beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mergebot-r...@apache.org
Subject [beam-site] 01/01: Prepare repository for deployment.
Date Fri, 22 Sep 2017 19:08:43 GMT
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit ed155be716bba45cfc332afa22cca447d24af929
Author: Mergebot <mergebot@apache.org>
AuthorDate: Fri Sep 22 19:08:40 2017 +0000

    Prepare repository for deployment.
---
 content/documentation/io/built-in/index.html       |    2 +-
 content/documentation/io/io-toc/index.html         |    2 +-
 .../pipelines/create-your-pipeline/index.html      |    2 +-
 .../pipelines/design-your-pipeline/index.html      |    4 +-
 .../pipelines/test-your-pipeline/index.html        |   12 +-
 content/documentation/programming-guide/index.html | 1923 +++++++++++++++-----
 .../documentation/sdks/python-custom-io/index.html |    2 +-
 .../get-started/mobile-gaming-example/index.html   |    8 +-
 content/get-started/wordcount-example/index.html   |    6 +-
 9 files changed, 1461 insertions(+), 500 deletions(-)

diff --git a/content/documentation/io/built-in/index.html b/content/documentation/io/built-in/index.html
index 27de854..533391f 100644
--- a/content/documentation/io/built-in/index.html
+++ b/content/documentation/io/built-in/index.html
@@ -149,7 +149,7 @@
 
 <p>This table contains the currently available I/O transforms.</p>
 
-<p>Consult the <a href="/documentation/programming-guide#io">Programming Guide I/O section</a> for general usage instructions, and see the javadoc/pydoc for the particular I/O transforms.</p>
+<p>Consult the <a href="/documentation/programming-guide#pipeline-io">Programming Guide I/O section</a> for general usage instructions, and see the javadoc/pydoc for the particular I/O transforms.</p>
 
 <table class="table table-bordered">
 <tr>
diff --git a/content/documentation/io/io-toc/index.html b/content/documentation/io/io-toc/index.html
index e700e34..32a6861 100644
--- a/content/documentation/io/io-toc/index.html
+++ b/content/documentation/io/io-toc/index.html
@@ -147,7 +147,7 @@
 
 <h2 id="using-pipeline-io">Using Pipeline I/O</h2>
 <ul>
-  <li><a href="/documentation/programming-guide#io">Programming Guide: Using I/O Transforms</a></li>
+  <li><a href="/documentation/programming-guide#pipeline-io">Programming Guide: Using I/O Transforms</a></li>
   <li><a href="/documentation/io/built-in/">Built-in I/O Transforms</a></li>
 </ul>
 
diff --git a/content/documentation/pipelines/create-your-pipeline/index.html b/content/documentation/pipelines/create-your-pipeline/index.html
index 759b0f3..988c6e3 100644
--- a/content/documentation/pipelines/create-your-pipeline/index.html
+++ b/content/documentation/pipelines/create-your-pipeline/index.html
@@ -170,7 +170,7 @@
 
 <p>In the Beam SDKs, each pipeline is represented by an explicit object of type <code class="highlighter-rouge">Pipeline</code>. Each <code class="highlighter-rouge">Pipeline</code> object is an independent entity that encapsulates both the data the pipeline operates over and the transforms that get applied to that data.</p>
 
-<p>To create a pipeline, declare a <code class="highlighter-rouge">Pipeline</code> object, and pass it some <a href="/documentation/programming-guide#options">configuration options</a>.</p>
+<p>To create a pipeline, declare a <code class="highlighter-rouge">Pipeline</code> object, and pass it some <a href="/documentation/programming-guide#configuring-pipeline-options">configuration options</a>.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Start by defining the options for the pipeline.</span>
 <span class="n">PipelineOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="n">PipelineOptionsFactory</span><span class="o">.</span><span class="na">create</span><span class="o">();</span>
diff --git a/content/documentation/pipelines/design-your-pipeline/index.html b/content/documentation/pipelines/design-your-pipeline/index.html
index ea5b37f..f2e7ff8 100644
--- a/content/documentation/pipelines/design-your-pipeline/index.html
+++ b/content/documentation/pipelines/design-your-pipeline/index.html
@@ -169,7 +169,7 @@
 <ul>
   <li><strong>Where is your input data stored?</strong> How many sets of input data do you have? This will determine what kinds of <code class="highlighter-rouge">Read</code> transforms you’ll need to apply at the start of your pipeline.</li>
   <li><strong>What does your data look like?</strong> It might be plaintext, formatted log files, or rows in a database table. Some Beam transforms work exclusively on <code class="highlighter-rouge">PCollection</code>s of key/value pairs; you’ll need to determine if and how your data is keyed and how to best represent that in your pipeline’s <code class="highlighter-rouge">PCollection</code>(s).</li>
-  <li><strong>What do you want to do with your data?</strong> The core transforms in the Beam SDKs are general purpose. Knowing how you need to change or manipulate your data will determine how you build core transforms like <a href="/documentation/programming-guide/#transforms-pardo">ParDo</a>, or when you use pre-written transforms included with the Beam SDKs.</li>
+  <li><strong>What do you want to do with your data?</strong> The core transforms in the Beam SDKs are general purpose. Knowing how you need to change or manipulate your data will determine how you build core transforms like <a href="/documentation/programming-guide/#pardo">ParDo</a>, or when you use pre-written transforms included with the Beam SDKs.</li>
   <li><strong>What does your output data look like, and where should it go?</strong> This will determine what kinds of <code class="highlighter-rouge">Write</code> transforms you’ll need to apply at the end of your pipeline.</li>
 </ul>
 
@@ -222,7 +222,7 @@
 
 <h3 id="a-single-transform-that-produces-multiple-outputs">A single transform that produces multiple outputs</h3>
 
-<p>Another way to branch a pipeline is to have a <strong>single</strong> transform output to multiple <code class="highlighter-rouge">PCollection</code>s by using <a href="/documentation/programming-guide/#transforms-outputs">tagged outputs</a>. Transforms that produce more than one output process each element of the input once, and output to zero or more <code class="highlighter-rouge">PCollection</code>s.</p>
+<p>Another way to branch a pipeline is to have a <strong>single</strong> transform output to multiple <code class="highlighter-rouge">PCollection</code>s by using <a href="/documentation/programming-guide/#additional-outputs">tagged outputs</a>. Transforms that produce more than one output process each element of the input once, and output to zero or more <code class="highlighter-rouge">PCollection</code>s.</p>
 
 <p>Figure 3 below illustrates the same example described above, but with one transform that produces multiple outputs. Names that start with ‘A’ are added to the main output <code class="highlighter-rouge">PCollection</code>, and names that start with ‘B’ are added to an additional output <code class="highlighter-rouge">PCollection</code>.</p>
 
diff --git a/content/documentation/pipelines/test-your-pipeline/index.html b/content/documentation/pipelines/test-your-pipeline/index.html
index 6952ed3..1dda75c 100644
--- a/content/documentation/pipelines/test-your-pipeline/index.html
+++ b/content/documentation/pipelines/test-your-pipeline/index.html
@@ -180,8 +180,8 @@
 <p>The Beam SDKs provide a number of ways to unit test your pipeline code, from the lowest to the highest levels. From the lowest to the highest level, these are:</p>
 
 <ul>
-  <li>You can test the individual function objects, such as <a href="/documentation/programming-guide/#transforms-pardo">DoFn</a>s, inside your pipeline’s core transforms.</li>
-  <li>You can test an entire <a href="/documentation/programming-guide/#transforms-composite">Composite Transform</a> as a unit.</li>
+  <li>You can test the individual function objects, such as <a href="/documentation/programming-guide/#pardo">DoFn</a>s, inside your pipeline’s core transforms.</li>
+  <li>You can test an entire <a href="/documentation/programming-guide/#composite-transforms">Composite Transform</a> as a unit.</li>
   <li>You can perform an end-to-end test for an entire pipeline.</li>
 </ul>
 
@@ -197,7 +197,7 @@
 
 <ol>
   <li>Create a <code class="highlighter-rouge">DoFnTester</code>. You’ll need to pass an instance of the <code class="highlighter-rouge">DoFn</code> you want to test to the static factory method for <code class="highlighter-rouge">DoFnTester</code>.</li>
-  <li>Create one or more main test inputs of the appropriate type for your <code class="highlighter-rouge">DoFn</code>. If your <code class="highlighter-rouge">DoFn</code> takes side inputs and/or produces <a href="/documentation/programming-guide#transforms-outputs">multiple outputs</a>, you should also create the side inputs and the output tags.</li>
+  <li>Create one or more main test inputs of the appropriate type for your <code class="highlighter-rouge">DoFn</code>. If your <code class="highlighter-rouge">DoFn</code> takes side inputs and/or produces <a href="/documentation/programming-guide#additional-outputs">multiple outputs</a>, you should also create the side inputs and the output tags.</li>
   <li>Call <code class="highlighter-rouge">DoFnTester.processBundle</code> to process the main inputs.</li>
   <li>Use JUnit’s <code class="highlighter-rouge">Assert.assertThat</code> method to ensure the test outputs returned from <code class="highlighter-rouge">processBundle</code> match your expected values.</li>
 </ol>
@@ -239,7 +239,7 @@
 </code></pre>
 </div>
 
-<p>See the <code class="highlighter-rouge">ParDo</code> documentation on <a href="/documentation/programming-guide/#transforms-sideio">side inputs</a> for more information.</p>
+<p>See the <code class="highlighter-rouge">ParDo</code> documentation on <a href="/documentation/programming-guide/#side-inputs">side inputs</a> for more information.</p>
 
 <h4 id="additional-outputs">Additional Outputs</h4>
 
@@ -264,7 +264,7 @@ for the <code class="highlighter-rouge">DoFnTester</code> as follows:</p>
 </code></pre>
 </div>
 
-<p>See the <code class="highlighter-rouge">ParDo</code> documentation on <a href="/documentation/programming-guide/#transforms-outputs">additional outputs</a> for more information.</p>
+<p>See the <code class="highlighter-rouge">ParDo</code> documentation on <a href="/documentation/programming-guide/#additional-outputs">additional outputs</a> for more information.</p>
 
 <h3 id="processing-test-inputs-and-checking-results">Processing Test Inputs and Checking Results</h3>
 
@@ -321,7 +321,7 @@ for the <code class="highlighter-rouge">DoFnTester</code> as follows:</p>
 
 <h3 id="using-the-create-transform">Using the Create Transform</h3>
 
-<p>You can use the <code class="highlighter-rouge">Create</code> transform to create a <code class="highlighter-rouge">PCollection</code> out of a standard in-memory collection class, such as Java <code class="highlighter-rouge">List</code>. See <a href="/documentation/programming-guide/#pcollection">Creating a PCollection</a> for more information.</p>
+<p>You can use the <code class="highlighter-rouge">Create</code> transform to create a <code class="highlighter-rouge">PCollection</code> out of a standard in-memory collection class, such as Java <code class="highlighter-rouge">List</code>. See <a href="/documentation/programming-guide/#creating-a-pcollection">Creating a PCollection</a> for more information.</p>
 
 <h3 id="passert">PAssert</h3>
 <p><a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/testing/PAssert.html">PAssert</a> is a class included in the Beam Java SDK  that is an assertion on the contents of a <code class="highlighter-rouge">PCollection</code>. You can use <code class="highlighter-rouge">PAssert</code>to verify that a <code class="highlighter-rouge">PCollection</code> contains a specific set of expected elements.</p>
diff --git a/content/documentation/programming-guide/index.html b/content/documentation/programming-guide/index.html
index 8a9ab93..6e109ac 100644
--- a/content/documentation/programming-guide/index.html
+++ b/content/documentation/programming-guide/index.html
@@ -145,7 +145,13 @@
     <div class="body__contained">
       <h1 id="apache-beam-programming-guide">Apache Beam Programming Guide</h1>
 
-<p>The <strong>Beam Programming Guide</strong> is intended for Beam users who want to use the Beam SDKs to create data processing pipelines. It provides guidance for using the Beam SDK classes to build and test your pipeline. It is not intended as an exhaustive reference, but as a language-agnostic, high-level guide to programmatically building your Beam pipeline. As the programming guide is filled out, the text will include code samples in multiple languages to help illustrate how to im [...]
+<p>The <strong>Beam Programming Guide</strong> is intended for Beam users who want to use the
+Beam SDKs to create data processing pipelines. It provides guidance for using
+the Beam SDK classes to build and test your pipeline. It is not intended as an
+exhaustive reference, but as a language-agnostic, high-level guide to
+programmatically building your Beam pipeline. As the programming guide is filled
+out, the text will include code samples in multiple languages to help illustrate
+how to implement Beam concepts in your pipelines.</p>
 
 <nav class="language-switcher">
   <strong>Adapt for:</strong>
@@ -155,77 +161,247 @@
   </ul>
 </nav>
 
-<h2 id="contents">Contents</h2>
-
-<ul>
-  <li><a href="#overview">Overview</a></li>
-  <li><a href="#pipeline">Creating the Pipeline</a>
-    <ul>
-      <li><a href="#options">Configuring Pipeline Options</a></li>
+<p><strong>Table of Contents:</strong></p>
+<ul id="markdown-toc">
+  <li><a href="#overview" id="markdown-toc-overview">1. Overview</a></li>
+  <li><a href="#creating-a-pipeline" id="markdown-toc-creating-a-pipeline">2. Creating a pipeline</a>    <ul>
+      <li><a href="#configuring-pipeline-options" id="markdown-toc-configuring-pipeline-options">2.1. Configuring pipeline options</a>        <ul>
+          <li><a href="#setting-pipelineoptions-from-command-line-arguments" id="markdown-toc-setting-pipelineoptions-from-command-line-arguments">2.1.1. Setting PipelineOptions from command-line arguments</a></li>
+          <li><a href="#creating-custom-options" id="markdown-toc-creating-custom-options">2.1.2. Creating custom options</a></li>
+        </ul>
+      </li>
+    </ul>
+  </li>
+  <li><a href="#pcollections" id="markdown-toc-pcollections">3. PCollections</a>    <ul>
+      <li><a href="#creating-a-pcollection" id="markdown-toc-creating-a-pcollection">3.1. Creating a PCollection</a>        <ul>
+          <li><a href="#reading-from-an-external-source" id="markdown-toc-reading-from-an-external-source">3.1.1. Reading from an external source</a></li>
+          <li><a href="#creating-a-pcollection-from-in-memory-data" id="markdown-toc-creating-a-pcollection-from-in-memory-data">3.1.2. Creating a PCollection from in-memory data</a></li>
+        </ul>
+      </li>
+      <li><a href="#pcollection-characteristics" id="markdown-toc-pcollection-characteristics">3.2. PCollection characteristics</a>        <ul>
+          <li><a href="#element-type" id="markdown-toc-element-type">3.2.1. Element type</a></li>
+          <li><a href="#immutability" id="markdown-toc-immutability">3.2.2. Immutability</a></li>
+          <li><a href="#random-access" id="markdown-toc-random-access">3.2.3. Random access</a></li>
+          <li><a href="#size-and-boundedness" id="markdown-toc-size-and-boundedness">3.2.4. Size and boundedness</a></li>
+          <li><a href="#element-timestamps" id="markdown-toc-element-timestamps">3.2.5. Element timestamps</a></li>
+        </ul>
+      </li>
+    </ul>
+  </li>
+  <li><a href="#transforms" id="markdown-toc-transforms">4. Transforms</a>    <ul>
+      <li><a href="#applying-transforms" id="markdown-toc-applying-transforms">4.1. Applying transforms</a></li>
+      <li><a href="#core-beam-transforms" id="markdown-toc-core-beam-transforms">4.2. Core Beam transforms</a>        <ul>
+          <li><a href="#pardo" id="markdown-toc-pardo">4.2.1. ParDo</a>            <ul>
+              <li><a href="#applying-pardo" id="markdown-toc-applying-pardo">4.2.1.1. Applying ParDo</a></li>
+              <li><a href="#creating-a-dofn" id="markdown-toc-creating-a-dofn">4.2.1.2. Creating a DoFn</a></li>
+              <li><a href="#lightweight-dofns-and-other-abstractions" id="markdown-toc-lightweight-dofns-and-other-abstractions">4.2.1.3. Lightweight DoFns and other abstractions</a></li>
+            </ul>
+          </li>
+          <li><a href="#groupbykey" id="markdown-toc-groupbykey">4.2.2. GroupByKey</a></li>
+          <li><a href="#cogroupbykey" id="markdown-toc-cogroupbykey">4.2.3. CoGroupByKey</a></li>
+          <li><a href="#combine" id="markdown-toc-combine">4.2.4. Combine</a>            <ul>
+              <li><a href="#simple-combinations-using-simple-functions" id="markdown-toc-simple-combinations-using-simple-functions">4.2.4.1. Simple combinations using simple functions</a></li>
+              <li><a href="#advanced-combinations-using-combinefn" id="markdown-toc-advanced-combinations-using-combinefn">4.2.4.2. Advanced combinations using CombineFn</a></li>
+              <li><a href="#combining-a-pcollection-into-a-single-value" id="markdown-toc-combining-a-pcollection-into-a-single-value">4.2.4.3. Combining a PCollection into a single value</a></li>
+              <li><a href="#combine-and-global-windowing" id="markdown-toc-combine-and-global-windowing">4.2.4.4. Combine and global windowing</a></li>
+              <li><a href="#combine-and-non-global-windowing" id="markdown-toc-combine-and-non-global-windowing">4.2.4.5. Combine and non-global windowing</a></li>
+              <li><a href="#combining-values-in-a-keyed-pcollection" id="markdown-toc-combining-values-in-a-keyed-pcollection">4.2.4.6. Combining values in a keyed PCollection</a></li>
+            </ul>
+          </li>
+          <li><a href="#flatten" id="markdown-toc-flatten">4.2.5. Flatten</a>            <ul>
+              <li><a href="#data-encoding-in-merged-collections" id="markdown-toc-data-encoding-in-merged-collections">4.2.5.1. Data encoding in merged collections</a></li>
+              <li><a href="#merging-windowed-collections" id="markdown-toc-merging-windowed-collections">4.2.5.2. Merging windowed collections</a></li>
+            </ul>
+          </li>
+          <li><a href="#partition" id="markdown-toc-partition">4.2.6. Partition</a></li>
+        </ul>
+      </li>
+      <li><a href="#requirements-for-writing-user-code-for-beam-transforms" id="markdown-toc-requirements-for-writing-user-code-for-beam-transforms">4.3. Requirements for writing user code for Beam transforms</a>        <ul>
+          <li><a href="#serializability" id="markdown-toc-serializability">4.3.1. Serializability</a></li>
+          <li><a href="#thread-compatibility" id="markdown-toc-thread-compatibility">4.3.2. Thread-compatibility</a></li>
+          <li><a href="#idempotence" id="markdown-toc-idempotence">4.3.3. Idempotence</a></li>
+        </ul>
+      </li>
+      <li><a href="#side-inputs" id="markdown-toc-side-inputs">4.4. Side inputs</a>        <ul>
+          <li><a href="#passing-side-inputs-to-pardo" id="markdown-toc-passing-side-inputs-to-pardo">4.4.1. Passing side inputs to ParDo</a></li>
+          <li><a href="#side-inputs-and-windowing" id="markdown-toc-side-inputs-and-windowing">4.4.2. Side inputs and windowing</a></li>
+        </ul>
+      </li>
+      <li><a href="#additional-outputs" id="markdown-toc-additional-outputs">4.5. Additional outputs</a>        <ul>
+          <li><a href="#tags-for-multiple-outputs" id="markdown-toc-tags-for-multiple-outputs">4.5.1. Tags for multiple outputs</a></li>
+          <li><a href="#emitting-to-multiple-outputs-in-your-dofn" id="markdown-toc-emitting-to-multiple-outputs-in-your-dofn">4.5.2. Emitting to multiple outputs in your DoFn</a></li>
+        </ul>
+      </li>
+      <li><a href="#composite-transforms" id="markdown-toc-composite-transforms">4.6. Composite transforms</a>        <ul>
+          <li><a href="#an-example-composite-transform" id="markdown-toc-an-example-composite-transform">4.6.1. An example composite transform</a></li>
+          <li><a href="#creating-a-composite-transform" id="markdown-toc-creating-a-composite-transform">4.6.2. Creating a composite transform</a></li>
+          <li><a href="#ptransform-style-guide" id="markdown-toc-ptransform-style-guide">4.6.3. PTransform Style Guide</a></li>
+        </ul>
+      </li>
+    </ul>
+  </li>
+  <li><a href="#pipeline-io" id="markdown-toc-pipeline-io">5. Pipeline I/O</a>    <ul>
+      <li><a href="#reading-input-data" id="markdown-toc-reading-input-data">5.1. Reading input data</a></li>
+      <li><a href="#writing-output-data" id="markdown-toc-writing-output-data">5.2. Writing output data</a></li>
+      <li><a href="#file-based-input-and-output-data" id="markdown-toc-file-based-input-and-output-data">5.3. File-based input and output data</a>        <ul>
+          <li><a href="#reading-from-multiple-locations" id="markdown-toc-reading-from-multiple-locations">5.3.1. Reading from multiple locations</a></li>
+          <li><a href="#writing-to-multiple-output-files" id="markdown-toc-writing-to-multiple-output-files">5.3.2. Writing to multiple output files</a></li>
+        </ul>
+      </li>
+      <li><a href="#beam-provided-io-transforms" id="markdown-toc-beam-provided-io-transforms">5.4. Beam-provided I/O transforms</a></li>
+    </ul>
+  </li>
+  <li><a href="#data-encoding-and-type-safety" id="markdown-toc-data-encoding-and-type-safety">6. Data encoding and type safety</a>    <ul>
+      <li><a href="#specifying-coders" id="markdown-toc-specifying-coders">6.1. Specifying coders</a></li>
+      <li><a href="#default-coders-and-the-coderregistry" id="markdown-toc-default-coders-and-the-coderregistry">6.2. Default coders and the CoderRegistry</a>        <ul>
+          <li><a href="#looking-up-a-default-coder" id="markdown-toc-looking-up-a-default-coder">6.2.1. Looking up a default coder</a></li>
+          <li><a href="#setting-the-default-coder-for-a-type" id="markdown-toc-setting-the-default-coder-for-a-type">6.2.2. Setting the default coder for a type</a></li>
+          <li><a href="#annotating-a-custom-data-type-with-a-default-coder" id="markdown-toc-annotating-a-custom-data-type-with-a-default-coder">6.2.3. Annotating a custom data type with a default coder</a></li>
+        </ul>
+      </li>
     </ul>
   </li>
-  <li><a href="#pcollection">Working with PCollections</a>
-    <ul>
-      <li><a href="#pccreate">Creating a PCollection</a></li>
-      <li><a href="#pccharacteristics">PCollection Characteristics</a></li>
+  <li><a href="#windowing" id="markdown-toc-windowing">7. Windowing</a>    <ul>
+      <li><a href="#windowing-basics" id="markdown-toc-windowing-basics">7.1. Windowing basics</a>        <ul>
+          <li><a href="#windowing-constraints" id="markdown-toc-windowing-constraints">7.1.1. Windowing constraints</a></li>
+          <li><a href="#using-windowing-with-bounded-pcollections" id="markdown-toc-using-windowing-with-bounded-pcollections">7.1.2. Using windowing with bounded PCollections</a></li>
+        </ul>
+      </li>
+      <li><a href="#provided-windowing-functions" id="markdown-toc-provided-windowing-functions">7.2. Provided windowing functions</a>        <ul>
+          <li><a href="#fixed-time-windows" id="markdown-toc-fixed-time-windows">7.2.1. Fixed time windows</a></li>
+          <li><a href="#sliding-time-windows" id="markdown-toc-sliding-time-windows">7.2.2. Sliding time windows</a></li>
+          <li><a href="#session-windows" id="markdown-toc-session-windows">7.2.3. Session windows</a></li>
+          <li><a href="#the-single-global-window" id="markdown-toc-the-single-global-window">7.2.4. The single global window</a></li>
+        </ul>
+      </li>
+      <li><a href="#setting-your-pcollections-windowing-function" id="markdown-toc-setting-your-pcollections-windowing-function">7.3. Setting your PCollection’s windowing function</a>        <ul>
+          <li><a href="#fixed-time-windows-1" id="markdown-toc-fixed-time-windows-1">7.3.1. Fixed-time windows</a></li>
+          <li><a href="#sliding-time-windows-1" id="markdown-toc-sliding-time-windows-1">7.3.2. Sliding time windows</a></li>
+          <li><a href="#session-windows-1" id="markdown-toc-session-windows-1">7.3.3. Session windows</a></li>
+          <li><a href="#single-global-window" id="markdown-toc-single-global-window">7.3.4. Single global window</a></li>
+        </ul>
+      </li>
+      <li><a href="#watermarks-and-late-data" id="markdown-toc-watermarks-and-late-data">7.4. Watermarks and late data</a>        <ul>
+          <li><a href="#managing-late-data" id="markdown-toc-managing-late-data">7.4.1. Managing late data</a></li>
+        </ul>
+      </li>
+      <li><a href="#adding-timestamps-to-a-pcollections-elements" id="markdown-toc-adding-timestamps-to-a-pcollections-elements">7.5. Adding timestamps to a PCollection’s elements</a></li>
     </ul>
   </li>
-  <li><a href="#transforms">Applying Transforms</a>
-    <ul>
-      <li><a href="#transforms-pardo">Using ParDo</a></li>
-      <li><a href="#transforms-gbk">Using GroupByKey</a></li>
-      <li><a href="#transforms-combine">Using Combine</a></li>
-      <li><a href="#transforms-flatten-partition">Using Flatten and Partition</a></li>
-      <li><a href="#transforms-usercodereqs">General Requirements for Writing User Code for Beam Transforms</a></li>
-      <li><a href="#transforms-sideio">Side Inputs</a></li>
-      <li><a href="#transforms-outputs">Additional Outputs</a></li>
+  <li><a href="#triggers" id="markdown-toc-triggers">8. Triggers</a>    <ul>
+      <li><a href="#event-time-triggers" id="markdown-toc-event-time-triggers">8.1. Event time triggers</a>        <ul>
+          <li><a href="#the-default-trigger" id="markdown-toc-the-default-trigger">8.1.1. The default trigger</a></li>
+        </ul>
+      </li>
+      <li><a href="#processing-time-triggers" id="markdown-toc-processing-time-triggers">8.2. Processing time triggers</a></li>
+      <li><a href="#data-driven-triggers" id="markdown-toc-data-driven-triggers">8.3. Data-driven triggers</a></li>
+      <li><a href="#setting-a-trigger" id="markdown-toc-setting-a-trigger">8.4. Setting a trigger</a>        <ul>
+          <li><a href="#window-accumulation-modes" id="markdown-toc-window-accumulation-modes">8.4.1. Window accumulation modes</a>            <ul>
+              <li><a href="#accumulating-mode" id="markdown-toc-accumulating-mode">8.4.1.1. Accumulating mode</a></li>
+              <li><a href="#discarding-mode" id="markdown-toc-discarding-mode">8.4.1.2. Discarding mode</a></li>
+            </ul>
+          </li>
+          <li><a href="#handling-late-data" id="markdown-toc-handling-late-data">8.4.2. Handling late data</a></li>
+        </ul>
+      </li>
+      <li><a href="#composite-triggers" id="markdown-toc-composite-triggers">8.5. Composite triggers</a>        <ul>
+          <li><a href="#composite-trigger-types" id="markdown-toc-composite-trigger-types">8.5.1. Composite trigger types</a></li>
+          <li><a href="#composition-with-afterwatermarkpastendofwindow" id="markdown-toc-composition-with-afterwatermarkpastendofwindow">8.5.2. Composition with AfterWatermark.pastEndOfWindow</a></li>
+          <li><a href="#other-composite-triggers" id="markdown-toc-other-composite-triggers">8.5.3. Other composite triggers</a></li>
+        </ul>
+      </li>
     </ul>
   </li>
-  <li><a href="#transforms-composite">Composite Transforms</a></li>
-  <li><a href="#io">Pipeline I/O</a></li>
-  <li><a href="#coders">Data Encoding and Type Safety</a></li>
-  <li><a href="#windowing">Working with Windowing</a></li>
-  <li><a href="#triggers">Working with Triggers</a></li>
 </ul>
 
-<h2 id="a-nameoverviewaoverview"><a name="overview"></a>Overview</h2>
+<h2 id="overview">1. Overview</h2>
 
-<p>To use Beam, you need to first create a driver program using the classes in one of the Beam SDKs. Your driver program <em>defines</em> your pipeline, including all of the inputs, transforms, and outputs; it also sets execution options for your pipeline (typically passed in using command-line options). These include the Pipeline Runner, which, in turn, determines what back-end your pipeline will run on.</p>
+<p>To use Beam, you need to first create a driver program using the classes in one
+of the Beam SDKs. Your driver program <em>defines</em> your pipeline, including all of
+the inputs, transforms, and outputs; it also sets execution options for your
+pipeline (typically passed in using command-line options). These include the
+Pipeline Runner, which, in turn, determines what back-end your pipeline will run
+on.</p>
 
-<p>The Beam SDKs provide a number of abstractions that simplify the mechanics of large-scale distributed data processing. The same Beam abstractions work with both batch and streaming data sources. When you create your Beam pipeline, you can think about your data processing task in terms of these abstractions. They include:</p>
+<p>The Beam SDKs provide a number of abstractions that simplify the mechanics of
+large-scale distributed data processing. The same Beam abstractions work with
+both batch and streaming data sources. When you create your Beam pipeline, you
+can think about your data processing task in terms of these abstractions. They
+include:</p>
 
 <ul>
   <li>
-    <p><code class="highlighter-rouge">Pipeline</code>: A <code class="highlighter-rouge">Pipeline</code> encapsulates your entire data processing task, from start to finish. This includes reading input data, transforming that data, and writing output data. All Beam driver programs must create a <code class="highlighter-rouge">Pipeline</code>. When you create the <code class="highlighter-rouge">Pipeline</code>, you must also specify the execution options that tell the <code class="highli [...]
+    <p><code class="highlighter-rouge">Pipeline</code>: A <code class="highlighter-rouge">Pipeline</code> encapsulates your entire data processing task, from
+start to finish. This includes reading input data, transforming that data, and
+writing output data. All Beam driver programs must create a <code class="highlighter-rouge">Pipeline</code>. When
+you create the <code class="highlighter-rouge">Pipeline</code>, you must also specify the execution options that
+tell the <code class="highlighter-rouge">Pipeline</code> where and how to run.</p>
   </li>
   <li>
-    <p><code class="highlighter-rouge">PCollection</code>: A <code class="highlighter-rouge">PCollection</code> represents a distributed data set that your Beam pipeline operates on. The data set can be <em>bounded</em>, meaning it comes from a fixed source like a file, or <em>unbounded</em>, meaning it comes from a continuously updating source via a subscription or other mechanism. Your pipeline typically creates an initial <code class="highlighter-rouge">PCollection</code> by reading d [...]
+    <p><code class="highlighter-rouge">PCollection</code>: A <code class="highlighter-rouge">PCollection</code> represents a distributed data set that your
+Beam pipeline operates on. The data set can be <em>bounded</em>, meaning it comes
+from a fixed source like a file, or <em>unbounded</em>, meaning it comes from a
+continuously updating source via a subscription or other mechanism. Your
+pipeline typically creates an initial <code class="highlighter-rouge">PCollection</code> by reading data from an
+external data source, but you can also create a <code class="highlighter-rouge">PCollection</code> from in-memory
+data within your driver program. From there, <code class="highlighter-rouge">PCollection</code>s are the inputs and
+outputs for each step in your pipeline.</p>
   </li>
   <li>
-    <p><code class="highlighter-rouge">Transform</code>: A <code class="highlighter-rouge">Transform</code> represents a data processing operation, or a step, in your pipeline. Every <code class="highlighter-rouge">Transform</code> takes one or more <code class="highlighter-rouge">PCollection</code> objects as input, performs a processing function that you provide on the elements of that <code class="highlighter-rouge">PCollection</code>, and produces one or more output <code class="high [...]
+    <p><code class="highlighter-rouge">Transform</code>: A <code class="highlighter-rouge">Transform</code> represents a data processing operation, or a step,
+in your pipeline. Every <code class="highlighter-rouge">Transform</code> takes one or more <code class="highlighter-rouge">PCollection</code> objects as
+input, performs a processing function that you provide on the elements of that
+<code class="highlighter-rouge">PCollection</code>, and produces one or more output <code class="highlighter-rouge">PCollection</code> objects.</p>
   </li>
   <li>
