beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [2/3] beam-site git commit: Regenerate website
Date Fri, 12 May 2017 23:18:43 GMT
Regenerate website


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/12121557
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/12121557
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/12121557

Branch: refs/heads/asf-site
Commit: 12121557b155ec7d0aea865afa5c6f2801217d56
Parents: 9fb214f
Author: Davor Bonaci <davor@google.com>
Authored: Fri May 12 16:17:49 2017 -0700
Committer: Davor Bonaci <davor@google.com>
Committed: Fri May 12 16:17:49 2017 -0700

----------------------------------------------------------------------
 .../documentation/programming-guide/index.html  |  2 +-
 .../sdks/python-custom-io/index.html            |  2 +-
 .../get-started/wordcount-example/index.html    | 80 +++++++++++---------
 3 files changed, 47 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/12121557/content/documentation/programming-guide/index.html
----------------------------------------------------------------------
diff --git a/content/documentation/programming-guide/index.html b/content/documentation/programming-guide/index.html
index 1e6e9a7..3b3cb14 100644
--- a/content/documentation/programming-guide/index.html
+++ b/content/documentation/programming-guide/index.html
@@ -1939,7 +1939,7 @@ Subsequent transforms, however, are applied to the result of the <code
class="hi
     <span class="n">unix_timestamp</span> <span class="o">=</span>
<span class="n">extract_timestamp_from_log_entry</span><span class="p">(</span><span
class="n">element</span><span class="p">)</span>
     <span class="c"># Wrap and emit the current entry and new timestamp in a</span>
     <span class="c"># TimestampedValue.</span>
-    <span class="k">yield</span> <span class="n">beam</span><span
class="o">.</span><span class="n">TimestampedValue</span><span class="p">(</span><span
class="n">element</span><span class="p">,</span> <span class="n">unix_timestamp</span><span
class="p">)</span>
+    <span class="k">yield</span> <span class="n">beam</span><span
class="o">.</span><span class="n">window</span><span class="o">.</span><span
class="n">TimestampedValue</span><span class="p">(</span><span class="n">element</span><span
class="p">,</span> <span class="n">unix_timestamp</span><span class="p">)</span>
 
 <span class="n">timestamped_items</span> <span class="o">=</span>
<span class="n">items</span> <span class="o">|</span> <span class="s">'timestamp'</span>
<span class="o">&gt;&gt;</span> <span class="n">beam</span><span
class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span
class="n">AddTimestampDoFn</span><span class="p">())</span>
 

http://git-wip-us.apache.org/repos/asf/beam-site/blob/12121557/content/documentation/sdks/python-custom-io/index.html
----------------------------------------------------------------------
diff --git a/content/documentation/sdks/python-custom-io/index.html b/content/documentation/sdks/python-custom-io/index.html
index 629ef0f..fb2646f 100644
--- a/content/documentation/sdks/python-custom-io/index.html
+++ b/content/documentation/sdks/python-custom-io/index.html
@@ -464,7 +464,7 @@ numbers = p | 'ProduceNumbers' &gt;&gt; beam.io.Read(CountingSource(count))
 
 <h4 id="filesink">FileSink</h4>
 
-<p>If your data source uses files, you can derive your <code class="highlighter-rouge">Sink</code>
and <code class="highlighter-rouge">Writer</code> classes from the <code class="highlighter-rouge">FileSink</code>
and <code class="highlighter-rouge">FileSinkWriter</code> classes, which can be
found in the <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/fileio.py">fileio.py</a>
module. These classes implement code common sinks that interact with files, including:</p>
+<p>If your data source uses files, you can derive your <code class="highlighter-rouge">Sink</code>
and <code class="highlighter-rouge">Writer</code> classes from the <code class="highlighter-rouge">FileBasedSink</code>
and <code class="highlighter-rouge">FileBasedSinkWriter</code> classes, which
can be found in the <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsink.py">filebasedsink.py</a>
module. These classes implement code common sinks that interact with files, including:</p>
 
 <ul>
   <li>Setting file headers and footers</li>

http://git-wip-us.apache.org/repos/asf/beam-site/blob/12121557/content/get-started/wordcount-example/index.html
----------------------------------------------------------------------
diff --git a/content/get-started/wordcount-example/index.html b/content/get-started/wordcount-example/index.html
index 5cc32f3..333cfb9 100644
--- a/content/get-started/wordcount-example/index.html
+++ b/content/get-started/wordcount-example/index.html
@@ -172,6 +172,7 @@
           <li><a href="#dataflow-runner" id="markdown-toc-dataflow-runner">Dataflow
