beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [4/5] beam-site git commit: Regenerate website
Date Mon, 13 Feb 2017 21:31:46 GMT
Regenerate website


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/2dd05932
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/2dd05932
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/2dd05932

Branch: refs/heads/asf-site
Commit: 2dd059328e13788c7e58a6be5e2098ea5af82336
Parents: 4c43991
Author: Davor Bonaci <davor@google.com>
Authored: Mon Feb 13 13:31:19 2017 -0800
Committer: Davor Bonaci <davor@google.com>
Committed: Mon Feb 13 13:31:19 2017 -0800

----------------------------------------------------------------------
 .../blog/2017/02/13/stateful-processing.html    | 751 +++++++++++++++++++
 content/blog/index.html                         |  21 +
 content/feed.xml                                | 593 ++++++++++++++-
 .../blog/stateful-processing/assign-indices.png | Bin 0 -> 29308 bytes
 .../blog/stateful-processing/combinefn.png      | Bin 0 -> 18138 bytes
 .../stateful-processing/combiner-lifting.png    | Bin 0 -> 37974 bytes
 .../blog/stateful-processing/pardo-and-gbk.png  | Bin 0 -> 27227 bytes
 .../blog/stateful-processing/pipeline.png       | Bin 0 -> 14308 bytes
 .../images/blog/stateful-processing/plaid.png   | Bin 0 -> 46216 bytes
 .../blog/stateful-processing/stateful-dofn.png  | Bin 0 -> 22229 bytes
 .../blog/stateful-processing/stateful-pardo.png | Bin 0 -> 18035 bytes
 content/index.html                              |   4 +-
 12 files changed, 1348 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/2dd05932/content/blog/2017/02/13/stateful-processing.html