-    <p>I/O <code class="highlighter-rouge">Source</code> and <code class="highlighter-rouge">Sink</code>: Beam provides <code class="highlighter-rouge">Source</code> and <code class="highlighter-rouge">Sink</code> APIs to represent reading and writing data, respectively. <code class="highlighter-rouge">Source</code> encapsulates the code necessary to read data into your Beam pipeline from some external source, such as cloud file storage or a subscription to a streaming data source. <code [...]
+    <p>I/O <code class="highlighter-rouge">Source</code> and <code class="highlighter-rouge">Sink</code>: Beam provides <code class="highlighter-rouge">Source</code> and <code class="highlighter-rouge">Sink</code> APIs to represent
+reading and writing data, respectively. <code class="highlighter-rouge">Source</code> encapsulates the code
+necessary to read data into your Beam pipeline from some external source, such
+as cloud file storage or a subscription to a streaming data source. <code class="highlighter-rouge">Sink</code>
+likewise encapsulates the code necessary to write the elements of a
+<code class="highlighter-rouge">PCollection</code> to an external data sink.</p>
   </li>
 </ul>
 
 <p>A typical Beam driver program works as follows:</p>
 
 <ul>
-  <li>Create a <code class="highlighter-rouge">Pipeline</code> object and set the pipeline execution options, including the Pipeline Runner.</li>
-  <li>Create an initial <code class="highlighter-rouge">PCollection</code> for pipeline data, either using the <code class="highlighter-rouge">Source</code> API to read data from an external source, or using a <code class="highlighter-rouge">Create</code> transform to build a <code class="highlighter-rouge">PCollection</code> from in-memory data.</li>
-  <li>Apply <strong>Transforms</strong> to each <code class="highlighter-rouge">PCollection</code>. Transforms can change, filter, group, analyze, or otherwise process the elements in a <code class="highlighter-rouge">PCollection</code>. A transform creates a new output <code class="highlighter-rouge">PCollection</code> <em>without consuming the input collection</em>. A typical pipeline applies subsequent transforms to the each new output <code class="highlighter-rouge">PCollection</code [...]
-  <li>Output the final, transformed <code class="highlighter-rouge">PCollection</code>(s), typically using the <code class="highlighter-rouge">Sink</code> API to write data to an external source.</li>
+  <li>Create a <code class="highlighter-rouge">Pipeline</code> object and set the pipeline execution options, including
+the Pipeline Runner.</li>
+  <li>Create an initial <code class="highlighter-rouge">PCollection</code> for pipeline data, either using the <code class="highlighter-rouge">Source</code>
+API to read data from an external source, or using a <code class="highlighter-rouge">Create</code> transform to
+build a <code class="highlighter-rouge">PCollection</code> from in-memory data.</li>
+  <li>Apply <strong>Transforms</strong> to each <code class="highlighter-rouge">PCollection</code>. Transforms can change, filter,
+group, analyze, or otherwise process the elements in a <code class="highlighter-rouge">PCollection</code>. A
+transform creates a new output <code class="highlighter-rouge">PCollection</code> <em>without consuming the input
+collection</em>. A typical pipeline applies subsequent transforms to the each new
+output <code class="highlighter-rouge">PCollection</code> in turn until processing is complete.</li>
+  <li>Output the final, transformed <code class="highlighter-rouge">PCollection</code>(s), typically using the <code class="highlighter-rouge">Sink</code> API
+to write data to an external source.</li>
   <li><strong>Run</strong> the pipeline using the designated Pipeline Runner.</li>
 </ul>
 
-<p>When you run your Beam driver program, the Pipeline Runner that you designate constructs a <strong>workflow graph</strong> of your pipeline based on the <code class="highlighter-rouge">PCollection</code> objects you’ve created and transforms that you’ve applied. That graph is then executed using the appropriate distributed processing back-end, becoming an asynchronous “job” (or equivalent) on that back-end.</p>
+<p>When you run your Beam driver program, the Pipeline Runner that you designate
+constructs a <strong>workflow graph</strong> of your pipeline based on the <code class="highlighter-rouge">PCollection</code>
+objects you’ve created and transforms that you’ve applied. That graph is then
+executed using the appropriate distributed processing back-end, becoming an
+asynchronous “job” (or equivalent) on that back-end.</p>
 
-<h2 id="a-namepipelineacreating-the-pipeline"><a name="pipeline"></a>Creating the pipeline</h2>
+<h2 id="creating-a-pipeline">2. Creating a pipeline</h2>
 
-<p>The <code class="highlighter-rouge">Pipeline</code> abstraction encapsulates all the data and steps in your data processing task. Your Beam driver program typically starts by constructing a <span class="language-java"><a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/Pipeline.html">Pipeline</a></span><span class="language-py"><a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pipeline.py">Pipeline</a></span> object, and then using that  [...]
+<p>The <code class="highlighter-rouge">Pipeline</code> abstraction encapsulates all the data and steps in your data
+processing task. Your Beam driver program typically starts by constructing a
+<span class="language-java"><a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/Pipeline.html">Pipeline</a></span>
+<span class="language-py"><a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pipeline.py">Pipeline</a></span>
+object, and then using that object as the basis for creating the pipeline’s data
+sets as <code class="highlighter-rouge">PCollection</code>s and its operations as <code class="highlighter-rouge">Transform</code>s.</p>
 
-<p>To use Beam, your driver program must first create an instance of the Beam SDK class <code class="highlighter-rouge">Pipeline</code> (typically in the <code class="highlighter-rouge">main()</code> function). When you create your <code class="highlighter-rouge">Pipeline</code>, you’ll also need to set some <strong>configuration options</strong>. You can set your pipeline’s configuration options programatically, but it’s often easier to set the options ahead of time (or read them from t [...]
+<p>To use Beam, your driver program must first create an instance of the Beam SDK
+class <code class="highlighter-rouge">Pipeline</code> (typically in the <code class="highlighter-rouge">main()</code> function). When you create your
+<code class="highlighter-rouge">Pipeline</code>, you’ll also need to set some <strong>configuration options</strong>. You can set
+your pipeline’s configuration options programatically, but it’s often easier to
+set the options ahead of time (or read them from the command line) and pass them
+to the <code class="highlighter-rouge">Pipeline</code> object when you create the object.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Start by defining the options for the pipeline.</span>
 <span class="n">PipelineOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="n">PipelineOptionsFactory</span><span class="o">.</span><span class="na">create</span><span class="o">();</span>
@@ -234,7 +410,6 @@
 <span class="n">Pipeline</span> <span class="n">p</span> <span class="o">=</span> <span class="n">Pipeline</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">options</span><span class="o">);</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="kn">import</span> <span class="nn">apache_beam</span> <span class="kn">as</span> <span class="nn">beam</span>
 <span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="kn">import</span> <span class="n">PipelineOptions</span>
 
@@ -243,22 +418,30 @@
 </code></pre>
 </div>
 
-<h3 id="a-nameoptionsaconfiguring-pipeline-options"><a name="options"></a>Configuring Pipeline Options</h3>
+<h3 id="configuring-pipeline-options">2.1. Configuring pipeline options</h3>
 
-<p>Use the pipeline options to configure different aspects of your pipeline, such as the pipeline runner that will execute your pipeline and any runner-specific configuration required by the chosen runner. Your pipeline options will potentially include information such as your project ID or a location for storing files.</p>
+<p>Use the pipeline options to configure different aspects of your pipeline, such
+as the pipeline runner that will execute your pipeline and any runner-specific
+configuration required by the chosen runner. Your pipeline options will
+potentially include information such as your project ID or a location for
+storing files.</p>
 
-<p>When you run the pipeline on a runner of your choice, a copy of the PipelineOptions will be available to your code. For example, you can read PipelineOptions from a DoFn’s Context.</p>
+<p>When you run the pipeline on a runner of your choice, a copy of the
+PipelineOptions will be available to your code. For example, you can read
+PipelineOptions from a DoFn’s Context.</p>
 
-<h4 id="setting-pipelineoptions-from-command-line-arguments">Setting PipelineOptions from Command-Line Arguments</h4>
+<h4 id="setting-pipelineoptions-from-command-line-arguments">2.1.1. Setting PipelineOptions from command-line arguments</h4>
 
-<p>While you can configure your pipeline by creating a <code class="highlighter-rouge">PipelineOptions</code> object and setting the fields directly, the Beam SDKs include a command-line parser that you can use to set fields in <code class="highlighter-rouge">PipelineOptions</code> using command-line arguments.</p>
+<p>While you can configure your pipeline by creating a <code class="highlighter-rouge">PipelineOptions</code> object and
+setting the fields directly, the Beam SDKs include a command-line parser that
+you can use to set fields in <code class="highlighter-rouge">PipelineOptions</code> using command-line arguments.</p>
 
-<p>To read options from the command-line, construct your <code class="highlighter-rouge">PipelineOptions</code> object as demonstrated in the following example code:</p>
+<p>To read options from the command-line, construct your <code class="highlighter-rouge">PipelineOptions</code> object
+as demonstrated in the following example code:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">MyOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="n">PipelineOptionsFactory</span><span class="o">.</span><span class="na">fromArgs</span><span class="o">(</span><span class="n">args</span><span class="o">).</span><span class="na">withValidation</span><span class="o">().</span><span class="na">create</span><span class="o">();</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="kn">import</span> <span class="nn">apache_beam</span> <span class="kn">as</span> <span class="nn">beam</span>
 <span class="kn">from</span> <span class="nn">apache_beam.options.pipeline_options</span> <span class="kn">import</span> <span class="n">PipelineOptions</span>
 
@@ -274,18 +457,24 @@
 </div>
 
 <blockquote>
-  <p><strong>Note:</strong> Appending the method <code class="highlighter-rouge">.withValidation</code> will check for required command-line arguments and validate argument values.</p>
+  <p><strong>Note:</strong> Appending the method <code class="highlighter-rouge">.withValidation</code> will check for required
+command-line arguments and validate argument values.</p>
 </blockquote>
 
-<p>Building your <code class="highlighter-rouge">PipelineOptions</code> this way lets you specify any of the options as a command-line argument.</p>
+<p>Building your <code class="highlighter-rouge">PipelineOptions</code> this way lets you specify any of the options as
+a command-line argument.</p>
 
 <blockquote>
-  <p><strong>Note:</strong> The <a href="/get-started/wordcount-example">WordCount example pipeline</a> demonstrates how to set pipeline options at runtime by using command-line options.</p>
+  <p><strong>Note:</strong> The <a href="/get-started/wordcount-example">WordCount example pipeline</a>
+demonstrates how to set pipeline options at runtime by using command-line
+options.</p>
 </blockquote>
 
-<h4 id="creating-custom-options">Creating Custom Options</h4>
+<h4 id="creating-custom-options">2.1.2. Creating custom options</h4>
 
-<p>You can add your own custom options in addition to the standard <code class="highlighter-rouge">PipelineOptions</code>. To add your own options, define an interface with getter and setter methods for each option, as in the following example:</p>
+<p>You can add your own custom options in addition to the standard
+<code class="highlighter-rouge">PipelineOptions</code>. To add your own options, define an interface with getter and
+setter methods for each option, as in the following example:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">MyOptions</span> <span class="kd">extends</span> <span class="n">PipelineOptions</span> <span class="o">{</span>
     <span class="n">String</span> <span class="nf">getMyCustomOption</span><span class="o">();</span>
@@ -293,7 +482,6 @@
   <span class="o">}</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="k">class</span> <span class="nc">MyOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
 
   <span class="nd">@classmethod</span>
@@ -304,7 +492,8 @@
 </code></pre>
 </div>
 
-<p>You can also specify a description, which appears when a user passes <code class="highlighter-rouge">--help</code> as a command-line argument, and a default value.</p>
+<p>You can also specify a description, which appears when a user passes <code class="highlighter-rouge">--help</code> as
+a command-line argument, and a default value.</p>
 
 <p>You set the description and default value using annotations, as follows:</p>
 
@@ -316,7 +505,6 @@
   <span class="o">}</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="k">class</span> <span class="nc">MyOptions</span><span class="p">(</span><span class="n">PipelineOptions</span><span class="p">):</span>
 
   <span class="nd">@classmethod</span>
@@ -331,9 +519,15 @@
 </code></pre>
 </div>
 
-<p class="language-java">It’s recommended that you register your interface with <code class="highlighter-rouge">PipelineOptionsFactory</code> and then pass the interface when creating the <code class="highlighter-rouge">PipelineOptions</code> object. When you register your interface with <code class="highlighter-rouge">PipelineOptionsFactory</code>, the <code class="highlighter-rouge">--help</code> can find your custom options interface and add it to the output of the <code class="highli [...]
+<p class="language-java">It’s recommended that you register your interface with <code class="highlighter-rouge">PipelineOptionsFactory</code>
+and then pass the interface when creating the <code class="highlighter-rouge">PipelineOptions</code> object. When you
+register your interface with <code class="highlighter-rouge">PipelineOptionsFactory</code>, the <code class="highlighter-rouge">--help</code> can find
+your custom options interface and add it to the output of the <code class="highlighter-rouge">--help</code> command.
+<code class="highlighter-rouge">PipelineOptionsFactory</code> will also validate that your custom options are
+compatible with all other registered options.</p>
 
-<p class="language-java">The following example code shows how to register your custom options interface with <code class="highlighter-rouge">PipelineOptionsFactory</code>:</p>
+<p class="language-java">The following example code shows how to register your custom options interface
+with <code class="highlighter-rouge">PipelineOptionsFactory</code>:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PipelineOptionsFactory</span><span class="o">.</span><span class="na">register</span><span class="o">(</span><span class="n">MyOptions</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
 <span class="n">MyOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="n">PipelineOptionsFactory</span><span class="o">.</span><span class="na">fromArgs</span><span class="o">(</span><span class="n">args</span><span class="o">)</span>
@@ -344,21 +538,45 @@
 
 <p>Now your pipeline can accept <code class="highlighter-rouge">--myCustomOption=value</code> as a command-line argument.</p>
 
-<h2 id="a-namepcollectionaworking-with-pcollections"><a name="pcollection"></a>Working with PCollections</h2>
-
-<p>The <span class="language-java"><a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/values/PCollection.html">PCollection</a></span><span class="language-py"><code class="highlighter-rouge">PCollection</code></span> abstraction represents a potentially distributed, multi-element data set. You can think of a <code class="highlighter-rouge">PCollection</code> as “pipeline” data; Beam transforms use <code class="highlighter-rouge">PCollection</code> objects as inputs [...]
-
-<p>After you’ve created your <code class="highlighter-rouge">Pipeline</code>, you’ll need to begin by creating at least one <code class="highlighter-rouge">PCollection</code> in some form. The <code class="highlighter-rouge">PCollection</code> you create serves as the input for the first operation in your pipeline.</p>
-
-<h3 id="a-namepccreateacreating-a-pcollection"><a name="pccreate"></a>Creating a PCollection</h3>
-
-<p>You create a <code class="highlighter-rouge">PCollection</code> by either reading data from an external source using Beam’s <a href="#io">Source API</a>, or you can create a <code class="highlighter-rouge">PCollection</code> of data stored in an in-memory collection class in your driver program. The former is typically how a production pipeline would ingest data; Beam’s Source APIs contain adapters to help you read from external sources like large cloud-based files, databases, or subs [...]
-
-<h4 id="reading-from-an-external-source">Reading from an external source</h4>
-
-<p>To read from an external source, you use one of the <a href="#io">Beam-provided I/O adapters</a>. The adapters vary in their exact usage, but all of them from some external data source and return a <code class="highlighter-rouge">PCollection</code> whose elements represent the data records in that source.</p>
-
-<p>Each data source adapter has a <code class="highlighter-rouge">Read</code> transform; to read, you must apply that transform to the <code class="highlighter-rouge">Pipeline</code> object itself. <span class="language-java"><code class="highlighter-rouge">TextIO.Read</code></span><span class="language-py"><code class="highlighter-rouge">io.TextFileSource</code></span>, for example, reads from an external text file and returns a <code class="highlighter-rouge">PCollection</code> whose e [...]
+<h2 id="pcollections">3. PCollections</h2>
+
+<p>The <span class="language-java"><a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/values/PCollection.html">PCollection</a></span>
+<span class="language-py"><code class="highlighter-rouge">PCollection</code></span> abstraction represents a
+potentially distributed, multi-element data set. You can think of a
+<code class="highlighter-rouge">PCollection</code> as “pipeline” data; Beam transforms use <code class="highlighter-rouge">PCollection</code> objects as
+inputs and outputs. As such, if you want to work with data in your pipeline, it
+must be in the form of a <code class="highlighter-rouge">PCollection</code>.</p>
+
+<p>After you’ve created your <code class="highlighter-rouge">Pipeline</code>, you’ll need to begin by creating at least
+one <code class="highlighter-rouge">PCollection</code> in some form. The <code class="highlighter-rouge">PCollection</code> you create serves as the input
+for the first operation in your pipeline.</p>
+
+<h3 id="creating-a-pcollection">3.1. Creating a PCollection</h3>
+
+<p>You create a <code class="highlighter-rouge">PCollection</code> by either reading data from an external source using
+Beam’s <a href="#pipeline-io">Source API</a>, or you can create a <code class="highlighter-rouge">PCollection</code> of data
+stored in an in-memory collection class in your driver program. The former is
+typically how a production pipeline would ingest data; Beam’s Source APIs
+contain adapters to help you read from external sources like large cloud-based
+files, databases, or subscription services. The latter is primarily useful for
+testing and debugging purposes.</p>
+
+<h4 id="reading-from-an-external-source">3.1.1. Reading from an external source</h4>
+
+<p>To read from an external source, you use one of the <a href="#pipeline-io">Beam-provided I/O
+adapters</a>. The adapters vary in their exact usage, but all of them
+from some external data source and return a <code class="highlighter-rouge">PCollection</code> whose elements
+represent the data records in that source.</p>
+
+<p>Each data source adapter has a <code class="highlighter-rouge">Read</code> transform; to read, you must apply that
+transform to the <code class="highlighter-rouge">Pipeline</code> object itself.
+<span class="language-java"><code class="highlighter-rouge">TextIO.Read</code></span>
+<span class="language-py"><code class="highlighter-rouge">io.TextFileSource</code></span>, for example, reads from an
+external text file and returns a <code class="highlighter-rouge">PCollection</code> whose elements are of type
+<code class="highlighter-rouge">String</code>, each <code class="highlighter-rouge">String</code> represents one line from the text file. Here’s how you
+would apply <span class="language-java"><code class="highlighter-rouge">TextIO.Read</code></span>
+<span class="language-py"><code class="highlighter-rouge">io.TextFileSource</code></span> to your <code class="highlighter-rouge">Pipeline</code> to create
+a <code class="highlighter-rouge">PCollection</code>:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span>
     <span class="c1">// Create the pipeline.</span>
@@ -372,23 +590,30 @@
 <span class="o">}</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">lines</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="s">'ReadMyFile'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">ReadFromText</span><span class="p">(</span><span class="s">'gs://some/inputData.txt'</span><span class="p">)</span>
 
 </code></pre>
 </div>
 
-<p>See the <a href="#io">section on I/O</a> to learn more about how to read from the various data sources supported by the Beam SDK.</p>
+<p>See the <a href="#pipeline-io">section on I/O</a> to learn more about how to read from the
+various data sources supported by the Beam SDK.</p>
 
-<h4 id="creating-a-pcollection-from-in-memory-data">Creating a PCollection from in-memory data</h4>
+<h4 id="creating-a-pcollection-from-in-memory-data">3.1.2. Creating a PCollection from in-memory data</h4>
 
-<p class="language-java">To create a <code class="highlighter-rouge">PCollection</code> from an in-memory Java <code class="highlighter-rouge">Collection</code>, you use the Beam-provided <code class="highlighter-rouge">Create</code> transform. Much like a data adapter’s <code class="highlighter-rouge">Read</code>, you apply <code class="highlighter-rouge">Create</code> directly to your <code class="highlighter-rouge">Pipeline</code> object itself.</p>
+<p class="language-java">To create a <code class="highlighter-rouge">PCollection</code> from an in-memory Java <code class="highlighter-rouge">Collection</code>, you use the
+Beam-provided <code class="highlighter-rouge">Create</code> transform. Much like a data adapter’s <code class="highlighter-rouge">Read</code>, you apply
+<code class="highlighter-rouge">Create</code> directly to your <code class="highlighter-rouge">Pipeline</code> object itself.</p>
 
-<p class="language-java">As parameters, <code class="highlighter-rouge">Create</code> accepts the Java <code class="highlighter-rouge">Collection</code> and a <code class="highlighter-rouge">Coder</code> object. The <code class="highlighter-rouge">Coder</code> specifies how the elements in the <code class="highlighter-rouge">Collection</code> should be <a href="#pcelementtype">encoded</a>.</p>
+<p class="language-java">As parameters, <code class="highlighter-rouge">Create</code> accepts the Java <code class="highlighter-rouge">Collection</code> and a <code class="highlighter-rouge">Coder</code> object. The
+<code class="highlighter-rouge">Coder</code> specifies how the elements in the <code class="highlighter-rouge">Collection</code> should be
+<a href="#element-type">encoded</a>.</p>
 
-<p class="language-py">To create a <code class="highlighter-rouge">PCollection</code> from an in-memory <code class="highlighter-rouge">list</code>, you use the Beam-provided <code class="highlighter-rouge">Create</code> transform. Apply this transform directly to your <code class="highlighter-rouge">Pipeline</code> object itself.</p>
+<p class="language-py">To create a <code class="highlighter-rouge">PCollection</code> from an in-memory <code class="highlighter-rouge">list</code>, you use the Beam-provided
+<code class="highlighter-rouge">Create</code> transform. Apply this transform directly to your <code class="highlighter-rouge">Pipeline</code> object
+itself.</p>
 
-<p>The following example code shows how to create a <code class="highlighter-rouge">PCollection</code> from an in-memory <span class="language-java"><code class="highlighter-rouge">List</code></span><span class="language-py"><code class="highlighter-rouge">list</code></span>:</p>
+<p>The following example code shows how to create a <code class="highlighter-rouge">PCollection</code> from an in-memory
+<span class="language-java"><code class="highlighter-rouge">List</code></span><span class="language-py"><code class="highlighter-rouge">list</code></span>:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span>
     <span class="c1">// Create a Java Collection, in this case a List of Strings.</span>