Runner</a></li>
           <li><a href="#apache-spark-runner" id="markdown-toc-apache-spark-runner">Apache
Spark Runner</a></li>
           <li><a href="#apache-flink-runner" id="markdown-toc-apache-flink-runner">Apache
Flink Runner</a></li>
+          <li><a href="#apache-apex-runner" id="markdown-toc-apache-apex-runner">Apache
Apex Runner</a></li>
         </ul>
       </li>
       <li><a href="#testing-your-pipeline-via-passert" id="markdown-toc-testing-your-pipeline-via-passert">Testing
your Pipeline via PAssert</a></li>
@@ -228,7 +229,7 @@
 
 <h3 id="creating-the-pipeline">Creating the Pipeline</h3>
 
-<p>The first step in creating a Beam pipeline is to create a <code class="highlighter-rouge">PipelineOptions
object</code>. This object lets us set various options for our pipeline, such as the
pipeline runner that will execute our pipeline and any runner-specific configuration required
by the chosen runner. In this example we set these options programmatically, but more often
command-line arguments are used to set <code class="highlighter-rouge">PipelineOptions</code>.</p>
+<p>The first step in creating a Beam pipeline is to create a <code class="highlighter-rouge">PipelineOptions</code>
object. This object lets us set various options for our pipeline, such as the pipeline runner
that will execute our pipeline and any runner-specific configuration required by the chosen
runner. In this example we set these options programmatically, but more often command-line
arguments are used to set <code class="highlighter-rouge">PipelineOptions</code>.</p>
 
 <p>You can specify a runner for executing your pipeline, such as the <code class="highlighter-rouge">DataflowRunner</code>
or <code class="highlighter-rouge">SparkRunner</code>. If you omit specifying
a runner, as in this example, your pipeline will be executed locally using the <code class="highlighter-rouge">DirectRunner</code>.
In the next sections, we will specify the pipeline’s runner.</p>
 
@@ -273,7 +274,7 @@
 
 <p>The Minimal WordCount pipeline contains several transforms to read data into the
pipeline, manipulate or otherwise transform the data, and write out the results. Each transform
represents an operation in the pipeline.</p>
 
-<p>Each transform takes some kind of input (data or otherwise), and produces some output
data. The input and output data is represented by the SDK class <code class="highlighter-rouge">PCollection</code>.
<code class="highlighter-rouge">PCollection</code> is a special class, provided
by the Beam SDK, that you can use to represent a data set of virtually any size, including
infinite data sets.</p>
+<p>Each transform takes some kind of input (data or otherwise), and produces some output
data. The input and output data is represented by the SDK class <code class="highlighter-rouge">PCollection</code>.
<code class="highlighter-rouge">PCollection</code> is a special class, provided
by the Beam SDK, that you can use to represent a data set of virtually any size, including
unbounded data sets.</p>
 
 <p><img src="/images/wordcount-pipeline.png" alt="Word Count pipeline diagram" />
 Figure 1: The pipeline data flow.</p>
@@ -282,7 +283,7 @@ Figure 1: The pipeline data flow.</p>
 
 <ol>
   <li>