----------------------------------------------------------------------
diff --git a/content/blog/2017/02/13/stateful-processing.html b/content/blog/2017/02/13/stateful-processing.html
new file mode 100644
index 0000000..dcf077a
--- /dev/null
+++ b/content/blog/2017/02/13/stateful-processing.html
@@ -0,0 +1,751 @@
+<!DOCTYPE html>
+<html lang="en">
+
+  <head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1">
+
+  <title>Stateful processing with Apache Beam</title>
+  <meta name="description" content="Beam lets you process unbounded, out-of-order, global-scale data with portablehigh-level pipelines. Stateful processing is a new feature of the Beam modeltha...">
+
+  <link rel="stylesheet" href="/styles/site.css">
+  <link rel="stylesheet" href="/css/theme.css">
+  <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.0/jquery.min.js"></script>
+  <script src="/js/bootstrap.min.js"></script>
+  <script src="/js/language-switch.js"></script>
+  <link rel="canonical" href="https://beam.apache.org/blog/2017/02/13/stateful-processing.html" data-proofer-ignore>
+  <link rel="alternate" type="application/rss+xml" title="Apache Beam" href="https://beam.apache.org/feed.xml">
+  <script>
+    (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
+    (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
+    m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
+    })(window,document,'script','//www.google-analytics.com/analytics.js','ga');
+
+    ga('create', 'UA-73650088-1', 'auto');
+    ga('send', 'pageview');
+
+  </script>
+  <link rel="shortcut icon" type="image/x-icon" href="/images/favicon.ico">
+</head>
+
+
+  <body role="document">
+
+    <nav class="navbar navbar-default navbar-fixed-top">
+  <div class="container">
+    <div class="navbar-header">
+      <a href="/" class="navbar-brand" >
+        <img alt="Brand" style="height: 25px" src="/images/beam_logo_navbar.png">
+      </a>
+      <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#navbar" aria-expanded="false" aria-controls="navbar">
+        <span class="sr-only">Toggle navigation</span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+      </button>
+    </div>
+    <div id="navbar" class="navbar-collapse collapse">
+      <ul class="nav navbar-nav">
+        <li class="dropdown">
+		  <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Get Started <span class="caret"></span></a>
+		  <ul class="dropdown-menu">
+			  <li><a href="/get-started/beam-overview/">Beam Overview</a></li>
+        <li><a href="/get-started/quickstart-java/">Quickstart - Java</a></li>
+        <li><a href="/get-started/quickstart-py/">Quickstart - Python</a></li>
+			  <li role="separator" class="divider"></li>
+			  <li class="dropdown-header">Example Walkthroughs</li>
+			  <li><a href="/get-started/wordcount-example/">WordCount</a></li>
+			  <li><a href="/get-started/mobile-gaming-example/">Mobile Gaming</a></li>
+              <li role="separator" class="divider"></li>
+              <li class="dropdown-header">Resources</li>
+              <li><a href="/get-started/downloads">Downloads</a></li>
+              <li><a href="/get-started/support">Support</a></li>
+		  </ul>
+	    </li>
+        <li class="dropdown">
+		  <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Documentation <span class="caret"></span></a>
+		  <ul class="dropdown-menu">
+			  <li><a href="/documentation">Using the Documentation</a></li>
+			  <li role="separator" class="divider"></li>
+			  <li class="dropdown-header">Beam Concepts</li>
+			  <li><a href="/documentation/programming-guide/">Programming Guide</a></li>
+			  <li><a href="/documentation/resources/">Additional Resources</a></li>
+			  <li role="separator" class="divider"></li>
+              <li class="dropdown-header">Pipeline Fundamentals</li>
+              <li><a href="/documentation/pipelines/design-your-pipeline/">Design Your Pipeline</a></li>
+              <li><a href="/documentation/pipelines/create-your-pipeline/">Create Your Pipeline</a></li>
+              <li><a href="/documentation/pipelines/test-your-pipeline/">Test Your Pipeline</a></li>
+              <li role="separator" class="divider"></li>
+			  <li class="dropdown-header">SDKs</li>
+			  <li><a href="/documentation/sdks/java/">Java SDK</a></li>
+			  <li><a href="/documentation/sdks/javadoc/0.5.0/" target="_blank">Java SDK API Reference <img src="/images/external-link-icon.png"
+                 width="14" height="14"
+                 alt="External link."></a>
+        </li>
+        <li><a href="/documentation/sdks/python/">Python SDK</a></li>
+			  <li role="separator" class="divider"></li>
+			  <li class="dropdown-header">Runners</li>
+			  <li><a href="/documentation/runners/capability-matrix/">Capability Matrix</a></li>
+			  <li><a href="/documentation/runners/direct/">Direct Runner</a></li>
+			  <li><a href="/documentation/runners/apex/">Apache Apex Runner</a></li>
+			  <li><a href="/documentation/runners/flink/">Apache Flink Runner</a></li>
+			  <li><a href="/documentation/runners/spark/">Apache Spark Runner</a></li>
+			  <li><a href="/documentation/runners/dataflow/">Cloud Dataflow Runner</a></li>
+		  </ul>
+	    </li>
+        <li class="dropdown">
+		  <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Contribute <span class="caret"></span></a>
+		  <ul class="dropdown-menu">
+			  <li><a href="/contribute">Get Started Contributing</a></li>
+        <li role="separator" class="divider"></li>
+        <li class="dropdown-header">Guides</li>
+			  <li><a href="/contribute/contribution-guide/">Contribution Guide</a></li>
+        <li><a href="/contribute/testing/">Testing Guide</a></li>
+        <li><a href="/contribute/release-guide/">Release Guide</a></li>
+        <li><a href="/contribute/ptransform-style-guide/">PTransform Style Guide</a></li>
+        <li role="separator" class="divider"></li>
+        <li class="dropdown-header">Technical References</li>
+        <li><a href="/contribute/design-principles/">Design Principles</a></li>
+			  <li><a href="/contribute/work-in-progress/">Ongoing Projects</a></li>
+        <li><a href="/contribute/source-repository/">Source Repository</a></li>      
+        <li role="separator" class="divider"></li>
+			  <li class="dropdown-header">Promotion</li>
+        <li><a href="/contribute/presentation-materials/">Presentation Materials</a></li>
+        <li><a href="/contribute/logos/">Logos and Design</a></li>
+        <li role="separator" class="divider"></li>
+        <li><a href="/contribute/maturity-model/">Maturity Model</a></li>
+        <li><a href="/contribute/team/">Team</a></li>
+		  </ul>
+	    </li>
+
+        <li><a href="/blog">Blog</a></li>
+      </ul>
+      <ul class="nav navbar-nav navbar-right">
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><img src="https://www.apache.org/foundation/press/kit/feather_small.png" alt="Apache Logo" style="height:24px;">Apache Software Foundation<span class="caret"></span></a>
+          <ul class="dropdown-menu dropdown-menu-right">
+            <li><a href="http://www.apache.org/">ASF Homepage</a></li>
+            <li><a href="http://www.apache.org/licenses/">License</a></li>
+            <li><a href="http://www.apache.org/security/">Security</a></li>
+            <li><a href="http://www.apache.org/foundation/thanks.html">Thanks</a></li>
+            <li><a href="http://www.apache.org/foundation/sponsorship.html">Sponsorship</a></li>
+            <li><a href="https://www.apache.org/foundation/policies/conduct">Code of Conduct</a></li>
+          </ul>
+        </li>
+      </ul>
+    </div><!--/.nav-collapse -->
+  </div>
+</nav>
+
+
+<link rel="stylesheet" href="">
+
+
+    <div class="container" role="main">
+
+      <div class="row">
+        
+
+<article class="post" itemscope itemtype="http://schema.org/BlogPosting">
+
+  <header class="post-header">
+    <h1 class="post-title" itemprop="name headline">Stateful processing with Apache Beam</h1>
+    <p class="post-meta"><time datetime="2017-02-13T00:00:01-08:00" itemprop="datePublished">Feb 13, 2017</time> •  Kenneth Knowles [<a href="https://twitter.com/KennKnowles">@KennKnowles</a>]
+</p>
+  </header>
+
+  <div class="post-content" itemprop="articleBody">
+    <p>Beam lets you process unbounded, out-of-order, global-scale data with portable
+high-level pipelines. Stateful processing is a new feature of the Beam model
+that expands the capabilities of Beam, unlocking new use cases and new
+efficiencies. In this post, I will guide you through stateful processing in
+Beam: how it works, how it fits in with the other features of the Beam model,
+what you might use it for, and what it looks like in code.</p>
+
+<!--more-->
+
+<blockquote>
+  <p><strong>Warning: new features ahead!</strong>: This is a very new aspect of the Beam
+model. Runners are still adding support. You can try it out today on multiple
+runners, but do check the <a href="/documentation/runners/capability-matrix/">runner capability
+matrix</a> for
+the current status in each runner.</p>
+</blockquote>
+
+<p>First, a quick recap: In Beam, a big data processing <em>pipeline</em> is a directed,
+acyclic graph of parallel operations called <em><code class="highlighter-rouge">PTransforms</code></em> processing data
+from <em><code class="highlighter-rouge">PCollections</code></em> I’ll expand on that by walking through this illustration:</p>
+
+<p><img class="center-block" src="/images/blog/stateful-processing/pipeline.png" alt="A Beam Pipeline - PTransforms are boxes - PCollections are arrows" width="300" /></p>
+
+<p>The boxes are <code class="highlighter-rouge">PTransforms</code> and the edges represent the data in <code class="highlighter-rouge">PCollections</code>
+flowing from one <code class="highlighter-rouge">PTransform</code> to the next. A <code class="highlighter-rouge">PCollection</code> may be <em>bounded</em> (which
+means it is finite and you know it) or <em>unbounded</em> (which means you don’t know if
+it is finite or not - basically, it is like an incoming stream of data that may
+or may not ever terminate). The cylinders are the data sources and sinks at the
+edges of your pipeline, such as bounded collections of log files or unbounded
+data streaming over a Kafka topic. This blog post isn’t about sources or sinks,
+but about what happens in between - your data processing.</p>
+
+<p>There are two main building blocks for processing your data in Beam: <em><code class="highlighter-rouge">ParDo</code></em>,
+for performing an operation in parallel across all elements, and <em><code class="highlighter-rouge">GroupByKey</code></em>
+(and the closely related <code class="highlighter-rouge">CombinePerKey</code> that I will talk about quite soon)
+for aggregating elements to which you have assigned the same key. In the
+picture below (featured in many of our presentations) the color indicates the
+key of the element. Thus the <code class="highlighter-rouge">GroupByKey</code>/<code class="highlighter-rouge">CombinePerKey</code> transform gathers all the
+green squares to produce a single output element.</p>
+
+<p><img class="center-block" src="/images/blog/stateful-processing/pardo-and-gbk.png" alt="ParDo and GroupByKey/CombinePerKey:          Elementwise versus aggregating computations" width="400" /></p>
+
+<p>But not all use cases are easily expressed as pipelines of simple <code class="highlighter-rouge">ParDo</code>/<code class="highlighter-rouge">Map</code> and
+<code class="highlighter-rouge">GroupByKey</code>/<code class="highlighter-rouge">CombinePerKey</code> transforms. The topic of this blog post is a new
+extension to the Beam programming model: <strong>per-element operation augmented with
+mutable state</strong>.</p>
+
+<p><img class="center-block" src="/images/blog/stateful-processing/stateful-pardo.png" alt="Stateful ParDo - sequential per-key processing with persistent state" width="300" /></p>
+
+<p>In the illustration above, ParDo now has a bit of durable, consistent state on
+the side, which can be read and written during the processing of each element.
+The state is partitioned by key, so it is drawn as having disjoint sections for
+each color. It is also partitioned per window, but I thought plaid 
+<img src="/images/blog/stateful-processing/plaid.png" alt="A plaid storage cylinder" width="20" /> 
+would be a bit much  :-). I’ll talk about
+why state is partitioned this way a bit later, via my first example.</p>
+
+<p>For the rest of this post, I will describe this new feature of Beam in detail -
+how it works at a high level, how it differs from existing features, how to
+make sure it is still massively scalable. After that introduction at the model
+level, I’ll walk through a simple example of how you use it in the Beam Java
+SDK.</p>
+
+<h2 id="how-does-stateful-processing-in-beam-work">How does stateful processing in Beam work?</h2>
+
+<p>The processing logic of your <code class="highlighter-rouge">ParDo</code> transform is expressed through the <code class="highlighter-rouge">DoFn</code>
+that it applies to each element.  Without stateful augmentations, a <code class="highlighter-rouge">DoFn</code> is a
+mostly-pure function from inputs to one or more outputs, corresponding to the
+Mapper in a MapReduce.  With state, a <code class="highlighter-rouge">DoFn</code> has the ability to access
+persistent mutable state while processing each input element. Consider this
+illustration:</p>
+
+<p><img class="center-block" src="/images/blog/stateful-processing/stateful-dofn.png" alt="Stateful DoFn -          the runner controls input but the DoFn controls storage and output" width="300" /></p>
+
+<p>The first thing to note is that all the data - the little squares, circles, and
+triangles - are red. This is to illustrate that stateful processing occurs in
+the context of a single key - all of the elements are key-value pairs with the
+same key. Calls from your chosen Beam runner to the <code class="highlighter-rouge">DoFn</code> are colored in
+yellow, while calls from the <code class="highlighter-rouge">DoFn</code> to the runner are in purple:</p>
+
+<ul>
+  <li>The runner invokes the <code class="highlighter-rouge">DoFn</code>’s <code class="highlighter-rouge">@ProcessElement</code> method on each element for a
+key+window.</li>
+  <li>The <code class="highlighter-rouge">DoFn</code> reads and writes state - the curved arrows to/from the storage on
+the side.</li>
+  <li>The <code class="highlighter-rouge">DoFn</code> emits output (or side output) to the runner as usual via
+<code class="highlighter-rouge">ProcessContext.output</code> (resp. <code class="highlighter-rouge">ProcessContext.sideOutput</code>).</li>
+</ul>
+
+<p>At this very high level, it is pretty intuitive: In your programming
+experience, you have probably at some point written a loop over elements that
+updates some mutable variables while performing other actions. The interesting
+question is how does this fit into the Beam model: how does it relate with
+other features? How does it scale, since state implies some synchronization?
+When should it be used versus other features?</p>
+
+<h2 id="how-does-stateful-processing-fit-into-the-beam-model">How does stateful processing fit into the Beam model?</h2>
+
+<p>To see where stateful processing fits in the Beam model, consider another
+way that you can keep some “state” while processing many elements: CombineFn. In
+Beam, you can write <code class="highlighter-rouge">Combine.perKey(CombineFn)</code> in Java or Python to apply an
+associative, commutative accumulating operation across all the elements with a
+common key (and window).</p>
+
+<p>Here is a diagram illustrating the basics of a <code class="highlighter-rouge">CombineFn</code>, the simplest way
+that a runner might invoke it on a per-key basis to build an accumulator and
+extract an output from the final accumulator:</p>
+
+<p><img class="center-block" src="/images/blog/stateful-processing/combinefn.png" alt="CombineFn - the runner controls input, storage, and output" width="300" /></p>
+
+<p>As with the illustration of stateful <code class="highlighter-rouge">DoFn</code>, all the data is colored red, since
+this is the processing of Combine for a single key. The illustrated method
+calls are colored yellow, since they are all controlled by the runner: The
+runner invokes <code class="highlighter-rouge">addInput</code> on each method to add it to the current accumulator.</p>
+
+<ul>
+  <li>The runner persists the accumulator when it chooses.</li>
+  <li>The runner calls <code class="highlighter-rouge">extractOutput</code> when ready to emit an output element.</li>
+</ul>
+
+<p>At this point, the diagram for <code class="highlighter-rouge">CombineFn</code> looks a whole lot like the diagram
+for stateful <code class="highlighter-rouge">DoFn</code>. In practice, the flow of data is, indeed, quite similar.
+But there are important differences, even so:</p>
+
+<ul>
+  <li>The runner controls all invocations and storage here. You do not decide when
+or how state is persisted, when an accumulator is discarded (based on
+triggering) or when output is extracted from an accumulator.</li>
+  <li>You can only have one piece of state - the accumulator. In a stateful DoFn
+you can read only what you need to know and write only what has changed.</li>
+  <li>You don’t have the extended features of <code class="highlighter-rouge">DoFn</code>, such as multiple outputs per
+input or side outputs. (These could be simulated by a sufficient complex
+accumulator, but it would not be natural or efficient. Some other features of
+<code class="highlighter-rouge">DoFn</code> such as side inputs and access to the window make perfect sense for
+<code class="highlighter-rouge">CombineFn</code>)</li>
+</ul>
+
+<p>But the main thing that <code class="highlighter-rouge">CombineFn</code> allows a runner to do is to
+<code class="highlighter-rouge">mergeAccumulators</code>, the concrete expression of the <code class="highlighter-rouge">CombineFn</code>’s associativity.
+This unlocks some huge optimizations: the runner can invoke multiple instances
+of a <code class="highlighter-rouge">CombineFn</code> on a number of inputs and later combine them in a classic
+divide-and-conquer architecture, as in this picture:</p>
+
+<p><img class="center-block" src="/images/blog/stateful-processing/combiner-lifting.png" alt="Divide-and-conquer aggregation with a CombineFn" width="600" /></p>
+
+<p>The contract of a <code class="highlighter-rouge">CombineFn</code> is that the result should be exactly the same,
+whether or not the runner decides to actually do such a thing, or even more
+complex trees with hot-key fanout, etc.</p>
+
+<p>This merge operation is not (necessarily) provided by a stateful <code class="highlighter-rouge">DoFn</code>: the
+runner cannot freely branch its execution and recombine the states. Note that
+the input elements are still received in an arbitrary order, so the <code class="highlighter-rouge">DoFn</code> should
+be insensitive to ordering and bundling but it doesn’t mean the output must be
+exactly equal. (fun and easy fact: if the outputs are actually always equal,
+then the <code class="highlighter-rouge">DoFn</code> is an associative and commutative operator)</p>
+
+<p>So now you can see how a stateful <code class="highlighter-rouge">DoFn</code> differs from <code class="highlighter-rouge">CombineFn</code>, but I want to
+step back and extrapolate this to a high level picture of how state in Beam
+relates to using other features to achieve the same or similar goals: In a lot
+of cases, what stateful processing represents is a chance to “get under the
+hood” of the highly abstract mostly-deterministic functional paradigm of Beam
+and do potentially-nondeterministic imperative-style programming that is hard
+to express any other way.</p>
+
+<h2 id="example-arbitrary-but-consistent-index-assignment">Example: arbitrary-but-consistent index assignment</h2>
+
+<p>Suppose that you want to give an index to every incoming element for a
+key-and-window. You don’t care what the indices are, just as long as they are
+unique and consistent. Before diving into the code for how to do this in a Beam
+SDK, I’ll go over this example from the level of the model. In pictures, you
+want to write a transform that maps input to output like this:</p>
+
+<p><img class="center-block" src="/images/blog/stateful-processing/assign-indices.png" alt="Assigning arbitrary but unique indices to each element" width="100" /></p>
+
+<p>The order of the elements A, B, C, D, E is arbitrary, hence their assigned
+indices are arbitrary, but downstream transforms just need to be OK with this.
+There is no associativity or commutativity as far as the actual values are
+concerned. The order-insensitivity of this transform only extends to the point
+of ensuring the necessary properties of the output: no duplicated indices, no
+gaps, and every element gets an index.</p>
+
+<p>Conceptually expressing this as a stateful loop is as trivial as you can
+imagine: The state you should store is the next index.</p>
+
+<ul>
+  <li>As an element comes in, output it along with the next index.</li>
+  <li>Increment the index.</li>
+</ul>
+
+<p>This presents a good opportunity to talk about big data and parallelism,
+because the algorithm in those bullet points is not parallelizable at all! If
+you wanted to apply this logic over an entire <code class="highlighter-rouge">PCollection</code>, you would have to
+process each element of the <code class="highlighter-rouge">PCollection</code> one-at-a-time… this is obvious a
+bad idea.  State in Beam is tightly scoped so that most of the time a stateful
+<code class="highlighter-rouge">ParDo</code> transform should still be possible for a runner to execute in parallel,
+though you still have to be thoughtful about it.</p>
+
+<p>A state cell in Beam is scoped to a key+window pair. When your DoFn reads or
+writes state by the name of <code class="highlighter-rouge">"index"</code>, it is actually accessing a mutable cell
+specified by <code class="highlighter-rouge">"index"</code> <em>along with</em> the key and window currently being
+processed.  So, when thinking about a state cell, it may be helpful to consider
+the full state of your transform as a table, where the rows are named according
+to names you use in your program, like <code class="highlighter-rouge">"index"</code>, and the columns are
+key+window pairs, like this:</p>
+
+<table class="table">
+  <thead>
+    <tr>
+      <th> </th>
+      <th>(key, window)<sub>1</sub></th>
+      <th>(key, window)<sub>2</sub></th>
+      <th>(key, window)<sub>3</sub></th>
+      <th>…</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td><code class="highlighter-rouge">"index"</code></td>
+      <td><code class="highlighter-rouge">3</code></td>
+      <td><code class="highlighter-rouge">7</code></td>
+      <td><code class="highlighter-rouge">15</code></td>
+      <td>…</td>
+    </tr>
+    <tr>
+      <td><code class="highlighter-rouge">"fizzOrBuzz?"</code></td>
+      <td><code class="highlighter-rouge">"fizz"</code></td>
+      <td><code class="highlighter-rouge">"7"</code></td>
+      <td><code class="highlighter-rouge">"fizzbuzz"</code></td>
+      <td>…</td>
+    </tr>
+    <tr>
+      <td>…</td>
+      <td>…</td>
+      <td>…</td>
+      <td>…</td>
+      <td>…</td>
+    </tr>
+  </tbody>
+</table>
+
+<p>(if you have a superb spatial sense, feel free to imagine this as a cube where
+keys and windows are independent dimensions)</p>
+
+<p>You can provide the opportunity for parallelism by making sure that table has
+enough columns, either via many keys in few windows - for example, a globally
+windowed stateful computation keyed by user ID - or via many windows over few
+keys - for example, a fixed windowed stateful computation over a global key.
+Caveat: all Beam runners today parallelize only over the key.</p>
+
+<p>Most often your mental model of state can be focused on only a single column of
+the table, a single key+window pair. Cross-column interactions do not occur
+directly, by design.</p>
+
+<h2 id="state-in-beams-java-sdk">State in Beam’s Java SDK</h2>
+
+<p>Now that I have talked a bit about stateful processing in the Beam model and
+worked through an abstract example, I’d like to show you what it looks like to
+write stateful processing code using Beam’s Java SDK.  Here is the code for a
+stateful <code class="highlighter-rouge">DoFn</code> that assigns an arbitrary-but-consistent index to each element
+on a per key-and-window basis:</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">MyKey</span><span class="o">,</span> <span class="n">MyValue</span><span class="o">&gt;,</span> <span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">KV</span><span class="o">&lt;</span><span class="n">MyKey</span><span class="o">,</span> <span class="n">MyValue</span><span class="o">&gt;&gt;&gt;()</span> <span class="o">{</span>
+
+  <span class="c1">// A state cell holding a single Integer per key+window</span>
+  <span class="nd">@StateId</span><span class="o">(</span><span class="s">"index"</span><span class="o">)</span>
+  <span class="kd">private</span> <span class="kd">final</span> <span class="n">StateSpec</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">,</span> <span class="n">ValueState</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">indexSpec</span> <span class="o">=</span> 
+      <span class="n">StateSpecs</span><span class="o">.</span><span class="na">value</span><span class="o">(</span><span class="n">VarIntCoder</span><span class="o">.</span><span class="na">of</span><span class="o">());</span>
+
+  <span class="nd">@ProcessElement</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span>
+      <span class="n">ProcessContext</span> <span class="n">context</span><span class="o">,</span>
+      <span class="nd">@StateId</span><span class="o">(</span><span class="s">"index"</span><span class="o">)</span> <span class="n">ValueState</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">index</span><span class="o">)</span> <span class="o">{</span>
+    <span class="kt">int</span> <span class="n">current</span> <span class="o">=</span> <span class="n">firstNonNull</span><span class="o">(</span><span class="n">index</span><span class="o">.</span><span class="na">read</span><span class="o">(),</span> <span class="mi">0</span><span class="o">);</span>
+    <span class="n">context</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">current</span><span class="o">,</span> <span class="n">context</span><span class="o">.</span><span class="na">element</span><span class="o">()));</span>
+    <span class="n">index</span><span class="o">.</span><span class="na">write</span><span class="o">(</span><span class="n">current</span><span class="o">+</span><span class="mi">1</span><span class="o">);</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># State and timers are not yet supported in Beam's Python SDK.</span>
+<span class="c"># Watch this space!</span>
+</code></pre>
+</div>
+
+<p>Let’s dissect this:</p>
+
+<ul>
+  <li>The first thing to look at is the presence of a couple of <code class="highlighter-rouge">@StateId("index")</code>
+annotations. This calls out that you are using a mutable state cell named
+“index” in this <code class="highlighter-rouge">DoFn</code>. The Beam Java SDK, and from there your chosen runner,
+will also note these annotations and use them to wire up your DoFn correctly.</li>
+  <li>The first <code class="highlighter-rouge">@StateId("index")</code> is annotated on a field of type <code class="highlighter-rouge">StateSpec</code> (for
+“state specification”). This declares and configures the state cell. The
+type parameter <code class="highlighter-rouge">ValueState</code> describes the kind of state you can get out of this
+cell - <code class="highlighter-rouge">ValueState</code> stores just a single value. Note that the spec itself is not
+a usable state cell - you need the runner to provide that during pipeline
+execution.</li>
+  <li>To fully specify a <code class="highlighter-rouge">ValueState</code> cell, you need to provide the coder
+that the runner will use (as necessary) to serialize the value
+you will be storing. This is the invocation <code class="highlighter-rouge">StateSpecs.value(VarIntCoder.of())</code>.</li>
+  <li>The second <code class="highlighter-rouge">@StateId("index")</code> annotation is on a parameter to your
+<code class="highlighter-rouge">@ProcessElement</code> method. This indicates access to the ValueState cell that
+was specified earlier.</li>
+  <li>The state is accessed in the simplest way: <code class="highlighter-rouge">read()</code> to read it, and
+<code class="highlighter-rouge">write(newvalue)</code> to write it.</li>
+  <li>The other features of <code class="highlighter-rouge">DoFn</code> are available in the usual way - such as
+<code class="highlighter-rouge">context.output(...)</code>. You can also use side inputs, side outputs, gain access
+to the window, etc.</li>
+</ul>
+
+<p>A few notes on how the SDK and runners see this DoFn:</p>
+
+<ul>
+  <li>Your state cells are all explicitly declared so a Beam SDK or runner can
+reason about them, for example to clear them out when a window expires.</li>
+  <li>If you declare a state cell and then use it with the wrong type, the Beam
+Java SDK will catch that error for you.</li>
+  <li>If you declare two state cells with the same ID, the SDK will catch that,
+too.</li>
+  <li>The runner knows that this is a stateful <code class="highlighter-rouge">DoFn</code> and may run it quite
+differently, for example by additional data shuffling and synchronization in
+order to avoid concurrent access to state cells.</li>
+</ul>
+
+<p>Let’s look at one more example of how to use this API, this time a bit more real-world.</p>
+
+<h2 id="example-anomaly-detection">Example: anomaly detection</h2>
+
+<p>Suppose you are feeding a stream of actions by your user into some complex
+model to predict some quantitative expression of the sorts of actions they
+take, for example to detect fraudulent activity. You will build up the model
+from events, and also compare incoming events against the latest model to
+determine if something has changed.</p>
+
+<p>If you try to express the building of your model as a <code class="highlighter-rouge">CombineFn</code>, you may have
+trouble with <code class="highlighter-rouge">mergeAccumulators</code>. Assuming you could express that, it might
+look something like this:</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">class</span> <span class="nc">ModelFromEventsFn</span> <span class="kd">extends</span> <span class="n">CombineFn</span><span class="o">&lt;</span><span class="n">Event</span><span class="o">,</span> <span class="n">Model</span><span class="o">,</span> <span class="n">Model</span><span class="o">&gt;</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kd">abstract</span> <span class="n">Model</span> <span class="nf">createAccumulator</span><span class="o">()</span> <span class="o">{</span>
+      <span class="k">return</span> <span class="n">Model</span><span class="o">.</span><span class="na">empty</span><span class="o">();</span>
+    <span class="o">}</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kd">abstract</span> <span class="n">Model</span> <span class="nf">addInput</span><span class="o">(</span><span class="n">Model</span> <span class="n">accumulator</span><span class="o">,</span> <span class="n">Event</span> <span class="n">input</span><span class="o">)</span> <span class="o">{</span>
+      <span class="k">return</span> <span class="n">accumulator</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="n">input</span><span class="o">);</span> <span class="c1">// this is encouraged to mutate, for efficiency</span>
+    <span class="o">}</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kd">abstract</span> <span class="n">Model</span> <span class="nf">mergeAccumulators</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Model</span><span class="o">&gt;</span> <span class="n">accumulators</span><span class="o">)</span> <span class="o">{</span>
+      <span class="c1">// ?? can you write this ??</span>
+    <span class="o">}</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kd">abstract</span> <span class="n">Model</span> <span class="nf">extractOutput</span><span class="o">(</span><span class="n">Model</span> <span class="n">accumulator</span><span class="o">)</span> <span class="o">{</span>
+      <span class="k">return</span> <span class="n">accumulator</span><span class="o">;</span> <span class="o">}</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># State and timers are not yet supported in Beam's Python SDK.</span>
+<span class="c"># Watch this space!</span>
+</code></pre>
+</div>
+
+<p>Now you have a way to compute the model of a particular user for a window as
+<code class="highlighter-rouge">Combine.perKey(new ModelFromEventsFn())</code>. How would you apply this model to
+the same stream of events from which it is calculated? A standard way to do
+take the result of a <code class="highlighter-rouge">Combine</code> transform and use it while processing the
+elements of a <code class="highlighter-rouge">PCollection</code> is to read it as a side input to a <code class="highlighter-rouge">ParDo</code>
+transform. So you could side input the model and check the stream of events
+against it, outputting the prediction, like so:</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">UserId</span><span class="o">,</span> <span class="n">Event</span><span class="o">&gt;&gt;</span> <span class="n">events</span> <span class="o">=</span> <span class="o">...</span>
+
+<span class="kd">final</span> <span class="n">PCollectionView</span><span class="o">&lt;</span><span class="n">Map</span><span class="o">&lt;</span><span class="n">UserId</span><span class="o">,</span> <span class="n">Model</span><span class="o">&gt;&gt;</span> <span class="n">userModels</span> <span class="o">=</span> <span class="n">events</span>
+    <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Combine</span><span class="o">.</span><span class="na">perKey</span><span class="o">(</span><span class="k">new</span> <span class="n">ModelFromEventsFn</span><span class="o">()))</span>
+    <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">View</span><span class="o">.</span><span class="na">asMap</span><span class="o">());</span>
+
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">UserId</span><span class="o">,</span> <span class="n">Prediction</span><span class="o">&gt;&gt;</span> <span class="n">predictions</span> <span class="o">=</span> <span class="n">events</span>
+    <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">UserId</span><span class="o">,</span> <span class="n">Event</span><span class="o">&gt;&gt;()</span> <span class="o">{</span>
+
+      <span class="nd">@ProcessElement</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">ProcessContext</span> <span class="n">ctx</span><span class="o">)</span> <span class="o">{</span>
+        <span class="n">UserId</span> <span class="n">userId</span> <span class="o">=</span> <span class="n">ctx</span><span class="o">.</span><span class="na">element</span><span class="o">().</span><span class="na">getKey</span><span class="o">();</span>
+        <span class="n">Event</span> <span class="n">event</span> <span class="o">=</span> <span class="n">ctx</span><span class="o">.</span><span class="na">element</span><span class="o">().</span><span class="na">getValue</span><span class="o">();</span>
+
+        <span class="n">Model</span> <span class="n">model</span> <span class="o">=</span> <span class="n">ctx</span><span class="o">.</span><span class="na">sideinput</span><span class="o">(</span><span class="n">userModels</span><span class="o">).</span><span class="na">get</span><span class="o">(</span><span class="n">userId</span><span class="o">);</span>
+
+        <span class="c1">// Perhaps some logic around when to output a new prediction</span>
+        <span class="err">…</span> <span class="n">c</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">userId</span><span class="o">,</span> <span class="n">model</span><span class="o">.</span><span class="na">prediction</span><span class="o">(</span><span class="n">event</span><span class="o">)))</span> <span class="err">…</span> 
+      <span class="o">}</span>
+    <span class="o">}));</span>
+</code></pre>
+</div>
+
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># State and timers are not yet supported in Beam's Python SDK.</span>
+<span class="c"># Watch this space!</span>
+</code></pre>
+</div>
+
+<p>In this pipeline, there is just one model emitted by the <code class="highlighter-rouge">Combine.perKey(...)</code>
+per user, per window, which is then prepared for side input by the <code class="highlighter-rouge">View.asMap()</code>
+transform. The processing of the <code class="highlighter-rouge">ParDo</code> over events will block until that side
+input is ready, buffering events, and will then check each event against the
+model. This is a high latency, high completeness solution: The model takes into
+account all user behavior in the window, but there can be no output until the
+window is complete.</p>
+
+<p>Suppose you want to get some results earlier, or don’t even have any
+natural windowing, but just want continuous analysis with the “model so far”,
+even though your model may not be as complete. How can you control the updates
+to the model against which you are checking your events? Triggers are the
+generic Beam feature for managing completeness versus latency tradeoffs. So here
+is the same pipeline with an added trigger that outputs a new model one second
+after input arrives:</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">UserId</span><span class="o">,</span> <span class="n">Event</span><span class="o">&gt;&gt;</span> <span class="n">events</span> <span class="o">=</span> <span class="o">...</span>
+
+<span class="n">PCollectionView</span><span class="o">&lt;</span><span class="n">Map</span><span class="o">&lt;</span><span class="n">UserId</span><span class="o">,</span> <span class="n">Model</span><span class="o">&gt;&gt;</span> <span class="n">userModels</span> <span class="o">=</span> <span class="n">events</span>
+
+    <span class="c1">// A tradeoff between latency and cost</span>
+    <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Window</span><span class="o">.</span><span class="na">triggering</span><span class="o">(</span>
+        <span class="n">AfterProcessingTime</span><span class="o">.</span><span class="na">pastFirstElementInPane</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardSeconds</span><span class="o">(</span><span class="mi">1</span><span class="o">)))</span>
+
+    <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Combine</span><span class="o">.</span><span class="na">perKey</span><span class="o">(</span><span class="k">new</span> <span class="n">ModelFromEventsFn</span><span class="o">()))</span>
+    <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">View</span><span class="o">.</span><span class="na">asMap</span><span class="o">());</span>
+</code></pre>
+</div>
+
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># State and timers are not yet supported in Beam's Python SDK.</span>
+<span class="c"># Watch this space!</span>
+</code></pre>
+</div>
+
+<p>This is often a pretty nice tradeoff between latency and cost: If a huge flood
+of events comes in a second, then you will only emit one new model, so you
+won’t be flooded with model outputs that you cannot even use before they are
+obsolete. In practice, the new model may not be present on the side input
+channel until many more seconds have passed, due to caches and processing
+delays preparing the side input. Many events (maybe an entire batch of
+activity) will have passed through the <code class="highlighter-rouge">ParDo</code> and had their predictions
+calculated according to the prior model. If the runner gave a tight enough
+bound on cache expirations and you used a more aggressive trigger, you might be
+able to improve latency at additional cost.</p>
+
+<p>But there is another cost to consider: you are outputting many uninteresting
+outputs from the <code class="highlighter-rouge">ParDo</code> that will be processed downstream. If the
+“interestingness” of the output is only well-defined relative to the prior
+output, then you cannot use a <code class="highlighter-rouge">Filter</code> transform to reduce data volume downstream.</p>
+
+<p>Stateful processing lets you address both the latency problem of side inputs
+and the cost problem of excessive uninterseting output. Here is the code, using
+only features I have already introduced:</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="k">new</span> <span class="n">DoFn</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">UserId</span><span class="o">,</span> <span class="n">Event</span><span class="o">&gt;,</span> <span class="n">KV</span><span class="o">&lt;</span><span class="n">UserId</span><span class="o">,</span> <span class="n">Prediction</span><span class="o">&gt;&gt;()</span> <span class="o">{</span>
+
+  <span class="nd">@StateId</span><span class="o">(</span><span class="s">"model"</span><span class="o">)</span>
+  <span class="kd">private</span> <span class="kd">final</span> <span class="n">StateSpec</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">,</span> <span class="n">ValueState</span><span class="o">&lt;</span><span class="n">Model</span><span class="o">&gt;&gt;</span> <span class="n">modelSpec</span> <span class="o">=</span>
+      <span class="n">StateSpecs</span><span class="o">.</span><span class="na">value</span><span class="o">(</span><span class="n">Model</span><span class="o">.</span><span class="na">coder</span><span class="o">());</span>
+
+  <span class="nd">@StateId</span><span class="o">(</span><span class="s">"previousPrediction"</span><span class="o">)</span>
+  <span class="kd">private</span> <span class="kd">final</span> <span class="n">StateSpec</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">,</span> <span class="n">ValueState</span><span class="o">&lt;</span><span class="n">Prediction</span><span class="o">&gt;&gt;</span> <span class="n">previousPredictionSpec</span> <span class="o">=</span>
+      <span class="n">StateSpecs</span><span class="o">.</span><span class="na">value</span><span class="o">(</span><span class="n">Prediction</span><span class="o">.</span><span class="na">coder</span><span class="o">());</span>
+
+  <span class="nd">@ProcessElement</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span>
+      <span class="n">ProcessContext</span> <span class="n">c</span><span class="o">,</span>
+      <span class="nd">@StateId</span><span class="o">(</span><span class="s">"previousPrediction"</span><span class="o">)</span> <span class="n">ValueState</span><span class="o">&lt;</span><span class="n">Prediction</span><span class="o">&gt;</span> <span class="n">previousPredictionState</span><span class="o">,</span>
+      <span class="nd">@StateId</span><span class="o">(</span><span class="s">"model"</span><span class="o">)</span> <span class="n">ValueState</span><span class="o">&lt;</span><span class="n">Model</span><span class="o">&gt;</span> <span class="n">modelState</span><span class="o">)</span> <span class="o">{</span>
+    <span class="n">UserId</span> <span class="n">userId</span> <span class="o">=</span> <span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">().</span><span class="na">getKey</span><span class="o">();</span>
+    <span class="n">Event</span> <span class="n">event</span> <span class="o">=</span> <span class="n">c</span><span class="o">.</span><span class="na">element</span><span class="o">().</span><span class="na">getValue</span><span class="o">()</span>
+
+    <span class="n">Model</span> <span class="n">model</span> <span class="o">=</span> <span class="n">modelState</span><span class="o">.</span><span class="na">read</span><span class="o">();</span>
+    <span class="n">Prediction</span> <span class="n">previousPrediction</span> <span class="o">=</span> <span class="n">previousPredictionState</span><span class="o">.</span><span class="na">read</span><span class="o">();</span>
+    <span class="n">Prediction</span> <span class="n">newPrediction</span> <span class="o">=</span> <span class="n">model</span><span class="o">.</span><span class="na">prediction</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
+    <span class="n">model</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">event</span><span class="o">);</span>
+    <span class="n">modelState</span><span class="o">.</span><span class="na">write</span><span class="o">(</span><span class="n">model</span><span class="o">);</span>
+    <span class="k">if</span> <span class="o">(</span><span class="n">previousPrediction</span> <span class="o">==</span> <span class="kc">null</span> 
+        <span class="o">||</span> <span class="n">shouldOutputNewPrediction</span><span class="o">(</span><span class="n">previousPrediction</span><span class="o">,</span> <span class="n">newPrediction</span><span class="o">))</span> <span class="o">{</span>
+      <span class="n">c</span><span class="o">.</span><span class="na">output</span><span class="o">(</span><span class="n">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">userId</span><span class="o">,</span> <span class="n">newPrediction</span><span class="o">));</span>
+      <span class="n">previousPredictionState</span><span class="o">.</span><span class="na">write</span><span class="o">(</span><span class="n">newPrediction</span><span class="o">);</span>
+    <span class="o">}</span>
+  <span class="o">}</span>
+<span class="o">};</span>
+</code></pre>
+</div>
+
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># State and timers are not yet supported in Beam's Python SDK.</span>
+<span class="c"># Watch this space!</span>
+</code></pre>
+</div>
+
+<p>Let’s walk through it,</p>
+
+<ul>
+  <li>You have two state cells declared, <code class="highlighter-rouge">@StateId("model")</code> to hold the current
+state of the model for a user and <code class="highlighter-rouge">@StateId("previousPrediction")</code> to hold
+the prediction output previously.</li>
+  <li>Access to the two state cells by annotation in the <code class="highlighter-rouge">@ProcessElement</code> method
+is as before.</li>
+  <li>You read the current model via <code class="highlighter-rouge">modelState.read()</code>. Because state is also
+per-key-and-window, this is a model just for the UserId of the Event
+currently being processed.</li>
+  <li>You derive a new prediction <code class="highlighter-rouge">model.prediction(event)</code> and compare it against
+the last one you output, accessed via <code class="highlighter-rouge">previousPredicationState.read()</code>.</li>
+  <li>You then update the model <code class="highlighter-rouge">model.update()</code> and write it via
+<code class="highlighter-rouge">modelState.write(...)</code>. It is perfectly fine to mutate the value you pulled
+out of state as long as you also remember to write the mutated value, in the
+same way you are encouraged to mutate <code class="highlighter-rouge">CombineFn</code> accumulators.</li>
+  <li>If the prediction has changed a significant amount since the last time you
+output, you emit it via <code class="highlighter-rouge">context.output(...)</code> and save the prediction using
+<code class="highlighter-rouge">previousPredictionState.write(...)</code>. Here the decision is relative to the
+prior prediction output, not the last one computed - realistically you might
+have some complex conditions here.</li>
+</ul>
+
+<p>Most of the above is just talking through Java! But before you go out and
+convert all of your pipelines to use stateful processing, I want to go over
+some considerations as to whether it is a good fit for your use case.</p>
+
+<h2 id="performance-considerations">Performance considerations</h2>
+
+<p>To decide whether to use per-key-and-window state, you need to consider how it
+executes. You can dig into how a particular runner manages state, but there are
+some general things to keep in mind:</p>
+
+<ul>
+  <li>Partitioning per-key-and-window: perhaps the most important thing to
+consider is that the runner may have to shuffle your data to colocate all
+the data for a particular key+window. If the data is already shuffled
+correctly, the runner may take advantage of this.</li>
+  <li>Synchronization overhead: the API is designed so the runner takes care of
+concurrency control, but this means that the runner cannot parallelize
+processing of elements for a particular key+window even when it would otherwise
+be advantageous.</li>
+  <li>Storage and fault tolerance of state: since state is per-key-and-window, the
+more keys and windows you expect to process simultaneously, the more storage
+you will incur. Because state benefits from all the fault tolerance /
+consistency properties of your other data in Beam, it also adds to the cost of
+committing the results of processing.</li>
+  <li>Expiration of state: also since state is per-window, the runner can reclaim
+the resources when a window expires (when the watermark exceeds its allowed
+lateness) but this could mean that the runner is tracking an additional timer
+per key and window to cause reclamation code to execute.</li>
+</ul>
+
+<h2 id="go-use-it">Go use it!</h2>
+
+<p>If you are new to Beam, I hope you are now interested in seeing if Beam with
+stateful processing addresses your use case.  If you are already using Beam, I
+hope this new addition to the model unlocks new use cases for you.  Do check
+the <a href="/documentation/runners/capability-matrix/">capability
+matrix</a> to
+see the level of support for this new model feature on your favorite
+backend(s).</p>
+
+<p>And please do join the community at
+<a href="/get-started/support">user@beam.apache.org</a>. We’d love to
+hear from you.</p>
+
+  </div>
+
+</article>
+
+      </div>
+
+
+    <hr>
+  <div class="row">
+      <div class="col-xs-12">
+          <footer>
+              <p class="text-center">
+                &copy; Copyright
+                <a href="http://www.apache.org">The Apache Software Foundation</a>,
+                2017. All Rights Reserved.
+              </p>
+              <p class="text-center">
+                <a href="/privacy_policy">Privacy Policy</a> |
+                <a href="/feed.xml">RSS Feed</a>
+              </p>
+          </footer>
+      </div>
+  </div>
+  <!-- container div end -->
+</div>
+
+
+  </body>
+
+</html>