@@ -408,7 +633,6 @@
 <span class="o">}</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="k">with</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">(</span><span class="n">options</span><span class="o">=</span><span class="n">pipeline_options</span><span class="p">)</span> <span class="k">as</span> <span class="n">p</span><span class="p">:</span>
 
   <span class="n">lines</span> <span class="o">=</span> <span class="p">(</span><span class="n">p</span>
@@ -421,70 +645,142 @@
 </code></pre>
 </div>
 
-<h3 id="a-namepccharacteristicsapcollection-characteristics"><a name="pccharacteristics"></a>PCollection characteristics</h3>
+<h3 id="pcollection-characteristics">3.2. PCollection characteristics</h3>
 
-<p>A <code class="highlighter-rouge">PCollection</code> is owned by the specific <code class="highlighter-rouge">Pipeline</code> object for which it is created; multiple pipelines cannot share a <code class="highlighter-rouge">PCollection</code>. In some respects, a <code class="highlighter-rouge">PCollection</code> functions like a collection class. However, a <code class="highlighter-rouge">PCollection</code> can differ in a few key ways:</p>
+<p>A <code class="highlighter-rouge">PCollection</code> is owned by the specific <code class="highlighter-rouge">Pipeline</code> object for which it is
+created; multiple pipelines cannot share a <code class="highlighter-rouge">PCollection</code>. In some respects, a
+<code class="highlighter-rouge">PCollection</code> functions like a collection class. However, a <code class="highlighter-rouge">PCollection</code> can
+differ in a few key ways:</p>
 
-<h4 id="a-namepcelementtypeaelement-type"><a name="pcelementtype"></a>Element type</h4>
+<h4 id="element-type">3.2.1. Element type</h4>
 
-<p>The elements of a <code class="highlighter-rouge">PCollection</code> may be of any type, but must all be of the same type. However, to support distributed processing, Beam needs to be able to encode each individual element as a byte string (so elements can be passed around to distributed workers). The Beam SDKs provide a data encoding mechanism that includes built-in encoding for commonly-used types as well as support for specifying custom encodings as needed.</p>
+<p>The elements of a <code class="highlighter-rouge">PCollection</code> may be of any type, but must all be of the same
+type. However, to support distributed processing, Beam needs to be able to
+encode each individual element as a byte string (so elements can be passed
+around to distributed workers). The Beam SDKs provide a data encoding mechanism
+that includes built-in encoding for commonly-used types as well as support for
+specifying custom encodings as needed.</p>
 
-<h4 id="a-namepcimmutabilityaimmutability"><a name="pcimmutability"></a>Immutability</h4>
+<h4 id="immutability">3.2.2. Immutability</h4>
 
-<p>A <code class="highlighter-rouge">PCollection</code> is immutable. Once created, you cannot add, remove, or change individual elements. A Beam Transform might process each element of a <code class="highlighter-rouge">PCollection</code> and generate new pipeline data (as a new <code class="highlighter-rouge">PCollection</code>), <em>but it does not consume or modify the original input collection</em>.</p>
+<p>A <code class="highlighter-rouge">PCollection</code> is immutable. Once created, you cannot add, remove, or change
+individual elements. A Beam Transform might process each element of a
+<code class="highlighter-rouge">PCollection</code> and generate new pipeline data (as a new <code class="highlighter-rouge">PCollection</code>), <em>but it
+does not consume or modify the original input collection</em>.</p>
 
-<h4 id="a-namepcrandomaccessarandom-access"><a name="pcrandomaccess"></a>Random access</h4>
+<h4 id="random-access">3.2.3. Random access</h4>
 
-<p>A <code class="highlighter-rouge">PCollection</code> does not support random access to individual elements. Instead, Beam Transforms consider every element in a <code class="highlighter-rouge">PCollection</code> individually.</p>
+<p>A <code class="highlighter-rouge">PCollection</code> does not support random access to individual elements. Instead,
+Beam Transforms consider every element in a <code class="highlighter-rouge">PCollection</code> individually.</p>
 
-<h4 id="a-namepcsizeboundasize-and-boundedness"><a name="pcsizebound"></a>Size and boundedness</h4>
+<h4 id="size-and-boundedness">3.2.4. Size and boundedness</h4>
 
-<p>A <code class="highlighter-rouge">PCollection</code> is a large, immutable “bag” of elements. There is no upper limit on how many elements a <code class="highlighter-rouge">PCollection</code> can contain; any given <code class="highlighter-rouge">PCollection</code> might fit in memory on a single machine, or it might represent a very large distributed data set backed by a persistent data store.</p>
+<p>A <code class="highlighter-rouge">PCollection</code> is a large, immutable “bag” of elements. There is no upper limit
+on how many elements a <code class="highlighter-rouge">PCollection</code> can contain; any given <code class="highlighter-rouge">PCollection</code> might
+fit in memory on a single machine, or it might represent a very large
+distributed data set backed by a persistent data store.</p>
 
-<p>A <code class="highlighter-rouge">PCollection</code> can be either <strong>bounded</strong> or <strong>unbounded</strong> in size. A <strong>bounded</strong> <code class="highlighter-rouge">PCollection</code> represents a data set of a known, fixed size, while an <strong>unbounded</strong> <code class="highlighter-rouge">PCollection</code> represents a data set of unlimited size. Whether a <code class="highlighter-rouge">PCollection</code> is bounded or unbounded depends on the source [...]
+<p>A <code class="highlighter-rouge">PCollection</code> can be either <strong>bounded</strong> or <strong>unbounded</strong> in size. A
+<strong>bounded</strong> <code class="highlighter-rouge">PCollection</code> represents a data set of a known, fixed size, while an
+<strong>unbounded</strong> <code class="highlighter-rouge">PCollection</code> represents a data set of unlimited size. Whether a
+<code class="highlighter-rouge">PCollection</code> is bounded or unbounded depends on the source of the data set that
+it represents. Reading from a batch data source, such as a file or a database,
+creates a bounded <code class="highlighter-rouge">PCollection</code>. Reading from a streaming or
+continously-updating data source, such as Pub/Sub or Kafka, creates an unbounded
+<code class="highlighter-rouge">PCollection</code> (unless you explicitly tell it not to).</p>
 
-<p>The bounded (or unbounded) nature of your <code class="highlighter-rouge">PCollection</code> affects how Beam processes your data. A bounded <code class="highlighter-rouge">PCollection</code> can be processed using a batch job, which might read the entire data set once, and perform processing in a job of finite length. An unbounded <code class="highlighter-rouge">PCollection</code> must be processed using a streaming job that runs continuously, as the entire collection can never be av [...]
+<p>The bounded (or unbounded) nature of your <code class="highlighter-rouge">PCollection</code> affects how Beam
+processes your data. A bounded <code class="highlighter-rouge">PCollection</code> can be processed using a batch job,
+which might read the entire data set once, and perform processing in a job of
+finite length. An unbounded <code class="highlighter-rouge">PCollection</code> must be processed using a streaming
+job that runs continuously, as the entire collection can never be available for
+processing at any one time.</p>
 
-<p>When performing an operation that groups elements in an unbounded <code class="highlighter-rouge">PCollection</code>, Beam requires a concept called <strong>windowing</strong> to divide a continuously updating data set into logical windows of finite size.  Beam processes each window as a bundle, and processing continues as the data set is generated. These logical windows are determined by some characteristic associated with a data element, such as a <strong>timestamp</strong>.</p>
+<p>When performing an operation that groups elements in an unbounded <code class="highlighter-rouge">PCollection</code>,
+Beam requires a concept called <strong>windowing</strong> to divide a continuously updating
+data set into logical windows of finite size.  Beam processes each window as a
+bundle, and processing continues as the data set is generated. These logical
+windows are determined by some characteristic associated with a data element,
+such as a <strong>timestamp</strong>.</p>
 
-<h4 id="a-namepctimestampsaelement-timestamps"><a name="pctimestamps"></a>Element timestamps</h4>
+<h4 id="element-timestamps">3.2.5. Element timestamps</h4>
 
-<p>Each element in a <code class="highlighter-rouge">PCollection</code> has an associated intrinsic <strong>timestamp</strong>. The timestamp for each element is initially assigned by the <a href="#io">Source</a> that creates the <code class="highlighter-rouge">PCollection</code>. Sources that create an unbounded <code class="highlighter-rouge">PCollection</code> often assign each new element a timestamp that corresponds to when the element was read or added.</p>
+<p>Each element in a <code class="highlighter-rouge">PCollection</code> has an associated intrinsic <strong>timestamp</strong>. The
+timestamp for each element is initially assigned by the <a href="#pipeline-io">Source</a>
+that creates the <code class="highlighter-rouge">PCollection</code>. Sources that create an unbounded <code class="highlighter-rouge">PCollection</code>
+often assign each new element a timestamp that corresponds to when the element
+was read or added.</p>
 
 <blockquote>
-  <p><strong>Note</strong>: Sources that create a bounded <code class="highlighter-rouge">PCollection</code> for a fixed data set also automatically assign timestamps, but the most common behavior is to assign every element the same timestamp (<code class="highlighter-rouge">Long.MIN_VALUE</code>).</p>
+  <p><strong>Note</strong>: Sources that create a bounded <code class="highlighter-rouge">PCollection</code> for a fixed data set
+also automatically assign timestamps, but the most common behavior is to
+assign every element the same timestamp (<code class="highlighter-rouge">Long.MIN_VALUE</code>).</p>
 </blockquote>
 
-<p>Timestamps are useful for a <code class="highlighter-rouge">PCollection</code> that contains elements with an inherent notion of time. If your pipeline is reading a stream of events, like Tweets or other social media messages, each element might use the time the event was posted as the element timestamp.</p>
-
-<p>You can manually assign timestamps to the elements of a <code class="highlighter-rouge">PCollection</code> if the source doesn’t do it for you. You’ll want to do this if the elements have an inherent timestamp, but the timestamp is somewhere in the structure of the element itself (such as a “time” field in a server log entry). Beam has <a href="#transforms">Transforms</a> that take a <code class="highlighter-rouge">PCollection</code> as input and output an identical <code class="highl [...]
-
-<h2 id="a-nametransformsaapplying-transforms"><a name="transforms"></a>Applying transforms</h2>
-
-<p>In the Beam SDKs, <strong>transforms</strong> are the operations in your pipeline. A transform takes a <code class="highlighter-rouge">PCollection</code> (or more than one <code class="highlighter-rouge">PCollection</code>) as input, performs an operation that you specify on each element in that collection, and produces a new output <code class="highlighter-rouge">PCollection</code>. To invoke a transform, you must <strong>apply</strong> it to the input <code class="highlighter-rouge" [...]
-
-<p>The Beam SDKs contain a number of different transforms that you can apply to your pipeline’s <code class="highlighter-rouge">PCollection</code>s. These include general-purpose core transforms, such as <a href="/documentation/programming-guide/#transforms-pardo">ParDo</a> or <a href="/documentation/programming-guide/#transforms-combine">Combine</a>. There are also pre-written <a href="/documentation/programming-guide/#transforms-composite">composite transforms</a> included in the SDKs, [...]
-
-<p>Each transform in the Beam SDKs has a generic <code class="highlighter-rouge">apply</code> method <span class="language-py">(or pipe operator <code class="highlighter-rouge">|</code>)</span>. Invoking multiple Beam transforms is similar to <em>method chaining</em>, but with one slight difference: You apply the transform to the input <code class="highlighter-rouge">PCollection</code>, passing the transform itself as an argument, and the operation returns the output <code class="highlig [...]
+<p>Timestamps are useful for a <code class="highlighter-rouge">PCollection</code> that contains elements with an
+inherent notion of time. If your pipeline is reading a stream of events, like
+Tweets or other social media messages, each element might use the time the event
+was posted as the element timestamp.</p>
+
+<p>You can manually assign timestamps to the elements of a <code class="highlighter-rouge">PCollection</code> if the
+source doesn’t do it for you. You’ll want to do this if the elements have an
+inherent timestamp, but the timestamp is somewhere in the structure of the
+element itself (such as a “time” field in a server log entry). Beam has
+<a href="#transforms">Transforms</a> that take a <code class="highlighter-rouge">PCollection</code> as input and output an
+identical <code class="highlighter-rouge">PCollection</code> with timestamps attached; see <a href="#adding-timestamps-to-a-pcollections-elements">Assigning
+Timestamps</a> for more information
+about how to do so.</p>
+
+<h2 id="transforms">4. Transforms</h2>
+
+<p>Transforms are the operations in your pipeline, and provide a generic
+processing framework. You provide processing logic in the form of a function
+object (colloquially referred to as “user code”), and your user code is applied
+to each element of an input <code class="highlighter-rouge">PCollection</code> (or more than one <code class="highlighter-rouge">PCollection</code>).
+Depending on the pipeline runner and back-end that you choose, many different
+workers across a cluster may execute instances of your user code in parallel.
+The user code running on each worker generates the output elements that are
+ultimately added to the final output <code class="highlighter-rouge">PCollection</code> that the transform produces.</p>
+
+<p>The Beam SDKs contain a number of different transforms that you can apply to
+your pipeline’s <code class="highlighter-rouge">PCollection</code>s. These include general-purpose core transforms,
+such as <a href="#pardo">ParDo</a> or <a href="#combine">Combine</a>. There are also pre-written
+<a href="#composite-transforms">composite transforms</a> included in the SDKs, which
+combine one or more of the core transforms in a useful processing pattern, such
+as counting or combining elements in a collection. You can also define your own
+more complex composite transforms to fit your pipeline’s exact use case.</p>
+
+<h3 id="applying-transforms">4.1. Applying transforms</h3>
+
+<p>To invoke a transform, you must <strong>apply</strong> it to the input <code class="highlighter-rouge">PCollection</code>. Each
+transform in the Beam SDKs has a generic <code class="highlighter-rouge">apply</code> method <span class="language-py">(or pipe operator <code class="highlighter-rouge">|</code>)</span>.
+Invoking multiple Beam transforms is similar to <em>method chaining</em>, but with one
+slight difference: You apply the transform to the input <code class="highlighter-rouge">PCollection</code>, passing
+the transform itself as an argument, and the operation returns the output
+<code class="highlighter-rouge">PCollection</code>. This takes the general form:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="o">[</span><span class="n">Output</span> <span class="n">PCollection</span><span class="o">]</span> <span class="o">=</span> <span class="o">[</span><span class="n">Input</span> <span class="n">PCollection</span><span class="o">].</span><span class="na">apply</span><span class="o">([</span><span class="n">Transform</span><span class="o">])</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="p">[</span><span class="n">Output</span> <span class="n">PCollection</span><span class="p">]</span> <span class="o">=</span> <span class="p">[</span><span class="n">Input</span> <span class="n">PCollection</span><span class="p">]</span> <span class="o">|</span> <span class="p">[</span><span class="n">Transform</span><span class="p">]</span>
 </code></pre>
 </div>
 
-<p>Because Beam uses a generic <code class="highlighter-rouge">apply</code> method for <code class="highlighter-rouge">PCollection</code>, you can both chain transforms sequentially and also apply transforms that contain other transforms nested within (called <a href="#transforms-composite">composite transforms</a> in the Beam SDKs).</p>
+<p>Because Beam uses a generic <code class="highlighter-rouge">apply</code> method for <code class="highlighter-rouge">PCollection</code>, you can both chain
+transforms sequentially and also apply transforms that contain other transforms
+nested within (called <a href="#composite-transforms">composite transforms</a> in the Beam
+SDKs).</p>
 
-<p>How you apply your pipeline’s transforms determines the structure of your pipeline. The best way to think of your pipeline is as a directed acyclic graph, where the nodes are <code class="highlighter-rouge">PCollection</code>s and the edges are transforms. For example, you can chain transforms to create a sequential pipeline, like this one:</p>
+<p>How you apply your pipeline’s transforms determines the structure of your
+pipeline. The best way to think of your pipeline is as a directed acyclic graph,
+where the nodes are <code class="highlighter-rouge">PCollection</code>s and the edges are transforms. For example,
+you can chain transforms to create a sequential pipeline, like this one:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="o">[</span><span class="n">Final</span> <span class="n">Output</span> <span class="n">PCollection</span><span class="o">]</span> <span class="o">=</span> <span class="o">[</span><span class="n">Initial</span> <span class="n">Input</span> <span class="n">PCollection</span><span class="o">].</span><span class="na">apply</span><span class="o">([</span><span class="n">First</span> <span class="n">Transform [...]
 <span class="o">.</span><span class="na">apply</span><span class="o">([</span><span class="n">Second</span> <span class="n">Transform</span><span class="o">])</span>
 <span class="o">.</span><span class="na">apply</span><span class="o">([</span><span class="n">Third</span> <span class="n">Transform</span><span class="o">])</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="p">[</span><span class="n">Final</span> <span class="n">Output</span> <span class="n">PCollection</span><span class="p">]</span> <span class="o">=</span> <span class="p">([</span><span class="n">Initial</span> <span class="n">Input</span> <span class="n">PCollection</span><span class="p">]</span> <span class="o">|</span> <span class="p">[</span><span class="n">First</span> <span class="n">Transform</span [...]
               <span class="o">|</span> <span class="p">[</span><span class="n">Second</span> <span class="n">Transform</span><span class="p">]</span>
               <span class="o">|</span> <span class="p">[</span><span class="n">Third</span> <span class="n">Transform</span><span class="p">])</span>
@@ -495,13 +791,15 @@
 
 <p>[Sequential Graph Graphic]</p>
 
-<p>However, note that a transform <em>does not consume or otherwise alter</em> the input collection–remember that a <code class="highlighter-rouge">PCollection</code> is immutable by definition. This means that you can apply multiple transforms to the same input <code class="highlighter-rouge">PCollection</code> to create a branching pipeline, like so:</p>
+<p>However, note that a transform <em>does not consume or otherwise alter</em> the input
+collection–remember that a <code class="highlighter-rouge">PCollection</code> is immutable by definition. This means
+that you can apply multiple transforms to the same input <code class="highlighter-rouge">PCollection</code> to create
+a branching pipeline, like so:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="o">[</span><span class="n">Output</span> <span class="n">PCollection</span> <span class="mi">1</span><span class="o">]</span> <span class="o">=</span> <span class="o">[</span><span class="n">Input</span> <span class="n">PCollection</span><span class="o">].</span><span class="na">apply</span><span class="o">([</span><span class="n">Transform</span> <span class="mi">1</span><span class="o">])</span>
 <span class="o">[</span><span class="n">Output</span> <span class="n">PCollection</span> <span class="mi">2</span><span class="o">]</span> <span class="o">=</span> <span class="o">[</span><span class="n">Input</span> <span class="n">PCollection</span><span class="o">].</span><span class="na">apply</span><span class="o">([</span><span class="n">Transform</span> <span class="mi">2</span><span class="o">])</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="p">[</span><span class="n">Output</span> <span class="n">PCollection</span> <span class="mi">1</span><span class="p">]</span> <span class="o">=</span> <span class="p">[</span><span class="n">Input</span> <span class="n">PCollection</span><span class="p">]</span> <span class="o">|</span> <span class="p">[</span><span class="n">Transform</span> <span class="mi">1</span><span class="p">]</span>
 <span class="p">[</span><span class="n">Output</span> <span class="n">PCollection</span> <span class="mi">2</span><span class="p">]</span> <span class="o">=</span> <span class="p">[</span><span class="n">Input</span> <span class="n">PCollection</span><span class="p">]</span> <span class="o">|</span> <span class="p">[</span><span class="n">Transform</span> <span class="mi">2</span><span class="p">]</span>
 </code></pre>
@@ -511,47 +809,72 @@
 
 <p>[Branching Graph Graphic]</p>
 
-<p>You can also build your own <a href="#transforms-composite">composite transforms</a> that nest multiple sub-steps inside a single, larger transform. Composite transforms are particularly useful for building a reusable sequence of simple steps that get used in a lot of different places.</p>
-
-<h3 id="transforms-in-the-beam-sdk">Transforms in the Beam SDK</h3>
+<p>You can also build your own <a href="#composite-transforms">composite transforms</a> that
+nest multiple sub-steps inside a single, larger transform. Composite transforms
+are particularly useful for building a reusable sequence of simple steps that
+get used in a lot of different places.</p>
 
-<p>The transforms in the Beam SDKs provide a generic <strong>processing framework</strong>, where you provide processing logic in the form of a function object (colloquially referred to as “user code”). The user code gets applied to the elements of the input <code class="highlighter-rouge">PCollection</code>. Instances of your user code might then be executed in parallel by many different workers across a cluster, depending on the pipeline runner and back-end that you choose to execute y [...]
+<h3 id="core-beam-transforms">4.2. Core Beam transforms</h3>
 
-<h3 id="core-beam-transforms">Core Beam transforms</h3>
-
-<p>Beam provides the following transforms, each of which represents a different processing paradigm:</p>
+<p>Beam provides the following core transforms, each of which represents a different
+processing paradigm:</p>
 
 <ul>
   <li><code class="highlighter-rouge">ParDo</code></li>
   <li><code class="highlighter-rouge">GroupByKey</code></li>
+  <li><code class="highlighter-rouge">CoGroupByKey</code></li>
   <li><code class="highlighter-rouge">Combine</code></li>
-  <li><code class="highlighter-rouge">Flatten</code> and <code class="highlighter-rouge">Partition</code></li>
+  <li><code class="highlighter-rouge">Flatten</code></li>
+  <li><code class="highlighter-rouge">Partition</code></li>
 </ul>
 
-<h4 id="a-nametransforms-pardoapardo"><a name="transforms-pardo"></a>ParDo</h4>
+<h4 id="pardo">4.2.1. ParDo</h4>
 
-<p><code class="highlighter-rouge">ParDo</code> is a Beam transform for generic parallel processing. The <code class="highlighter-rouge">ParDo</code> processing paradigm is similar to the “Map” phase of a Map/Shuffle/Reduce-style algorithm: a <code class="highlighter-rouge">ParDo</code> transform considers each element in the input <code class="highlighter-rouge">PCollection</code>, performs some processing function (your user code) on that element, and emits zero, one, or multiple eleme [...]
+<p><code class="highlighter-rouge">ParDo</code> is a Beam transform for generic parallel processing. The <code class="highlighter-rouge">ParDo</code>
+processing paradigm is similar to the “Map” phase of a Map/Shuffle/Reduce-style
+algorithm: a <code class="highlighter-rouge">ParDo</code> transform considers each element in the input
+<code class="highlighter-rouge">PCollection</code>, performs some processing function (your user code) on that
+element, and emits zero, one, or multiple elements to an output <code class="highlighter-rouge">PCollection</code>.</p>
 
 <p><code class="highlighter-rouge">ParDo</code> is useful for a variety of common data processing operations, including:</p>
 
 <ul>
-  <li><strong>Filtering a data set.</strong> You can use <code class="highlighter-rouge">ParDo</code> to consider each element in a <code class="highlighter-rouge">PCollection</code> and either output that element to a new collection, or discard it.</li>
-  <li><strong>Formatting or type-converting each element in a data set.</strong> If your input <code class="highlighter-rouge">PCollection</code> contains elements that are of a different type or format than you want, you can use <code class="highlighter-rouge">ParDo</code> to perform a conversion on each element and output the result to a new <code class="highlighter-rouge">PCollection</code>.</li>
-  <li><strong>Extracting parts of each element in a data set.</strong> If you have a <code class="highlighter-rouge">PCollection</code> of records with multiple fields, for example, you can use a <code class="highlighter-rouge">ParDo</code> to parse out just the fields you want to consider into a new <code class="highlighter-rouge">PCollection</code>.</li>
-  <li><strong>Performing computations on each element in a data set.</strong> You can use <code class="highlighter-rouge">ParDo</code> to perform simple or complex computations on every element, or certain elements, of a <code class="highlighter-rouge">PCollection</code> and output the results as a new <code class="highlighter-rouge">PCollection</code>.</li>
+  <li><strong>Filtering a data set.</strong> You can use <code class="highlighter-rouge">ParDo</code> to consider each element in a
+<code class="highlighter-rouge">PCollection</code> and either output that element to a new collection, or discard
+it.</li>
+  <li><strong>Formatting or type-converting each element in a data set.</strong> If your input
+<code class="highlighter-rouge">PCollection</code> contains elements that are of a different type or format than
+you want, you can use <code class="highlighter-rouge">ParDo</code> to perform a conversion on each element and
+output the result to a new <code class="highlighter-rouge">PCollection</code>.</li>
+  <li><strong>Extracting parts of each element in a data set.</strong> If you have a
+<code class="highlighter-rouge">PCollection</code> of records with multiple fields, for example, you can use a
+<code class="highlighter-rouge">ParDo</code> to parse out just the fields you want to consider into a new
+<code class="highlighter-rouge">PCollection</code>.</li>
+  <li><strong>Performing computations on each element in a data set.</strong> You can use <code class="highlighter-rouge">ParDo</code>
+to perform simple or complex computations on every element, or certain
+elements, of a <code class="highlighter-rouge">PCollection</code> and output the results as a new <code class="highlighter-rouge">PCollection</code>.</li>
 </ul>
 
-<p>In such roles, <code class="highlighter-rouge">ParDo</code> is a common intermediate step in a pipeline. You might use it to extract certain fields from a set of raw input records, or convert raw input into a different format; you might also use <code class="highlighter-rouge">ParDo</code> to convert processed data into a format suitable for output, like database table rows or printable strings.</p>
+<p>In such roles, <code class="highlighter-rouge">ParDo</code> is a common intermediate step in a pipeline. You might
+use it to extract certain fields from a set of raw input records, or convert raw
+input into a different format; you might also use <code class="highlighter-rouge">ParDo</code> to convert processed
+data into a format suitable for output, like database table rows or printable
+strings.</p>
 
-<p>When you apply a <code class="highlighter-rouge">ParDo</code> transform, you’ll need to provide user code in the form of a <code class="highlighter-rouge">DoFn</code> object. <code class="highlighter-rouge">DoFn</code> is a Beam SDK class that defines a distributed processing function.</p>
+<p>When you apply a <code class="highlighter-rouge">ParDo</code> transform, you’ll need to provide user code in the form
+of a <code class="highlighter-rouge">DoFn</code> object. <code class="highlighter-rouge">DoFn</code> is a Beam SDK class that defines a distributed
+processing function.</p>
 
 <blockquote>
-  <p>When you create a subclass of <code class="highlighter-rouge">DoFn</code>, note that your subclass should adhere to the <a href="#transforms-usercodereqs">General Requirements for Writing User Code for Beam Transforms</a>.</p>
+  <p>When you create a subclass of <code class="highlighter-rouge">DoFn</code>, note that your subclass should adhere to
+the <a href="#requirements-for-writing-user-code-for-beam-transforms">Requirements for writing user code for Beam transforms</a>.</p>
 </blockquote>
 
-<h5 id="applying-pardo">Applying ParDo</h5>
+<h5 id="applying-pardo">4.2.1.1. Applying ParDo</h5>
 
-<p>Like all Beam transforms, you apply <code class="highlighter-rouge">ParDo</code> by calling the <code class="highlighter-rouge">apply</code> method on the input <code class="highlighter-rouge">PCollection</code> and passing <code class="highlighter-rouge">ParDo</code> as an argument, as shown in the following example code:</p>
+<p>Like all Beam transforms, you apply <code class="highlighter-rouge">ParDo</code> by calling the <code class="highlighter-rouge">apply</code> method on the
+input <code class="highlighter-rouge">PCollection</code> and passing <code class="highlighter-rouge">ParDo</code> as an argument, as shown in the
+following example code:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// The input PCollection of Strings.</span>
 <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...;</span>
@@ -566,7 +889,6 @@
                                             <span class="c1">// we define above.</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># The input PCollection of Strings.</span>
 <span class="n">words</span> <span class="o">=</span> <span class="o">...</span>
 
@@ -580,25 +902,48 @@
 </code></pre>
 </div>
 
-<p>In the example, our input <code class="highlighter-rouge">PCollection</code> contains <code class="highlighter-rouge">String</code> values. We apply a <code class="highlighter-rouge">ParDo</code> transform that specifies a function (<code class="highlighter-rouge">ComputeWordLengthFn</code>) to compute the length of each string, and outputs the result to a new <code class="highlighter-rouge">PCollection</code> of <code class="highlighter-rouge">Integer</code> values that stores the le [...]
+<p>In the example, our input <code class="highlighter-rouge">PCollection</code> contains <code class="highlighter-rouge">String</code> values. We apply a
+<code class="highlighter-rouge">ParDo</code> transform that specifies a function (<code class="highlighter-rouge">ComputeWordLengthFn</code>) to compute
+the length of each string, and outputs the result to a new <code class="highlighter-rouge">PCollection</code> of
+<code class="highlighter-rouge">Integer</code> values that stores the length of each word.</p>
 
-<h5 id="creating-a-dofn">Creating a DoFn</h5>
+<h5 id="creating-a-dofn">4.2.1.2. Creating a DoFn</h5>
 
-<p>The <code class="highlighter-rouge">DoFn</code> object that you pass to <code class="highlighter-rouge">ParDo</code> contains the processing logic that gets applied to the elements in the input collection. When you use Beam, often the most important pieces of code you’ll write are these <code class="highlighter-rouge">DoFn</code>s–they’re what define your pipeline’s exact data processing tasks.</p>
+<p>The <code class="highlighter-rouge">DoFn</code> object that you pass to <code class="highlighter-rouge">ParDo</code> contains the processing logic that
+gets applied to the elements in the input collection. When you use Beam, often
+the most important pieces of code you’ll write are these <code class="highlighter-rouge">DoFn</code>s–they’re what
+define your pipeline’s exact data processing tasks.</p>
 
 <blockquote>
-  <p><strong>Note:</strong> When you create your <code class="highlighter-rouge">DoFn</code>, be mindful of the <a href="#transforms-usercodereqs">General Requirements for Writing User Code for Beam Transforms</a> and ensure that your code follows them.</p>
+  <p><strong>Note:</strong> When you create your <code class="highlighter-rouge">DoFn</code>, be mindful of the <a href="#requirements-for-writing-user-code-for-beam-transforms">Requirements
+for writing user code for Beam transforms</a>
+and ensure that your code follows them.</p>
 </blockquote>
 
-<p class="language-java">A <code class="highlighter-rouge">DoFn</code> processes one element at a time from the input <code class="highlighter-rouge">PCollection</code>. When you create a subclass of <code class="highlighter-rouge">DoFn</code>, you’ll need to provide type parameters that match the types of the input and output elements. If your <code class="highlighter-rouge">DoFn</code> processes incoming <code class="highlighter-rouge">String</code> elements and produces <code class="h [...]
+<p class="language-java">A <code class="highlighter-rouge">DoFn</code> processes one element at a time from the input <code class="highlighter-rouge">PCollection</code>. When you
+create a subclass of <code class="highlighter-rouge">DoFn</code>, you’ll need to provide type parameters that match
+the types of the input and output elements. If your <code class="highlighter-rouge">DoFn</code> processes incoming
+<code class="highlighter-rouge">String</code> elements and produces <code class="highlighter-rouge">Integer</code> elements for the output collection
+(like our previous example, <code class="highlighter-rouge">ComputeWordLengthFn</code>), your class declaration would
+look like this:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">static</span> <span class="kd">class</span> <span class="nc">ComputeWordLengthFn</span> <span class="kd">extends</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span>
 </code></pre>
 </div>
 
-<p class="language-java">Inside your <code class="highlighter-rouge">DoFn</code> subclass, you’ll write a method annotated with <code class="highlighter-rouge">@ProcessElement</code> where you provide the actual processing logic. You don’t need to manually extract the elements from the input collection; the Beam SDKs handle that for you. Your <code class="highlighter-rouge">@ProcessElement</code> method should accept an object of type <code class="highlighter-rouge">ProcessContext</code> [...]
+<p class="language-java">Inside your <code class="highlighter-rouge">DoFn</code> subclass, you’ll write a method annotated with
+<code class="highlighter-rouge">@ProcessElement</code> where you provide the actual processing logic. You don’t need
+to manually extract the elements from the input collection; the Beam SDKs handle
+that for you. Your <code class="highlighter-rouge">@ProcessElement</code> method should accept an object of type
+<code class="highlighter-rouge">ProcessContext</code>. The <code class="highlighter-rouge">ProcessContext</code> object gives you access to an input
+element and a method for emitting an output element:</p>
 
-<p class="language-py">Inside your <code class="highlighter-rouge">DoFn</code> subclass, you’ll write a method <code class="highlighter-rouge">process</code> where you provide the actual processing logic. You don’t need to manually extract the elements from the input collection; the Beam SDKs handle that for you. Your <code class="highlighter-rouge">process</code> method should accept an object of type <code class="highlighter-rouge">element</code>. This is the input element and output i [...]
+<p class="language-py">Inside your <code class="highlighter-rouge">DoFn</code> subclass, you’ll write a method <code class="highlighter-rouge">process</code> where you provide
+the actual processing logic. You don’t need to manually extract the elements
+from the input collection; the Beam SDKs handle that for you. Your <code class="highlighter-rouge">process</code>
+method should accept an object of type <code class="highlighter-rouge">element</code>. This is the input element and
+output is emitted by using <code class="highlighter-rouge">yield</code> or <code class="highlighter-rouge">return</code> statement inside <code class="highlighter-rouge">process</code>
+method.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">static</span> <span class="kd">class</span> <span class="nc">ComputeWordLengthFn</span> <span class="kd">extends</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="o">{</span>
   <span class="nd">@ProcessElement</span>
@@ -611,7 +956,6 @@
 <span class="o">}</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="k">class</span> <span class="nc">ComputeWordLengthFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span>
   <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span>
     <span class="k">return</span> <span class="p">[</span><span class="nb">len</span><span class="p">(</span><span class="n">element</span><span class="p">)]</span>
@@ -620,23 +964,42 @@
 </div>
 
 <blockquote class="language-java">
-  <p><strong>Note:</strong> If the elements in your input <code class="highlighter-rouge">PCollection</code> are key/value pairs, you can access the key or value by using <code class="highlighter-rouge">ProcessContext.element().getKey()</code> or <code class="highlighter-rouge">ProcessContext.element().getValue()</code>, respectively.</p>
+  <p><strong>Note:</strong> If the elements in your input <code class="highlighter-rouge">PCollection</code> are key/value pairs, you
+can access the key or value by using <code class="highlighter-rouge">ProcessContext.element().getKey()</code> or
+<code class="highlighter-rouge">ProcessContext.element().getValue()</code>, respectively.</p>
 </blockquote>
 
-<p>A given <code class="highlighter-rouge">DoFn</code> instance generally gets invoked one or more times to process some arbitrary bundle of elements. However, Beam doesn’t guarantee an exact number of invocations; it may be invoked multiple times on a given worker node to account for failures and retries. As such, you can cache information across multiple calls to your processing method, but if you do so, make sure the implementation <strong>does not depend on the number of invocations< [...]
+<p>A given <code class="highlighter-rouge">DoFn</code> instance generally gets invoked one or more times to process some
+arbitrary bundle of elements. However, Beam doesn’t guarantee an exact number of
+invocations; it may be invoked multiple times on a given worker node to account
+for failures and retries. As such, you can cache information across multiple
+calls to your processing method, but if you do so, make sure the implementation
+<strong>does not depend on the number of invocations</strong>.</p>
 
-<p>In your processing method, you’ll also need to meet some immutability requirements to ensure that Beam and the processing back-end can safely serialize and cache the values in your pipeline. Your method should meet the following requirements:</p>
+<p>In your processing method, you’ll also need to meet some immutability
+requirements to ensure that Beam and the processing back-end can safely
+serialize and cache the values in your pipeline. Your method should meet the
+following requirements:</p>
 
 <ul class="language-java">
-  <li>You should not in any way modify an element returned by <code class="highlighter-rouge">ProcessContext.element()</code> or <code class="highlighter-rouge">ProcessContext.sideInput()</code> (the incoming elements from the input collection).</li>
-  <li>Once you output a value using <code class="highlighter-rouge">ProcessContext.output()</code> or <code class="highlighter-rouge">ProcessContext.sideOutput()</code>, you should not modify that value in any way.</li>
+  <li>You should not in any way modify an element returned by
+<code class="highlighter-rouge">ProcessContext.element()</code> or <code class="highlighter-rouge">ProcessContext.sideInput()</code> (the incoming
+elements from the input collection).</li>
+  <li>Once you output a value using <code class="highlighter-rouge">ProcessContext.output()</code> or
+<code class="highlighter-rouge">ProcessContext.sideOutput()</code>, you should not modify that value in any way.</li>
 </ul>
 
-<h5 id="lightweight-dofns-and-other-abstractions">Lightweight DoFns and other abstractions</h5>
+<h5 id="lightweight-dofns-and-other-abstractions">4.2.1.3. Lightweight DoFns and other abstractions</h5>
 
-<p>If your function is relatively straightforward, you can simplify your use of <code class="highlighter-rouge">ParDo</code> by providing a lightweight <code class="highlighter-rouge">DoFn</code> in-line, as <span class="language-java">an anonymous inner class instance</span><span class="language-py">a lambda function</span>.</p>
+<p>If your function is relatively straightforward, you can simplify your use of
+<code class="highlighter-rouge">ParDo</code> by providing a lightweight <code class="highlighter-rouge">DoFn</code> in-line, as
+<span class="language-java">an anonymous inner class instance</span>
+<span class="language-py">a lambda function</span>.</p>
 
-<p>Here’s the previous example, <code class="highlighter-rouge">ParDo</code> with <code class="highlighter-rouge">ComputeLengthWordsFn</code>, with the <code class="highlighter-rouge">DoFn</code> specified as <span class="language-java">an anonymous inner class instance</span><span class="language-py">a lambda function</span>:</p>
+<p>Here’s the previous example, <code class="highlighter-rouge">ParDo</code> with <code class="highlighter-rouge">ComputeLengthWordsFn</code>, with the
+<code class="highlighter-rouge">DoFn</code> specified as
+<span class="language-java">an anonymous inner class instance</span>
+<span class="language-py">a lambda function</span>:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// The input PCollection.</span>
 <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...;</span>
@@ -663,9 +1026,15 @@
 </code></pre>
 </div>
 
-<p>If your <code class="highlighter-rouge">ParDo</code> performs a one-to-one mapping of input elements to output elements–that is, for each input element, it applies a function that produces <em>exactly one</em> output element, you can use the higher-level <span class="language-java"><code class="highlighter-rouge">MapElements</code></span><span class="language-py"><code class="highlighter-rouge">Map</code></span> transform. <span class="language-java"><code class="highlighter-rouge">Ma [...]
+<p>If your <code class="highlighter-rouge">ParDo</code> performs a one-to-one mapping of input elements to output
+elements–that is, for each input element, it applies a function that produces
+<em>exactly one</em> output element, you can use the higher-level
+<span class="language-java"><code class="highlighter-rouge">MapElements</code></span><span class="language-py"><code class="highlighter-rouge">Map</code></span>
+transform. <span class="language-java"><code class="highlighter-rouge">MapElements</code> can accept an anonymous
+Java 8 lambda function for additional brevity.</span></p>
 
-<p>Here’s the previous example using <span class="language-java"><code class="highlighter-rouge">MapElements</code></span><span class="language-py"><code class="highlighter-rouge">Map</code></span>:</p>
+<p>Here’s the previous example using <span class="language-java"><code class="highlighter-rouge">MapElements</code></span>
+<span class="language-py"><code class="highlighter-rouge">Map</code></span>:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// The input PCollection.</span>
 <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...;</span>
@@ -688,18 +1057,35 @@
 </div>
 
 <blockquote class="language-java">
-  <p><strong>Note:</strong> You can use Java 8 lambda functions with several other Beam transforms, including <code class="highlighter-rouge">Filter</code>, <code class="highlighter-rouge">FlatMapElements</code>, and <code class="highlighter-rouge">Partition</code>.</p>
+  <p><strong>Note:</strong> You can use Java 8 lambda functions with several other Beam
+transforms, including <code class="highlighter-rouge">Filter</code>, <code class="highlighter-rouge">FlatMapElements</code>, and <code class="highlighter-rouge">Partition</code>.</p>
 </blockquote>
 
-<h4 id="a-nametransforms-gbkausing-groupbykey"><a name="transforms-gbk"></a>Using GroupByKey</h4>
+<h4 id="groupbykey">4.2.2. GroupByKey</h4>
 
-<p><code class="highlighter-rouge">GroupByKey</code> is a Beam transform for processing collections of key/value pairs. It’s a parallel reduction operation, analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm. The input to <code class="highlighter-rouge">GroupByKey</code> is a collection of key/value pairs that represents a <em>multimap</em>, where the collection contains multiple pairs that have the same key, but different values. Given such a collection, you use <cod [...]
+<p><code class="highlighter-rouge">GroupByKey</code> is a Beam transform for processing collections of key/value pairs.
+It’s a parallel reduction operation, analogous to the Shuffle phase of a
+Map/Shuffle/Reduce-style algorithm. The input to <code class="highlighter-rouge">GroupByKey</code> is a collection of
+key/value pairs that represents a <em>multimap</em>, where the collection contains
+multiple pairs that have the same key, but different values. Given such a
+collection, you use <code class="highlighter-rouge">GroupByKey</code> to collect all of the values associated with
+each unique key.</p>
 
-<p><code class="highlighter-rouge">GroupByKey</code> is a good way to aggregate data that has something in common. For example, if you have a collection that stores records of customer orders, you might want to group together all the orders from the same postal code (wherein the “key” of the key/value pair is the postal code field, and the “value” is the remainder of the record).</p>
+<p><code class="highlighter-rouge">GroupByKey</code> is a good way to aggregate data that has something in common. For
+example, if you have a collection that stores records of customer orders, you
+might want to group together all the orders from the same postal code (wherein
+the “key” of the key/value pair is the postal code field, and the “value” is the
+remainder of the record).</p>
 
-<p>Let’s examine the mechanics of <code class="highlighter-rouge">GroupByKey</code> with a simple example case, where our data set consists of words from a text file and the line number on which they appear. We want to group together all the line numbers (values) that share the same word (key), letting us see all the places in the text where a particular word appears.</p>
+<p>Let’s examine the mechanics of <code class="highlighter-rouge">GroupByKey</code> with a simple example case, where
+our data set consists of words from a text file and the line number on which
+they appear. We want to group together all the line numbers (values) that share
+the same word (key), letting us see all the places in the text where a
+particular word appears.</p>
 
-<p>Our input is a <code class="highlighter-rouge">PCollection</code> of key/value pairs where each word is a key, and the value is a line number in the file where the word appears. Here’s a list of the key/value pairs in the input collection:</p>
+<p>Our input is a <code class="highlighter-rouge">PCollection</code> of key/value pairs where each word is a key, and
+the value is a line number in the file where the word appears. Here’s a list of
+the key/value pairs in the input collection:</p>
 
 <div class="highlighter-rouge"><pre class="highlight"><code>cat, 1
 dog, 5
@@ -715,7 +1101,10 @@ and, 6
 </code></pre>
 </div>
 
-<p><code class="highlighter-rouge">GroupByKey</code> gathers up all the values with the same key and outputs a new pair consisting of the unique key and a collection of all of the values that were associated with that key in the input collection. If we apply <code class="highlighter-rouge">GroupByKey</code> to our input collection above, the output collection would look like this:</p>
+<p><code class="highlighter-rouge">GroupByKey</code> gathers up all the values with the same key and outputs a new pair
+consisting of the unique key and a collection of all of the values that were
+associated with that key in the input collection. If we apply <code class="highlighter-rouge">GroupByKey</code> to
+our input collection above, the output collection would look like this:</p>
 
 <div class="highlighter-rouge"><pre class="highlight"><code>cat, [1,5,9]
 dog, [5,2]
@@ -726,11 +1115,15 @@ tree, [2]
 </code></pre>
 </div>
 
-<p>Thus, <code class="highlighter-rouge">GroupByKey</code> represents a transform from a multimap (multiple keys to individual values) to a uni-map (unique keys to collections of values).</p>
+<p>Thus, <code class="highlighter-rouge">GroupByKey</code> represents a transform from a multimap (multiple keys to
+individual values) to a uni-map (unique keys to collections of values).</p>
 
-<h5 id="joins-with-cogroupbykey"><strong>Joins with CoGroupByKey</strong></h5>
+<h4 id="cogroupbykey">4.2.3. CoGroupByKey</h4>
 
-<p><code class="highlighter-rouge">CoGroupByKey</code> joins two or more key/value <code class="highlighter-rouge">PCollection</code>s that have the same key type, and then emits a collection of <code class="highlighter-rouge">KV&lt;K, CoGbkResult&gt;</code> pairs. <a href="/documentation/pipelines/design-your-pipeline/#multiple-sources">Design Your Pipeline</a> shows an example pipeline that uses a join.</p>
+<p><code class="highlighter-rouge">CoGroupByKey</code> joins two or more key/value <code class="highlighter-rouge">PCollection</code>s that have the same key
+type, and then emits a collection of <code class="highlighter-rouge">KV&lt;K, CoGbkResult&gt;</code> pairs. <a href="/documentation/pipelines/design-your-pipeline/#multiple-sources">Design Your
+Pipeline</a>
+shows an example pipeline that uses a join.</p>
 
 <p>Given the input collections below:</p>
 <div class="highlighter-rouge"><pre class="highlight"><code>// collection 1
@@ -747,7 +1140,11 @@ guest, order4
 </code></pre>
 </div>
 
-<p><code class="highlighter-rouge">CoGroupByKey</code> gathers up the values with the same key from all <code class="highlighter-rouge">PCollection</code>s, and outputs a new pair consisting of the unique key and an object <code class="highlighter-rouge">CoGbkResult</code> containing all values that were associated with that key. If you apply <code class="highlighter-rouge">CoGroupByKey</code> to the input collections above, the output collection would look like this:</p>
+<p><code class="highlighter-rouge">CoGroupByKey</code> gathers up the values with the same key from all <code class="highlighter-rouge">PCollection</code>s,
+and outputs a new pair consisting of the unique key and an object <code class="highlighter-rouge">CoGbkResult</code>
+containing all values that were associated with that key. If you apply
+<code class="highlighter-rouge">CoGroupByKey</code> to the input collections above, the output collection would look
+like this:</p>
 <div class="highlighter-rouge"><pre class="highlight"><code>user1, [[address1], [order1, order2]]
 user2, [[address2], [order3]]
 user3, [[address3], []]
@@ -757,18 +1154,36 @@ guest, [[], [order4]]
 </div>
 
 <blockquote>
-  <p><strong>A Note on Key/Value Pairs:</strong> Beam represents key/value pairs slightly differently depending on the language and SDK you’re using. In the Beam SDK for Java, you represent a key/value pair with an object of type <code class="highlighter-rouge">KV&lt;K, V&gt;</code>. In Python, you represent key/value pairs with 2-tuples.</p>
+  <p><strong>A Note on Key/Value Pairs:</strong> Beam represents key/value pairs slightly
+differently depending on the language and SDK you’re using. In the Beam SDK
+for Java, you represent a key/value pair with an object of type <code class="highlighter-rouge">KV&lt;K, V&gt;</code>. In
+Python, you represent key/value pairs with 2-tuples.</p>
 </blockquote>
 
-<h4 id="a-nametransforms-combineausing-combine"><a name="transforms-combine"></a>Using Combine</h4>
+<h4 id="combine">4.2.4. Combine</h4>
 
-<p><span class="language-java"><a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/transforms/Combine.html"><code class="highlighter-rouge">Combine</code></a></span><span class="language-py"><a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py"><code class="highlighter-rouge">Combine</code></a></span> is a Beam transform for combining collections of elements or values in your data. <code class="highlighter-rouge">Combine</co [...]
+<p><span class="language-java"><a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/transforms/Combine.html"><code class="highlighter-rouge">Combine</code></a></span>
+<span class="language-py"><a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py"><code class="highlighter-rouge">Combine</code></a></span>
+is a Beam transform for combining collections of elements or values in your
+data. <code class="highlighter-rouge">Combine</code> has variants that work on entire <code class="highlighter-rouge">PCollection</code>s, and some that
+combine the values for each key in <code class="highlighter-rouge">PCollection</code>s of key/value pairs.</p>
 
-<p>When you apply a <code class="highlighter-rouge">Combine</code> transform, you must provide the function that contains the logic for combining the elements or values. The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key. Because the input data (including the value collection) may be distributed across multiple workers, the combining function might be called multiple times to perform partial [...]
+<p>When you apply a <code class="highlighter-rouge">Combine</code> transform, you must provide the function that
+contains the logic for combining the elements or values. The combining function
+should be commutative and associative, as the function is not necessarily
+invoked exactly once on all values with a given key. Because the input data
+(including the value collection) may be distributed across multiple workers, the
+combining function might be called multiple times to perform partial combining
+on subsets of the value collection. The Beam SDK also provides some pre-built
+combine functions for common numeric combination operations such as sum, min,
+and max.</p>
 
-<p>Simple combine operations, such as sums, can usually be implemented as a simple function. More complex combination operations might require you to create a subclass of <code class="highlighter-rouge">CombineFn</code> that has an accumulation type distinct from the input/output type.</p>
+<p>Simple combine operations, such as sums, can usually be implemented as a simple
+function. More complex combination operations might require you to create a
+subclass of <code class="highlighter-rouge">CombineFn</code> that has an accumulation type distinct from the
+input/output type.</p>
 
-<h5 id="simple-combinations-using-simple-functions"><strong>Simple combinations using simple functions</strong></h5>
+<h5 id="simple-combinations-using-simple-functions">4.2.4.1. Simple combinations using simple functions</h5>
 
 <p>The following example code shows a simple combine function.</p>
 
@@ -795,28 +1210,46 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<h5 id="advanced-combinations-using-combinefn"><strong>Advanced combinations using CombineFn</strong></h5>
+<h5 id="advanced-combinations-using-combinefn">4.2.4.2. Advanced combinations using CombineFn</h5>
 
-<p>For more complex combine functions, you can define a subclass of <code class="highlighter-rouge">CombineFn</code>. You should use <code class="highlighter-rouge">CombineFn</code> if the combine function requires a more sophisticated accumulator, must perform additional pre- or post-processing, might change the output type, or takes the key into account.</p>
+<p>For more complex combine functions, you can define a subclass of <code class="highlighter-rouge">CombineFn</code>.
+You should use <code class="highlighter-rouge">CombineFn</code> if the combine function requires a more sophisticated
+accumulator, must perform additional pre- or post-processing, might change the
+output type, or takes the key into account.</p>
 
-<p>A general combining operation consists of four operations. When you create a subclass of <code class="highlighter-rouge">CombineFn</code>, you must provide four operations by overriding the corresponding methods:</p>
+<p>A general combining operation consists of four operations. When you create a
+subclass of <code class="highlighter-rouge">CombineFn</code>, you must provide four operations by overriding the
+corresponding methods:</p>
 
 <ol>
   <li>
-    <p><strong>Create Accumulator</strong> creates a new “local” accumulator. In the example case, taking a mean average, a local accumulator tracks the running sum of values (the numerator value for our final average division) and the number of values summed so far (the denominator value). It may be called any number of times in a distributed fashion.</p>
+    <p><strong>Create Accumulator</strong> creates a new “local” accumulator. In the example
+case, taking a mean average, a local accumulator tracks the running sum of
+values (the numerator value for our final average division) and the number of
+values summed so far (the denominator value). It may be called any number of
+times in a distributed fashion.</p>
   </li>
   <li>
-    <p><strong>Add Input</strong> adds an input element to an accumulator, returning the accumulator value. In our example, it would update the sum and increment the count. It may also be invoked in parallel.</p>
+    <p><strong>Add Input</strong> adds an input element to an accumulator, returning the
+accumulator value. In our example, it would update the sum and increment the
+count. It may also be invoked in parallel.</p>
   </li>
   <li>
-    <p><strong>Merge Accumulators</strong> merges several accumulators into a single accumulator; this is how data in multiple accumulators is combined before the final calculation. In the case of the mean average computation, the accumulators representing each portion of the division are merged together. It may be called again on its outputs any number of times.</p>
+    <p><strong>Merge Accumulators</strong> merges several accumulators into a single accumulator;
+this is how data in multiple accumulators is combined before the final
+calculation. In the case of the mean average computation, the accumulators
+representing each portion of the division are merged together. It may be
+called again on its outputs any number of times.</p>
   </li>
   <li>
-    <p><strong>Extract Output</strong> performs the final computation. In the case of computing a mean average, this means dividing the combined sum of all the values by the number of values summed. It is called once on the final, merged accumulator.</p>
+    <p><strong>Extract Output</strong> performs the final computation. In the case of computing a
+mean average, this means dividing the combined sum of all the values by the
+number of values summed. It is called once on the final, merged accumulator.</p>
   </li>
 </ol>
 
-<p>The following example code shows how to define a <code class="highlighter-rouge">CombineFn</code> that computes a mean average:</p>
+<p>The following example code shows how to define a <code class="highlighter-rouge">CombineFn</code> that computes a
+mean average:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">public</span> <span class="kd">class</span> <span class="nc">AverageFn</span> <span class="kd">extends</span> <span class="n">CombineFn</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">AverageFn</span><span class="o">.</span><span class="na">Accum</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span> <span clas [...]
   <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Accum</span> <span class="o">{</span>
@@ -851,7 +1284,6 @@ guest, [[], [order4]]
 <span class="o">}</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">pc</span> <span class="o">=</span> <span class="o">...</span>
 <span class="k">class</span> <span class="nc">AverageFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">CombineFn</span><span class="p">):</span>
   <span class="k">def</span> <span class="nf">create_accumulator</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
@@ -869,57 +1301,80 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<p>If you are combining a <code class="highlighter-rouge">PCollection</code> of key-value pairs, <a href="#transforms-combine-per-key">per-key combining</a> is often enough. If you need the combining strategy to change based on the key (for example, MIN for some users and MAX for other users), you can define a <code class="highlighter-rouge">KeyedCombineFn</code> to access the key within the combining strategy.</p>
+<p>If you are combining a <code class="highlighter-rouge">PCollection</code> of key-value pairs, <a href="#combining-values-in-a-keyed-pcollection">per-key
+combining</a> is often enough. If
+you need the combining strategy to change based on the key (for example, MIN for
+some users and MAX for other users), you can define a <code class="highlighter-rouge">KeyedCombineFn</code> to access
+the key within the combining strategy.</p>
 
-<h5 id="combining-a-pcollection-into-a-single-value"><strong>Combining a PCollection into a single value</strong></h5>
+<h5 id="combining-a-pcollection-into-a-single-value">4.2.4.3. Combining a PCollection into a single value</h5>
 
-<p>Use the global combine to transform all of the elements in a given <code class="highlighter-rouge">PCollection</code> into a single value, represented in your pipeline as a new <code class="highlighter-rouge">PCollection</code> containing one element. The following example code shows how to apply the Beam provided sum combine function to produce a single sum value for a <code class="highlighter-rouge">PCollection</code> of integers.</p>
+<p>Use the global combine to transform all of the elements in a given <code class="highlighter-rouge">PCollection</code>
+into a single value, represented in your pipeline as a new <code class="highlighter-rouge">PCollection</code>
+containing one element. The following example code shows how to apply the Beam
+provided sum combine function to produce a single sum value for a <code class="highlighter-rouge">PCollection</code>
+of integers.</p>
 
-<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Sum.SumIntegerFn() combines the elements in the input PCollection.</span>
-<span class="c1">// The resulting PCollection, called sum, contains one value: the sum of all the elements in the input PCollection.</span>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Sum.SumIntegerFn() combines the elements in the input PCollection. The resulting PCollection, called sum,</span>
+<span class="c1">// contains one value: the sum of all the elements in the input PCollection.</span>
 <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">pc</span> <span class="o">=</span> <span class="o">...;</span>
 <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">sum</span> <span class="o">=</span> <span class="n">pc</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
    <span class="n">Combine</span><span class="o">.</span><span class="na">globally</span><span class="o">(</span><span class="k">new</span> <span class="n">Sum</span><span class="o">.</span><span class="na">SumIntegerFn</span><span class="o">()));</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># sum combines the elements in the input PCollection.</span>
-<span class="c"># The resulting PCollection, called result, contains one value: the sum of all the elements in the input PCollection.</span>
+<span class="c"># The resulting PCollection, called result, contains one value: the sum of all</span>
+<span class="c"># the elements in the input PCollection.</span>
 <span class="n">pc</span> <span class="o">=</span> <span class="o">...</span>
 <span class="n">average</span> <span class="o">=</span> <span class="n">pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombineGlobally</span><span class="p">(</span><span class="n">AverageFn</span><span class="p">())</span>
 </code></pre>
 </div>
 
-<h5 id="global-windowing">Global windowing:</h5>
+<h5 id="combine-and-global-windowing">4.2.4.4. Combine and global windowing</h5>
 
-<p>If your input <code class="highlighter-rouge">PCollection</code> uses the default global windowing, the default behavior is to return a <code class="highlighter-rouge">PCollection</code> containing one item. That item’s value comes from the accumulator in the combine function that you specified when applying <code class="highlighter-rouge">Combine</code>. For example, the Beam provided sum combine function returns a zero value (the sum of an empty input), while the max combine functio [...]
+<p>If your input <code class="highlighter-rouge">PCollection</code> uses the default global windowing, the default
+behavior is to return a <code class="highlighter-rouge">PCollection</code> containing one item. That item’s value
+comes from the accumulator in the combine function that you specified when
+applying <code class="highlighter-rouge">Combine</code>. For example, the Beam provided sum combine function returns
+a zero value (the sum of an empty input), while the max combine function returns
+a maximal or infinite value.</p>
 
-<p>To have <code class="highlighter-rouge">Combine</code> instead return an empty <code class="highlighter-rouge">PCollection</code> if the input is empty, specify <code class="highlighter-rouge">.withoutDefaults</code> when you apply your <code class="highlighter-rouge">Combine</code> transform, as in the following code example:</p>
+<p>To have <code class="highlighter-rouge">Combine</code> instead return an empty <code class="highlighter-rouge">PCollection</code> if the input is empty,
+specify <code class="highlighter-rouge">.withoutDefaults</code> when you apply your <code class="highlighter-rouge">Combine</code> transform, as in the
+following code example:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">pc</span> <span class="o">=</span> <span class="o">...;</span>
 <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">sum</span> <span class="o">=</span> <span class="n">pc</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
   <span class="n">Combine</span><span class="o">.</span><span class="na">globally</span><span class="o">(</span><span class="k">new</span> <span class="n">Sum</span><span class="o">.</span><span class="na">SumIntegerFn</span><span class="o">()).</span><span class="na">withoutDefaults</span><span class="o">());</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">pc</span> <span class="o">=</span> <span class="o">...</span>
 <span class="nb">sum</span> <span class="o">=</span> <span class="n">pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombineGlobally</span><span class="p">(</span><span class="nb">sum</span><span class="p">)</span><span class="o">.</span><span class="n">without_defaults</span><span class="p">()</span>
 </code></pre>
 </div>
 
-<h5 id="non-global-windowing">Non-global windowing:</h5>
+<h5 id="combine-and-non-global-windowing">4.2.4.5. Combine and non-global windowing</h5>
 
-<p>If your <code class="highlighter-rouge">PCollection</code> uses any non-global windowing function, Beam does not provide the default behavior. You must specify one of the following options when applying <code class="highlighter-rouge">Combine</code>:</p>
+<p>If your <code class="highlighter-rouge">PCollection</code> uses any non-global windowing function, Beam does not
+provide the default behavior. You must specify one of the following options when
+applying <code class="highlighter-rouge">Combine</code>:</p>
 
 <ul>
-  <li>Specify <code class="highlighter-rouge">.withoutDefaults</code>, where windows that are empty in the input <code class="highlighter-rouge">PCollection</code> will likewise be empty in the output collection.</li>
-  <li>Specify <code class="highlighter-rouge">.asSingletonView</code>, in which the output is immediately converted to a <code class="highlighter-rouge">PCollectionView</code>, which will provide a default value for each empty window when used as a side input. You’ll generally only need to use this option if the result of your pipeline’s <code class="highlighter-rouge">Combine</code> is to be used as a side input later in the pipeline.</li>
+  <li>Specify <code class="highlighter-rouge">.withoutDefaults</code>, where windows that are empty in the input
+<code class="highlighter-rouge">PCollection</code> will likewise be empty in the output collection.</li>
+  <li>Specify <code class="highlighter-rouge">.asSingletonView</code>, in which the output is immediately converted to a
+<code class="highlighter-rouge">PCollectionView</code>, which will provide a default value for each empty window
+when used as a side input. You’ll generally only need to use this option if
+the result of your pipeline’s <code class="highlighter-rouge">Combine</code> is to be used as a side input later in
+the pipeline.</li>
 </ul>
 
-<h5 id="a-nametransforms-combine-per-keyacombining-values-in-a-key-grouped-collection"><a name="transforms-combine-per-key"></a><strong>Combining values in a key-grouped collection</strong></h5>
-
-<p>After creating a key-grouped collection (for example, by using a <code class="highlighter-rouge">GroupByKey</code> transform) a common pattern is to combine the collection of values associated with each key into a single, merged value. Drawing on the previous example from <code class="highlighter-rouge">GroupByKey</code>, a key-grouped <code class="highlighter-rouge">PCollection</code> called <code class="highlighter-rouge">groupedWords</code> looks like this:</p>
+<h5 id="combining-values-in-a-keyed-pcollection">4.2.4.6. Combining values in a keyed PCollection</h5>
 
+<p>After creating a keyed PCollection (for example, by using a <code class="highlighter-rouge">GroupByKey</code>
+transform), a common pattern is to combine the collection of values associated
+with each key into a single, merged value. Drawing on the previous example from
+<code class="highlighter-rouge">GroupByKey</code>, a key-grouped <code class="highlighter-rouge">PCollection</code> called <code class="highlighter-rouge">groupedWords</code> looks like this:</p>
 <div class="highlighter-rouge"><pre class="highlight"><code>  cat, [1,5,9]
   dog, [5,2]
   and, [1,2,6]
@@ -929,7 +1384,14 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<p>In the above <code class="highlighter-rouge">PCollection</code>, each element has a string key (for example, “cat”) and an iterable of integers for its value (in the first element, containing [1, 5, 9]). If our pipeline’s next processing step combines the values (rather than considering them individually), you can combine the iterable of integers to create a single, merged value to be paired with each key. This pattern of a <code class="highlighter-rouge">GroupByKey</code> followed by [...]
+<p>In the above <code class="highlighter-rouge">PCollection</code>, each element has a string key (for example, “cat”)
+and an iterable of integers for its value (in the first element, containing [1,
+5, 9]). If our pipeline’s next processing step combines the values (rather than
+considering them individually), you can combine the iterable of integers to
+create a single, merged value to be paired with each key. This pattern of a
+<code class="highlighter-rouge">GroupByKey</code> followed by merging the collection of values is equivalent to
+Beam’s Combine PerKey transform. The combine function you supply to Combine
+PerKey must be an associative reduction function or a subclass of <code class="highlighter-rouge">CombineFn</code>.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// PCollection is grouped by key and the Double values associated with each key are combined into a Double.</span>
 <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">salesRecords</span> <span class="o">=</span> <span class="o">...;</span>
@@ -937,17 +1399,16 @@ guest, [[], [order4]]
   <span class="n">salesRecords</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Combine</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span><span class="n">perKey</span><span class="o">(</span>
     <span class="k">new</span> <span class="n">Sum</span><span class="o">.</span><span class="na">SumDoubleFn</span><span class="o">()));</span>
 