-    <p>A text file <code class="highlighter-rouge">Read</code> transform
is applied to the Pipeline object itself, and produces a <code class="highlighter-rouge">PCollection</code>
as output. Each element in the output PCollection represents one line of text from the input
file. This example happens to use input data stored in a publicly accessible Google Cloud
Storage bucket (“gs://”).</p>
+    <p>A text file <code class="highlighter-rouge">Read</code> transform
is applied to the Pipeline object itself, and produces a <code class="highlighter-rouge">PCollection</code>
as output. Each element in the output PCollection represents one line of text from the input
file. This example uses input data stored in a publicly accessible Google Cloud Storage bucket
(“gs://”).</p>
 
     <div class="language-java highlighter-rouge"><pre class="highlight"><code><span
class="n">p</span><span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span
class="na">Read</span><span class="o">.</span><span class="na">from</span><span
class="o">(</span><span class="s">"gs://apache-beam-samples/shakespeare/*"</span><span
class="o">))</span>
 </code></pre>
@@ -299,7 +300,9 @@ Figure 1: The pipeline data flow.</p>
     <div class="language-java highlighter-rouge"><pre class="highlight"><code><span
class="o">.</span><span class="na">apply</span><span class="o">(</span><span
class="s">"ExtractWords"</span><span class="o">,</span> <span class="n">ParDo</span><span
class="o">.</span><span class="na">of</span><span class="o">(</span><span
class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span
class="n">String</span><span class="o">,</span> <span class="n">String</span><span
class="o">&gt;()</span> <span class="o">{</span>
     <span class="nd">@ProcessElement</span>
     <span class="kd">public</span> <span class="kt">void</span> <span
class="nf">processElement</span><span class="o">(</span><span class="n">ProcessContext</span>
<span class="n">c</span><span class="o">)</span> <span class="o">{</span>
-        <span class="k">for</span> <span class="o">(</span><span
class="n">String</span> <span class="n">word</span> <span class="o">:</span>
<span class="n">c</span><span class="o">.</span><span class="na">element</span><span
class="o">().</span><span class="na">split</span><span class="o">(</span><span
class="s">"[^a-zA-Z']+"</span><span class="o">))</span> <span class="o">{</span>
+        <span class="c1">// \p{L} denotes the category of Unicode letters,</span>
+        <span class="c1">// so this pattern will match on everything that is not a
letter.</span>
+        <span class="k">for</span> <span class="o">(</span><span
class="n">String</span> <span class="n">word</span> <span class="o">:</span>
<span class="n">c</span><span class="o">.</span><span class="na">element</span><span
class="o">().</span><span class="na">split</span><span class="o">(</span><span
class="s">"[^\\p{L}]+"</span><span class="o">))</span> <span class="o">{</span>
             <span class="k">if</span> <span class="o">(!</span><span
class="n">word</span><span class="o">.</span><span class="na">isEmpty</span><span
class="o">())</span> <span class="o">{</span>
                 <span class="n">c</span><span class="o">.</span><span
class="na">output</span><span class="o">(</span><span class="n">word</span><span
class="o">);</span>
             <span class="o">}</span>
@@ -330,7 +333,7 @@ Figure 1: The pipeline data flow.</p>
   <li>
     <p>The next transform formats each of the key/value pairs of unique words and occurrence
counts into a printable string suitable for writing to an output file.</p>
 
-    <p>The map transform is a higher-level composite transform that encapsulates a
simple <code class="highlighter-rouge">ParDo</code>; for each element in the input
<code class="highlighter-rouge">PCollection</code>, the map transform applies
a function that produces exactly one output element.</p>
+    <p>The map transform is a higher-level composite transform that encapsulates a
simple <code class="highlighter-rouge">ParDo</code>. For each element in the input
<code class="highlighter-rouge">PCollection</code>, the map transform applies
a function that produces exactly one output element.</p>
 
     <div class="language-java highlighter-rouge"><pre class="highlight"><code><span
class="o">.</span><span class="na">apply</span><span class="o">(</span><span
class="s">"FormatResults"</span><span class="o">,</span> <span class="n">MapElements</span><span
class="o">.</span><span class="na">via</span><span class="o">(</span><span
class="k">new</span> <span class="n">SimpleFunction</span><span class="o">&lt;</span><span
class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span
class="o">,</span> <span class="n">Long</span><span class="o">&gt;,</span>
<span class="n">String</span><span class="o">&gt;()</span> <span
class="o">{</span>
     <span class="nd">@Override</span>
@@ -526,14 +529,6 @@ Figure 1: The pipeline data flow.</p>
 
 <p>Each runner may choose to handle logs in its own way.</p>
 
-<h4 id="direct-runner">Direct Runner</h4>
-
-<p>If you execute your pipeline using <code class="highlighter-rouge">DirectRunner</code>,
it will print the log messages directly to your local console.</p>
-
-<h4 id="dataflow-runner">Dataflow Runner</h4>
-
-<p>If you execute your pipeline using <code class="highlighter-rouge">DataflowRunner</code>,
you can use Google Cloud Logging. Google Cloud Logging (currently in beta) aggregates the
logs from all of your Dataflow job’s workers to a single location in the Google Cloud Platform
Console. You can use Cloud Logging to search and access the logs from all of the Compute Engine
instances that Dataflow has spun up to complete your Dataflow job. You can add logging statements
into your pipeline’s <code class="highlighter-rouge">DoFn</code> instances that
will appear in Cloud Logging as your pipeline runs.</p>
-
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span
class="c1">// This example uses .trace and .debug:</span>
 
 <span class="kd">public</span> <span class="kd">class</span> <span
class="nc">DebuggingWordCount</span> <span class="o">{</span>
@@ -592,7 +587,15 @@ Figure 1: The pipeline data flow.</p>
 </code></pre>
 </div>
 
-<p>If you execute your pipeline using <code class="highlighter-rouge">DataflowRunner</code>,
you can control the worker log levels. Dataflow workers that execute user code are configured
to log to Cloud Logging by default at “INFO” log level and higher. You can override log
levels for specific logging namespaces by specifying: <code class="highlighter-rouge">--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}</code>.
For example, by specifying <code class="highlighter-rouge">--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}</code>
when executing this pipeline using the Dataflow service, Cloud Logging would contain only
“DEBUG” or higher level logs for the package in addition to the default “INFO” or
higher level logs.</p>
+<h4 id="direct-runner">Direct Runner</h4>
+
+<p>If you execute your pipeline using <code class="highlighter-rouge">DirectRunner</code>,
it will print the log messages directly to your local console.</p>
+
+<h4 id="dataflow-runner">Dataflow Runner</h4>
+
+<p>If you execute your pipeline using <code class="highlighter-rouge">DataflowRunner</code>,
you can use Stackdriver Logging. Stackdriver Logging aggregates the logs from all of your
Dataflow job’s workers to a single location in the Google Cloud Platform Console. You can
use Stackdriver Logging to search and access the logs from all of the workers that Dataflow
has spun up to complete your Dataflow job. Logging statements in your pipeline’s <code
class="highlighter-rouge">DoFn</code> instances will appear in Stackdriver Logging
as your pipeline runs.</p>
+
+<p>If you execute your pipeline using <code class="highlighter-rouge">DataflowRunner</code>,
you can control the worker log levels. Dataflow workers that execute user code are configured
to log to Stackdriver Logging by default at “INFO” log level and higher. You can override
log levels for specific logging namespaces by specifying: <code class="highlighter-rouge">--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}</code>.
For example, by specifying <code class="highlighter-rouge">--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}</code>
when executing this pipeline using the Dataflow service, Stackdriver Logging would contain
only “DEBUG” or higher level logs for the package in addition to the default “INFO”
or higher level logs.</p>
 
 <p>The default Dataflow worker logging configuration can be overridden by specifying
<code class="highlighter-rouge">--defaultWorkerLogLevel=&lt;one of TRACE, DEBUG,
INFO, WARN, ERROR&gt;</code>. For example, by specifying <code class="highlighter-rouge">--defaultWorkerLogLevel=DEBUG</code>
when executing this pipeline with the Dataflow service, Cloud Logging would contain all “DEBUG”
or higher level logs. Note that changing the default worker log level to TRACE or DEBUG will
significantly increase the amount of logs output.</p>
 
@@ -608,11 +611,17 @@ Figure 1: The pipeline data flow.</p>
   <p><strong>Note:</strong> This section is yet to be added. There is an
open issue for this (<a href="https://issues.apache.org/jira/browse/BEAM-791">BEAM-791</a>).</p>
 </blockquote>
 
+<h4 id="apache-apex-runner">Apache Apex Runner</h4>
+
+<blockquote>
+  <p><strong>Note:</strong> This section is yet to be added. There is an
open issue for this (<a href="https://issues.apache.org/jira/browse/BEAM-2285">BEAM-2285</a>).</p>
+</blockquote>
+
 <h3 id="testing-your-pipeline-via-passert">Testing your Pipeline via PAssert</h3>
 
-<p><code class="highlighter-rouge">PAssert</code> is a set of convenient
<code class="highlighter-rouge">PTransform</code>s in the style of Hamcrest’s
collection matchers that can be used when writing Pipeline level tests to validate the contents
of PCollections. <code class="highlighter-rouge">PAssert</code> is best used in
unit tests with small data sets, but is demonstrated here as a teaching tool.</p>
+<p><code class="highlighter-rouge">PAssert</code> is a set of convenient
PTransforms in the style of Hamcrest’s collection matchers that can be used when writing
Pipeline level tests to validate the contents of PCollections. <code class="highlighter-rouge">PAssert</code>
is best used in unit tests with small data sets, but is demonstrated here as a teaching tool.</p>
 
-<p>Below, we verify that the set of filtered words matches our expected counts. Note
that <code class="highlighter-rouge">PAssert</code> does not provide any output,
and that successful completion of the pipeline implies that the expectations were met. See
<a href="https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java">DebuggingWordCountTest</a>
for an example unit test.</p>
+<p>Below, we verify that the set of filtered words matches our expected counts. Note
that <code class="highlighter-rouge">PAssert</code> does not produce any output,
and pipeline will only succeed if all of the expectations are met. See <a href="https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java">DebuggingWordCountTest</a>
for an example unit test.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span
class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span>
<span class="nf">main</span><span class="o">(</span><span class="n">String</span><span
class="o">[]</span> <span class="n">args</span><span class="o">)</span>
<span class="o">{</span>
   <span class="o">...</span>
@@ -646,7 +655,7 @@ Figure 1: The pipeline data flow.</p>
 
 <h3 id="unbounded-and-bounded-pipeline-input-modes">Unbounded and bounded pipeline
input modes</h3>
 
-<p>Beam allows you to create a single pipeline that can handle both bounded and unbounded
types of input. If the input is unbounded, then all <code class="highlighter-rouge">PCollections</code>
of the pipeline will be unbounded as well. The same goes for bounded input. If your input
has a fixed number of elements, it’s considered a ‘bounded’ data set. If your input
is continuously updating, then it’s considered ‘unbounded’.</p>
+<p>Beam allows you to create a single pipeline that can handle both bounded and unbounded
types of input. If the input is unbounded, then all PCollections of the pipeline will be unbounded
as well. The same goes for bounded input. If your input has a fixed number of elements, it’s
considered a ‘bounded’ data set. If your input is continuously updating, then it’s considered
‘unbounded’.</p>
 
 <p>Recall that the input for this example is a a set of Shakespeare’s texts, finite
data. Therefore, this example reads bounded data from a text file:</p>
 
@@ -679,20 +688,24 @@ Figure 1: The pipeline data flow.</p>
 <p>Below is the code for <code class="highlighter-rouge">AddTimestampFn</code>,
a <code class="highlighter-rouge">DoFn</code> invoked by <code class="highlighter-rouge">ParDo</code>,
that sets the data element of the timestamp given the element itself. For example, if the
elements were log lines, this <code class="highlighter-rouge">ParDo</code> could
parse the time out of the log string and set it as the element’s timestamp. There are no
timestamps inherent in the works of Shakespeare, so in this case we’ve made up random timestamps
just to illustrate the concept. Each line of the input text will get a random associated timestamp
sometime in a 2-hour period.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span
class="kd">static</span> <span class="kd">class</span> <span class="nc">AddTimestampFn</span>
<span class="kd">extends</span> <span class="n">DoFn</span><span
class="o">&lt;</span><span class="n">String</span><span class="o">,</span>
<span class="n">String</span><span class="o">&gt;</span> <span
class="o">{</span>
-  <span class="kd">private</span> <span class="kd">static</span>
<span class="kd">final</span> <span class="n">Duration</span> <span
class="n">RAND_RANGE</span> <span class="o">=</span> <span class="n">Duration</span><span
class="o">.</span><span class="na">standardHours</span><span class="o">(</span><span
class="mi">2</span><span class="o">);</span>
   <span class="kd">private</span> <span class="kd">final</span> <span
class="n">Instant</span> <span class="n">minTimestamp</span><span
class="o">;</span>
+  <span class="kd">private</span> <span class="kd">final</span> <span
class="n">Instant</span> <span class="n">maxTimestamp</span><span
class="o">;</span>
 
-  <span class="n">AddTimestampFn</span><span class="o">()</span>
<span class="o">{</span>
-    <span class="k">this</span><span class="o">.</span><span class="na">minTimestamp</span>
<span class="o">=</span> <span class="k">new</span> <span class="n">Instant</span><span
class="o">(</span><span class="n">System</span><span class="o">.</span><span
class="na">currentTimeMillis</span><span class="o">());</span>
+  <span class="n">AddTimestampFn</span><span class="o">(</span><span
class="n">Instant</span> <span class="n">minTimestamp</span><span
class="o">,</span> <span class="n">Instant</span> <span class="n">maxTimestamp</span><span
class="o">)</span> <span class="o">{</span>
+    <span class="k">this</span><span class="o">.</span><span class="na">minTimestamp</span>
<span class="o">=</span> <span class="n">minTimestamp</span><span
class="o">;</span>
+    <span class="k">this</span><span class="o">.</span><span class="na">maxTimestamp</span>
<span class="o">=</span> <span class="n">maxTimestamp</span><span
class="o">;</span>
   <span class="o">}</span>
 
   <span class="nd">@ProcessElement</span>
   <span class="kd">public</span> <span class="kt">void</span> <span
class="nf">processElement</span><span class="o">(</span><span class="n">ProcessContext</span>
<span class="n">c</span><span class="o">)</span> <span class="o">{</span>
-    <span class="c1">// Generate a timestamp that falls somewhere in the past two hours.</span>
-    <span class="kt">long</span> <span class="n">randMillis</span>
<span class="o">=</span> <span class="o">(</span><span class="kt">long</span><span
class="o">)</span> <span class="o">(</span><span class="n">Math</span><span
class="o">.</span><span class="na">random</span><span class="o">()</span>
<span class="o">*</span> <span class="n">RAND_RANGE</span><span
class="o">.</span><span class="na">getMillis</span><span class="o">());</span>
-    <span class="n">Instant</span> <span class="n">randomTimestamp</span>
<span class="o">=</span> <span class="n">minTimestamp</span><span
class="o">.</span><span class="na">plus</span><span class="o">(</span><span
class="n">randMillis</span><span class="o">);</span>
-
-    <span class="c1">// Set the data element with that timestamp.</span>
+    <span class="n">Instant</span> <span class="n">randomTimestamp</span>
<span class="o">=</span>
+      <span class="k">new</span> <span class="nf">Instant</span><span
class="o">(</span>
+          <span class="n">ThreadLocalRandom</span><span class="o">.</span><span
class="na">current</span><span class="o">()</span>
+          <span class="o">.</span><span class="na">nextLong</span><span
class="o">(</span><span class="n">minTimestamp</span><span class="o">.</span><span
class="na">getMillis</span><span class="o">(),</span> <span class="n">maxTimestamp</span><span
class="o">.</span><span class="na">getMillis</span><span class="o">()));</span>
+
+    <span class="cm">/**
+     * Concept #2: Set the data element with that timestamp.
+     */</span>
     <span class="n">c</span><span class="o">.</span><span class="na">outputWithTimestamp</span><span
class="o">(</span><span class="n">c</span><span class="o">.</span><span
class="na">element</span><span class="o">(),</span> <span class="k">new</span>
<span class="n">Instant</span><span class="o">(</span><span class="n">randomTimestamp</span><span
class="o">));</span>
   <span class="o">}</span>
 <span class="o">}</span>
@@ -705,9 +718,9 @@ Figure 1: The pipeline data flow.</p>
 
 <h3 id="windowing">Windowing</h3>
 
-<p>Beam uses a concept called <strong>Windowing</strong> to subdivide a
<code class="highlighter-rouge">PCollection</code> according to the timestamps
of its individual elements. <code class="highlighter-rouge">PTransforms</code>
that aggregate multiple elements, process each <code class="highlighter-rouge">PCollection</code>
as a succession of multiple, finite windows, even though the entire collection itself may
be of infinite size (unbounded).</p>
+<p>Beam uses a concept called <strong>Windowing</strong> to subdivide a
<code class="highlighter-rouge">PCollection</code> according to the timestamps
of its individual elements. PTransforms that aggregate multiple elements, process each <code
class="highlighter-rouge">PCollection</code> as a succession of multiple, finite
windows, even though the entire collection itself may be of infinite size (unbounded).</p>
 
-<p>The <code class="highlighter-rouge">WindowingWordCount</code> example
applies fixed-time windowing, wherein each window represents a fixed time interval. The fixed
window size for this example defaults to 1 minute (you can change this with a command-line
option).</p>
+<p>The <code class="highlighter-rouge">WindowedWordCount</code> example
applies fixed-time windowing, wherein each window represents a fixed time interval. The fixed
window size for this example defaults to 1 minute (you can change this with a command-line
option).</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span
class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span
class="o">&gt;</span> <span class="n">windowedWords</span> <span
class="o">=</span> <span class="n">input</span>
   <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span
class="n">Window</span><span class="o">.&lt;</span><span class="n">String</span><span
class="o">&gt;</span><span class="n">into</span><span class="o">(</span>
@@ -721,7 +734,7 @@ Figure 1: The pipeline data flow.</p>
 
 <h3 id="reusing-ptransforms-over-windowed-pcollections">Reusing PTransforms over windowed
PCollections</h3>
 
-<p>You can reuse existing <code class="highlighter-rouge">PTransform</code>s,
that were created for manipulating simple <code class="highlighter-rouge">PCollection</code>s,
over windowed <code class="highlighter-rouge">PCollection</code>s as well.</p>
+<p>You can reuse existing PTransforms that were created for manipulating simple PCollections
over windowed PCollections as well.</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span
class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span
class="o">&lt;</span><span class="n">String</span><span class="o">,</span>
<span class="n">Long</span><span class="o">&gt;&gt;</span>
<span class="n">wordCounts</span> <span class="o">=</span> <span
class="n">windowedWords</span><span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="k">new</span> <span class="n">WordCount</span><span
class="o">.</span><span class="na">CountWords</span><span class="o">());</span>
 </code></pre>
@@ -733,16 +746,13 @@ Figure 1: The pipeline data flow.</p>
 
 <h3 id="write-results-to-an-unbounded-sink">Write Results to an Unbounded Sink</h3>
 
-<p>Since our input is unbounded, the same is true of our output <code class="highlighter-rouge">PCollection</code>.
We need to make sure that we choose an appropriate, unbounded sink. Some output sinks support
only bounded output, such as a text file. Google Cloud BigQuery is an output source that supports
both bounded and unbounded input.</p>
+<p>When our input is unbounded, the same is true of our output <code class="highlighter-rouge">PCollection</code>.
We need to make sure that we choose an appropriate, unbounded sink. Some output sinks support
only bounded output, while others support both bounded and unbounded outputs. By using a <code
class="highlighter-rouge">FilenamePolicy</code>, we can use <code class="highlighter-rouge">TextIO</code>
to files that are partitioned by windows. We use a composite <code class="highlighter-rouge">PTransform</code>
that uses such a policy internally to write a single sharded file per window.</p>
 
 <p>In this example, we stream the results to a BigQuery table. The results are then
formatted for a BigQuery table, and then written to BigQuery using BigQueryIO.Write.</p>
 
-<div class="language-java highlighter-rouge"><pre class="highlight"><code><span
class="n">wordCounts</span><span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="n">ParDo</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span class="k">new</span>
<span class="n">FormatAsTableRowFn</span><span class="o">()))</span>
-    <span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="n">BigQueryIO</span><span class="o">.</span><span
class="na">Write</span>
-      <span class="o">.</span><span class="na">to</span><span
class="o">(</span><span class="n">getTableReference</span><span class="o">(</span><span
class="n">options</span><span class="o">))</span>
-      <span class="o">.</span><span class="na">withSchema</span><span
class="o">(</span><span class="n">getSchema</span><span class="o">())</span>
-      <span class="o">.</span><span class="na">withCreateDisposition</span><span
class="o">(</span><span class="n">BigQueryIO</span><span class="o">.</span><span
class="na">Write</span><span class="o">.</span><span class="na">CreateDisposition</span><span
class="o">.</span><span class="na">CREATE_IF_NEEDED</span><span class="o">)</span>
-      <span class="o">.</span><span class="na">withWriteDisposition</span><span
class="o">(</span><span class="n">BigQueryIO</span><span class="o">.</span><span
class="na">Write</span><span class="o">.</span><span class="na">WriteDisposition</span><span
class="o">.</span><span class="na">WRITE_APPEND</span><span class="o">));</span>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code>
 <span class="n">wordCounts</span>
+      <span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="n">MapElements</span><span class="o">.</span><span
class="na">via</span><span class="o">(</span><span class="k">new</span>
<span class="n">WordCount</span><span class="o">.</span><span class="na">FormatAsTextFn</span><span
class="o">()))</span>
+      <span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="k">new</span> <span class="n">WriteOneFilePerWindow</span><span
class="o">(</span><span class="n">output</span><span class="o">,</span>
<span class="n">options</span><span class="o">.</span><span class="na">getNumShards</span><span
class="o">()));</span>
 </code></pre>
 </div>
 


Mime
View raw message