http://git-wip-us.apache.org/repos/asf/beam-site/blob/2dd05932/content/blog/index.html
----------------------------------------------------------------------
diff --git a/content/blog/index.html b/content/blog/index.html
index 7bced60..f737da2 100644
--- a/content/blog/index.html
+++ b/content/blog/index.html
@@ -151,6 +151,27 @@
 <p>This is the blog for the Apache Beam project. This blog contains news and updates
 for the project.</p>
 
+<h3 id="a-classpost-link-hrefblog20170213stateful-processinghtmlstateful-processing-with-apache-beama"><a class="post-link" href="/blog/2017/02/13/stateful-processing.html">Stateful processing with Apache Beam</a></h3>
+<p><i>Feb 13, 2017 •  Kenneth Knowles [<a href="https://twitter.com/KennKnowles">@KennKnowles</a>]
+</i></p>
+
+<p>Beam lets you process unbounded, out-of-order, global-scale data with portable
+high-level pipelines. Stateful processing is a new feature of the Beam model
+that expands the capabilities of Beam, unlocking new use cases and new
+efficiencies. In this post, I will guide you through stateful processing in
+Beam: how it works, how it fits in with the other features of the Beam model,
+what you might use it for, and what it looks like in code.</p>
+
+<!-- Render a "read more" button if the post is longer than the excerpt -->
+
+<p>
+<a class="btn btn-default btn-sm" href="/blog/2017/02/13/stateful-processing.html" role="button">
+Read more&nbsp;<span class="glyphicon glyphicon-menu-right" aria-hidden="true"></span>
+</a>
+</p>
+
+<hr />
+
 <h3 id="a-classpost-link-hrefblog20170201graduation-media-recaphtmlmedia-recap-of-the-apache-beam-graduationa"><a class="post-link" href="/blog/2017/02/01/graduation-media-recap.html">Media recap of the Apache Beam graduation</a></h3>
 <p><i>Feb 1, 2017 •  Davor Bonaci [<a href="https://twitter.com/BonaciDavor">@BonaciDavor</a>]
 </i></p>


Mime
View raw message