-<span class="c1">// The combined value is of a different type than the original collection of values per key.</span>
-<span class="c1">// PCollection has keys of type String and values of type Integer, and the combined value is a Double.</span>
-
+<span class="c1">// The combined value is of a different type than the original collection of values per key. PCollection has</span>
+<span class="c1">// keys of type String and values of type Integer, and the combined value is a Double.</span>
 <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">playerAccuracy</span> <span class="o">=</span> <span class="o">...;</span>
 <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;&gt;</span> <span class="n">avgAccuracyPerPlayer</span> <span class="o">=</span>
   <span class="n">playerAccuracy</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Combine</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Double</span><span class="o">&gt;</span><span class="n">perKey</span><span class="o">(</span>
     <span class="k">new</span> <span class="nf">MeanInts</span><span class="o">())));</span>
 </code></pre>
 </div>
-
-<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># PCollection is grouped by key and the numeric values associated with each key are averaged into a float.</span>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># PCollection is grouped by key and the numeric values associated with each key</span>
+<span class="c"># are averaged into a float.</span>
 <span class="n">player_accuracies</span> <span class="o">=</span> <span class="o">...</span>
 <span class="n">avg_accuracy_per_player</span> <span class="o">=</span> <span class="p">(</span><span class="n">player_accuracies</span>
                            <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombinePerKey</span><span class="p">(</span>
@@ -956,13 +1417,16 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<h4 id="a-nametransforms-flatten-partitionausing-flatten-and-partition"><a name="transforms-flatten-partition"></a>Using Flatten and Partition</h4>
+<h4 id="flatten">4.2.5. Flatten</h4>
 
-<p><span class="language-java"><a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/transforms/Flatten.html"><code class="highlighter-rouge">Flatten</code></a></span><span class="language-py"><a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py"><code class="highlighter-rouge">Flatten</code></a></span> and <span class="language-java"><a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/transforms/Partitio [...]
+<p><span class="language-java"><a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/transforms/Flatten.html"><code class="highlighter-rouge">Flatten</code></a></span>
+<span class="language-py"><a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py"><code class="highlighter-rouge">Flatten</code></a></span> and
+is a Beam transform for <code class="highlighter-rouge">PCollection</code> objects that store the same data type.
+<code class="highlighter-rouge">Flatten</code> merges multiple <code class="highlighter-rouge">PCollection</code> objects into a single logical
+<code class="highlighter-rouge">PCollection</code>.</p>
 
-<h5 id="flatten"><strong>Flatten</strong></h5>
-
-<p>The following example shows how to apply a <code class="highlighter-rouge">Flatten</code> transform to merge multiple <code class="highlighter-rouge">PCollection</code> objects.</p>
+<p>The following example shows how to apply a <code class="highlighter-rouge">Flatten</code> transform to merge multiple
+<code class="highlighter-rouge">PCollection</code> objects.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Flatten takes a PCollectionList of PCollection objects of a given type.</span>
 <span class="c1">// Returns a single PCollection that contains all of the elements in the PCollection objects in that list.</span>
@@ -976,7 +1440,7 @@ guest, [[], [order4]]
 </div>
 
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># Flatten takes a tuple of PCollection objects.</span>
-<span class="c"># Returns a single PCollection that contains all of the elements in the </span>
+<span class="c"># Returns a single PCollection that contains all of the elements in the</span>
 <span class="n">merged</span> <span class="o">=</span> <span class="p">(</span>
     <span class="p">(</span><span class="n">pcoll1</span><span class="p">,</span> <span class="n">pcoll2</span><span class="p">,</span> <span class="n">pcoll3</span><span class="p">)</span>
     <span class="c"># A list of tuples can be "piped" directly into a Flatten transform.</span>
@@ -985,25 +1449,48 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<h5 id="data-encoding-in-merged-collections">Data encoding in merged collections:</h5>
+<h5 id="data-encoding-in-merged-collections">4.2.5.1. Data encoding in merged collections</h5>
+
+<p>By default, the coder for the output <code class="highlighter-rouge">PCollection</code> is the same as the coder for
+the first <code class="highlighter-rouge">PCollection</code> in the input <code class="highlighter-rouge">PCollectionList</code>. However, the input
+<code class="highlighter-rouge">PCollection</code> objects can each use different coders, as long as they all contain
+the same data type in your chosen language.</p>
 
-<p>By default, the coder for the output <code class="highlighter-rouge">PCollection</code> is the same as the coder for the first <code class="highlighter-rouge">PCollection</code> in the input <code class="highlighter-rouge">PCollectionList</code>. However, the input <code class="highlighter-rouge">PCollection</code> objects can each use different coders, as long as they all contain the same data type in your chosen language.</p>
+<h5 id="merging-windowed-collections">4.2.5.2. Merging windowed collections</h5>
 
-<h5 id="merging-windowed-collections">Merging windowed collections:</h5>
+<p>When using <code class="highlighter-rouge">Flatten</code> to merge <code class="highlighter-rouge">PCollection</code> objects that have a windowing
+strategy applied, all of the <code class="highlighter-rouge">PCollection</code> objects you want to merge must use a
+compatible windowing strategy and window sizing. For example, all the
+collections you’re merging must all use (hypothetically) identical 5-minute
+fixed windows or 4-minute sliding windows starting every 30 seconds.</p>
 
-<p>When using <code class="highlighter-rouge">Flatten</code> to merge <code class="highlighter-rouge">PCollection</code> objects that have a windowing strategy applied, all of the <code class="highlighter-rouge">PCollection</code> objects you want to merge must use a compatible windowing strategy and window sizing. For example, all the collections you’re merging must all use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.</p>
+<p>If your pipeline attempts to use <code class="highlighter-rouge">Flatten</code> to merge <code class="highlighter-rouge">PCollection</code> objects with
+incompatible windows, Beam generates an <code class="highlighter-rouge">IllegalStateException</code> error when your
+pipeline is constructed.</p>
 
-<p>If your pipeline attempts to use <code class="highlighter-rouge">Flatten</code> to merge <code class="highlighter-rouge">PCollection</code> objects with incompatible windows, Beam generates an <code class="highlighter-rouge">IllegalStateException</code> error when your pipeline is constructed.</p>
+<h4 id="partition">4.2.6. Partition</h4>
 
-<h5 id="partition"><strong>Partition</strong></h5>
+<p><span class="language-java"><a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/transforms/Partition.html"><code class="highlighter-rouge">Partition</code></a></span>
+<span class="language-py"><a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py"><code class="highlighter-rouge">Partition</code></a></span>
+is a Beam transform for <code class="highlighter-rouge">PCollection</code> objects that store the same data
+type. <code class="highlighter-rouge">Partition</code> splits a single <code class="highlighter-rouge">PCollection</code> into a fixed number of smaller
+collections.</p>
 
-<p><code class="highlighter-rouge">Partition</code> divides the elements of a <code class="highlighter-rouge">PCollection</code> according to a partitioning function that you provide. The partitioning function contains the logic that determines how to split up the elements of the input <code class="highlighter-rouge">PCollection</code> into each resulting partition <code class="highlighter-rouge">PCollection</code>. The number of partitions must be determined at graph construction time.  [...]
+<p><code class="highlighter-rouge">Partition</code> divides the elements of a <code class="highlighter-rouge">PCollection</code> according to a partitioning
+function that you provide. The partitioning function contains the logic that
+determines how to split up the elements of the input <code class="highlighter-rouge">PCollection</code> into each
+resulting partition <code class="highlighter-rouge">PCollection</code>. The number of partitions must be determined
+at graph construction time. You can, for example, pass the number of partitions
+as a command-line option at runtime (which will then be used to build your
+pipeline graph), but you cannot determine the number of partitions in
+mid-pipeline (based on data calculated after your pipeline graph is constructed,
+for instance).</p>
 
 <p>The following example divides a <code class="highlighter-rouge">PCollection</code> into percentile groups.</p>
 
-<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function.</span>
-<span class="c1">// In this example, we define the PartitionFn in-line.</span>
-<span class="c1">// Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects.</span>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the</span>
+<span class="c1">// partitioning function. In this example, we define the PartitionFn in-line. Returns a PCollectionList</span>
+<span class="c1">// containing each of the resulting partitions as individual PCollection objects.</span>
 <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Student</span><span class="o">&gt;</span> <span class="n">students</span> <span class="o">=</span> <span class="o">...;</span>
 <span class="c1">// Split students up into 10 partitions, by percentile:</span>
 <span class="n">PCollectionList</span><span class="o">&lt;</span><span class="n">Student</span><span class="o">&gt;</span> <span class="n">studentsByPercentile</span> <span class="o">=</span>
@@ -1017,7 +1504,6 @@ guest, [[], [order4]]
 <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Student</span><span class="o">&gt;</span> <span class="n">fortiethPercentile</span> <span class="o">=</span> <span class="n">studentsByPercentile</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="mi">4</span><span class="o">);</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># Provide an int value with the desired number of result partitions, and a partitioning function (partition_fn in this example).</span>
 <span class="c"># Returns a tuple of PCollection objects containing each of the resulting partitions as individual PCollection objects.</span>
 <span class="n">students</span> <span class="o">=</span> <span class="o">...</span>
@@ -1033,52 +1519,91 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<h4 id="a-nametransforms-usercodereqsageneral-requirements-for-writing-user-code-for-beam-transforms"><a name="transforms-usercodereqs"></a>General Requirements for writing user code for Beam transforms</h4>
+<h3 id="requirements-for-writing-user-code-for-beam-transforms">4.3. Requirements for writing user code for Beam transforms</h3>
 
-<p>When you build user code for a Beam transform, you should keep in mind the distributed nature of execution. For example, there might be many copies of your function running on a lot of different machines in parallel, and those copies function independently, without communicating or sharing state with any of the other copies. Depending on the Pipeline Runner and processing back-end you choose for your pipeline, each copy of your user code function may be retried or run multiple times.  [...]
+<p>When you build user code for a Beam transform, you should keep in mind the
+distributed nature of execution. For example, there might be many copies of your
+function running on a lot of different machines in parallel, and those copies
+function independently, without communicating or sharing state with any of the
+other copies. Depending on the Pipeline Runner and processing back-end you
+choose for your pipeline, each copy of your user code function may be retried or
+run multiple times. As such, you should be cautious about including things like
+state dependency in your user code.</p>
 
 <p>In general, your user code must fulfill at least these requirements:</p>
 
 <ul>
   <li>Your function object must be <strong>serializable</strong>.</li>
-  <li>Your function object must be <strong>thread-compatible</strong>, and be aware that <em>the Beam SDKs are not thread-safe</em>.</li>
+  <li>Your function object must be <strong>thread-compatible</strong>, and be aware that <em>the
+Beam SDKs are not thread-safe</em>.</li>
 </ul>
 
 <p>In addition, it’s recommended that you make your function object <strong>idempotent</strong>.</p>
 
 <blockquote>
-  <p><strong>Note:</strong> These requirements apply to subclasses of <code class="highlighter-rouge">DoFn</code> (a function object used with the <a href="#transforms-pardo">ParDo</a> transform), <code class="highlighter-rouge">CombineFn</code> (a function object used with the <a href="#transforms-combine">Combine</a> transform), and <code class="highlighter-rouge">WindowFn</code> (a function object used with the <a href="#windowing">Window</a> transform).</p>
+  <p><strong>Note:</strong> These requirements apply to subclasses of <code class="highlighter-rouge">DoFn</code> (a function object
+used with the <a href="#pardo">ParDo</a> transform), <code class="highlighter-rouge">CombineFn</code> (a function object used
+with the <a href="#combine">Combine</a> transform), and <code class="highlighter-rouge">WindowFn</code> (a function object
+used with the <a href="#windowing">Window</a> transform).</p>
 </blockquote>
 
-<h5 id="serializability">Serializability</h5>
+<h4 id="serializability">4.3.1. Serializability</h4>
 
-<p>Any function object you provide to a transform must be <strong>fully serializable</strong>. This is because a copy of the function needs to be serialized and transmitted to a remote worker in your processing cluster. The base classes for user code, such as <code class="highlighter-rouge">DoFn</code>, <code class="highlighter-rouge">CombineFn</code>, and <code class="highlighter-rouge">WindowFn</code>, already implement <code class="highlighter-rouge">Serializable</code>; however, your [...]
+<p>Any function object you provide to a transform must be <strong>fully serializable</strong>.
+This is because a copy of the function needs to be serialized and transmitted to
+a remote worker in your processing cluster. The base classes for user code, such
+as <code class="highlighter-rouge">DoFn</code>, <code class="highlighter-rouge">CombineFn</code>, and <code class="highlighter-rouge">WindowFn</code>, already implement <code class="highlighter-rouge">Serializable</code>;
+however, your subclass must not add any non-serializable members.</p>
 
 <p>Some other serializability factors you should keep in mind are:</p>
 
 <ul>
-  <li>Transient fields in your function object are <em>not</em> transmitted to worker instances, because they are not automatically serialized.</li>
+  <li>Transient fields in your function object are <em>not</em> transmitted to worker
+instances, because they are not automatically serialized.</li>
   <li>Avoid loading a field with a large amount of data before serialization.</li>
   <li>Individual instances of your function object cannot share data.</li>
   <li>Mutating a function object after it gets applied will have no effect.</li>
-  <li>Take care when declaring your function object inline by using an anonymous inner class instance. In a non-static context, your inner class instance will implicitly contain a pointer to the enclosing class and that class’ state. That enclosing class will also be serialized, and thus the same considerations that apply to the function object itself also apply to this outer class.</li>
+  <li>Take care when declaring your function object inline by using an anonymous
+inner class instance. In a non-static context, your inner class instance will
+implicitly contain a pointer to the enclosing class and that class’ state.
+That enclosing class will also be serialized, and thus the same considerations
+that apply to the function object itself also apply to this outer class.</li>
 </ul>
 
-<h5 id="thread-compatibility">Thread-compatibility</h5>
+<h4 id="thread-compatibility">4.3.2. Thread-compatibility</h4>
 
-<p>Your function object should be thread-compatible. Each instance of your function object is accessed by a single thread on a worker instance, unless you explicitly create your own threads. Note, however, that <strong>the Beam SDKs are not thread-safe</strong>. If you create your own threads in your user code, you must provide your own synchronization. Note that static members in your function object are not passed to worker instances and that multiple instances of your function may be  [...]
+<p>Your function object should be thread-compatible. Each instance of your function
+object is accessed by a single thread on a worker instance, unless you
+explicitly create your own threads. Note, however, that <strong>the Beam SDKs are not
+thread-safe</strong>. If you create your own threads in your user code, you must
+provide your own synchronization. Note that static members in your function
+object are not passed to worker instances and that multiple instances of your
+function may be accessed from different threads.</p>
 
-<h5 id="idempotence">Idempotence</h5>
+<h4 id="idempotence">4.3.3. Idempotence</h4>
 
-<p>It’s recommended that you make your function object idempotent–that is, that it can be repeated or retried as often as necessary without causing unintended side effects. The Beam model provides no guarantees as to the number of times your user code might be invoked or retried; as such, keeping your function object idempotent keeps your pipeline’s output deterministic, and your transforms’ behavior more predictable and easier to debug.</p>
+<p>It’s recommended that you make your function object idempotent–that is, that it
+can be repeated or retried as often as necessary without causing unintended side
+effects. The Beam model provides no guarantees as to the number of times your
+user code might be invoked or retried; as such, keeping your function object
+idempotent keeps your pipeline’s output deterministic, and your transforms’
+behavior more predictable and easier to debug.</p>
 
-<h4 id="a-nametransforms-sideioaside-inputs"><a name="transforms-sideio"></a>Side Inputs</h4>
+<h3 id="side-inputs">4.4. Side inputs</h3>
 
-<p>In addition to the main input <code class="highlighter-rouge">PCollection</code>, you can provide additional inputs to a <code class="highlighter-rouge">ParDo</code> transform in the form of side inputs. A side input is an additional input that your <code class="highlighter-rouge">DoFn</code> can access each time it processes an element in the input <code class="highlighter-rouge">PCollection</code>. When you specify a side input, you create a view of some other data that can be read  [...]
+<p>In addition to the main input <code class="highlighter-rouge">PCollection</code>, you can provide additional inputs
+to a <code class="highlighter-rouge">ParDo</code> transform in the form of side inputs. A side input is an additional
+input that your <code class="highlighter-rouge">DoFn</code> can access each time it processes an element in the input
+<code class="highlighter-rouge">PCollection</code>. When you specify a side input, you create a view of some other
+data that can be read from within the <code class="highlighter-rouge">ParDo</code> transform’s <code class="highlighter-rouge">DoFn</code> while procesing
+each element.</p>
 
-<p>Side inputs are useful if your <code class="highlighter-rouge">ParDo</code> needs to inject additional data when processing each element in the input <code class="highlighter-rouge">PCollection</code>, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline.</p>
+<p>Side inputs are useful if your <code class="highlighter-rouge">ParDo</code> needs to inject additional data when
+processing each element in the input <code class="highlighter-rouge">PCollection</code>, but the additional data
+needs to be determined at runtime (and not hard-coded). Such values might be
+determined by the input data, or depend on a different branch of your pipeline.</p>
 
-<h5 id="passing-side-inputs-to-pardo">Passing side inputs to ParDo:</h5>
+<h4 id="passing-side-inputs-to-pardo">4.4.1. Passing side inputs to ParDo</h4>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>  <span class="c1">// Pass side inputs to your ParDo transform by invoking .withSideInputs.</span>
   <span class="c1">// Inside your DoFn, access the side input by using the method DoFn.ProcessContext.sideInput.</span>
@@ -1110,11 +1635,11 @@ guest, [[], [order4]]
   <span class="o">);</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># Side inputs are available as extra arguments in the DoFn's process method or Map / FlatMap's callable.</span>
-<span class="c"># Optional, positional, and keyword arguments are all supported. Deferred arguments are unwrapped into their actual values.</span>
-<span class="c"># For example, using pvalue.AsIter(pcoll) at pipeline construction time results in an iterable of the actual elements of pcoll being passed into each process invocation.</span>
-<span class="c"># In this example, side inputs are passed to a FlatMap transform as extra arguments and consumed by filter_using_length.</span>
+<span class="c"># Optional, positional, and keyword arguments are all supported. Deferred arguments are unwrapped into their</span>
+<span class="c"># actual values. For example, using pvalue.AsIteor(pcoll) at pipeline construction time results in an iterable</span>
+<span class="c"># of the actual elements of pcoll being passed into each process invocation. In this example, side inputs are</span>
+<span class="c"># passed to a FlatMap transform as extra arguments and consumed by filter_using_length.</span>
 <span class="n">words</span> <span class="o">=</span> <span class="o">...</span>
 <span class="c"># Callable takes additional arguments.</span>
 <span class="k">def</span> <span class="nf">filter_using_length</span><span class="p">(</span><span class="n">word</span><span class="p">,</span> <span class="n">lower_bound</span><span class="p">,</span> <span class="n">upper_bound</span><span class="o">=</span><span class="nb">float</span><span class="p">(</span><span class="s">'inf'</span><span class="p">)):</span>
@@ -1155,27 +1680,49 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<h5 id="side-inputs-and-windowing">Side inputs and windowing:</h5>
+<h4 id="side-inputs-and-windowing">4.4.2. Side inputs and windowing</h4>
 
-<p>A windowed <code class="highlighter-rouge">PCollection</code> may be infinite and thus cannot be compressed into a single value (or single collection class). When you create a <code class="highlighter-rouge">PCollectionView</code> of a windowed <code class="highlighter-rouge">PCollection</code>, the <code class="highlighter-rouge">PCollectionView</code> represents a single entity per window (one singleton per window, one list per window, etc.).</p>
+<p>A windowed <code class="highlighter-rouge">PCollection</code> may be infinite and thus cannot be compressed into a
+single value (or single collection class). When you create a <code class="highlighter-rouge">PCollectionView</code>
+of a windowed <code class="highlighter-rouge">PCollection</code>, the <code class="highlighter-rouge">PCollectionView</code> represents a single entity
+per window (one singleton per window, one list per window, etc.).</p>
 
-<p>Beam uses the window(s) for the main input element to look up the appropriate window for the side input element. Beam projects the main input element’s window into the side input’s window set, and then uses the side input from the resulting window. If the main input and side inputs have identical windows, the projection provides the exact corresponding window. However, if the inputs have different windows, Beam uses the projection to choose the most appropriate side input window.</p>
+<p>Beam uses the window(s) for the main input element to look up the appropriate
+window for the side input element. Beam projects the main input element’s window
+into the side input’s window set, and then uses the side input from the
+resulting window. If the main input and side inputs have identical windows, the
+projection provides the exact corresponding window. However, if the inputs have
+different windows, Beam uses the projection to choose the most appropriate side
+input window.</p>
 
-<p>For example, if the main input is windowed using fixed-time windows of one minute, and the side input is windowed using fixed-time windows of one hour, Beam projects the main input window against the side input window set and selects the side input value from the appropriate hour-long side input window.</p>
+<p>For example, if the main input is windowed using fixed-time windows of one
+minute, and the side input is windowed using fixed-time windows of one hour,
+Beam projects the main input window against the side input window set and
+selects the side input value from the appropriate hour-long side input window.</p>
 
-<p>If the main input element exists in more than one window, then <code class="highlighter-rouge">processElement</code> gets called multiple times, once for each window. Each call to <code class="highlighter-rouge">processElement</code> projects the “current” window for the main input element, and thus might provide a different view of the side input each time.</p>
+<p>If the main input element exists in more than one window, then <code class="highlighter-rouge">processElement</code>
+gets called multiple times, once for each window. Each call to <code class="highlighter-rouge">processElement</code>
+projects the “current” window for the main input element, and thus might provide
+a different view of the side input each time.</p>
 
-<p>If the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger.</p>
+<p>If the side input has multiple trigger firings, Beam uses the value from the
+latest trigger firing. This is particularly useful if you use a side input with
+a single global window and specify a trigger.</p>
 
-<h4 id="a-nametransforms-outputsaadditional-outputs"><a name="transforms-outputs"></a>Additional Outputs</h4>
+<h3 id="additional-outputs">4.5. Additional outputs</h3>
 
-<p>While <code class="highlighter-rouge">ParDo</code> always produces a main output <code class="highlighter-rouge">PCollection</code> (as the return value from <code class="highlighter-rouge">apply</code>), you can also have your <code class="highlighter-rouge">ParDo</code> produce any number of additional output <code class="highlighter-rouge">PCollection</code>s. If you choose to have multiple outputs, your <code class="highlighter-rouge">ParDo</code> returns all of the output <code c [...]
+<p>While <code class="highlighter-rouge">ParDo</code> always produces a main output <code class="highlighter-rouge">PCollection</code> (as the return value
+from <code class="highlighter-rouge">apply</code>), you can also have your <code class="highlighter-rouge">ParDo</code> produce any number of additional
+output <code class="highlighter-rouge">PCollection</code>s. If you choose to have multiple outputs, your <code class="highlighter-rouge">ParDo</code>
+returns all of the output <code class="highlighter-rouge">PCollection</code>s (including the main output) bundled
+together.</p>
 
-<h5 id="tags-for-muitiple-outputs">Tags for muitiple outputs:</h5>
+<h4 id="tags-for-multiple-outputs">4.5.1. Tags for multiple outputs</h4>
 
-<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// To emit elements to multiple output PCollections, create a TupleTag object to identify each collection that your ParDo produces.</span>
-<span class="c1">// For example, if your ParDo produces three output PCollections (the main output and two additional outputs), you must create three TupleTags.</span>
-<span class="c1">// The following example code shows how to create TupleTags for a ParDo with three output PCollections.</span>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// To emit elements to multiple output PCollections, create a TupleTag object to identify each collection</span>
+<span class="c1">// that your ParDo produces. For example, if your ParDo produces three output PCollections (the main output</span>
+<span class="c1">// and two additional outputs), you must create three TupleTags. The following example code shows how to</span>
+<span class="c1">// create TupleTags for a ParDo with three output PCollections.</span>
 
   <span class="c1">// Input PCollection to our ParDo.</span>
   <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...;</span>
@@ -1200,10 +1747,11 @@ guest, [[], [order4]]
       <span class="k">new</span> <span class="n">TupleTag</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(){};</span>
 
 <span class="c1">// Passing Output Tags to ParDo:</span>
-<span class="c1">// After you specify the TupleTags for each of your ParDo outputs, pass the tags to your ParDo by invoking .withOutputTags.</span>
-<span class="c1">// You pass the tag for the main output first, and then the tags for any additional outputs in a TupleTagList.</span>
-<span class="c1">// Building on our previous example, we pass the three TupleTags for our three output PCollections</span>
-<span class="c1">// to our ParDo. Note that all of the outputs (including the main output PCollection) are bundled into the returned PCollectionTuple.</span>
+<span class="c1">// After you specify the TupleTags for each of your ParDo outputs, pass the tags to your ParDo by invoking</span>
+<span class="c1">// .withOutputTags. You pass the tag for the main output first, and then the tags for any additional outputs</span>
+<span class="c1">// in a TupleTagList. Building on our previous example, we pass the three TupleTags for our three output</span>
+<span class="c1">// PCollections to our ParDo. Note that all of the outputs (including the main output PCollection) are</span>
+<span class="c1">// bundled into the returned PCollectionTuple.</span>
 
   <span class="n">PCollectionTuple</span> <span class="n">results</span> <span class="o">=</span>
       <span class="n">words</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">ParDo</span>
@@ -1219,9 +1767,10 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># To emit elements to multiple output PCollections, invoke with_outputs() on the ParDo, and specify the expected tags for the outputs.</span>
-<span class="c"># with_outputs() returns a DoOutputsTuple object. Tags specified in with_outputs are attributes on the returned DoOutputsTuple object.</span>
-<span class="c"># The tags give access to the corresponding output PCollections.</span>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># To emit elements to multiple output PCollections, invoke with_outputs() on the ParDo, and specify the</span>
+<span class="c"># expected tags for the outputs. with_outputs() returns a DoOutputsTuple object. Tags specified in</span>
+<span class="c"># with_outputs are attributes on the returned DoOutputsTuple object. The tags give access to the</span>
+<span class="c"># corresponding output PCollections.</span>
 
 <span class="n">results</span> <span class="o">=</span> <span class="p">(</span><span class="n">words</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">ProcessWords</span><span class="p">(),</span> <span class="n">cutoff_length</span><span class="o">=</span><span class="mi">2</span><span class="p">,</span> <span class="n">marker</span><span class="o">=</span><span class="s">'x'</span><spa [...]
            <span class="o">.</span><span class="n">with_outputs</span><span class="p">(</span><span class="s">'above_cutoff_lengths'</span><span class="p">,</span> <span class="s">'marked strings'</span><span class="p">,</span>
@@ -1231,7 +1780,8 @@ guest, [[], [order4]]
 <span class="n">marked</span> <span class="o">=</span> <span class="n">results</span><span class="p">[</span><span class="s">'marked strings'</span><span class="p">]</span>  <span class="c"># indexing works as well</span>
 
 
-<span class="c"># The result is also iterable, ordered in the same order that the tags were passed to with_outputs(), the main tag (if specified) first.</span>
+<span class="c"># The result is also iterable, ordered in the same order that the tags were passed to with_outputs(),</span>
+<span class="c"># the main tag (if specified) first.</span>
 
 <span class="n">below</span><span class="p">,</span> <span class="n">above</span><span class="p">,</span> <span class="n">marked</span> <span class="o">=</span> <span class="p">(</span><span class="n">words</span>
                         <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span>
@@ -1242,7 +1792,7 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<h5 id="emitting-to-multiple-outputs-in-your-dofn">Emitting to multiple outputs in your DoFn:</h5>
+<h4 id="emitting-to-multiple-outputs-in-your-dofn">4.5.2. Emitting to multiple outputs in your DoFn</h4>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Inside your ParDo's DoFn, you can emit an element to a specific output PCollection by passing in the</span>
 <span class="c1">// appropriate TupleTag when you call ProcessContext.output.</span>
@@ -1303,30 +1853,45 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<h2 id="a-nametransforms-compositeacomposite-transforms"><a name="transforms-composite"></a>Composite Transforms</h2>
+<h3 id="composite-transforms">4.6. Composite transforms</h3>
 
-<p>Transforms can have a nested structure, where a complex transform performs multiple simpler transforms (such as more than one <code class="highlighter-rouge">ParDo</code>, <code class="highlighter-rouge">Combine</code>, <code class="highlighter-rouge">GroupByKey</code>, or even other composite transforms). These transforms are called composite transforms. Nesting multiple transforms inside a single composite transform can make your code more modular and easier to understand.</p>
+<p>Transforms can have a nested structure, where a complex transform performs
+multiple simpler transforms (such as more than one <code class="highlighter-rouge">ParDo</code>, <code class="highlighter-rouge">Combine</code>,
+<code class="highlighter-rouge">GroupByKey</code>, or even other composite transforms). These transforms are called
+composite transforms. Nesting multiple transforms inside a single composite
+transform can make your code more modular and easier to understand.</p>
 
-<p>The Beam SDK comes packed with many useful composite transforms. See the API reference pages for a list of transforms:</p>
+<p>The Beam SDK comes packed with many useful composite transforms. See the API
+reference pages for a list of transforms:</p>
 <ul>
   <li><a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/transforms/package-summary.html">Pre-written Beam transforms for Java</a></li>
   <li><a href="/documentation/sdks/pydoc/2.1.0/apache_beam.transforms.html">Pre-written Beam transforms for Python</a></li>
 </ul>
 
-<h3 id="an-example-of-a-composite-transform">An example of a composite transform</h3>
+<h4 id="an-example-composite-transform">4.6.1. An example composite transform</h4>
 
-<p>The <code class="highlighter-rouge">CountWords</code> transform in the <a href="/get-started/wordcount-example/">WordCount example program</a> is an example of a composite transform. <code class="highlighter-rouge">CountWords</code> is a <code class="highlighter-rouge">PTransform</code> subclass that consists of multiple nested transforms.</p>
+<p>The <code class="highlighter-rouge">CountWords</code> transform in the <a href="/get-started/wordcount-example/">WordCount example program</a>
+is an example of a composite transform. <code class="highlighter-rouge">CountWords</code> is a <code class="highlighter-rouge">PTransform</code> subclass
+that consists of multiple nested transforms.</p>
 
-<p>In its <code class="highlighter-rouge">expand</code> method, the <code class="highlighter-rouge">CountWords</code> transform applies the following transform operations:</p>
+<p>In its <code class="highlighter-rouge">expand</code> method, the <code class="highlighter-rouge">CountWords</code> transform applies the following
+transform operations:</p>
 
 <ol>
-  <li>It applies a <code class="highlighter-rouge">ParDo</code> on the input <code class="highlighter-rouge">PCollection</code> of text lines, producing an output <code class="highlighter-rouge">PCollection</code> of individual words.</li>
-  <li>It applies the Beam SDK library transform <code class="highlighter-rouge">Count</code> on the <code class="highlighter-rouge">PCollection</code> of words, producing a <code class="highlighter-rouge">PCollection</code> of key/value pairs. Each key represents a word in the text, and each value represents the number of times that word appeared in the original data.</li>
+  <li>It applies a <code class="highlighter-rouge">ParDo</code> on the input <code class="highlighter-rouge">PCollection</code> of text lines, producing
+an output <code class="highlighter-rouge">PCollection</code> of individual words.</li>
+  <li>It applies the Beam SDK library transform <code class="highlighter-rouge">Count</code> on the <code class="highlighter-rouge">PCollection</code> of
+words, producing a <code class="highlighter-rouge">PCollection</code> of key/value pairs. Each key represents a
+word in the text, and each value represents the number of times that word
+appeared in the original data.</li>
 </ol>
 
-<p>Note that this is also an example of nested composite transforms, as <code class="highlighter-rouge">Count</code> is, by itself, a composite transform.</p>
+<p>Note that this is also an example of nested composite transforms, as <code class="highlighter-rouge">Count</code>
+is, by itself, a composite transform.</p>
 
-<p>Your composite transform’s parameters and return value must match the initial input type and final return type for the entire transform, even if the transform’s intermediate data changes type multiple times.</p>
+<p>Your composite transform’s parameters and return value must match the initial
+input type and final return type for the entire transform, even if the
+transform’s intermediate data changes type multiple times.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>  <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">CountWords</span> <span class="kd">extends</span> <span class="n">PTransform</span><span class="o">&lt;</span><span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span>
       <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;&gt;</span> <span class="o">{</span>
@@ -1361,13 +1926,20 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<h3 id="creating-a-composite-transform">Creating a composite transform</h3>
+<h4 id="creating-a-composite-transform">4.6.2. Creating a composite transform</h4>
 
-<p>To create your own composite transform, create a subclass of the <code class="highlighter-rouge">PTransform</code> class and override the <code class="highlighter-rouge">expand</code> method to specify the actual processing logic. You can then use this transform just as you would a built-in transform from the Beam SDK.</p>
+<p>To create your own composite transform, create a subclass of the <code class="highlighter-rouge">PTransform</code>
+class and override the <code class="highlighter-rouge">expand</code> method to specify the actual processing logic.
+You can then use this transform just as you would a built-in transform from the
+Beam SDK.</p>
 
-<p class="language-java">For the <code class="highlighter-rouge">PTransform</code> class type parameters, you pass the <code class="highlighter-rouge">PCollection</code> types that your transform takes as input, and produces as output. To take multiple <code class="highlighter-rouge">PCollection</code>s as input, or produce multiple <code class="highlighter-rouge">PCollection</code>s as output, use one of the multi-collection types for the relevant type parameter.</p>
+<p class="language-java">For the <code class="highlighter-rouge">PTransform</code> class type parameters, you pass the <code class="highlighter-rouge">PCollection</code> types
+that your transform takes as input, and produces as output. To take multiple
+<code class="highlighter-rouge">PCollection</code>s as input, or produce multiple <code class="highlighter-rouge">PCollection</code>s as output, use one
+of the multi-collection types for the relevant type parameter.</p>
 
-<p>The following code sample shows how to declare a <code class="highlighter-rouge">PTransform</code> that accepts a <code class="highlighter-rouge">PCollection</code> of <code class="highlighter-rouge">String</code>s for input, and outputs a <code class="highlighter-rouge">PCollection</code> of <code class="highlighter-rouge">Integer</code>s:</p>
+<p>The following code sample shows how to declare a <code class="highlighter-rouge">PTransform</code> that accepts a
+<code class="highlighter-rouge">PCollection</code> of <code class="highlighter-rouge">String</code>s for input, and outputs a <code class="highlighter-rouge">PCollection</code> of <code class="highlighter-rouge">Integer</code>s:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>  <span class="kd">static</span> <span class="kd">class</span> <span class="nc">ComputeWordLengths</span>
     <span class="kd">extends</span> <span class="n">PTransform</span><span class="o">&lt;</span><span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
@@ -1383,11 +1955,14 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<h4 id="overriding-the-expand-method">Overriding the expand method</h4>
+<p>Within your <code class="highlighter-rouge">PTransform</code> subclass, you’ll need to override the <code class="highlighter-rouge">expand</code> method.
+The <code class="highlighter-rouge">expand</code> method is where you add the processing logic for the <code class="highlighter-rouge">PTransform</code>.
+Your override of <code class="highlighter-rouge">expand</code> must accept the appropriate type of input
+<code class="highlighter-rouge">PCollection</code> as a parameter, and specify the output <code class="highlighter-rouge">PCollection</code> as the return
+value.</p>
 
-<p>Within your <code class="highlighter-rouge">PTransform</code> subclass, you’ll need to override the <code class="highlighter-rouge">expand</code> method. The <code class="highlighter-rouge">expand</code> method is where you add the processing logic for the <code class="highlighter-rouge">PTransform</code>. Your override of <code class="highlighter-rouge">expand</code> must accept the appropriate type of input <code class="highlighter-rouge">PCollection</code> as a parameter, and speci [...]
-
-<p>The following code sample shows how to override <code class="highlighter-rouge">expand</code> for the <code class="highlighter-rouge">ComputeWordLengths</code> class declared in the previous example:</p>
+<p>The following code sample shows how to override <code class="highlighter-rouge">expand</code> for the
+<code class="highlighter-rouge">ComputeWordLengths</code> class declared in the previous example:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>  <span class="kd">static</span> <span class="kd">class</span> <span class="nc">ComputeWordLengths</span>
       <span class="kd">extends</span> <span class="n">PTransform</span><span class="o">&lt;</span><span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;,</span> <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
@@ -1407,23 +1982,41 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<p>As long as you override the <code class="highlighter-rouge">expand</code> method in your <code class="highlighter-rouge">PTransform</code> subclass to accept the appropriate input <code class="highlighter-rouge">PCollection</code>(s) and return the corresponding output <code class="highlighter-rouge">PCollection</code>(s), you can include as many transforms as you want. These transforms can include core transforms, composite transforms, or the transforms included in the Beam SDK libra [...]
-
-<p><strong>Note:</strong> The <code class="highlighter-rouge">expand</code> method of a <code class="highlighter-rouge">PTransform</code> is not meant to be invoked directly by the user of a transform. Instead, you should call the <code class="highlighter-rouge">apply</code> method on the <code class="highlighter-rouge">PCollection</code> itself, with the transform as an argument. This allows transforms to be nested within the structure of your pipeline.</p>
+<p>As long as you override the <code class="highlighter-rouge">expand</code> method in your <code class="highlighter-rouge">PTransform</code> subclass to
+accept the appropriate input <code class="highlighter-rouge">PCollection</code>(s) and return the corresponding
+output <code class="highlighter-rouge">PCollection</code>(s), you can include as many transforms as you want. These
+transforms can include core transforms, composite transforms, or the transforms
+included in the Beam SDK libraries.</p>
 
-<h4 id="ptransform-style-guide">PTransform Style Guide</h4>
+<p><strong>Note:</strong> The <code class="highlighter-rouge">expand</code> method of a <code class="highlighter-rouge">PTransform</code> is not meant to be invoked
+directly by the user of a transform. Instead, you should call the <code class="highlighter-rouge">apply</code> method
+on the <code class="highlighter-rouge">PCollection</code> itself, with the transform as an argument. This allows
+transforms to be nested within the structure of your pipeline.</p>
 
-<p>When you create a new <code class="highlighter-rouge">PTransform</code>, be sure to read the <a href="/contribute/ptransform-style-guide/">PTransform Style Guide</a>. The guide contains additional helpful information such as style guidelines, logging and testing guidance, and language-specific considerations.</p>
+<h4 id="ptransform-style-guide">4.6.3. PTransform Style Guide</h4>
 
-<h2 id="a-nameioapipeline-io"><a name="io"></a>Pipeline I/O</h2>
+<p>The <a href="/contribute/ptransform-style-guide/">PTransform Style Guide</a>
+contains additional information not included here, such as style guidelines,
+logging and testing guidance, and language-specific considerations.  The guide
+is a useful starting point when you want to write new composite PTransforms.</p>
 
-<p>When you create a pipeline, you often need to read data from some external source, such as a file in external data sink or a database. Likewise, you may want your pipeline to output its result data to a similar external data sink. Beam provides read and write transforms for a <a href="/documentation/io/built-in/">number of common data storage types</a>. If you want your pipeline to read from or write to a data storage format that isn’t supported by the built-in transforms, you can <a  [...]
+<h2 id="pipeline-io">5. Pipeline I/O</h2>
 
-<h3 id="reading-input-data">Reading input data</h3>
+<p>When you create a pipeline, you often need to read data from some external
+source, such as a file in external data sink or a database. Likewise, you may
+want your pipeline to output its result data to a similar external data sink.
+Beam provides read and write transforms for a <a href="/documentation/io/built-in/">number of common data storage
+types</a>. If you want your pipeline
+to read from or write to a data storage format that isn’t supported by the
+built-in transforms, you can <a href="/documentation/io/io-toc/">implement your own read and write
+transforms</a>.</p>
 
-<p>Read transforms read data from an external source and return a <code class="highlighter-rouge">PCollection</code> representation of the data for use by your pipeline. You can use a read transform at any point while constructing your pipeline to create a new <code class="highlighter-rouge">PCollection</code>, though it will be most common at the start of your pipeline.</p>
+<h3 id="reading-input-data">5.1. Reading input data</h3>
 
-<h4 id="using-a-read-transform">Using a read transform:</h4>
+<p>Read transforms read data from an external source and return a <code class="highlighter-rouge">PCollection</code>
+representation of the data for use by your pipeline. You can use a read
+transform at any point while constructing your pipeline to create a new
+<code class="highlighter-rouge">PCollection</code>, though it will be most common at the start of your pipeline.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">lines</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">from</span><span clas [...]
 </code></pre>
@@ -1433,11 +2026,12 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<h3 id="writing-output-data">Writing output data</h3>
-
-<p>Write transforms write the data in a <code class="highlighter-rouge">PCollection</code> to an external data source. You will most often use write transforms at the end of your pipeline to output your pipeline’s final results. However, you can use a write transform to output a <code class="highlighter-rouge">PCollection</code>’s data at any point in your pipeline.</p>
+<h3 id="writing-output-data">5.2. Writing output data</h3>
 
-<h4 id="using-a-write-transform">Using a Write transform:</h4>
+<p>Write transforms write the data in a <code class="highlighter-rouge">PCollection</code> to an external data source.
+You will most often use write transforms at the end of your pipeline to output
+your pipeline’s final results. However, you can use a write transform to output
+a <code class="highlighter-rouge">PCollection</code>’s data at any point in your pipeline.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">output</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span class="na">write</span><span class="o">().</span><span class="na">to</span><span class="o">(</span><span class="s">"gs://some/outputData"</span><span class="o">));</span>
 </code></pre>
@@ -1447,11 +2041,15 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<h3 id="file-based-input-and-output-data">File-based input and output data</h3>
+<h3 id="file-based-input-and-output-data">5.3. File-based input and output data</h3>
 
-<h4 id="reading-from-multiple-locations">Reading from multiple locations:</h4>
+<h4 id="reading-from-multiple-locations">5.3.1. Reading from multiple locations</h4>
 
-<p>Many read transforms support reading from multiple input files matching a glob operator you provide. Note that glob operators are filesystem-specific and obey filesystem-specific consistency models. The following TextIO example uses a glob operator (*) to read all matching input files that have prefix “input-“ and the suffix “.csv” in the given location:</p>
+<p>Many read transforms support reading from multiple input files matching a glob
+operator you provide. Note that glob operators are filesystem-specific and obey
+filesystem-specific consistency models. The following TextIO example uses a glob
+operator (*) to read all matching input files that have prefix “input-“ and the
+suffix “.csv” in the given location:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">p</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="err">“</span><span class="n">ReadFromText</span><span class="err">”</span><span class="o">,</span>
     <span class="n">TextIO</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">from</span><span class="o">(</span><span class="s">"protocol://my_bucket/path/to/input-*.csv"</span><span class="o">);</span>
@@ -1463,13 +2061,20 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<p>To read data from disparate sources into a single <code class="highlighter-rouge">PCollection</code>, read each one independently and then use the <a href="#transforms-flatten-partition">Flatten</a> transform to create a single <code class="highlighter-rouge">PCollection</code>.</p>
+<p>To read data from disparate sources into a single <code class="highlighter-rouge">PCollection</code>, read each one
+independently and then use the <a href="#flatten">Flatten</a> transform to create a single
+<code class="highlighter-rouge">PCollection</code>.</p>
 
-<h4 id="writing-to-multiple-output-files">Writing to multiple output files:</h4>
+<h4 id="writing-to-multiple-output-files">5.3.2. Writing to multiple output files</h4>
 
-<p>For file-based output data, write transforms write to multiple output files by default. When you pass an output file name to a write transform, the file name is used as the prefix for all output files that the write transform produces. You can append a suffix to each output file by specifying a suffix.</p>
+<p>For file-based output data, write transforms write to multiple output files by
+default. When you pass an output file name to a write transform, the file name
+is used as the prefix for all output files that the write transform produces.
+You can append a suffix to each output file by specifying a suffix.</p>
 
-<p>The following write transform example writes multiple output files to a location. Each file has the prefix “numbers”, a numeric tag, and the suffix “.csv”.</p>
+<p>The following write transform example writes multiple output files to a
+location. Each file has the prefix “numbers”, a numeric tag, and the suffix
+“.csv”.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">records</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"WriteToText"</span><span class="o">,</span>
     <span class="n">TextIO</span><span class="o">.</span><span class="na">write</span><span class="o">().</span><span class="na">to</span><span class="o">(</span><span class="s">"protocol://my_bucket/path/to/numbers"</span><span class="o">)</span>
@@ -1483,53 +2088,115 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<h3 id="beam-provided-io-transforms">Beam-provided I/O Transforms</h3>
-<p>See the  <a href="/documentation/io/built-in/">Beam-provided I/O Transforms</a> page for a list of the currently available I/O transforms.</p>
+<h3 id="beam-provided-io-transforms">5.4. Beam-provided I/O transforms</h3>
 
-<h2 id="a-namecodersadata-encoding-and-type-safety"><a name="coders"></a>Data encoding and type safety</h2>
+<p>See the <a href="/documentation/io/built-in/">Beam-provided I/O Transforms</a>
+page for a list of the currently available I/O transforms.</p>
 
-<p>When Beam runners execute your pipeline, they often need to materialize the intermediate data in your <code class="highlighter-rouge">PCollection</code>s, which requires converting elements to and from byte strings. The Beam SDKs use objects called <code class="highlighter-rouge">Coder</code>s to describe how the elements of a given <code class="highlighter-rouge">PCollection</code> may be encoded and decoded.</p>
+<h2 id="data-encoding-and-type-safety">6. Data encoding and type safety</h2>
+
+<p>When Beam runners execute your pipeline, they often need to materialize the
+intermediate data in your <code class="highlighter-rouge">PCollection</code>s, which requires converting elements to
+and from byte strings. The Beam SDKs use objects called <code class="highlighter-rouge">Coder</code>s to describe how
+the elements of a given <code class="highlighter-rouge">PCollection</code> may be encoded and decoded.</p>
 
 <blockquote>
-  <p>Note that coders are unrelated to parsing or formatting data when interacting with external data sources or sinks. Such parsing or formatting should typically be done explicitly, using transforms such as <code class="highlighter-rouge">ParDo</code> or <code class="highlighter-rouge">MapElements</code>.</p>
+  <p>Note that coders are unrelated to parsing or formatting data when interacting
+with external data sources or sinks. Such parsing or formatting should
+typically be done explicitly, using transforms such as <code class="highlighter-rouge">ParDo</code> or
+<code class="highlighter-rouge">MapElements</code>.</p>
 </blockquote>
 
-<p class="language-java">In the Beam SDK for Java, the type <code class="highlighter-rouge">Coder</code> provides the methods required for encoding and decoding data. The SDK for Java provides a number of Coder subclasses that work with a variety of standard Java types, such as Integer, Long, Double, StringUtf8 and more. You can find all of the available Coder subclasses in the <a href="https://github.com/apache/beam/tree/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders">Co [...]
+<p class="language-java">In the Beam SDK for Java, the type <code class="highlighter-rouge">Coder</code> provides the methods required for
+encoding and decoding data. The SDK for Java provides a number of Coder
+subclasses that work with a variety of standard Java types, such as Integer,
+Long, Double, StringUtf8 and more. You can find all of the available Coder
+subclasses in the <a href="https://github.com/apache/beam/tree/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders">Coder package</a>.</p>
 
-<p class="language-py">In the Beam SDK for Python, the type <code class="highlighter-rouge">Coder</code> provides the methods required for encoding and decoding data. The SDK for Python provides a number of Coder subclasses that work with a variety of standard Python types, such as primitive types, Tuple, Iterable, StringUtf8 and more. You can find all of the available Coder subclasses in the <a href="https://github.com/apache/beam/tree/master/sdks/python/apache_beam/coders">apache_beam. [...]
+<p class="language-py">In the Beam SDK for Python, the type <code class="highlighter-rouge">Coder</code> provides the methods required for
+encoding and decoding data. The SDK for Python provides a number of Coder
+subclasses that work with a variety of standard Python types, such as primitive
+types, Tuple, Iterable, StringUtf8 and more. You can find all of the available
+Coder subclasses in the
+<a href="https://github.com/apache/beam/tree/master/sdks/python/apache_beam/coders">apache_beam.coders</a>
+package.</p>
 
 <blockquote>
-  <p>Note that coders do not necessarily have a 1:1 relationship with types. For example, the Integer type can have multiple valid coders, and input and output data can use different Integer coders. A transform might have Integer-typed input data that uses BigEndianIntegerCoder, and Integer-typed output data that uses VarIntCoder.</p>
+  <p>Note that coders do not necessarily have a 1:1 relationship with types. For
+example, the Integer type can have multiple valid coders, and input and output
+data can use different Integer coders. A transform might have Integer-typed
+input data that uses BigEndianIntegerCoder, and Integer-typed output data that
+uses VarIntCoder.</p>
 </blockquote>
 
-<h3 id="specifying-coders">Specifying coders</h3>
-<p>The Beam SDKs require a coder for every <code class="highlighter-rouge">PCollection</code> in your pipeline. In most cases, the Beam SDK is able to automatically infer a <code class="highlighter-rouge">Coder</code> for a <code class="highlighter-rouge">PCollection</code> based on its element type or the transform that produces it, however, in some cases the pipeline author will need to specify a <code class="highlighter-rouge">Coder</code> explicitly, or develop a <code class="highlig [...]
-
-<p class="language-java">You can explicitly set the coder for an existing <code class="highlighter-rouge">PCollection</code> by using the method <code class="highlighter-rouge">PCollection.setCoder</code>. Note that you cannot call <code class="highlighter-rouge">setCoder</code> on a <code class="highlighter-rouge">PCollection</code> that has been finalized (e.g. by calling <code class="highlighter-rouge">.apply</code> on it).</p>
-
-<p class="language-java">You can get the coder for an existing <code class="highlighter-rouge">PCollection</code> by using the method <code class="highlighter-rouge">getCoder</code>. This method will fail with an <code class="highlighter-rouge">IllegalStateException</code> if a coder has not been set and cannot be inferred for the given <code class="highlighter-rouge">PCollection</code>.</p>
-
-<p>Beam SDKs use a variety of mechanisms when attempting to automatically infer the <code class="highlighter-rouge">Coder</code> for a <code class="highlighter-rouge">PCollection</code>.</p>
-
-<p class="language-java">Each pipeline object has a <code class="highlighter-rouge">CoderRegistry</code>. The <code class="highlighter-rouge">CoderRegistry</code> represents a mapping of Java types to the default coders that the pipeline should use for <code class="highlighter-rouge">PCollection</code>s of each type.</p>
-
-<p class="language-py">The Beam SDK for Python has a <code class="highlighter-rouge">CoderRegistry</code> that represents a mapping of Python types to the default coder that should be used for <code class="highlighter-rouge">PCollection</code>s of each type.</p>
-
-<p class="language-java">By default, the Beam SDK for Java automatically infers the <code class="highlighter-rouge">Coder</code> for the elements of a <code class="highlighter-rouge">PCollection</code> produced by a <code class="highlighter-rouge">PTransform</code> using the type parameter from the transform’s function object, such as <code class="highlighter-rouge">DoFn</code>. In the case of <code class="highlighter-rouge">ParDo</code>, for example, a <code class="highlighter-rouge">Do [...]
-
-<p class="language-py">By default, the Beam SDK for Python automatically infers the <code class="highlighter-rouge">Coder</code> for the elements of an output <code class="highlighter-rouge">PCollection</code> using the typehints from the transform’s function object, such as <code class="highlighter-rouge">DoFn</code>. In the case of <code class="highlighter-rouge">ParDo</code>, for example a <code class="highlighter-rouge">DoFn</code> with the typehints <code class="highlighter-rouge">@ [...]
+<h3 id="specifying-coders">6.1. Specifying coders</h3>
+
+<p>The Beam SDKs require a coder for every <code class="highlighter-rouge">PCollection</code> in your pipeline. In most
+cases, the Beam SDK is able to automatically infer a <code class="highlighter-rouge">Coder</code> for a <code class="highlighter-rouge">PCollection</code>
+based on its element type or the transform that produces it, however, in some
+cases the pipeline author will need to specify a <code class="highlighter-rouge">Coder</code> explicitly, or develop
+a <code class="highlighter-rouge">Coder</code> for their custom type.</p>
+
+<p class="language-java">You can explicitly set the coder for an existing <code class="highlighter-rouge">PCollection</code> by using the
+method <code class="highlighter-rouge">PCollection.setCoder</code>. Note that you cannot call <code class="highlighter-rouge">setCoder</code> on a
+<code class="highlighter-rouge">PCollection</code> that has been finalized (e.g. by calling <code class="highlighter-rouge">.apply</code> on it).</p>
+
+<p class="language-java">You can get the coder for an existing <code class="highlighter-rouge">PCollection</code> by using the method
+<code class="highlighter-rouge">getCoder</code>. This method will fail with an <code class="highlighter-rouge">IllegalStateException</code> if a coder has
+not been set and cannot be inferred for the given <code class="highlighter-rouge">PCollection</code>.</p>
+
+<p>Beam SDKs use a variety of mechanisms when attempting to automatically infer the
+<code class="highlighter-rouge">Coder</code> for a <code class="highlighter-rouge">PCollection</code>.</p>
+
+<p class="language-java">Each pipeline object has a <code class="highlighter-rouge">CoderRegistry</code>. The <code class="highlighter-rouge">CoderRegistry</code> represents a
+mapping of Java types to the default coders that the pipeline should use for
+<code class="highlighter-rouge">PCollection</code>s of each type.</p>
+
+<p class="language-py">The Beam SDK for Python has a <code class="highlighter-rouge">CoderRegistry</code> that represents a mapping of
+Python types to the default coder that should be used for <code class="highlighter-rouge">PCollection</code>s of each
+type.</p>
+
+<p class="language-java">By default, the Beam SDK for Java automatically infers the <code class="highlighter-rouge">Coder</code> for the
+elements of a <code class="highlighter-rouge">PCollection</code> produced by a <code class="highlighter-rouge">PTransform</code> using the type parameter
+from the transform’s function object, such as <code class="highlighter-rouge">DoFn</code>. In the case of <code class="highlighter-rouge">ParDo</code>,
+for example, a <code class="highlighter-rouge">DoFn&lt;Integer, String&gt;</code> function object accepts an input element
+of type <code class="highlighter-rouge">Integer</code> and produces an output element of type <code class="highlighter-rouge">String</code>. In such a
+case, the SDK for Java will automatically infer the default <code class="highlighter-rouge">Coder</code> for the
+output <code class="highlighter-rouge">PCollection&lt;String&gt;</code> (in the default pipeline <code class="highlighter-rouge">CoderRegistry</code>, this is
+<code class="highlighter-rouge">StringUtf8Coder</code>).</p>
+
+<p class="language-py">By default, the Beam SDK for Python automatically infers the <code class="highlighter-rouge">Coder</code> for the
+elements of an output <code class="highlighter-rouge">PCollection</code> using the typehints from the transform’s
+function object, such as <code class="highlighter-rouge">DoFn</code>. In the case of <code class="highlighter-rouge">ParDo</code>, for example a <code class="highlighter-rouge">DoFn</code>
+with the typehints <code class="highlighter-rouge">@beam.typehints.with_input_types(int)</code> and
+<code class="highlighter-rouge">@beam.typehints.with_output_types(str)</code> accepts an input element of type int
+and produces an output element of type str. In such a case, the Beam SDK for
+Python will automatically infer the default <code class="highlighter-rouge">Coder</code> for the output <code class="highlighter-rouge">PCollection</code>
+(in the default pipeline <code class="highlighter-rouge">CoderRegistry</code>, this is <code class="highlighter-rouge">BytesCoder</code>).</p>
 
 <blockquote>
-  <p>NOTE: If you create your <code class="highlighter-rouge">PCollection</code> from in-memory data by using the <code class="highlighter-rouge">Create</code> transform, you cannot rely on coder inference and default coders. <code class="highlighter-rouge">Create</code> does not have access to any typing information for its arguments, and may not be able to infer a coder if the argument list contains a value whose exact run-time class doesn’t have a default coder registered.</p>
+  <p>NOTE: If you create your <code class="highlighter-rouge">PCollection</code> from in-memory data by using the
+<code class="highlighter-rouge">Create</code> transform, you cannot rely on coder inference and default coders.
+<code class="highlighter-rouge">Create</code> does not have access to any typing information for its arguments, and
+may not be able to infer a coder if the argument list contains a value whose
+exact run-time class doesn’t have a default coder registered.</p>
 </blockquote>
 
-<p class="language-java">When using <code class="highlighter-rouge">Create</code>, the simplest way to ensure that you have the correct coder is by invoking <code class="highlighter-rouge">withCoder</code> when you apply the <code class="highlighter-rouge">Create</code> transform.</p>
+<p class="language-java">When using <code class="highlighter-rouge">Create</code>, the simplest way to ensure that you have the correct coder
+is by invoking <code class="highlighter-rouge">withCoder</code> when you apply the <code class="highlighter-rouge">Create</code> transform.</p>
 
-<h4 id="default-coders-and-the-coderregistry">Default coders and the CoderRegistry</h4>
+<h3 id="default-coders-and-the-coderregistry">6.2. Default coders and the CoderRegistry</h3>
 
-<p>Each Pipeline object has a <code class="highlighter-rouge">CoderRegistry</code> object, which maps language types to the default coder the pipeline should use for those types. You can use the <code class="highlighter-rouge">CoderRegistry</code> yourself to look up the default coder for a given type, or to register a new default coder for a given type.</p>
+<p>Each Pipeline object has a <code class="highlighter-rouge">CoderRegistry</code> object, which maps language types to
+the default coder the pipeline should use for those types. You can use the
+<code class="highlighter-rouge">CoderRegistry</code> yourself to look up the default coder for a given type, or to
+register a new default coder for a given type.</p>
 
-<p><code class="highlighter-rouge">CoderRegistry</code> contains a default mapping of coders to standard <span class="language-java">Java</span> <span class="language-py">Python</span> types for any pipeline you create using the Beam SDK for <span class="language-java">Java</span> <span class="language-py">Python</span>. The following table shows the standard mapping:</p>
+<p><code class="highlighter-rouge">CoderRegistry</code> contains a default mapping of coders to standard
+<span class="language-java">Java</span><span class="language-py">Python</span>
+types for any pipeline you create using the Beam SDK for
+<span class="language-java">Java</span><span class="language-py">Python</span>.
+The following table shows the standard mapping:</p>
 
 <table class="language-java">
   <thead>
@@ -1625,17 +2292,36 @@ guest, [[], [order4]]
   </tbody>
 </table>
 
-<h5 id="looking-up-a-default-coder">Looking up a default coder</h5>
-
-<p class="language-java">You can use the method <code class="highlighter-rouge">CoderRegistry.getDefaultCoder</code> to determine the default Coder for a Java type. You can access the <code class="highlighter-rouge">CoderRegistry</code> for a given pipeline by using the method <code class="highlighter-rouge">Pipeline.getCoderRegistry</code>. This allows you to determine (or set) the default Coder for a Java type on a per-pipeline basis: i.e. “for this pipeline, verify that Integer values [...]
-
-<p class="language-py">You can use the method <code class="highlighter-rouge">CoderRegistry.get_coder</code> to determine the default Coder for a Python type. You can use <code class="highlighter-rouge">coders.registry</code> to access the <code class="highlighter-rouge">CoderRegistry</code>. This allows you to determine (or set) the default Coder for a Python type.</p>
-
-<h5 id="setting-the-default-coder-for-a-type">Setting the default coder for a type</h5>
-
-<p>To set the default Coder for a <span class="language-java">Java</span> <span class="language-py">Python</span> type for a particular pipeline, you obtain and modify the pipeline’s <code class="highlighter-rouge">CoderRegistry</code>. You use the method <span class="language-java"><code class="highlighter-rouge">Pipeline.getCoderRegistry</code></span> <span class="language-py"><code class="highlighter-rouge">coders.registry</code></span> to get the <code class="highlighter-rouge">Coder [...]
-
-<p>The following example code demonstrates how to set a default Coder, in this case <code class="highlighter-rouge">BigEndianIntegerCoder</code>, for <span class="language-java">Integer</span> <span class="language-py">int</span> values for a pipeline.</p>
+<h4 id="looking-up-a-default-coder">6.2.1. Looking up a default coder</h4>
+
+<p class="language-java">You can use the method <code class="highlighter-rouge">CoderRegistry.getDefaultCoder</code> to determine the default
+Coder for a Java type. You can access the <code class="highlighter-rouge">CoderRegistry</code> for a given pipeline
+by using the method <code class="highlighter-rouge">Pipeline.getCoderRegistry</code>. This allows you to determine
+(or set) the default Coder for a Java type on a per-pipeline basis: i.e. “for
+this pipeline, verify that Integer values are encoded using
+<code class="highlighter-rouge">BigEndianIntegerCoder</code>.”</p>
+
+<p class="language-py">You can use the method <code class="highlighter-rouge">CoderRegistry.get_coder</code> to determine the default Coder
+for a Python type. You can use <code class="highlighter-rouge">coders.registry</code> to access the <code class="highlighter-rouge">CoderRegistry</code>.
+This allows you to determine (or set) the default Coder for a Python type.</p>
+
+<h4 id="setting-the-default-coder-for-a-type">6.2.2. Setting the default coder for a type</h4>
+
+<p>To set the default Coder for a
+<span class="language-java">Java</span><span class="language-py">Python</span>
+type for a particular pipeline, you obtain and modify the pipeline’s
+<code class="highlighter-rouge">CoderRegistry</code>. You use the method
+<span class="language-java"><code class="highlighter-rouge">Pipeline.getCoderRegistry</code></span>
+<span class="language-py"><code class="highlighter-rouge">coders.registry</code></span>
+to get the <code class="highlighter-rouge">CoderRegistry</code> object, and then use the method
+<span class="language-java"><code class="highlighter-rouge">CoderRegistry.registerCoder</code></span>
+<span class="language-py"><code class="highlighter-rouge">CoderRegistry.register_coder</code></span>
+to register a new <code class="highlighter-rouge">Coder</code> for the target type.</p>
+
+<p>The following example code demonstrates how to set a default Coder, in this case
+<code class="highlighter-rouge">BigEndianIntegerCoder</code>, for
+<span class="language-java">Integer</span><span class="language-py">int</span>
+values for a pipeline.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PipelineOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="n">PipelineOptionsFactory</span><span class="o">.</span><span class="na">create</span><span class="o">();</span>
 <span class="n">Pipeline</span> <span class="n">p</span> <span class="o">=</span> <span class="n">Pipeline</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">options</span><span class="o">);</span>
@@ -1649,9 +2335,12 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<h5 id="annotating-a-custom-data-type-with-a-default-coder">Annotating a custom data type with a default coder</h5>
+<h4 id="annotating-a-custom-data-type-with-a-default-coder">6.2.3. Annotating a custom data type with a default coder</h4>
 
-<p class="language-java">If your pipeline program defines a custom data type, you can use the <code class="highlighter-rouge">@DefaultCoder</code> annotation to specify the coder to use with that type. For example, let’s say you have a custom data type for which you want to use <code class="highlighter-rouge">SerializableCoder</code>. You can use the <code class="highlighter-rouge">@DefaultCoder</code> annotation as follows:</p>
+<p class="language-java">If your pipeline program defines a custom data type, you can use the
+<code class="highlighter-rouge">@DefaultCoder</code> annotation to specify the coder to use with that type. For
+example, let’s say you have a custom data type for which you want to use
+<code class="highlighter-rouge">SerializableCoder</code>. You can use the <code class="highlighter-rouge">@DefaultCoder</code> annotation as follows:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="nd">@DefaultCoder</span><span class="o">(</span><span class="n">AvroCoder</span><span class="o">.</span><span class="na">class</span><span class="o">)</span>
 <span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCustomDataType</span> <span class="o">{</span>
@@ -1660,7 +2349,9 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<p class="language-java">If you’ve created a custom coder to match your data type, and you want to use the <code class="highlighter-rouge">@DefaultCoder</code> annotation, your coder class must implement a static <code class="highlighter-rouge">Coder.of(Class&lt;T&gt;)</code> factory method.</p>
+<p class="language-java">If you’ve created a custom coder to match your data type, and you want to use
+the <code class="highlighter-rouge">@DefaultCoder</code> annotation, your coder class must implement a static
+<code class="highlighter-rouge">Coder.of(Class&lt;T&gt;)</code> factory method.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCustomCoder</span> <span class="kd">implements</span> <span class="n">Coder</span> <span class="o">{</span>
   <span class="kd">public</span> <span class="kd">static</span> <span class="n">Coder</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="nf">of</span><span class="o">(</span><span class="n">Class</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">clazz</span><span class="o">)</span> <span class="o">{...}</span>
@@ -1674,55 +2365,111 @@ guest, [[], [order4]]
 </code></pre>
 </div>
 
-<p class="language-py">The Beam SDK for Python does not support annotating data types with a default coder. If you would like to set a default coder, use the method described in the previous section, <em>Setting the default coder for a type</em>.</p>
+<p class="language-py">The Beam SDK for Python does not support annotating data types with a default
+coder. If you would like to set a default coder, use the method described in the
+previous section, <em>Setting the default coder for a type</em>.</p>
 
-<h2 id="a-namewindowingaworking-with-windowing"><a name="windowing"></a>Working with windowing</h2>
+<h2 id="windowing">7. Windowing</h2>
 
-<p>Windowing subdivides a <code class="highlighter-rouge">PCollection</code> according to the timestamps of its individual elements. Transforms that aggregate multiple elements, such as <code class="highlighter-rouge">GroupByKey</code> and <code class="highlighter-rouge">Combine</code>, work implicitly on a per-window basis — they process each <code class="highlighter-rouge">PCollection</code> as a succession of multiple, finite windows, though the entire collection itself may be of unbo [...]
+<p>Windowing subdivides a <code class="highlighter-rouge">PCollection</code> according to the timestamps of its
+individual elements. Transforms that aggregate multiple elements, such as
+<code class="highlighter-rouge">GroupByKey</code> and <code class="highlighter-rouge">Combine</code>, work implicitly on a per-window basis — they process
+each <code class="highlighter-rouge">PCollection</code> as a succession of multiple, finite windows, though the
+entire collection itself may be of unbounded size.</p>
 
-<p>A related concept, called <strong>triggers</strong>, determines when to emit the results of aggregation as unbounded data arrives. You can use triggers to refine the windowing strategy for your <code class="highlighter-rouge">PCollection</code>. Triggers allow you to deal with late-arriving data or to provide early results. See the <a href="#triggers">triggers</a> section for more information.</p>
+<p>A related concept, called <strong>triggers</strong>, determines when to emit the results of
+aggregation as unbounded data arrives. You can use triggers to refine the
+windowing strategy for your <code class="highlighter-rouge">PCollection</code>. Triggers allow you to deal with
+late-arriving data or to provide early results. See the <a href="#triggers">triggers</a>
+section for more information.</p>
 
-<h3 id="windowing-basics">Windowing basics</h3>
+<h3 id="windowing-basics">7.1. Windowing basics</h3>
 
-<p>Some Beam transforms, such as <code class="highlighter-rouge">GroupByKey</code> and <code class="highlighter-rouge">Combine</code>, group multiple elements by a common key. Ordinarily, that grouping operation groups all of the elements that have the same key within the entire data set. With an unbounded data set, it is impossible to collect all of the elements, since new elements are constantly being added and may be infinitely many (e.g. streaming data). If you are working with unbou [...]
+<p>Some Beam transforms, such as <code class="highlighter-rouge">GroupByKey</code> and <code class="highlighter-rouge">Combine</code>, group multiple
+elements by a common key. Ordinarily, that grouping operation groups all of the
+elements that have the same key within the entire data set. With an unbounded
+data set, it is impossible to collect all of the elements, since new elements
+are constantly being added and may be infinitely many (e.g. streaming data). If
+you are working with unbounded <code class="highlighter-rouge">PCollection</code>s, windowing is especially useful.</p>
 
-<p>In the Beam model, any <code class="highlighter-rouge">PCollection</code> (including unbounded <code class="highlighter-rouge">PCollection</code>s) can be subdivided into logical windows. Each element in a <code class="highlighter-rouge">PCollection</code> is assigned to one or more windows according to the <code class="highlighter-rouge">PCollection</code>’s windowing function, and each individual window contains a finite number of elements. Grouping transforms then consider each <co [...]
+<p>In the Beam model, any <code class="highlighter-rouge">PCollection</code> (including unbounded <code class="highlighter-rouge">PCollection</code>s) can be
+subdivided into logical windows. Each element in a <code class="highlighter-rouge">PCollection</code> is assigned to
+one or more windows according to the <code class="highlighter-rouge">PCollection</code>’s windowing function, and
+each individual window contains a finite number of elements. Grouping transforms
+then consider each <code class="highlighter-rouge">PCollection</code>’s elements on a per-window basis. <code class="highlighter-rouge">GroupByKey</code>,
+for example, implicitly groups the elements of a <code class="highlighter-rouge">PCollection</code> by <em>key and
+window</em>.</p>
 
-<p><strong>Caution:</strong> Beam’s default windowing behavior is to assign all elements of a <code class="highlighter-rouge">PCollection</code> to a single, global window and discard late data, <em>even for unbounded <code class="highlighter-rouge">PCollection</code>s</em>. Before you use a grouping transform such as <code class="highlighter-rouge">GroupByKey</code> on an unbounded <code class="highlighter-rouge">PCollection</code>, you must do at least one of the following:</p>
+<p><strong>Caution:</strong> Beam’s default windowing behavior is to assign all elements of a
+<code class="highlighter-rouge">PCollection</code> to a single, global window and discard late data, <em>even for
+unbounded <code class="highlighter-rouge">PCollection</code>s</em>. Before you use a grouping transform such as
+<code class="highlighter-rouge">GroupByKey</code> on an unbounded <code class="highlighter-rouge">PCollection</code>, you must do at least one of the
+following:</p>
 <ul>
-  <li>Set a non-global windowing function. See <a href="#setwindowingfunction">Setting your PCollection’s windowing function</a>.</li>
-  <li>Set a non-default <a href="#triggers">trigger</a>. This allows the global window to emit results under other conditions, since the default windowing behavior (waiting for all data to arrive) will never occur.</li>
+  <li>Set a non-global windowing function. See <a href="#setting-your-pcollections-windowing-function">Setting your PCollection’s
+windowing function</a>.</li>
+  <li>Set a non-default <a href="#triggers">trigger</a>. This allows the global window to emit
+results under other conditions, since the default windowing behavior (waiting
+for all data to arrive) will never occur.</li>
 </ul>
 
-<p>If you don’t set a non-global windowing function or a non-default trigger for your unbounded <code class="highlighter-rouge">PCollection</code> and subsequently use a grouping transform such as <code class="highlighter-rouge">GroupByKey</code> or <code class="highlighter-rouge">Combine</code>, your pipeline will generate an error upon construction and your job will fail.</p>
+<p>If you don’t set a non-global windowing function or a non-default trigger for
+your unbounded <code class="highlighter-rouge">PCollection</code> and subsequently use a grouping transform such as
+<code class="highlighter-rouge">GroupByKey</code> or <code class="highlighter-rouge">Combine</code>, your pipeline will generate an error upon
+construction and your job will fail.</p>
 
-<h4 id="windowing-constraints">Windowing constraints</h4>
+<h4 id="windowing-constraints">7.1.1. Windowing constraints</h4>
 
-<p>After you set the windowing function for a <code class="highlighter-rouge">PCollection</code>, the elements’ windows are used the next time you apply a grouping transform to that <code class="highlighter-rouge">PCollection</code>. Window grouping occurs on an as-needed basis. If you set a windowing function using the <code class="highlighter-rouge">Window</code> transform, each element is assigned to a window, but the windows are not considered until <code class="highlighter-rouge">Gr [...]
-Consider the example pipeline in the figure below:</p>
+<p>After you set the windowing function for a <code class="highlighter-rouge">PCollection</code>, the elements’ windows
+are used the next time you apply a grouping transform to that <code class="highlighter-rouge">PCollection</code>.
+Window grouping occurs on an as-needed basis. If you set a windowing function
+using the <code class="highlighter-rouge">Window</code> transform, each element is assigned to a window, but the
+windows are not considered until <code class="highlighter-rouge">GroupByKey</code> or <code class="highlighter-rouge">Combine</code> aggregates across a
+window and key. This can have different effects on your pipeline.  Consider the
+example pipeline in the figure below:</p>
 
 <p><img src="/images/windowing-pipeline-unbounded.png" alt="Diagram of pipeline applying windowing" title="Pipeline applying windowing" /></p>
 
 <p><strong>Figure:</strong> Pipeline applying windowing</p>
 
-<p>In the above pipeline, we create an unbounded <code class="highlighter-rouge">PCollection</code> by reading a set of key/value pairs using <code class="highlighter-rouge">KafkaIO</code>, and then apply a windowing function to that collection using the <code class="highlighter-rouge">Window</code> transform. We then apply a <code class="highlighter-rouge">ParDo</code> to the the collection, and then later group the result of that <code class="highlighter-rouge">ParDo</code> using <code [...]
-Subsequent transforms, however, are applied to the result of the <code class="highlighter-rouge">GroupByKey</code> – data is grouped by both key and window.</p>
+<p>In the above pipeline, we create an unbounded <code class="highlighter-rouge">PCollection</code> by reading a set of
+key/value pairs using <code class="highlighter-rouge">KafkaIO</code>, and then apply a windowing function to that
+collection using the <code class="highlighter-rouge">Window</code> transform. We then apply a <code class="highlighter-rouge">ParDo</code> to the the
+collection, and then later group the result of that <code class="highlighter-rouge">ParDo</code> using <code class="highlighter-rouge">GroupByKey</code>.
+The windowing function has no effect on the <code class="highlighter-rouge">ParDo</code> transform, because the
+windows are not actually used until they’re needed for the <code class="highlighter-rouge">GroupByKey</code>.
+Subsequent transforms, however, are applied to the result of the <code class="highlighter-rouge">GroupByKey</code> –
+data is grouped by both key and window.</p>
 
-<h4 id="using-windowing-with-bounded-pcollections">Using windowing with bounded PCollections</h4>
+<h4 id="using-windowing-with-bounded-pcollections">7.1.2. Using windowing with bounded PCollections</h4>
 
-<p>You can use windowing with fixed-size data sets in <strong>bounded</strong> <code class="highlighter-rouge">PCollection</code>s. However, note that windowing considers only the implicit timestamps attached to each element of a <code class="highlighter-rouge">PCollection</code>, and data sources that create fixed data sets (such as <code class="highlighter-rouge">TextIO</code>) assign the same timestamp to every element. This means that all the elements are by default part of a single, [...]
+<p>You can use windowing with fixed-size data sets in <strong>bounded</strong> <code class="highlighter-rouge">PCollection</code>s.
+However, note that windowing considers only the implicit timestamps attached to
+each element of a <code class="highlighter-rouge">PCollection</code>, and data sources that create fixed data sets
+(such as <code class="highlighter-rouge">TextIO</code>) assign the same timestamp to every element. This means that
+all the elements are by default part of a single, global window.</p>
 
-<p>To use windowing with fixed data sets, you can assign your own timestamps to each element. To assign timestamps to elements, use a <code class="highlighter-rouge">ParDo</code> transform with a <code class="highlighter-rouge">DoFn</code> that outputs each element with a new timestamp (for example, the <a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/transforms/WithTimestamps.html">WithTimestamps</a> transform in the Beam SDK for Java).</p>
+<p>To use windowing with fixed data sets, you can assign your own timestamps to
+each element. To assign timestamps to elements, use a <code class="highlighter-rouge">ParDo</code> transform with a
+<code class="highlighter-rouge">DoFn</code> that outputs each element with a new timestamp (for example, the
+<a href="/documentation/sdks/javadoc/2.1.0/index.html?org/apache/beam/sdk/transforms/WithTimestamps.html">WithTimestamps</a>
+transform in the Beam SDK for Java).</p>
 
-<p>To illustrate how windowing with a bounded <code class="highlighter-rouge">PCollection</code> can affect how your pipeline processes data, consider the following pipeline:</p>
+<p>To illustrate how windowing with a bounded <code class="highlighter-rouge">PCollection</code> can affect how your
+pipeline processes data, consider the following pipeline:</p>
 
 <p><img src="/images/unwindowed-pipeline-bounded.png" alt="Diagram of GroupByKey and ParDo without windowing, on a bounded collection" title="GroupByKey and ParDo without windowing, on a bounded collection" /></p>
 
 <p><strong>Figure:</strong> <code class="highlighter-rouge">GroupByKey</code> and <code class="highlighter-rouge">ParDo</code> without windowing, on a bounded collection.</p>
 
-<p>In the above pipeline, we create a bounded <code class="highlighter-rouge">PCollection</code> by reading a set of key/value pairs using <code class="highlighter-rouge">TextIO</code>. We then group the collection using <code class="highlighter-rouge">GroupByKey</code>, and apply a <code class="highlighter-rouge">ParDo</code> transform to the grouped <code class="highlighter-rouge">PCollection</code>. In this example, the <code class="highlighter-rouge">GroupByKey</code> creates a colle [...]
+<p>In the above pipeline, we create a bounded <code class="highlighter-rouge">PCollection</code> by reading a set of
+key/value pairs using <code class="highlighter-rouge">TextIO</code>. We then group the collection using <code class="highlighter-rouge">GroupByKey</code>,
+and apply a <code class="highlighter-rouge">ParDo</code> transform to the grouped <code class="highlighter-rouge">PCollection</code>. In this example, the
+<code class="highlighter-rouge">GroupByKey</code> creates a collection of unique keys, and then <code class="highlighter-rouge">ParDo</code> gets applied
+exactly once per key.</p>
 
-<p>Note that even if you don’t set a windowing function, there is still a window – all elements in your <code class="highlighter-rouge">PCollection</code> are assigned to a single global window.</p>
+<p>Note that even if you don’t set a windowing function, there is still a window –
+all elements in your <code class="highlighter-rouge">PCollection</code> are assigned to a single global window.</p>
 
 <p>Now, consider the same pipeline, but using a windowing function:</p>
 
@@ -1730,11 +2477,17 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
 
 <p><strong>Figure:</strong> <code class="highlighter-rouge">GroupByKey</code> and <code class="highlighter-rouge">ParDo</code> with windowing, on a bounded collection.</p>
 
-<p>As before, the pipeline creates a bounded <code class="highlighter-rouge">PCollection</code> of key/value pairs. We then set a <a href="#setwindowingfunction">windowing function</a> for that <code class="highlighter-rouge">PCollection</code>. The <code class="highlighter-rouge">GroupByKey</code> transform groups the elements of the <code class="highlighter-rouge">PCollection</code> by both key and window, based on the windowing function. The subsequent <code class="highlighter-rouge"> [...]
+<p>As before, the pipeline creates a bounded <code class="highlighter-rouge">PCollection</code> of key/value pairs. We
+then set a <a href="#setting-your-pcollections-windowing-function">windowing function</a>
+for that <code class="highlighter-rouge">PCollection</code>.  The <code class="highlighter-rouge">GroupByKey</code> transform groups the elements of the
+<code class="highlighter-rouge">PCollection</code> by both key and window, based on the windowing function. The
+subsequent <code class="highlighter-rouge">ParDo</code> transform gets applied multiple times per key, once for each
+window.</p>
 
-<h3 id="windowing-functions">Windowing functions</h3>
+<h3 id="provided-windowing-functions">7.2. Provided windowing functions</h3>
 
-<p>You can define different kinds of windows to divide the elements of your <code class="highlighter-rouge">PCollection</code>. Beam provides several windowing functions, including:</p>
+<p>You can define different kinds of windows to divide the elements of your
+<code class="highlighter-rouge">PCollection</code>. Beam provides several windowing functions, including:</p>
 
 <ul>
   <li>Fixed Time Windows</li>
@@ -1744,53 +2497,98 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
   <li>Calendar-based Windows (not supported by the Beam SDK for Python)</li>
 </ul>
 
-<p>Note that each element can logically belong to more than one window, depending on the windowing function you use. Sliding time windowing, for example, creates overlapping windows wherein a single element can be assigned to multiple windows.</p>
+<p>You can also define your own <code class="highlighter-rouge">WindowFn</code> if you have a more complex need.</p>
 
-<h4 id="fixed-time-windows">Fixed time windows</h4>
+<p>Note that each element can logically belong to more than one window, depending
+on the windowing function you use. Sliding time windowing, for example, creates
+overlapping windows wherein a single element can be assigned to multiple
+windows.</p>
 
-<p>The simplest form of windowing is using <strong>fixed time windows</strong>: given a timestamped <code class="highlighter-rouge">PCollection</code> which might be continuously updating, each window might capture (for example) all elements with timestamps that fall into a five minute interval.</p>
+<h4 id="fixed-time-windows">7.2.1. Fixed time windows</h4>
 
-<p>A fixed time window represents a consistent duration, non overlapping time interval in the data stream. Consider windows with a five-minute duration: all of the elements in your unbounded <code class="highlighter-rouge">PCollection</code> with timestamp values from 0:00:00 up to (but not including) 0:05:00 belong to the first window, elements with timestamp values from 0:05:00 up to (but not including) 0:10:00 belong to the second window, and so on.</p>
+<p>The simplest form of windowing is using <strong>fixed time windows</strong>: given a
+timestamped <code class="highlighter-rouge">PCollection</code> which might be continuously updating, each window
+might capture (for example) all elements with timestamps that fall into a five
+minute interval.</p>
+
+<p>A fixed time window represents a consistent duration, non overlapping time
+interval in the data stream. Consider windows with a five-minute duration: all
+of the elements in your unbounded <code class="highlighter-rouge">PCollection</code> with timestamp values from
+0:00:00 up to (but not including) 0:05:00 belong to the first window, elements
+with timestamp values from 0:05:00 up to (but not including) 0:10:00 belong to
+the second window, and so on.</p>
 
 <p><img src="/images/fixed-time-windows.png" alt="Diagram of fixed time windows, 30s in duration" title="Fixed time windows, 30s in duration" /></p>
 
 <p><strong>Figure:</strong> Fixed time windows, 30s in duration.</p>
 
-<h4 id="sliding-time-windows">Sliding time windows</h4>
+<h4 id="sliding-time-windows">7.2.2. Sliding time windows</h4>
 
-<p>A <strong>sliding time window</strong> also represents time intervals in the data stream; however, sliding time windows can overlap. For example, each window might capture five minutes worth of data, but a new window starts every ten seconds. The frequency with which sliding windows begin is called the <em>period</em>. Therefore, our example would have a window <em>duration</em> of five minutes and a <em>period</em> of ten seconds.</p>
+<p>A <strong>sliding time window</strong> also represents time intervals in the data stream;
+however, sliding time windows can overlap. For example, each window might
+capture five minutes worth of data, but a new window starts every ten seconds.
+The frequency with which sliding windows begin is called the <em>period</em>.
+Therefore, our example would have a window <em>duration</em> of five minutes and a
+<em>period</em> of ten seconds.</p>
 
-<p>Because multiple windows overlap, most elements in a data set will belong to more than one window. This kind of windowing is useful for taking running averages of data; using sliding time windows, you can compute a running average of the past five minutes’ worth of data, updated every ten seconds, in our example.</p>
+<p>Because multiple windows overlap, most elements in a data set will belong to
+more than one window. This kind of windowing is useful for taking running
+averages of data; using sliding time windows, you can compute a running average
+of the past five minutes’ worth of data, updated every ten seconds, in our
+example.</p>
 
 <p><img src="/images/sliding-time-windows.png" alt="Diagram of sliding time windows, with 1 minute window duration and 30s window period" title="Sliding time windows, with 1 minute window duration and 30s window period" /></p>
 
-<p><strong>Figure:</strong> Sliding time windows, with 1 minute window duration and 30s window period.</p>
+<p><strong>Figure:</strong> Sliding time windows, with 1 minute window duration and 30s window
+period.</p>
 
-<h4 id="session-windows">Session windows</h4>
+<h4 id="session-windows">7.2.3. Session windows</h4>
 
-<p>A <strong>session window</strong> function defines windows that contain elements that are within a certain gap duration of another element. Session windowing applies on a per-key basis and is useful for data that is irregularly distributed with respect to time. For example, a data stream representing user mouse activity may have long periods of idle time interspersed with high concentrations of clicks. If data arrives after the minimum specified gap duration time, this initiates the s [...]
+<p>A <strong>session window</strong> function defines windows that contain elements that are
+within a certain gap duration of another element. Session windowing applies on a
+per-key basis and is useful for data that is irregularly distributed with
+respect to time. For example, a data stream representing user mouse activity may
+have long periods of idle time interspersed with high concentrations of clicks.
+If data arrives after the minimum specified gap duration time, this initiates
+the start of a new window.</p>
 
 <p><img src="/images/session-windows.png" alt="Diagram of session windows with a minimum gap duration" title="Session windows, with a minimum gap duration" /></p>
 
-<p><strong>Figure:</strong> Session windows, with a minimum gap duration. Note how each data key has different windows, according to its data distribution.</p>
+<p><strong>Figure:</strong> Session windows, with a minimum gap duration. Note how each data key
+has different windows, according to its data distribution.</p>
 
-<h4 id="single-global-window">Single global window</h4>
+<h4 id="the-single-global-window">7.2.4. The single global window</h4>
 
-<p>By default, all data in a <code class="highlighter-rouge">PCollection</code> is assigned to a single global window, and late data is discarded. If your data set is of a fixed size, you can use the global window default for your <code class="highlighter-rouge">PCollection</code>.</p>
+<p>By default, all data in a <code class="highlighter-rouge">PCollection</code> is assigned to the single global window,
+and late data is discarded. If your data set is of a fixed size, you can use the
+global window default for your <code class="highlighter-rouge">PCollection</code>.</p>
 
-<p>You can use a single global window if you are working with an unbounded data set (e.g. from a streaming data source) but use caution when applying aggregating transforms such as <code class="highlighter-rouge">GroupByKey</code> and <code class="highlighter-rouge">Combine</code>. A single global window with a default trigger generally requires the entire data set to be available before processing, which is not possible with continuously updating data. To perform aggregations on an unbo [...]
+<p>You can use the single global window if you are working with an unbounded data set
+(e.g. from a streaming data source) but use caution when applying aggregating
+transforms such as <code class="highlighter-rouge">GroupByKey</code> and <code class="highlighter-rouge">Combine</code>. The single global window with a
+default trigger generally requires the entire data set to be available before
+processing, which is not possible with continuously updating data. To perform
+aggregations on an unbounded <code class="highlighter-rouge">PCollection</code> that uses global windowing, you
+should specify a non-default trigger for that <code class="highlighter-rouge">PCollection</code>.</p>
 
-<h3 id="a-namesetwindowingfunctionasetting-your-pcollections-windowing-function"><a name="setwindowingfunction"></a>Setting your PCollection’s windowing function</h3>
+<h3 id="setting-your-pcollections-windowing-function">7.3. Setting your PCollection’s windowing function</h3>
 
-<p>You can set the windowing function for a <code class="highlighter-rouge">PCollection</code> by applying the <code class="highlighter-rouge">Window</code> transform. When you apply the <code class="highlighter-rouge">Window</code> transform, you must provide a <code class="highlighter-rouge">WindowFn</code>. The <code class="highlighter-rouge">WindowFn</code> determines the windowing function your <code class="highlighter-rouge">PCollection</code> will use for subsequent grouping trans [...]
+<p>You can set the windowing function for a <code class="highlighter-rouge">PCollection</code> by applying the <code class="highlighter-rouge">Window</code>
+transform. When you apply the <code class="highlighter-rouge">Window</code> transform, you must provide a <code class="highlighter-rouge">WindowFn</code>.
+The <code class="highlighter-rouge">WindowFn</code> determines the windowing function your <code class="highlighter-rouge">PCollection</code> will use for
+subsequent grouping transforms, such as a fixed or sliding time window.</p>
 
-<p>Beam provides pre-defined <code class="highlighter-rouge">WindownFn</code>s for the basic windowing functions described here. You can also define your own <code class="highlighter-rouge">WindowFn</code> if you have a more complex need.</p>
+<p>When you set a windowing function, you may also want to set a trigger for your
+<code class="highlighter-rouge">PCollection</code>. The trigger determines when each individual window is aggregated
+and emitted, and helps refine how the windowing function performs with respect
+to late data and computing early results. See the <a href="#triggers">triggers</a> section
+for more information.</p>
 
-<p>When you set a windowing function, you may also want to set a trigger for your <code class="highlighter-rouge">PCollection</code>. The trigger determines when each individual window is aggregated and emitted, and helps refine how the windowing function performs with respect to late data and computing early results. See the <a href="#triggers">triggers</a> section for more information.</p>
+<h4 id="fixed-time-windows-1">7.3.1. Fixed-time windows</h4>
 
-<h4 id="setting-fixed-time-windows">Setting fixed-time windows</h4>
+<p>The following example code shows how to apply <code class="highlighter-rouge">Window</code> to divide a <code class="highlighter-rouge">PCollection</code>
+into fixed windows, each one minute in length:</p>
 
-<p>The following example code shows how to apply <code class="highlighter-rouge">Window</code> to divide a <code class="highlighter-rouge">PCollection</code> into fixed windows, each one minute in length:</p>
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>    <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">items</span> <span class="o">=</span> <span class="o">...;</span>
     <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">fixed_windowed_items</span> <span class="o">=</span> <span class="n">items</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
         <span class="n">Window</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">into</span><span class="o">(</span><span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="mi">1</span><span class="o">))));</span>
@@ -1803,9 +2601,12 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
 </code></pre>
 </div>
 
-<h4 id="setting-sliding-time-windows">Setting sliding time windows</h4>
+<h4 id="sliding-time-windows-1">7.3.2. Sliding time windows</h4>
+
+<p>The following example code shows how to apply <code class="highlighter-rouge">Window</code> to divide a <code class="highlighter-rouge">PCollection</code>
+into sliding time windows. Each window is 30 minutes in length, and a new window
+begins every five seconds:</p>
 
-<p>The following example code shows how to apply <code class="highlighter-rouge">Window</code> to divide a <code class="highlighter-rouge">PCollection</code> into sliding time windows. Each window is 30 minutes in length, and a new window begins every five seconds:</p>
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>    <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">items</span> <span class="o">=</span> <span class="o">...;</span>
     <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">sliding_windowed_items</span> <span class="o">=</span> <span class="n">items</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
         <span class="n">Window</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">into</span><span class="o">(</span><span class="n">SlidingWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="mi">30</span><span class="o">)).</span><span class="na">every</span><span clas [...]
@@ -1818,9 +2619,12 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
 </code></pre>
 </div>
 
-<h4 id="setting-session-windows">Setting session windows</h4>
+<h4 id="session-windows-1">7.3.3. Session windows</h4>
+
+<p>The following example code shows how to apply <code class="highlighter-rouge">Window</code> to divide a <code class="highlighter-rouge">PCollection</code>
+into session windows, where each session must be separated by a time gap of at
+least 10 minutes:</p>
 
-<p>The following example code shows how to apply <code class="highlighter-rouge">Window</code> to divide a <code class="highlighter-rouge">PCollection</code> into session windows, where each session must be separated by a time gap of at least 10 minutes:</p>
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>    <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">items</span> <span class="o">=</span> <span class="o">...;</span>
     <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">session_windowed_items</span> <span class="o">=</span> <span class="n">items</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
         <span class="n">Window</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">into</span><span class="o">(</span><span class="n">Sessions</span><span class="o">.</span><span class="na">withGapDuration</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="mi">10</span><span class="o">))));</span>
@@ -1833,11 +2637,14 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
 </code></pre>
 </div>
 
-<p>Note that the sessions are per-key — each key in the collection will have its own session groupings depending on the data distribution.</p>
+<p>Note that the sessions are per-key — each key in the collection will have its
+own session groupings depending on the data distribution.</p>
 
-<h4 id="setting-a-single-global-window">Setting a single global window</h4>
+<h4 id="single-global-window">7.3.4. Single global window</h4>
 
-<p>If your <code class="highlighter-rouge">PCollection</code> is bounded (the size is fixed), you can assign all the elements to a single global window. The following example code shows how to set a single global window for a <code class="highlighter-rouge">PCollection</code>:</p>
+<p>If your <code class="highlighter-rouge">PCollection</code> is bounded (the size is fixed), you can assign all the
+elements to a single global window. The following example code shows how to set
+a single global window for a <code class="highlighter-rouge">PCollection</code>:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>    <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">items</span> <span class="o">=</span> <span class="o">...;</span>
     <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">batch_items</span> <span class="o">=</span> <span class="n">items</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
@@ -1851,27 +2658,57 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
 </code></pre>
 </div>
 
-<h3 id="a-namewatermarks-late-dataawatermarks-and-late-data"><a name="watermarks-late-data"></a>Watermarks and late data</h3>
+<h3 id="watermarks-and-late-data">7.4. Watermarks and late data</h3>
 
-<p>In any data processing system, there is a certain amount of lag between the time a data event occurs (the “event time”, determined by the timestamp on the data element itself) and the time the actual data element gets processed at any stage in your pipeline (the “processing time”, determined by the clock on the system processing the element). In addition, there are no guarantees that data events will appear in your pipeline in the same order that they were generated.</p>
+<p>In any data processing system, there is a certain amount of lag between the time
+a data event occurs (the “event time”, determined by the timestamp on the data
+element itself) and the time the actual data element gets processed at any stage
+in your pipeline (the “processing time”, determined by the clock on the system
+processing the element). In addition, there are no guarantees that data events
+will appear in your pipeline in the same order that they were generated.</p>
 
-<p>For example, let’s say we have a <code class="highlighter-rouge">PCollection</code> that’s using fixed-time windowing, with windows that are five minutes long. For each window, Beam must collect all the data with an <em>event time</em> timestamp in the given window range (between 0:00 and 4:59 in the first window, for instance). Data with timestamps outside that range (data from 5:00 or later) belong to a different window.</p>
+<p>For example, let’s say we have a <code class="highlighter-rouge">PCollection</code> that’s using fixed-time
+windowing, with windows that are five minutes long. For each window, Beam must
+collect all the data with an <em>event time</em> timestamp in the given window range
+(between 0:00 and 4:59 in the first window, for instance). Data with timestamps
+outside that range (data from 5:00 or later) belong to a different window.</p>
 
-<p>However, data isn’t always guaranteed to arrive in a pipeline in time order, or to always arrive at predictable intervals. Beam tracks a <em>watermark</em>, which is the system’s notion of when all data in a certain window can be expected to have arrived in the pipeline. Data that arrives with a timestamp after the watermark is considered <strong>late data</strong>.</p>
+<p>However, data isn’t always guaranteed to arrive in a pipeline in time order, or
+to always arrive at predictable intervals. Beam tracks a <em>watermark</em>, which is
+the system’s notion of when all data in a certain window can be expected to have
+arrived in the pipeline. Data that arrives with a timestamp after the watermark
+is considered <strong>late data</strong>.</p>
 
-<p>From our example, suppose we have a simple watermark that assumes approximately 30s of lag time between the data timestamps (the event time) and the time the data appears in the pipeline (the processing time), then Beam would close the first window at 5:30. If a data record arrives at 5:34, but with a timestamp that would put it in the 0:00-4:59 window (say, 3:38), then that record is late data.</p>
+<p>From our example, suppose we have a simple watermark that assumes approximately
+30s of lag time between the data timestamps (the event time) and the time the
+data appears in the pipeline (the processing time), then Beam would close the
+first window at 5:30. If a data record arrives at 5:34, but with a timestamp
+that would put it in the 0:00-4:59 window (say, 3:38), then that record is late
+data.</p>
 
-<p>Note: For simplicity, we’ve assumed that we’re using a very straightforward watermark that estimates the lag time. In practice, your <code class="highlighter-rouge">PCollection</code>’s data source determines the watermark, and watermarks can be more precise or complex.</p>
+<p>Note: For simplicity, we’ve assumed that we’re using a very straightforward
+watermark that estimates the lag time. In practice, your <code class="highlighter-rouge">PCollection</code>’s data
+source determines the watermark, and watermarks can be more precise or complex.</p>
 
-<p>Beam’s default windowing configuration tries to determines when all data has arrived (based on the type of data source) and then advances the watermark past the end of the window. This default configuration does <em>not</em> allow late data. <a href="#triggers">Triggers</a> allow you to modify and refine the windowing strategy for a <code class="highlighter-rouge">PCollection</code>. You can use triggers to decide when each individual window aggregates and reports its results, includi [...]
+<p>Beam’s default windowing configuration tries to determines when all data has
+arrived (based on the type of data source) and then advances the watermark past
+the end of the window. This default configuration does <em>not</em> allow late data.
+<a href="#triggers">Triggers</a> allow you to modify and refine the windowing strategy for
+a <code class="highlighter-rouge">PCollection</code>. You can use triggers to decide when each individual window
+aggregates and reports its results, including how the window emits late
+elements.</p>
 
-<h4 id="managing-late-data">Managing late data</h4>
+<h4 id="managing-late-data">7.4.1. Managing late data</h4>
 
 <blockquote>
   <p><strong>Note:</strong> Managing late data is not supported in the Beam SDK for Python.</p>
 </blockquote>
 
-<p>You can allow late data by invoking the <code class="highlighter-rouge">.withAllowedLateness</code> operation when you set your <code class="highlighter-rouge">PCollection</code>’s windowing strategy. The following code example demonstrates a windowing strategy that will allow late data up to two days after the end of a window.</p>
+<p>You can allow late data by invoking the <code class="highlighter-rouge">.withAllowedLateness</code> operation when
+you set your <code class="highlighter-rouge">PCollection</code>’s windowing strategy. The following code example
+demonstrates a windowing strategy that will allow late data up to two days after
+the end of a window.</p>
+
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>    <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">items</span> <span class="o">=</span> <span class="o">...;</span>
     <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">fixed_windowed_items</span> <span class="o">=</span> <span class="n">items</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span>
         <span class="n">Window</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">into</span><span class="o">(</span><span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="mi">1</span><span class="o">)))</span>
@@ -1879,17 +2716,31 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
 </code></pre>
 </div>
 
-<p>When you set <code class="highlighter-rouge">.withAllowedLateness</code> on a <code class="highlighter-rouge">PCollection</code>, that allowed lateness propagates forward to any subsequent <code class="highlighter-rouge">PCollection</code> derived from the first <code class="highlighter-rouge">PCollection</code> you applied allowed lateness to. If you want to change the allowed lateness later in your pipeline, you must do so explictly by applying <code class="highlighter-rouge">Window [...]
+<p>When you set <code class="highlighter-rouge">.withAllowedLateness</code> on a <code class="highlighter-rouge">PCollection</code>, that allowed lateness
+propagates forward to any subsequent <code class="highlighter-rouge">PCollection</code> derived from the first
+<code class="highlighter-rouge">PCollection</code> you applied allowed lateness to. If you want to change the allowed
+lateness later in your pipeline, you must do so explictly by applying
+<code class="highlighter-rouge">Window.configure().withAllowedLateness()</code>.</p>
 
-<h3 id="adding-timestamps-to-a-pcollections-elements">Adding timestamps to a PCollection’s elements</h3>
+<h3 id="adding-timestamps-to-a-pcollections-elements">7.5. Adding timestamps to a PCollection’s elements</h3>
 
-<p>An unbounded source provides a timestamp for each element. Depending on your unbounded source, you may need to configure how the timestamp is extracted from the raw data stream.</p>
+<p>An unbounded source provides a timestamp for each element. Depending on your
+unbounded source, you may need to configure how the timestamp is extracted from
+the raw data stream.</p>
 
-<p>However, bounded sources (such as a file from <code class="highlighter-rouge">TextIO</code>) do not provide timestamps. If you need timestamps, you must add them to your <code class="highlighter-rouge">PCollection</code>’s elements.</p>
+<p>However, bounded sources (such as a file from <code class="highlighter-rouge">TextIO</code>) do not provide
+timestamps. If you need timestamps, you must add them to your <code class="highlighter-rouge">PCollection</code>’s
+elements.</p>
 
-<p>You can assign new timestamps to the elements of a <code class="highlighter-rouge">PCollection</code> by applying a <a href="#transforms-pardo">ParDo</a> transform that outputs new elements with timestamps that you set.</p>
+<p>You can assign new timestamps to the elements of a <code class="highlighter-rouge">PCollection</code> by applying a
+<a href="#pardo">ParDo</a> transform that outputs new elements with timestamps that you
+set.</p>
 
-<p>An example might be if your pipeline reads log records from an input file, and each log record includes a timestamp field; since your pipeline reads the records in from a file, the file source doesn’t assign timestamps automatically. You can parse the timestamp field from each record and use a <code class="highlighter-rouge">ParDo</code> transform with a <code class="highlighter-rouge">DoFn</code> to attach the timestamps to each element in your <code class="highlighter-rouge">PCollec [...]
+<p>An example might be if your pipeline reads log records from an input file, and
+each log record includes a timestamp field; since your pipeline reads the
+records in from a file, the file source doesn’t assign timestamps automatically.
+You can parse the timestamp field from each record and use a <code class="highlighter-rouge">ParDo</code> transform
+with a <code class="highlighter-rouge">DoFn</code> to attach the timestamps to each element in your <code class="highlighter-rouge">PCollection</code>.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>      <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">LogEntry</span><span class="o">&gt;</span> <span class="n">unstampedLogs</span> <span class="o">=</span> <span class="o">...;</span>
       <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">LogEntry</span><span class="o">&gt;</span> <span class="n">stampedLogs</span> <span class="o">=</span>
@@ -1919,49 +2770,90 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
 </code></pre>
 </div>
 
-<h2 id="a-nametriggersaworking-with-triggers"><a name="triggers"></a>Working with triggers</h2>
+<h2 id="triggers">8. Triggers</h2>
 
 <blockquote>
-  <p><strong>NOTE:</strong> This content applies only to the Beam SDK for Java. The Beam SDK for Python does not support triggers.</p>
+  <p><strong>NOTE:</strong> This content applies only to the Beam SDK for Java. The Beam SDK for
+Python does not support triggers.</p>
 </blockquote>
 
-<p>When collecting and grouping data into windows, Beam uses <strong>triggers</strong> to determine when to emit the aggregated results of each window (referred to as a <em>pane</em>). If you use Beam’s default windowing configuration and <a href="#default-trigger">default trigger</a>, Beam outputs the aggregated result when it <a href="#watermarks-late-data">estimates all data has arrived</a>, and discards all subsequent data for that window.</p>
+<p>When collecting and grouping data into windows, Beam uses <strong>triggers</strong> to
+determine when to emit the aggregated results of each window (referred to as a
+<em>pane</em>). If you use Beam’s default windowing configuration and <a href="#the-default-trigger">default
+trigger</a>, Beam outputs the aggregated result when it
+<a href="#watermarks-and-late-data">estimates all data has arrived</a>, and discards all
+subsequent data for that window.</p>
 
-<p>You can set triggers for your <code class="highlighter-rouge">PCollection</code>s to change this default behavior. Beam provides a number of pre-built triggers that you can set:</p>
+<p>You can set triggers for your <code class="highlighter-rouge">PCollection</code>s to change this default behavior.
+Beam provides a number of pre-built triggers that you can set:</p>
 
 <ul>
-  <li><strong>Event time triggers</strong>. These triggers operate on the event time, as indicated by the timestamp on each data element. Beam’s default trigger is event time-based.</li>
-  <li><strong>Processing time triggers</strong>. These triggers operate on the processing time – the time when the data element is processed at any given stage in the pipeline.</li>
-  <li><strong>Data-driven triggers</strong>. These triggers operate by examining the data as it arrives in each window, and firing when that data meets a certain property. Currently, data-driven triggers only support firing after a certain number of data elements.</li>
-  <li><strong>Composite triggers</strong>. These triggers combine multiple triggers in various ways.</li>
+  <li><strong>Event time triggers</strong>. These triggers operate on the event time, as
+indicated by the timestamp on each data element. Beam’s default trigger is
+event time-based.</li>
+  <li><strong>Processing time triggers</strong>. These triggers operate on the processing time
+– the time when the data element is processed at any given stage in the
+pipeline.</li>
+  <li><strong>Data-driven triggers</strong>. These triggers operate by examining the data as it
+arrives in each window, and firing when that data meets a certain property.
+Currently, data-driven triggers only support firing after a certain number
+of data elements.</li>
+  <li><strong>Composite triggers</strong>. These triggers combine multiple triggers in various
+ways.</li>
 </ul>
 
-<p>At a high level, triggers provide two additional capabilities compared to simply outputting at the end of a window:</p>
+<p>At a high level, triggers provide two additional capabilities compared to simply
+outputting at the end of a window:</p>
 
 <ul>
-  <li>Triggers allow Beam to emit early results, before all the data in a given window has arrived. For example, emitting after a certain amouint of time elapses, or after a certain number of elements arrives.</li>
-  <li>Triggers allow processing of late data by triggering after the event time watermark passes the end of the window.</li>
+  <li>Triggers allow Beam to emit early results, before all the data in a given
+window has arrived. For example, emitting after a certain amouint of time
+elapses, or after a certain number of elements arrives.</li>
+  <li>Triggers allow processing of late data by triggering after the event time
+watermark passes the end of the window.</li>
 </ul>
 
-<p>These capabilities allow you to control the flow of your data and balance between different factors depending on your use case:</p>
+<p>These capabilities allow you to control the flow of your data and balance
+between different factors depending on your use case:</p>
 
 <ul>
-  <li><strong>Completeness:</strong> How important is it to have all of your data before you compute your result?</li>
-  <li><strong>Latency:</strong> How long do you want to wait for data? For example, do you wait until you think you have all data? Do you process data as it arrives?</li>
-  <li><strong>Cost:</strong> How much compute power/money are you willing to spend to lower the latency?</li>
+  <li><strong>Completeness:</strong> How important is it to have all of your data before you
+compute your result?</li>
+  <li><strong>Latency:</strong> How long do you want to wait for data? For example, do you wait
+until you think you have all data? Do you process data as it arrives?</li>
+  <li><strong>Cost:</strong> How much compute power/money are you willing to spend to lower the
+latency?</li>
 </ul>
 
-<p>For example, a system that requires time-sensitive updates might use a strict time-based trigger that emits a window every <em>N</em> seconds, valuing promptness over data completeness. A system that values data completeness more than the exact timing of results might choose to use Beam’s default trigger, which fires at the end of the window.</p>
+<p>For example, a system that requires time-sensitive updates might use a strict
+time-based trigger that emits a window every <em>N</em> seconds, valuing promptness
+over data completeness. A system that values data completeness more than the
+exact timing of results might choose to use Beam’s default trigger, which fires
+at the end of the window.</p>
 
-<p>You can also set a trigger for an unbounded <code class="highlighter-rouge">PCollection</code> that uses a <a href="#windowing">single global window for its windowing function</a>. This can be useful when you want your pipeline to provide periodic updates on an unbounded data set — for example, a running average of all data provided to the present time, updated every N seconds or every N elements.</p>
+<p>You can also set a trigger for an unbounded <code class="highlighter-rouge">PCollection</code> that uses a <a href="#windowing">single
+global window for its windowing function</a>. This can be useful when
+you want your pipeline to provide periodic updates on an unbounded data set —
+for example, a running average of all data provided to the present time, updated
+every N seconds or every N elements.</p>
 
-<h4 id="event-time-triggers">Event Time Triggers</h4>
+<h3 id="event-time-triggers">8.1. Event time triggers</h3>
 
-<p>The <code class="highlighter-rouge">AfterWatermark</code> trigger operates on <em>event time</em>. The <code class="highlighter-rouge">AfterWatermark</code> trigger emits the contents of a window after the <a href="#watermarks-late-data">watermark</a> passes the end of the window, based on the timestamps attached to the data elements. The watermark is a global progress metric, and is Beam’s notion of input completeness within your pipeline at any given point. <code class="highlighter- [...]
+<p>The <code class="highlighter-rouge">AfterWatermark</code> trigger operates on <em>event time</em>. The <code class="highlighter-rouge">AfterWatermark</code>
+trigger emits the contents of a window after the
+<a href="#watermarks-and-late-data">watermark</a> passes the end of the window, based on the
+timestamps attached to the data elements. The watermark is a global progress
+metric, and is Beam’s notion of input completeness within your pipeline at any
+given point. <code class="highlighter-rouge">AfterWatermark.pastEndOfWindow()</code> <em>only</em> fires when the watermark
+passes the end of the window.</p>
 
-<p>In addition, you can use <code class="highlighter-rouge">.withEarlyFirings(trigger)</code> and <code class="highlighter-rouge">.withLateFirings(trigger)</code> to configure triggers that fire if your pipeline receives data before or after the end of the window.</p>
+<p>In addition, you can use <code class="highlighter-rouge">.withEarlyFirings(trigger)</code> and
+<code class="highlighter-rouge">.withLateFirings(trigger)</code> to configure triggers that fire if your pipeline
+receives data before or after the end of the window.</p>
+
+<p>The following example shows a billing scenario, and uses both early and late
+firings:</p>
 
-<p>The following example shows a billing scenario, and uses both early and late firings:</p>
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>  <span class="c1">// Create a bill at the end of the month.</span>
   <span class="n">AfterWatermark</span><span class="o">.</span><span class="na">pastEndOfWindow</span><span class="o">()</span>
       <span class="c1">// During the month, get near real-time estimates.</span>
@@ -1973,34 +2865,56 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
       <span class="o">.</span><span class="na">withLateFirings</span><span class="o">(</span><span class="n">AfterPane</span><span class="o">.</span><span class="na">elementCountAtLeast</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code>  <span class="c"># The Beam SDK for Python does not support triggers.</span>
 </code></pre>
 </div>
 
-<h5 id="a-namedefault-triggeradefault-trigger"><a name="default-trigger"></a><strong>Default Trigger</strong></h5>
+<h4 id="the-default-trigger">8.1.1. The default trigger</h4>
 
-<p>The default trigger for a <code class="highlighter-rouge">PCollection</code> is based on event time, and emits the results of the window when the Beam’s watermark passes the end of the window, and then fires each time late data arrives.</p>
+<p>The default trigger for a <code class="highlighter-rouge">PCollection</code> is based on event time, and emits the
+results of the window when the Beam’s watermark passes the end of the window,
+and then fires each time late data arrives.</p>
 
-<p>However, if you are using both the default windowing configuration and the default trigger, the default trigger emits exactly once, and late data is discarded. This is because the default windowing configuration has an allowed lateness value of 0. See the Handling Late Data section for information about modifying this behavior.</p>
+<p>However, if you are using both the default windowing configuration and the
+default trigger, the default trigger emits exactly once, and late data is
+discarded. This is because the default windowing configuration has an allowed
+lateness value of 0. See the Handling Late Data section for information about
+modifying this behavior.</p>
 
-<h4 id="processing-time-triggers">Processing Time Triggers</h4>
+<h3 id="processing-time-triggers">8.2. Processing time triggers</h3>
 
-<p>The <code class="highlighter-rouge">AfterProcessingTime</code> trigger operates on <em>processing time</em>. For example, the <code class="highlighter-rouge">AfterProcessingTime.pastFirstElementInPane() </code> trigger emits a window after a certain amount of processing time has passed since data was received. The processing time is determined by the system clock, rather than the data element’s timestamp.</p>
+<p>The <code class="highlighter-rouge">AfterProcessingTime</code> trigger operates on <em>processing time</em>. For example,
+the <code class="highlighter-rouge">AfterProcessingTime.pastFirstElementInPane() </code> trigger emits a window after
+a certain amount of processing time has passed since data was received. The
+processing time is determined by the system clock, rather than the data
+element’s timestamp.</p>
 
-<p>The <code class="highlighter-rouge">AfterProcessingTime</code> trigger is useful for triggering early results from a window, particularly a window with a large time frame such as a single global window.</p>
+<p>The <code class="highlighter-rouge">AfterProcessingTime</code> trigger is useful for triggering early results from a
+window, particularly a window with a large time frame such as a single global
+window.</p>
 
-<h4 id="data-driven-triggers">Data-Driven Triggers</h4>
+<h3 id="data-driven-triggers">8.3. Data-driven triggers</h3>
 
-<p>Beam provides one data-driven trigger, <code class="highlighter-rouge">AfterPane.elementCountAtLeast()</code>. This trigger works on an element count; it fires after the current pane has collected at least <em>N</em> elements. This allows a window to emit early results (before all the data has accumulated), which can be particularly useful if you are using a single global window.</p>
+<p>Beam provides one data-driven trigger, <code class="highlighter-rouge">AfterPane.elementCountAtLeast()</code>. This
+trigger works on an element count; it fires after the current pane has collected
+at least <em>N</em> elements. This allows a window to emit early results (before all
+the data has accumulated), which can be particularly useful if you are using a
+single global window.</p>
 
-<p>It is important to note that if, for example, you use <code class="highlighter-rouge">.elementCountAtLeast(50)</code> and only 32 elements arrive, those 32 elements sit around forever. If the 32 elements are important to you, consider using <a href="#composite-triggers">composite triggers</a> to combine multiple conditions. This allows you to specify multiple firing conditions such as “fire either when I receive 50 elements, or every 1 second”.</p>
+<p>It is important to note that if, for example, you use <code class="highlighter-rouge">.elementCountAtLeast(50)</code>
+and only 32 elements arrive, those 32 elements sit around forever. If the 32
+elements are important to you, consider using <a href="#composite-triggers">composite
+triggers</a> to combine multiple conditions. This allows you
+to specify multiple firing conditions such as “fire either when I receive 50
+elements, or every 1 second”.</p>
 
-<h3 id="setting-a-trigger">Setting a Trigger</h3>
+<h3 id="setting-a-trigger">8.4. Setting a trigger</h3>
 
-<p>When you set a windowing function for a <code class="highlighter-rouge">PCollection</code> by using the <code class="highlighter-rouge">Window</code> transform, you can also specify a trigger.</p>
+<p>When you set a windowing function for a <code class="highlighter-rouge">PCollection</code> by using the <code class="highlighter-rouge">Window</code>
+transform, you can also specify a trigger.</p>
 
-<p>You set the trigger(s) for a <code class="highlighter-rouge">PCollection</code> by invoking the method <code class="highlighter-rouge">.triggering()</code> on the result of your <code class="highlighter-rouge">Window.into()</code> transform, as follows:</p>
+<p>You set the trigger(s) for a <code class="highlighter-rouge">PCollection</code> by invoking the method
+<code class="highlighter-rouge">.triggering()</code> on the result of your <code class="highlighter-rouge">Window.into()</code> transform, as follows:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>  <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">pc</span> <span class="o">=</span> <span class="o">...;</span>
   <span class="n">pc</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Window</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">into</span><span class="o">(</span><span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span clas [...]
@@ -2009,33 +2923,50 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
                                <span class="o">.</span><span class="na">discardingFiredPanes</span><span class="o">());</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code>  <span class="c"># The Beam SDK for Python does not support triggers.</span>
 </code></pre>
 </div>
 
-<p>This code sample sets a time-based trigger for a <code class="highlighter-rouge">PCollection</code>, which emits results one minute after the first element in that window has been processed. The last line in the code sample, <code class="highlighter-rouge">.discardingFiredPanes()</code>, is the window’s <strong>accumulation mode</strong>.</p>
+<p>This code sample sets a time-based trigger for a <code class="highlighter-rouge">PCollection</code>, which emits
+results one minute after the first element in that window has been processed.
+The last line in the code sample, <code class="highlighter-rouge">.discardingFiredPanes()</code>, is the window’s
+<strong>accumulation mode</strong>.</p>
 
-<h4 id="a-namewindow-accumulation-modesawindow-accumulation-modes"><a name="window-accumulation-modes"></a>Window Accumulation Modes</h4>
+<h4 id="window-accumulation-modes">8.4.1. Window accumulation modes</h4>
 
-<p>When you specify a trigger, you must also set the the window’s <strong>accumulation mode</strong>. When a trigger fires, it emits the current contents of the window as a pane. Since a trigger can fire multiple times, the accumulation mode determines whether the system <em>accumulates</em> the window panes as the trigger fires, or <em>discards</em> them.</p>
+<p>When you specify a trigger, you must also set the the window’s <strong>accumulation
+mode</strong>. When a trigger fires, it emits the current contents of the window as a
+pane. Since a trigger can fire multiple times, the accumulation mode determines
+whether the system <em>accumulates</em> the window panes as the trigger fires, or
+<em>discards</em> them.</p>
 
-<p>To set a window to accumulate the panes that are produced when the trigger fires, invoke<code class="highlighter-rouge">.accumulatingFiredPanes()</code> when you set the trigger. To set a window to discard fired panes, invoke <code class="highlighter-rouge">.discardingFiredPanes()</code>.</p>
+<p>To set a window to accumulate the panes that are produced when the trigger
+fires, invoke<code class="highlighter-rouge">.accumulatingFiredPanes()</code> when you set the trigger. To set a
+window to discard fired panes, invoke <code class="highlighter-rouge">.discardingFiredPanes()</code>.</p>
 
-<p>Let’s look an example that uses a <code class="highlighter-rouge">PCollection</code> with fixed-time windowing and a data-based trigger. This is something you might do if, for example, each window represented a ten-minute running average, but you wanted to display the current value of the average in a UI more frequently than every ten minutes. We’ll assume the following conditions:</p>
+<p>Let’s look an example that uses a <code class="highlighter-rouge">PCollection</code> with fixed-time windowing and a
+data-based trigger. This is something you might do if, for example, each window
+represented a ten-minute running average, but you wanted to display the current
+value of the average in a UI more frequently than every ten minutes. We’ll
+assume the following conditions:</p>
 
 <ul>
   <li>The <code class="highlighter-rouge">PCollection</code> uses 10-minute fixed-time windows.</li>
-  <li>The <code class="highlighter-rouge">PCollection</code> has a repeating trigger that fires every time 3 elements arrive.</li>
+  <li>The <code class="highlighter-rouge">PCollection</code> has a repeating trigger that fires every time 3 elements
+arrive.</li>
 </ul>
 
-<p>The following diagram shows data events for key X as they arrive in the PCollection and are assigned to windows. To keep the diagram a bit simpler, we’ll assume that the events all arrive in the pipeline in order.</p>
+<p>The following diagram shows data events for key X as they arrive in the
+PCollection and are assigned to windows. To keep the diagram a bit simpler,
+we’ll assume that the events all arrive in the pipeline in order.</p>
 
 <p><img src="/images/trigger-accumulation.png" alt="Diagram of data events for acculumating mode example" title="Data events for accumulating mode example" /></p>
 
-<h5 id="accumulating-mode">Accumulating Mode</h5>
+<h5 id="accumulating-mode">8.4.1.1. Accumulating mode</h5>
 
-<p>If our trigger is set to <code class="highlighter-rouge">.accumulatingFiredPanes</code>, the trigger emits the following values each time it fires. Keep in mind that the trigger fires every time three elements arrive:</p>
+<p>If our trigger is set to <code class="highlighter-rouge">.accumulatingFiredPanes</code>, the trigger emits the
+following values each time it fires. Keep in mind that the trigger fires every
+time three elements arrive:</p>
 
 <div class="highlighter-rouge"><pre class="highlight"><code>  First trigger firing:  [5, 8, 3]
   Second trigger firing: [5, 8, 3, 15, 19, 23]
@@ -2043,9 +2974,10 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
 </code></pre>
 </div>
 
-<h5 id="discarding-mode">Discarding Mode</h5>
+<h5 id="discarding-mode">8.4.1.2. Discarding mode</h5>
 
-<p>If our trigger is set to <code class="highlighter-rouge">.discardingFiredPanes</code>, the trigger emits the following values on each firing:</p>
+<p>If our trigger is set to <code class="highlighter-rouge">.discardingFiredPanes</code>, the trigger emits the
+following values on each firing:</p>
 
 <div class="highlighter-rouge"><pre class="highlight"><code>  First trigger firing:  [5, 8, 3]
   Second trigger firing:           [15, 19, 23]
@@ -2053,11 +2985,16 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
 </code></pre>
 </div>
 
-<h4 id="handling-late-data">Handling Late Data</h4>
+<h4 id="handling-late-data">8.4.2. Handling late data</h4>
 
-<p>If you want your pipeline to process data that arrives after the watermark passes the end of the window, you can apply an <em>allowed lateness</em> when you set your windowing configuration. This gives your trigger the opportunity to react to the late data. If allowed lateness is set, the default trigger will emit new results immediately whenever late data arrives.</p>
+<p>If you want your pipeline to process data that arrives after the watermark
+passes the end of the window, you can apply an <em>allowed lateness</em> when you set
+your windowing configuration. This gives your trigger the opportunity to react
+to the late data. If allowed lateness is set, the default trigger will emit new
+results immediately whenever late data arrives.</p>
 
-<p>You set the allowed lateness by using <code class="highlighter-rouge">.withAllowedLateness()</code> when you set your windowing function:</p>
+<p>You set the allowed lateness by using <code class="highlighter-rouge">.withAllowedLateness()</code> when you set your
+windowing function:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>  <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">pc</span> <span class="o">=</span> <span class="o">...;</span>
   <span class="n">pc</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Window</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="n">into</span><span class="o">(</span><span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span clas [...]
@@ -2066,45 +3003,69 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
                               <span class="o">.</span><span class="na">withAllowedLateness</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="mi">30</span><span class="o">));</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code>  <span class="c"># The Beam SDK for Python does not support triggers.</span>
 </code></pre>
 </div>
 
-<p>This allowed lateness propagates to all <code class="highlighter-rouge">PCollection</code>s derived as a result of applying transforms to the original <code class="highlighter-rouge">PCollection</code>. If you want to change the allowed lateness later in your pipeline, you can apply <code class="highlighter-rouge">Window.configure().withAllowedLateness()</code> again, explicitly.</p>
+<p>This allowed lateness propagates to all <code class="highlighter-rouge">PCollection</code>s derived as a result of
+applying transforms to the original <code class="highlighter-rouge">PCollection</code>. If you want to change the
+allowed lateness later in your pipeline, you can apply
+<code class="highlighter-rouge">Window.configure().withAllowedLateness()</code> again, explicitly.</p>
 
-<h3 id="a-namecomposite-triggersacomposite-triggers"><a name="composite-triggers"></a>Composite Triggers</h3>
+<h3 id="composite-triggers">8.5. Composite triggers</h3>
 
-<p>You can combine multiple triggers to form <strong>composite triggers</strong>, and can specify a trigger to emit results repeatedly, at most once, or under other custom conditions.</p>
+<p>You can combine multiple triggers to form <strong>composite triggers</strong>, and can
+specify a trigger to emit results repeatedly, at most once, or under other
+custom conditions.</p>
 
-<h4 id="composite-trigger-types">Composite Trigger Types</h4>
+<h4 id="composite-trigger-types">8.5.1. Composite trigger types</h4>
 
 <p>Beam includes the following composite triggers:</p>
 
 <ul>
-  <li>You can add additional early firings or late firings to <code class="highlighter-rouge">AfterWatermark.pastEndOfWindow</code> via <code class="highlighter-rouge">.withEarlyFirings</code> and <code class="highlighter-rouge">.withLateFirings</code>.</li>
-  <li><code class="highlighter-rouge">Repeatedly.forever</code> specifies a trigger that executes forever. Any time the trigger’s conditions are met, it causes a window to emit results and then resets and starts over. It can be useful to combine <code class="highlighter-rouge">Repeatedly.forever</code> with <code class="highlighter-rouge">.orFinally</code> to specify a condition that causes the repeating trigger to stop.</li>
-  <li><code class="highlighter-rouge">AfterEach.inOrder</code> combines multiple triggers to fire in a specific sequence. Each time a trigger in the sequence emits a window, the sequence advances to the next trigger.</li>
-  <li><code class="highlighter-rouge">AfterFirst</code> takes multiple triggers and emits the first time <em>any</em> of its argument triggers is satisfied. This is equivalent to a logical OR operation for multiple triggers.</li>
-  <li><code class="highlighter-rouge">AfterAll</code> takes multiple triggers and emits when <em>all</em> of its argument triggers are satisfied. This is equivalent to a logical AND operation for multiple triggers.</li>
-  <li><code class="highlighter-rouge">orFinally</code> can serve as a final condition to cause any trigger to fire one final time and never fire again.</li>
+  <li>You can add additional early firings or late firings to
+<code class="highlighter-rouge">AfterWatermark.pastEndOfWindow</code> via <code class="highlighter-rouge">.withEarlyFirings</code> and
+<code class="highlighter-rouge">.withLateFirings</code>.</li>
+  <li><code class="highlighter-rouge">Repeatedly.forever</code> specifies a trigger that executes forever. Any time the
+trigger’s conditions are met, it causes a window to emit results and then
+resets and starts over. It can be useful to combine <code class="highlighter-rouge">Repeatedly.forever</code>
+with <code class="highlighter-rouge">.orFinally</code> to specify a condition that causes the repeating trigger
+to stop.</li>
+  <li><code class="highlighter-rouge">AfterEach.inOrder</code> combines multiple triggers to fire in a specific
+sequence. Each time a trigger in the sequence emits a window, the sequence
+advances to the next trigger.</li>
+  <li><code class="highlighter-rouge">AfterFirst</code> takes multiple triggers and emits the first time <em>any</em> of its
+argument triggers is satisfied. This is equivalent to a logical OR operation
+for multiple triggers.</li>
+  <li><code class="highlighter-rouge">AfterAll</code> takes multiple triggers and emits when <em>all</em> of its argument
+triggers are satisfied. This is equivalent to a logical AND operation for
+multiple triggers.</li>
+  <li><code class="highlighter-rouge">orFinally</code> can serve as a final condition to cause any trigger to fire one
+final time and never fire again.</li>
 </ul>
 
-<h4 id="composition-with-afterwatermarkpastendofwindow">Composition with AfterWatermark.pastEndOfWindow</h4>
+<h4 id="composition-with-afterwatermarkpastendofwindow">8.5.2. Composition with AfterWatermark.pastEndOfWindow</h4>
 
-<p>Some of the most useful composite triggers fire a single time when Beam estimates that all the data has arrived (i.e. when the watermark passes the end of the window) combined with either, or both, of the following:</p>
+<p>Some of the most useful composite triggers fire a single time when Beam
+estimates that all the data has arrived (i.e. when the watermark passes the end
+of the window) combined with either, or both, of the following:</p>
 
 <ul>
-  <li>Speculative firings that precede the watermark passing the end of the window to allow faster processing of partial results.</li>
-  <li>Late firings that happen after the watermark passes the end of the window, to allow for handling late-arriving data</li>
+  <li>Speculative firings that precede the watermark passing the end of the window
+to allow faster processing of partial results.</li>
+  <li>Late firings that happen after the watermark passes the end of the window,
+to allow for handling late-arriving data</li>
 </ul>
 
-<p>You can express this pattern using <code class="highlighter-rouge">AfterWatermark.pastEndOfWindow</code>. For example, the following example trigger code fires on the following conditions:</p>
+<p>You can express this pattern using <code class="highlighter-rouge">AfterWatermark.pastEndOfWindow</code>. For
+example, the following example trigger code fires on the following conditions:</p>
 
 <ul>
-  <li>On Beam’s estimate that all the data has arrived (the watermark passes the end of the window)</li>
+  <li>On Beam’s estimate that all the data has arrived (the watermark passes the
+end of the window)</li>
   <li>Any time late data arrives, after a ten-minute delay</li>
-  <li>After two days, we assume no more data of interest will arrive, and the trigger stops executing</li>
+  <li>After two days, we assume no more data of interest will arrive, and the
+trigger stops executing</li>
 </ul>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>  <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Window</span>
@@ -2117,21 +3078,21 @@ Subsequent transforms, however, are applied to the result of the <code class="hi
       <span class="o">.</span><span class="na">withAllowedLateness</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardDays</span><span class="o">(</span><span class="mi">2</span><span class="o">)));</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code>  <span class="c"># The Beam SDK for Python does not support triggers.</span>
 </code></pre>
 </div>
 
-<h4 id="other-composite-triggers">Other Composite Triggers</h4>
+<h4 id="other-composite-triggers">8.5.3. Other composite triggers</h4>
 
-<p>You can also build other sorts of composite triggers. The following example code shows a simple composite trigger that fires whenever the pane has at least 100 elements, or after a minute.</p>
+<p>You can also build other sorts of composite triggers. The following example code
+shows a simple composite trigger that fires whenever the pane has at least 100
+elements, or after a minute.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code>  <span class="n">Repeatedly</span><span class="o">.</span><span class="na">forever</span><span class="o">(</span><span class="n">AfterFirst</span><span class="o">.</span><span class="na">of</span><span class="o">(</span>
       <span class="n">AfterPane</span><span class="o">.</span><span class="na">elementCountAtLeast</span><span class="o">(</span><span class="mi">100</span><span class="o">),</span>
       <span class="n">AfterProcessingTime</span><span class="o">.</span><span class="na">pastFirstElementInPane</span><span class="o">().</span><span class="na">plusDelayOf</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardMinutes</span><span class="o">(</span><span class="mi">1</span><span class="o">))))</span>
 </code></pre>
 </div>
-
 <div class="language-py highlighter-rouge"><pre class="highlight"><code>  <span class="c"># The Beam SDK for Python does not support triggers.</span>
 </code></pre>
 </div>
diff --git a/content/documentation/sdks/python-custom-io/index.html b/content/documentation/sdks/python-custom-io/index.html
index e3a4892..33d48c7 100644
--- a/content/documentation/sdks/python-custom-io/index.html
+++ b/content/documentation/sdks/python-custom-io/index.html
@@ -154,7 +154,7 @@
 
 <h2 id="why-create-a-new-source-or-sink">Why Create a New Source or Sink</h2>
 
-<p>You’ll need to create a new source or sink if you want your pipeline to read data from (or write data to) a storage system for which the Beam SDK for Python does not provide <a href="/documentation/programming-guide/#io">native support</a>.</p>
+<p>You’ll need to create a new source or sink if you want your pipeline to read data from (or write data to) a storage system for which the Beam SDK for Python does not provide <a href="/documentation/programming-guide/#pipeline-io">native support</a>.</p>
 
 <p>In simple cases, you may not need to create a new source or sink. For example, if you need to read data from an SQL database using an arbitrary query, none of the advanced Source API features would benefit you. Likewise, if you’d like to write data to a third-party API via a protocol that lacks deduplication support, the Sink API wouldn’t benefit you. In such cases it makes more sense to use a <code class="highlighter-rouge">ParDo</code>.</p>
 
diff --git a/content/get-started/mobile-gaming-example/index.html b/content/get-started/mobile-gaming-example/index.html
index dbea94c..70ad2e5 100644
--- a/content/get-started/mobile-gaming-example/index.html
+++ b/content/get-started/mobile-gaming-example/index.html
@@ -219,7 +219,7 @@
 
 <p>Because some of our example pipelines use data files (like logs from the game server) as input, the event timestamp for each game might be embedded in the data–that is, it’s a field in each data record. Those pipelines need to parse the event timestamp from each data record after reading it from the input file.</p>
 
-<p>For pipelines that read unbounded game data from an unbounded source, the data source sets the intrinsic <a href="/documentation/programming-guide/#pctimestamps">timestamp</a> for each PCollection element to the appropriate event time.</p>
+<p>For pipelines that read unbounded game data from an unbounded source, the data source sets the intrinsic <a href="/documentation/programming-guide/#element-timestamps">timestamp</a> for each PCollection element to the appropriate event time.</p>
 
 <p>The Mobile Gaming example pipelines vary in complexity, from simple batch analysis to more complex pipelines that can perform real-time analysis and abuse detection. This section walks you through each example and demonstrates how to use Beam features like windowing and triggers to expand your pipeline’s capabilites.</p>
 
@@ -264,7 +264,7 @@
 
 <p>This example uses batch processing, and the diagram’s Y axis represents processing time: the pipeline processes events lower on the Y-axis first, and events higher up the axis later. The diagram’s X axis represents the event time for each game event, as denoted by that event’s timestamp. Note that the individual events in the diagram are not processed by the pipeline in the same order as they occurred (according to their timestamps).</p>
 
-<p>After reading the score events from the input file, the pipeline groups all of those user/score pairs together and sums the score values into one total value per unique user. <code class="highlighter-rouge">UserScore</code> encapsulates the core logic for that step as the <a href="/documentation/programming-guide/#transforms-composite">user-defined composite transform</a> <code class="highlighter-rouge">ExtractAndSumScore</code>:</p>
+<p>After reading the score events from the input file, the pipeline groups all of those user/score pairs together and sums the score values into one total value per unique user. <code class="highlighter-rouge">UserScore</code> encapsulates the core logic for that step as the <a href="/documentation/programming-guide/#composite-transforms">user-defined composite transform</a> <code class="highlighter-rouge">ExtractAndSumScore</code>:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">ExtractAndSumScore</span>
     <span class="kd">extends</span> <span class="n">PTransform</span><span class="o">&lt;</span><span class="n">PCollection</span><span class="o">&lt;</span><span class="n">GameActionInfo</span><span class="o">&gt;,</span> <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;&gt;</span> <span class="o">{</span>
@@ -425,7 +425,7 @@
   <p><strong>Note:</strong> As is shown in the diagram above, using windowing produces an <em>independent total for every interval</em> (in this case, each hour). <code class="highlighter-rouge">HourlyTeamScore</code> doesn’t provide a running total for the entire data set at each hour–it provides the total score for all the events that occurred <em>only within that hour</em>.</p>
 </blockquote>
 
-<p>Beam’s windowing feature uses the <a href="/documentation/programming-guide/#pctimestamps">intrinsic timestamp information</a> attached to each element of a <code class="highlighter-rouge">PCollection</code>. Because we want our pipeline to window based on <em>event time</em>, we <strong>must first extract the timestamp</strong> that’s embedded in each data record apply it to the corresponding element in the <code class="highlighter-rouge">PCollection</code> of score data. Then, the p [...]
+<p>Beam’s windowing feature uses the <a href="/documentation/programming-guide/#element-timestamps">intrinsic timestamp information</a> attached to each element of a <code class="highlighter-rouge">PCollection</code>. Because we want our pipeline to window based on <em>event time</em>, we <strong>must first extract the timestamp</strong> that’s embedded in each data record apply it to the corresponding element in the <code class="highlighter-rouge">PCollection</code> of score data. Then, [...]
 
 <p class="language-java"><code class="highlighter-rouge">HourlyTeamScore</code> uses the <a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java">WithTimestamps</a> and <a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java">Window</a> transforms to perform these operations.</p>
 
@@ -864,7 +864,7 @@
 
 <p>To determine whether or not a score is “abnormally” high, <code class="highlighter-rouge">GameStats</code> calculates the average of every score in that fixed-time window, and then checks each score individual score against the average score multiplied by an arbitrary weight factor (in our case, 2.5). Thus, any score more than 2.5 times the average is deemed to be the product of spam. The <code class="highlighter-rouge">GameStats</code> pipeline tracks a list of “spam” users and filte [...]
 
-<p>Since the average depends on the pipeline data, we need to calculate it, and then use that calculated data in a subsequent <code class="highlighter-rouge">ParDo</code> transform that filters scores that exceed the weighted value. To do this, we can pass the calculated average to as a <a href="/documentation/programming-guide/#transforms-sideio">side input</a> to the filtering <code class="highlighter-rouge">ParDo</code>.</p>
+<p>Since the average depends on the pipeline data, we need to calculate it, and then use that calculated data in a subsequent <code class="highlighter-rouge">ParDo</code> transform that filters scores that exceed the weighted value. To do this, we can pass the calculated average to as a <a href="/documentation/programming-guide/#side-inputs">side input</a> to the filtering <code class="highlighter-rouge">ParDo</code>.</p>
 
 <p>The following code example shows the composite transform that handles abuse detection. The transform uses the <code class="highlighter-rouge">Sum.integersPerKey</code> transform to sum all scores per user, and then the <code class="highlighter-rouge">Mean.globally</code> transform to determine the average score for all users. Once that’s been calculated (as a <code class="highlighter-rouge">PCollectionView</code> singleton), we can pass it to the filtering <code class="highlighter-rou [...]
 
diff --git a/content/get-started/wordcount-example/index.html b/content/get-started/wordcount-example/index.html
index 5d6ea39..a0845dc 100644
--- a/content/get-started/wordcount-example/index.html
+++ b/content/get-started/wordcount-example/index.html
@@ -366,7 +366,7 @@ executed, associated with that particular pipeline.</p>
 <p>The Minimal WordCount pipeline contains several transforms to read data into the
 pipeline, manipulate or otherwise transform the data, and write out the results.
 Transforms can consist of an individual operation, or can contain multiple
-nested transforms (which is a <a href="/documentation/programming-guide#transforms-composite">composite transform</a>).</p>
+nested transforms (which is a <a href="/documentation/programming-guide#composite-transforms">composite transform</a>).</p>
 
 <p>Each transform takes some kind of input data and produces some output data. The
 input and output data is often represented by the SDK class <code class="highlighter-rouge">PCollection</code>.
@@ -395,7 +395,7 @@ data stored in a publicly accessible Google Cloud Storage bucket (“gs://”).<
     </div>
   </li>
   <li>
-    <p>A <a href="/documentation/programming-guide/#transforms-pardo">ParDo</a>
+    <p>A <a href="/documentation/programming-guide/#pardo">ParDo</a>
 transform that invokes a <code class="highlighter-rouge">DoFn</code> (defined in-line as an anonymous class) on
 each element that tokenizes the text lines into individual words. The input
 for this transform is the <code class="highlighter-rouge">PCollection</code> of text lines generated by the
@@ -1057,7 +1057,7 @@ file:</p>
 
 <h3 id="adding-timestamps-to-data">Adding timestamps to data</h3>
 
-<p>Each element in a <code class="highlighter-rouge">PCollection</code> has an associated <a href="/documentation/programming-guide#pctimestamps">timestamp</a>.
+<p>Each element in a <code class="highlighter-rouge">PCollection</code> has an associated <a href="/documentation/programming-guide#element-timestamps">timestamp</a>.
 The timestamp for each element is initially assigned by the source that creates
 the <code class="highlighter-rouge">PCollection</code>. Some sources that create unbounded PCollections can assign
 each new element a timestamp that corresponds to when the element was read or

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <commits@beam.apache.org>.

Mime
View raw message