storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nathanm...@apache.org
Subject svn commit: r1597454 [7/9] - in /incubator/storm/site: ./ publish/ publish/about/ publish/documentation/
Date Sun, 25 May 2014 17:47:13 GMT
Modified: incubator/storm/site/publish/documentation/Trident-API-Overview.html
URL: http://svn.apache.org/viewvc/incubator/storm/site/publish/documentation/Trident-API-Overview.html?rev=1597454&r1=1597453&r2=1597454&view=diff
==============================================================================
--- incubator/storm/site/publish/documentation/Trident-API-Overview.html (original)
+++ incubator/storm/site/publish/documentation/Trident-API-Overview.html Sun May 25 17:47:12 2014
@@ -65,267 +65,281 @@
   </ul>
 </div>
 <div id="aboutcontent">
-<h1>Trident API overview</h1>
+<h1 id="trident-api-overview">Trident API overview</h1>
 
-<p>The core data model in Trident is the "Stream", processed as a series of batches. A stream is partitioned among the nodes in the cluster, and operations applied to a stream are applied in parallel across each partition.</p>
+<p>The core data model in Trident is the &#8220;Stream&#8221;, processed as a series of batches. A stream is partitioned among the nodes in the cluster, and operations applied to a stream are applied in parallel across each partition.</p>
 
 <p>There are five kinds of operations in Trident:</p>
 
 <ol>
-<li>Operations that apply locally to each partition and cause no network transfer</li>
-<li>Repartitioning operations that repartition a stream but otherwise don't change the contents (involves network transfer)</li>
-<li>Aggregation operations that do network transfer as part of the operation</li>
-<li>Operations on grouped streams</li>
-<li>Merges and joins</li>
+  <li>Operations that apply locally to each partition and cause no network transfer</li>
+  <li>Repartitioning operations that repartition a stream but otherwise don&#8217;t change the contents (involves network transfer)</li>
+  <li>Aggregation operations that do network transfer as part of the operation</li>
+  <li>Operations on grouped streams</li>
+  <li>Merges and joins</li>
 </ol>
 
-
-<h2>Partition-local operations</h2>
+<h2 id="partition-local-operations">Partition-local operations</h2>
 
 <p>Partition-local operations involve no network transfer and are applied to each batch partition independently.</p>
 
-<h3>Functions</h3>
+<h3 id="functions">Functions</h3>
 
 <p>A function takes in a set of input fields and emits zero or more tuples as output. The fields of the output tuple are appended to the original input tuple in the stream. If a function emits no tuples, the original input tuple is filtered out. Otherwise, the input tuple is duplicated for each output tuple. Suppose you have this function:</p>
 
-<pre><code class="java">public class MyFunction extends BaseFunction {
+<p><code>java
+public class MyFunction extends BaseFunction {
     public void execute(TridentTuple tuple, TridentCollector collector) {
         for(int i=0; i &lt; tuple.getInteger(0); i++) {
             collector.emit(new Values(i));
         }
     }
 }
-</code></pre>
+</code></p>
 
-<p>Now suppose you have a stream in the variable "mystream" with the fields ["a", "b", "c"] with the following tuples:</p>
+<p>Now suppose you have a stream in the variable &#8220;mystream&#8221; with the fields [&#8220;a&#8221;, &#8220;b&#8221;, &#8220;c&#8221;] with the following tuples:</p>
 
-<pre><code>[1, 2, 3]
+<p><code>
+[1, 2, 3]
 [4, 1, 6]
 [3, 0, 8]
-</code></pre>
+</code></p>
 
 <p>If you run this code:</p>
 
-<pre><code class="java">mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
-</code></pre>
+<p><code>java
+mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
+</code></p>
 
-<p>The resulting tuples would have fields ["a", "b", "c", "d"] and look like this:</p>
+<p>The resulting tuples would have fields [&#8220;a&#8221;, &#8220;b&#8221;, &#8220;c&#8221;, &#8220;d&#8221;] and look like this:</p>
 
-<pre><code>[1, 2, 3, 0]
+<p><code>
+[1, 2, 3, 0]
 [1, 2, 3, 1]
 [4, 1, 6, 0]
-</code></pre>
+</code></p>
 
-<h3>Filters</h3>
+<h3 id="filters">Filters</h3>
 
 <p>Filters take in a tuple as input and decide whether or not to keep that tuple or not. Suppose you had this filter:</p>
 
-<pre><code class="java">public class MyFilter extends BaseFunction {
+<p><code>java
+public class MyFilter extends BaseFunction {
     public boolean isKeep(TridentTuple tuple) {
         return tuple.getInteger(0) == 1 &amp;&amp; tuple.getInteger(1) == 2;
     }
 }
-</code></pre>
+</code></p>
 
-<p>Now suppose you had these tuples with fields ["a", "b", "c"]:</p>
+<p>Now suppose you had these tuples with fields [&#8220;a&#8221;, &#8220;b&#8221;, &#8220;c&#8221;]:</p>
 
-<pre><code>[1, 2, 3]
+<p><code>
+[1, 2, 3]
 [2, 1, 1]
 [2, 3, 4]
-</code></pre>
+</code></p>
 
 <p>If you ran this code:</p>
 
-<pre><code class="java">mystream.each(new Fields("b", "a"), new MyFilter())
-</code></pre>
+<p><code>java
+mystream.each(new Fields("b", "a"), new MyFilter())
+</code></p>
 
 <p>The resulting tuples would be:</p>
 
-<pre><code>[2, 1, 1]
-</code></pre>
+<p><code>
+[2, 1, 1]
+</code></p>
 
-<h3>partitionAggregate</h3>
+<h3 id="partitionaggregate">partitionAggregate</h3>
 
 <p>partitionAggregate runs a function on each partition of a batch of tuples. Unlike functions, the tuples emitted by partitionAggregate replace the input tuples given to it. Consider this example:</p>
 
-<pre><code class="java">mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
-</code></pre>
+<p><code>java
+mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
+</code></p>
 
-<p>Suppose the input stream contained fields ["a", "b"] and the following partitions of tuples:</p>
+<p>Suppose the input stream contained fields [&#8220;a&#8221;, &#8220;b&#8221;] and the following partitions of tuples:</p>
 
-<pre><code>Partition 0:
-["a", 1]
-["b", 2]
-
-Partition 1:
-["a", 3]
-["c", 8]
-
-Partition 2:
-["e", 1]
-["d", 9]
-["d", 10]
-</code></pre>
+<p>```
+Partition 0:
+[&#8220;a&#8221;, 1]
+[&#8220;b&#8221;, 2]</p>
+
+<p>Partition 1:
+[&#8220;a&#8221;, 3]
+[&#8220;c&#8221;, 8]</p>
+
+<p>Partition 2:
+[&#8220;e&#8221;, 1]
+[&#8220;d&#8221;, 9]
+[&#8220;d&#8221;, 10]
+```</p>
 
-<p>Then the output stream of that code would contain these tuples with one field called "sum":</p>
+<p>Then the output stream of that code would contain these tuples with one field called &#8220;sum&#8221;:</p>
 
-<pre><code>Partition 0:
-[3]
+<p>```
+Partition 0:
+[3]</p>
 
-Partition 1:
-[11]
+<p>Partition 1:
+[11]</p>
 
-Partition 2:
+<p>Partition 2:
 [20]
-</code></pre>
+```</p>
 
 <p>There are three different interfaces for defining aggregators: CombinerAggregator, ReducerAggregator, and Aggregator.</p>
 
-<p>Here's the interface for CombinerAggregator:</p>
+<p>Here&#8217;s the interface for CombinerAggregator:</p>
 
-<pre><code class="java">public interface CombinerAggregator&lt;T&gt; extends Serializable {
+<p><code>java
+public interface CombinerAggregator&lt;T&gt; extends Serializable {
     T init(TridentTuple tuple);
     T combine(T val1, T val2);
     T zero();
 }
-</code></pre>
+</code></p>
 
-<p>A CombinerAggregator returns a single tuple with a single field as output. CombinerAggregators run the init function on each input tuple and use the combine function to combine values until there's only one value left. If there's no tuples in the partition, the CombinerAggregator emits the output of the zero function. For example, here's the implementation of Count:</p>
+<p>A CombinerAggregator returns a single tuple with a single field as output. CombinerAggregators run the init function on each input tuple and use the combine function to combine values until there&#8217;s only one value left. If there&#8217;s no tuples in the partition, the CombinerAggregator emits the output of the zero function. For example, here&#8217;s the implementation of Count:</p>
 
-<pre><code class="java">public class Count implements CombinerAggregator&lt;Long&gt; {
+<p>```java
+public class Count implements CombinerAggregator<long> {
     public Long init(TridentTuple tuple) {
         return 1L;
-    }
-
-    public Long combine(Long val1, Long val2) {
-        return val1 + val2;
-    }
+    }</long></p>
 
-    public Long zero() {
-        return 0L;
-    }
+<pre><code>public Long combine(Long val1, Long val2) {
+    return val1 + val2;
 }
+
+public Long zero() {
+    return 0L;
+} } ```
 </code></pre>
 
 <p>The benefits of CombinerAggregators are seen when you use the with the aggregate method instead of partitionAggregate. In that case, Trident automatically optimizes the computation by doing partial aggregations before transferring tuples over the network.</p>
 
 <p>A ReducerAggregator has the following interface:</p>
 
-<pre><code class="java">public interface ReducerAggregator&lt;T&gt; extends Serializable {
+<p><code>java
+public interface ReducerAggregator&lt;T&gt; extends Serializable {
     T init();
     T reduce(T curr, TridentTuple tuple);
 }
-</code></pre>
+</code></p>
 
-<p>A ReducerAggregator produces an initial value with init, and then it iterates on that value for each input tuple to produce a single tuple with a single value as output. For example, here's how you would define Count as a ReducerAggregator:</p>
+<p>A ReducerAggregator produces an initial value with init, and then it iterates on that value for each input tuple to produce a single tuple with a single value as output. For example, here&#8217;s how you would define Count as a ReducerAggregator:</p>
 
-<pre><code class="java">public class Count implements ReducerAggregator&lt;Long&gt; {
+<p>```java
+public class Count implements ReducerAggregator<long> {
     public Long init() {
         return 0L;
-    }
+    }</long></p>
 
-    public Long reduce(Long curr, TridentTuple tuple) {
-        return curr + 1;
-    }
-}
+<pre><code>public Long reduce(Long curr, TridentTuple tuple) {
+    return curr + 1;
+} } ```
 </code></pre>
 
-<p>ReducerAggregator can also be used with persistentAggregate, as you'll see later.</p>
+<p>ReducerAggregator can also be used with persistentAggregate, as you&#8217;ll see later.</p>
 
 <p>The most general interface for performing aggregations is Aggregator, which looks like this:</p>
 
-<pre><code class="java">public interface Aggregator&lt;T&gt; extends Operation {
+<p><code>java
+public interface Aggregator&lt;T&gt; extends Operation {
     T init(Object batchId, TridentCollector collector);
     void aggregate(T state, TridentTuple tuple, TridentCollector collector);
     void complete(T state, TridentCollector collector);
 }
-</code></pre>
+</code></p>
 
 <p>Aggregators can emit any number of tuples with any number of fields. They can emit tuples at any point during execution. Aggregators execute in the following way:</p>
 
 <ol>
-<li>The init method is called before processing the batch. The return value of init is an Object that will represent the state of the aggregation and will be passed into the aggregate and complete methods.</li>
-<li>The aggregate method is called for each input tuple in the batch partition. This method can update the state and optionally emit tuples.</li>
-<li>The complete method is called when all tuples for the batch partition have been processed by aggregate.</li>
+  <li>The init method is called before processing the batch. The return value of init is an Object that will represent the state of the aggregation and will be passed into the aggregate and complete methods.</li>
+  <li>The aggregate method is called for each input tuple in the batch partition. This method can update the state and optionally emit tuples.</li>
+  <li>The complete method is called when all tuples for the batch partition have been processed by aggregate. </li>
 </ol>
 
+<p>Here&#8217;s how you would implement Count as an Aggregator:</p>
 
-<p>Here's how you would implement Count as an Aggregator:</p>
-
-<pre><code class="java">public class CountAgg extends BaseAggregator&lt;CountState&gt; {
+<p>```java
+public class CountAgg extends BaseAggregator<countstate> {
     static class CountState {
         long count = 0;
-    }
+    }</countstate></p>
 
-    public CountState init(Object batchId, TridentCollector collector) {
-        return new CountState();
-    }
-
-    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
-        state.count+=1;
-    }
+<pre><code>public CountState init(Object batchId, TridentCollector collector) {
+    return new CountState();
+}
 
-    public void complete(CountState state, TridentCollector collector) {
-        collector.emit(new Values(state.count));
-    }
+public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
+    state.count+=1;
 }
+
+public void complete(CountState state, TridentCollector collector) {
+    collector.emit(new Values(state.count));
+} } ```
 </code></pre>
 
 <p>Sometimes you want to execute multiple aggregators at the same time. This is called chaining and can be accomplished like this:</p>
 
-<pre><code class="java">mystream.chainedAgg()
+<p><code>java
+mystream.chainedAgg()
         .partitionAggregate(new Count(), new Fields("count"))
         .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
         .chainEnd()
-</code></pre>
+</code></p>
 
-<p>This code will run the Count and Sum aggregators on each partition. The output will contain a single tuple with the fields ["count", "sum"].</p>
+<p>This code will run the Count and Sum aggregators on each partition. The output will contain a single tuple with the fields [&#8220;count&#8221;, &#8220;sum&#8221;].</p>
 
-<h3>stateQuery and partitionPersist</h3>
+<h3 id="statequery-and-partitionpersist">stateQuery and partitionPersist</h3>
 
 <p>stateQuery and partitionPersist query and update sources of state, respectively. You can read about how to use them on <a href="Trident-state.html">Trident state doc</a>.</p>
 
-<h3>projection</h3>
+<h3 id="projection">projection</h3>
 
-<p>The projection method on Stream keeps only the fields specified in the operation. If you had a Stream with fields ["a", "b", "c", "d"] and you ran this code:</p>
+<p>The projection method on Stream keeps only the fields specified in the operation. If you had a Stream with fields [&#8220;a&#8221;, &#8220;b&#8221;, &#8220;c&#8221;, &#8220;d&#8221;] and you ran this code:</p>
 
-<pre><code class="java">mystream.project(new Fields("b", "d"))
-</code></pre>
+<p><code>java
+mystream.project(new Fields("b", "d"))
+</code></p>
 
-<p>The output stream would contain only the fields ["b", "d"].</p>
+<p>The output stream would contain only the fields [&#8220;b&#8221;, &#8220;d&#8221;].</p>
 
-<h2>Repartitioning operations</h2>
+<h2 id="repartitioning-operations">Repartitioning operations</h2>
 
 <p>Repartitioning operations run a function to change how the tuples are partitioned across tasks. The number of partitions can also change as a result of repartitioning (for example, if the parallelism hint is greater after repartioning). Repartitioning requires network transfer. Here are the repartitioning functions:</p>
 
 <ol>
-<li>shuffle: Use random round robin algorithm to evenly redistribute tuples across all target partitions</li>
-<li>broadcast: Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do a stateQuery on every partition of data.</li>
-<li>partitionBy: partitionBy takes in a set of fields and does semantic partitioning based on that set of fields. The fields are hashed and modded by the number of target partitions to select the target partition. partitionBy guarantees that the same set of fields always goes to the same target partition.</li>
-<li>global: All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.</li>
-<li>batchGlobal: All tuples in the batch are sent to the same partition. Different batches in the stream may go to different partitions.</li>
-<li>partition: This method takes in a custom partitioning function that implements backtype.storm.grouping.CustomStreamGrouping</li>
+  <li>shuffle: Use random round robin algorithm to evenly redistribute tuples across all target partitions</li>
+  <li>broadcast: Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do a stateQuery on every partition of data.</li>
+  <li>partitionBy: partitionBy takes in a set of fields and does semantic partitioning based on that set of fields. The fields are hashed and modded by the number of target partitions to select the target partition. partitionBy guarantees that the same set of fields always goes to the same target partition.</li>
+  <li>global: All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.</li>
+  <li>batchGlobal: All tuples in the batch are sent to the same partition. Different batches in the stream may go to different partitions. </li>
+  <li>partition: This method takes in a custom partitioning function that implements backtype.storm.grouping.CustomStreamGrouping</li>
 </ol>
 
-
-<h2>Aggregation operations</h2>
+<h2 id="aggregation-operations">Aggregation operations</h2>
 
 <p>Trident has aggregate and persistentAggregate methods for doing aggregations on Streams. aggregate is run on each batch of the stream in isolation, while persistentAggregate will aggregation on all tuples across all batches in the stream and store the result in a source of state.</p>
 
-<p>Running aggregate on a Stream does a global aggregation. When you use a ReducerAggregator or an Aggregator, the stream is first repartitioned into a single partition, and then the aggregation function is run on that partition. When you use a CombinerAggregator, on the other hand, first Trident will compute partial aggregations of each partition, then repartition to a single partition, and then finish the aggregation after the network transfer. CombinerAggregator's are far more efficient and should be used when possible.</p>
+<p>Running aggregate on a Stream does a global aggregation. When you use a ReducerAggregator or an Aggregator, the stream is first repartitioned into a single partition, and then the aggregation function is run on that partition. When you use a CombinerAggregator, on the other hand, first Trident will compute partial aggregations of each partition, then repartition to a single partition, and then finish the aggregation after the network transfer. CombinerAggregator&#8217;s are far more efficient and should be used when possible.</p>
 
-<p>Here's an example of using aggregate to get a global count for a batch:</p>
+<p>Here&#8217;s an example of using aggregate to get a global count for a batch:</p>
 
-<pre><code class="java">mystream.aggregate(new Count(), new Fields("count"))
-</code></pre>
+<p><code>java
+mystream.aggregate(new Count(), new Fields("count"))
+</code></p>
 
 <p>Like partitionAggregate, aggregators for aggregate can be chained. However, if you chain a CombinerAggregator with a non-CombinerAggregator, Trident is unable to do the partial aggregation optimization.</p>
 
 <p>You can read more about how to use persistentAggregate in the <a href="https://github.com/apache/incubator-storm/wiki/Trident-state">Trident state doc</a>.</p>
 
-<h2>Operations on grouped streams</h2>
+<h2 id="operations-on-grouped-streams">Operations on grouped streams</h2>
 
-<p>The groupBy operation repartitions the stream by doing a partitionBy on the specified fields, and then within each partition groups tuples together whose group fields are equal. For example, here's an illustration of a groupBy operation:</p>
+<p>The groupBy operation repartitions the stream by doing a partitionBy on the specified fields, and then within each partition groups tuples together whose group fields are equal. For example, here&#8217;s an illustration of a groupBy operation:</p>
 
 <p><img src="images/grouping.png" alt="Grouping" /></p>
 
@@ -333,35 +347,36 @@ Partition 2:
 
 <p>Like regular streams, aggregators on grouped streams can be chained.</p>
 
-<h2>Merges and joins</h2>
+<h2 id="merges-and-joins">Merges and joins</h2>
 
 <p>The last part of the API is combining different streams together. The simplest way to combine streams is to merge them into one stream. You can do that with the TridentTopology#merge method, like so:</p>
 
-<pre><code class="java">topology.merge(stream1, stream2, stream3);
-</code></pre>
+<p><code>java
+topology.merge(stream1, stream2, stream3);
+</code></p>
 
 <p>Trident will name the output fields of the new, merged stream as the output fields of the first stream.</p>
 
-<p>Another way to combine streams is with a join. Now, a standard join, like the kind from SQL, require finite input. So they don't make sense with infinite streams. Joins in Trident only apply within each small batch that comes off of the spout.</p>
+<p>Another way to combine streams is with a join. Now, a standard join, like the kind from SQL, require finite input. So they don&#8217;t make sense with infinite streams. Joins in Trident only apply within each small batch that comes off of the spout. </p>
 
-<p>Here's an example join between a stream containing fields ["key", "val1", "val2"] and another stream containing ["x", "val1"]:</p>
+<p>Here&#8217;s an example join between a stream containing fields [&#8220;key&#8221;, &#8220;val1&#8221;, &#8220;val2&#8221;] and another stream containing [&#8220;x&#8221;, &#8220;val1&#8221;]:</p>
 
-<pre><code class="java">topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
-</code></pre>
+<p><code>java
+topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
+</code></p>
 
-<p>This joins stream1 and stream2 together using "key" and "x" as the join fields for each respective stream. Then, Trident requires that all the output fields of the new stream be named, since the input streams could have overlapping field names. The tuples emitted from the join will contain:</p>
+<p>This joins stream1 and stream2 together using &#8220;key&#8221; and &#8220;x&#8221; as the join fields for each respective stream. Then, Trident requires that all the output fields of the new stream be named, since the input streams could have overlapping field names. The tuples emitted from the join will contain:</p>
 
 <ol>
-<li>First, the list of join fields. In this case, "key" corresponds to "key" from stream1 and "x" from stream2.</li>
-<li>Next, a list of all non-join fields from all streams, in order of how the streams were passed to the join method. In this case, "a" and "b" correspond to "val1" and "val2" from stream1, and "c" corresponds to "val1" from stream2.</li>
+  <li>First, the list of join fields. In this case, &#8220;key&#8221; corresponds to &#8220;key&#8221; from stream1 and &#8220;x&#8221; from stream2.</li>
+  <li>Next, a list of all non-join fields from all streams, in order of how the streams were passed to the join method. In this case, &#8220;a&#8221; and &#8220;b&#8221; correspond to &#8220;val1&#8221; and &#8220;val2&#8221; from stream1, and &#8220;c&#8221; corresponds to &#8220;val1&#8221; from stream2.</li>
 </ol>
 
-
 <p>When a join happens between streams originating from different spouts, those spouts will be synchronized with how they emit batches. That is, a batch of processing will include tuples from each spout.</p>
 
-<p>You might be wondering – how do you do something like a "windowed join", where tuples from one side of the join are joined against the last hour of tuples from the other side of the join.</p>
+<p>You might be wondering – how do you do something like a &#8220;windowed join&#8221;, where tuples from one side of the join are joined against the last hour of tuples from the other side of the join.</p>
 
-<p>To do this, you would make use of partitionPersist and stateQuery. The last hour of tuples from one side of the join would be stored and rotated in a source of state, keyed by the join field. Then the stateQuery would do lookups by the join field to perform the "join".</p>
+<p>To do this, you would make use of partitionPersist and stateQuery. The last hour of tuples from one side of the join would be stored and rotated in a source of state, keyed by the join field. Then the stateQuery would do lookups by the join field to perform the &#8220;join&#8221;.</p>
 
 </div>
 </div>

Modified: incubator/storm/site/publish/documentation/Trident-spouts.html
URL: http://svn.apache.org/viewvc/incubator/storm/site/publish/documentation/Trident-spouts.html?rev=1597454&r1=1597453&r2=1597454&view=diff
==============================================================================
--- incubator/storm/site/publish/documentation/Trident-spouts.html (original)
+++ incubator/storm/site/publish/documentation/Trident-spouts.html Sun May 25 17:47:12 2014
@@ -65,7 +65,7 @@
   </ul>
 </div>
 <div id="aboutcontent">
-<h1>Trident spouts</h1>
+<h1 id="trident-spouts">Trident spouts</h1>
 
 <p>Like in the vanilla Storm API, spouts are the source of streams in a Trident topology. On top of the vanilla Storm spouts, Trident exposes additional APIs for more sophisticated spouts.</p>
 
@@ -73,40 +73,40 @@
 
 <p>Regular Storm spouts will be non-transactional spouts in a Trident topology. To use a regular Storm IRichSpout, create the stream like this in a TridentTopology:</p>
 
-<pre><code class="java">TridentTopology topology = new TridentTopology();
+<p><code>java
+TridentTopology topology = new TridentTopology();
 topology.newStream("myspoutid", new MyRichSpout());
-</code></pre>
+</code></p>
 
 <p>All spouts in a Trident topology are required to be given a unique identifier for the stream – this identifier must be unique across all topologies run on the cluster. Trident will use this identifier to store metadata about what the spout has consumed in Zookeeper, including the txid and any metadata associated with the spout.</p>
 
 <p>You can configure the Zookeeper storage of spout metadata via the following configuration options:</p>
 
 <ol>
-<li><code>transactional.zookeeper.servers</code>: A list of Zookeeper hostnames</li>
-<li><code>transactional.zookeeper.port</code>: The port of the Zookeeper cluster</li>
-<li><code>transactional.zookeeper.root</code>: The root dir in Zookeeper where metadata is stored. Metadata will be stored at the path <root path>/<spout id></li>
+  <li><code>transactional.zookeeper.servers</code>: A list of Zookeeper hostnames </li>
+  <li><code>transactional.zookeeper.port</code>: The port of the Zookeeper cluster</li>
+  <li><code>transactional.zookeeper.root</code>: The root dir in Zookeeper where metadata is stored. Metadata will be stored at the path <root path="">/<spout></spout></root></li>
 </ol>
 
+<h2 id="pipelining">Pipelining</h2>
 
-<h2>Pipelining</h2>
+<p>By default, Trident processes a single batch at a time, waiting for the batch to succeed or fail before trying another batch. You can get significantly higher throughput – and lower latency of processing of each batch – by pipelining the batches. You configure the maximum amount of batches to be processed simultaneously with the &#8220;topology.max.spout.pending&#8221; property. </p>
 
-<p>By default, Trident processes a single batch at a time, waiting for the batch to succeed or fail before trying another batch. You can get significantly higher throughput – and lower latency of processing of each batch – by pipelining the batches. You configure the maximum amount of batches to be processed simultaneously with the "topology.max.spout.pending" property.</p>
+<p>Even while processing multiple batches simultaneously, Trident will order any state updates taking place in the topology among batches. For example, suppose you&#8217;re doing a global count aggregation into a database. The idea is that while you&#8217;re updating the count in the database for batch 1, you can still be computing the partial counts for batches 2 through 10. Trident won&#8217;t move on to the state updates for batch 2 until the state updates for batch 1 have succeeded. This is essential for achieving exactly-once processing semantics, as outline in <a href="Trident-state.html">Trident state doc</a>.</p>
 
-<p>Even while processing multiple batches simultaneously, Trident will order any state updates taking place in the topology among batches. For example, suppose you're doing a global count aggregation into a database. The idea is that while you're updating the count in the database for batch 1, you can still be computing the partial counts for batches 2 through 10. Trident won't move on to the state updates for batch 2 until the state updates for batch 1 have succeeded. This is essential for achieving exactly-once processing semantics, as outline in <a href="Trident-state.html">Trident state doc</a>.</p>
-
-<h2>Trident spout types</h2>
+<h2 id="trident-spout-types">Trident spout types</h2>
 
 <p>Here are the following spout APIs available:</p>
 
 <ol>
-<li><a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java">ITridentSpout</a>: The most general API that can support transactional or opaque transactional semantics. Generally you'll use one of the partitioned flavors of this API rather than this one directly.</li>
-<li><a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/spout/IBatchSpout.java">IBatchSpout</a>: A non-transactional spout that emits batches of tuples at a time</li>
-<li><a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java">IPartitionedTridentSpout</a>: A transactional spout that reads from a partitioned data source (like a cluster of Kafka servers)</li>
-<li><a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java">IOpaquePartitionedTridentSpout</a>: An opaque transactional spout that reads from a partitioned data source</li>
+  <li><a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/spout/ITridentSpout.java">ITridentSpout</a>: The most general API that can support transactional or opaque transactional semantics. Generally you&#8217;ll use one of the partitioned flavors of this API rather than this one directly.</li>
+  <li><a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/spout/IBatchSpout.java">IBatchSpout</a>: A non-transactional spout that emits batches of tuples at a time</li>
+  <li><a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/spout/IPartitionedTridentSpout.java">IPartitionedTridentSpout</a>: A transactional spout that reads from a partitioned data source (like a cluster of Kafka servers)</li>
+  <li><a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java">IOpaquePartitionedTridentSpout</a>: An opaque transactional spout that reads from a partitioned data source</li>
 </ol>
 
+<p>And, like mentioned in the beginning of this tutorial, you can use regular IRichSpout&#8217;s as well.</p>
 
-<p>And, like mentioned in the beginning of this tutorial, you can use regular IRichSpout's as well.</p>
 
 </div>
 </div>

Modified: incubator/storm/site/publish/documentation/Trident-state.html
URL: http://svn.apache.org/viewvc/incubator/storm/site/publish/documentation/Trident-state.html?rev=1597454&r1=1597453&r2=1597454&view=diff
==============================================================================
--- incubator/storm/site/publish/documentation/Trident-state.html (original)
+++ incubator/storm/site/publish/documentation/Trident-state.html Sun May 25 17:47:12 2014
@@ -65,117 +65,120 @@
   </ul>
 </div>
 <div id="aboutcontent">
-<h1>State in Trident</h1>
+<h1 id="state-in-trident">State in Trident</h1>
 
-<p>Trident has first-class abstractions for reading from and writing to stateful sources. The state can either be internal to the topology – e.g., kept in-memory and backed by HDFS – or externally stored in a database like Memcached or Cassandra. There's no difference in the Trident API for either case.</p>
+<p>Trident has first-class abstractions for reading from and writing to stateful sources. The state can either be internal to the topology – e.g., kept in-memory and backed by HDFS – or externally stored in a database like Memcached or Cassandra. There&#8217;s no difference in the Trident API for either case.</p>
 
 <p>Trident manages state in a fault-tolerant way so that state updates are idempotent in the face of retries and failures. This lets you reason about Trident topologies as if each message were processed exactly-once.</p>
 
-<p>There's various levels of fault-tolerance possible when doing state updates. Before getting to those, let's look at an example that illustrates the tricks necessary to achieve exactly-once semantics. Suppose that you're doing a count aggregation of your stream and want to store the running count in a database. Now suppose you store in the database a single value representing the count, and every time you process a new tuple you increment the count.</p>
+<p>There&#8217;s various levels of fault-tolerance possible when doing state updates. Before getting to those, let&#8217;s look at an example that illustrates the tricks necessary to achieve exactly-once semantics. Suppose that you&#8217;re doing a count aggregation of your stream and want to store the running count in a database. Now suppose you store in the database a single value representing the count, and every time you process a new tuple you increment the count.</p>
 
-<p>When failures occur, tuples will be replayed. This brings up a problem when doing state updates (or anything with side effects) – you have no idea if you've ever successfully updated the state based on this tuple before. Perhaps you never processed the tuple before, in which case you should increment the count. Perhaps you've processed the tuple and successfully incremented the count, but the tuple failed processing in another step. In this case, you should not increment the count. Or perhaps you saw the tuple before but got an error when updating the database. In this case, you <em>should</em> update the database.</p>
+<p>When failures occur, tuples will be replayed. This brings up a problem when doing state updates (or anything with side effects) – you have no idea if you&#8217;ve ever successfully updated the state based on this tuple before. Perhaps you never processed the tuple before, in which case you should increment the count. Perhaps you&#8217;ve processed the tuple and successfully incremented the count, but the tuple failed processing in another step. In this case, you should not increment the count. Or perhaps you saw the tuple before but got an error when updating the database. In this case, you <em>should</em> update the database.</p>
 
 <p>By just storing the count in the database, you have no idea whether or not this tuple has been processed before. So you need more information in order to make the right decision. Trident provides the following semantics which are sufficient for achieving exactly-once processing semantics:</p>
 
 <ol>
-<li>Tuples are processed as small batches (see <a href="Trident-tutorial.html">the tutorial</a>)</li>
-<li>Each batch of tuples is given a unique id called the "transaction id" (txid). If the batch is replayed, it is given the exact same txid.</li>
-<li>State updates are ordered among batches. That is, the state updates for batch 3 won't be applied until the state updates for batch 2 have succeeded.</li>
+  <li>Tuples are processed as small batches (see <a href="Trident-tutorial.html">the tutorial</a>)</li>
+  <li>Each batch of tuples is given a unique id called the &#8220;transaction id&#8221; (txid). If the batch is replayed, it is given the exact same txid.</li>
+  <li>State updates are ordered among batches. That is, the state updates for batch 3 won&#8217;t be applied until the state updates for batch 2 have succeeded.</li>
 </ol>
 
+<p>With these primitives, your State implementation can detect whether or not the batch of tuples has been processed before and take the appropriate action to update the state in a consistent way. The action you take depends on the exact semantics provided by your input spouts as to what&#8217;s in each batch. There&#8217;s three kinds of spouts possible with respect to fault-tolerance: &#8220;non-transactional&#8221;, &#8220;transactional&#8221;, and &#8220;opaque transactional&#8221;. Likewise, there&#8217;s three kinds of state possible with respect to fault-tolerance: &#8220;non-transactional&#8221;, &#8220;transactional&#8221;, and &#8220;opaque transactional&#8221;. Let&#8217;s take a look at each spout type and see what kind of fault-tolerance you can achieve with each.</p>
 
-<p>With these primitives, your State implementation can detect whether or not the batch of tuples has been processed before and take the appropriate action to update the state in a consistent way. The action you take depends on the exact semantics provided by your input spouts as to what's in each batch. There's three kinds of spouts possible with respect to fault-tolerance: "non-transactional", "transactional", and "opaque transactional". Likewise, there's three kinds of state possible with respect to fault-tolerance: "non-transactional", "transactional", and "opaque transactional". Let's take a look at each spout type and see what kind of fault-tolerance you can achieve with each.</p>
+<h2 id="transactional-spouts">Transactional spouts</h2>
 
-<h2>Transactional spouts</h2>
-
-<p>Remember, Trident processes tuples as small batches with each batch being given a unique transaction id. The properties of spouts vary according to the guarantees they can provide as to what's in each batch. A transactional spout has the following properties:</p>
+<p>Remember, Trident processes tuples as small batches with each batch being given a unique transaction id. The properties of spouts vary according to the guarantees they can provide as to what&#8217;s in each batch. A transactional spout has the following properties:</p>
 
 <ol>
-<li>Batches for a given txid are always the same. Replays of batches for a txid will exact same set of tuples as the first time that batch was emitted for that txid.</li>
-<li>There's no overlap between batches of tuples (tuples are in one batch or another, never multiple).</li>
-<li>Every tuple is in a batch (no tuples are skipped)</li>
+  <li>Batches for a given txid are always the same. Replays of batches for a txid will exact same set of tuples as the first time that batch was emitted for that txid.</li>
+  <li>There&#8217;s no overlap between batches of tuples (tuples are in one batch or another, never multiple).</li>
+  <li>Every tuple is in a batch (no tuples are skipped)</li>
 </ol>
 
-
 <p>This is a pretty easy type of spout to understand, the stream is divided into fixed batches that never change. storm-contrib has <a href="https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java">an implementation of a transactional spout</a> for Kafka.</p>
 
-<p>You might be wondering – why wouldn't you just always use a transactional spout? They're simple and easy to understand. One reason you might not use one is because they're not necessarily very fault-tolerant. For example, the way TransactionalTridentKafkaSpout works is the batch for a txid will contain tuples from all the Kafka partitions for a topic. Once a batch has been emitted, any time that batch is re-emitted in the future the exact same set of tuples must be emitted to meet the semantics of transactional spouts. Now suppose a batch is emitted from TransactionalTridentKafkaSpout, the batch fails to process, and at the same time one of the Kafka nodes goes down. You're now incapable of replaying the same batch as you did before (since the node is down and some partitions for the topic are not unavailable), and processing will halt.</p>
+<p>You might be wondering – why wouldn&#8217;t you just always use a transactional spout? They&#8217;re simple and easy to understand. One reason you might not use one is because they&#8217;re not necessarily very fault-tolerant. For example, the way TransactionalTridentKafkaSpout works is the batch for a txid will contain tuples from all the Kafka partitions for a topic. Once a batch has been emitted, any time that batch is re-emitted in the future the exact same set of tuples must be emitted to meet the semantics of transactional spouts. Now suppose a batch is emitted from TransactionalTridentKafkaSpout, the batch fails to process, and at the same time one of the Kafka nodes goes down. You&#8217;re now incapable of replaying the same batch as you did before (since the node is down and some partitions for the topic are not unavailable), and processing will halt. </p>
 
-<p>This is why "opaque transactional" spouts exist – they are fault-tolerant to losing source nodes while still allowing you to achieve exactly-once processing semantics. We'll cover those spouts in the next section though.</p>
+<p>This is why &#8220;opaque transactional&#8221; spouts exist – they are fault-tolerant to losing source nodes while still allowing you to achieve exactly-once processing semantics. We&#8217;ll cover those spouts in the next section though.</p>
 
 <p>(One side note – once Kafka supports replication, it will be possible to have transactional spouts that are fault-tolerant to node failure, but that feature does not exist yet.)</p>
 
-<p>Before we get to "opaque transactional" spouts, let's look at how you would design a State implementation that has exactly-once semantics for transactional spouts. This State type is called a "transactional state" and takes advantage of the fact that any given txid is always associated with the exact same set of tuples.</p>
+<p>Before we get to &#8220;opaque transactional&#8221; spouts, let&#8217;s look at how you would design a State implementation that has exactly-once semantics for transactional spouts. This State type is called a &#8220;transactional state&#8221; and takes advantage of the fact that any given txid is always associated with the exact same set of tuples.</p>
 
-<p>Suppose your topology computes word count and you want to store the word counts in a key/value database. The key will be the word, and the value will contain the count. You've already seen that storing just the count as the value isn't sufficient to know whether you've processed a batch of tuples before. Instead, what you can do is store the transaction id with the count in the database as an atomic value. Then, when updating the count, you can just compare the transaction id in the database with the transaction id for the current batch. If they're the same, you skip the update – because of the strong ordering, you know for sure that the value in the database incorporates the current batch. If they're different, you increment the count. This logic works because the batch for a txid never changes, and Trident ensures that state updates are ordered among batches.</p>
+<p>Suppose your topology computes word count and you want to store the word counts in a key/value database. The key will be the word, and the value will contain the count. You&#8217;ve already seen that storing just the count as the value isn&#8217;t sufficient to know whether you&#8217;ve processed a batch of tuples before. Instead, what you can do is store the transaction id with the count in the database as an atomic value. Then, when updating the count, you can just compare the transaction id in the database with the transaction id for the current batch. If they&#8217;re the same, you skip the update – because of the strong ordering, you know for sure that the value in the database incorporates the current batch. If they&#8217;re different, you increment the count. This logic works because the batch for a txid never changes, and Trident ensures that state updates are ordered among batches.</p>
 
 <p>Consider this example of why it works. Suppose you are processing txid 3 which consists of the following batch of tuples:</p>
 
-<pre><code>["man"]
+<p><code>
+["man"]
 ["man"]
 ["dog"]
-</code></pre>
+</code></p>
 
 <p>Suppose the database currently holds the following key/value pairs:</p>
 
-<pre><code>man =&gt; [count=3, txid=1]
+<p><code>
+man =&gt; [count=3, txid=1]
 dog =&gt; [count=4, txid=3]
 apple =&gt; [count=10, txid=2]
-</code></pre>
+</code></p>
 
-<p>The txid associated with "man" is txid 1. Since the current txid is 3, you know for sure that this batch of tuples is not represented in that count. So you can go ahead and increment the count by 2 and update the txid. On the other hand, the txid for "dog" is the same as the current txid. So you know for sure that the increment from the current batch is already represented in the database for the "dog" key. So you can skip the update. After completing updates, the database looks like this:</p>
+<p>The txid associated with &#8220;man&#8221; is txid 1. Since the current txid is 3, you know for sure that this batch of tuples is not represented in that count. So you can go ahead and increment the count by 2 and update the txid. On the other hand, the txid for &#8220;dog&#8221; is the same as the current txid. So you know for sure that the increment from the current batch is already represented in the database for the &#8220;dog&#8221; key. So you can skip the update. After completing updates, the database looks like this:</p>
 
-<pre><code>man =&gt; [count=5, txid=3]
+<p><code>
+man =&gt; [count=5, txid=3]
 dog =&gt; [count=4, txid=3]
 apple =&gt; [count=10, txid=2]
-</code></pre>
+</code></p>
 
-<p>Let's now look at opaque transactional spouts and how to design states for that type of spout.</p>
+<p>Let&#8217;s now look at opaque transactional spouts and how to design states for that type of spout.</p>
 
-<h2>Opaque transactional spouts</h2>
+<h2 id="opaque-transactional-spouts">Opaque transactional spouts</h2>
 
 <p>As described before, an opaque transactional spout cannot guarantee that the batch of tuples for a txid remains constant. An opaque transactional spout has the following property:</p>
 
 <ol>
-<li>Every tuple is <em>successfully</em> processed in exactly one batch. However, it's possible for a tuple to fail to process in one batch and then succeed to process in a later batch.</li>
+  <li>Every tuple is <em>successfully</em> processed in exactly one batch. However, it&#8217;s possible for a tuple to fail to process in one batch and then succeed to process in a later batch.</li>
 </ol>
 
+<p><a href="https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java">OpaqueTridentKafkaSpout</a> is a spout that has this property and is fault-tolerant to losing Kafka nodes. Whenever it&#8217;s time for OpaqueTridentKafkaSpout to emit a batch, it emits tuples starting from where the last batch finished emitting. This ensures that no tuple is ever skipped or successfully processed by multiple batches.</p>
 
-<p><a href="https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java">OpaqueTridentKafkaSpout</a> is a spout that has this property and is fault-tolerant to losing Kafka nodes. Whenever it's time for OpaqueTridentKafkaSpout to emit a batch, it emits tuples starting from where the last batch finished emitting. This ensures that no tuple is ever skipped or successfully processed by multiple batches.</p>
-
-<p>With opaque transactional spouts, it's no longer possible to use the trick of skipping state updates if the transaction id in the database is the same as the transaction id for the current batch. This is because the batch may have changed between state updates.</p>
+<p>With opaque transactional spouts, it&#8217;s no longer possible to use the trick of skipping state updates if the transaction id in the database is the same as the transaction id for the current batch. This is because the batch may have changed between state updates.</p>
 
-<p>What you can do is store more state in the database. Rather than store a value and transaction id in the database, you instead store a value, transaction id, and the previous value in the database. Let's again use the example of storing a count in the database. Suppose the partial count for your batch is "2" and it's time to apply a state update. Suppose the value in the database looks like this:</p>
+<p>What you can do is store more state in the database. Rather than store a value and transaction id in the database, you instead store a value, transaction id, and the previous value in the database. Let&#8217;s again use the example of storing a count in the database. Suppose the partial count for your batch is &#8220;2&#8221; and it&#8217;s time to apply a state update. Suppose the value in the database looks like this:</p>
 
-<pre><code>{ value = 4,
+<p><code>
+{ value = 4,
   prevValue = 1,
   txid = 2
 }
-</code></pre>
+</code></p>
 
-<p>Suppose your current txid is 3, different than what's in the database. In this case, you set "prevValue" equal to "value", increment "value" by your partial count, and update the txid. The new database value will look like this:</p>
+<p>Suppose your current txid is 3, different than what&#8217;s in the database. In this case, you set &#8220;prevValue&#8221; equal to &#8220;value&#8221;, increment &#8220;value&#8221; by your partial count, and update the txid. The new database value will look like this:</p>
 
-<pre><code>{ value = 6,
+<p><code>
+{ value = 6,
   prevValue = 4,
   txid = 3
 }
-</code></pre>
+</code></p>
 
-<p>Now suppose your current txid is 2, equal to what's in the database. Now you know that the "value" in the database contains an update from a previous batch for your current txid, but that batch may have been different so you have to ignore it. What you do in this case is increment "prevValue" by your partial count to compute the new "value". You then set the value in the database to this:</p>
+<p>Now suppose your current txid is 2, equal to what&#8217;s in the database. Now you know that the &#8220;value&#8221; in the database contains an update from a previous batch for your current txid, but that batch may have been different so you have to ignore it. What you do in this case is increment &#8220;prevValue&#8221; by your partial count to compute the new &#8220;value&#8221;. You then set the value in the database to this:</p>
 
-<pre><code>{ value = 3,
+<p><code>
+{ value = 3,
   prevValue = 1,
   txid = 2
 }
-</code></pre>
+</code></p>
 
 <p>This works because of the strong ordering of batches provided by Trident. Once Trident moves onto a new batch for state updates, it will never go back to a previous batch. And since opaque transactional spouts guarantee no overlap between batches – that each tuple is successfully processed by one batch – you can safely update based on the previous value.</p>
 
-<h2>Non-transactional spouts</h2>
+<h2 id="non-transactional-spouts">Non-transactional spouts</h2>
 
-<p>Non-transactional spouts don't provide any guarantees about what's in each batch. So it might have at-most-once processing, in which case tuples are not retried after failed batches. Or it might have at-least-once processing, where tuples can be processed successfully by multiple batches. There's no way to achieve exactly-once semantics for this kind of spout.</p>
+<p>Non-transactional spouts don&#8217;t provide any guarantees about what&#8217;s in each batch. So it might have at-most-once processing, in which case tuples are not retried after failed batches. Or it might have at-least-once processing, where tuples can be processed successfully by multiple batches. There&#8217;s no way to achieve exactly-once semantics for this kind of spout.</p>
 
-<h2>Summary of spout and state types</h2>
+<h2 id="summary-of-spout-and-state-types">Summary of spout and state types</h2>
 
 <p>This diagram shows which combinations of spouts / states enable exactly-once messaging semantics:</p>
 
@@ -185,127 +188,132 @@ apple =&gt; [count=10, txid=2]
 
 <p>The state and spout types you choose are a tradeoff between fault-tolerance and storage costs, and ultimately your application requirements will determine which combination is right for you.</p>
 
-<h2>State APIs</h2>
+<h2 id="state-apis">State APIs</h2>
 
-<p>You've seen the intricacies of what it takes to achieve exactly-once semantics. The nice thing about Trident is that it internalizes all the fault-tolerance logic within the State – as a user you don't have to deal with comparing txids, storing multiple values in the database, or anything like that. You can write code like this:</p>
+<p>You&#8217;ve seen the intricacies of what it takes to achieve exactly-once semantics. The nice thing about Trident is that it internalizes all the fault-tolerance logic within the State – as a user you don&#8217;t have to deal with comparing txids, storing multiple values in the database, or anything like that. You can write code like this:</p>
 
-<pre><code class="java">TridentTopology topology = new TridentTopology();        
+<p><code>java
+TridentTopology topology = new TridentTopology();        
 TridentState wordCounts =
       topology.newStream("spout1", spout)
         .each(new Fields("sentence"), new Split(), new Fields("word"))
         .groupBy(new Fields("word"))
         .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))                
         .parallelismHint(6);
-</code></pre>
+</code></p>
 
 <p>All the logic necessary to manage opaque transactional state logic is internalized in the MemcachedState.opaque call. Additionally, updates are automatically batched to minimize roundtrips to the database.</p>
 
 <p>The base State interface just has two methods:</p>
 
-<pre><code class="java">public interface State {
+<p><code>java
+public interface State {
     void beginCommit(Long txid); // can be null for things like partitionPersist occurring off a DRPC stream
     void commit(Long txid);
 }
-</code></pre>
+</code></p>
 
-<p>You're told when a state update is beginning, when a state update is ending, and you're given the txid in each case. Trident assumes nothing about how your state works, what kind of methods there are to update it, and what kind of methods there are to read from it.</p>
+<p>You&#8217;re told when a state update is beginning, when a state update is ending, and you&#8217;re given the txid in each case. Trident assumes nothing about how your state works, what kind of methods there are to update it, and what kind of methods there are to read from it.</p>
 
 <p>Suppose you have a home-grown database that contains user location information and you want to be able to access it from Trident. Your State implementation would have methods for getting and setting user information:</p>
 
-<pre><code class="java">public class LocationDB implements State {
-    public void beginCommit(Long txid) {    
-    }
-
-    public void commit(Long txid) {    
-    }
+<p>```java
+public class LocationDB implements State {
+    public void beginCommit(Long txid) {  <br />
+    }</p>
 
-    public void setLocation(long userId, String location) {
-      // code to access database and set location
-    }
+<pre><code>public void commit(Long txid) {    
+}
 
-    public String getLocation(long userId) {
-      // code to get location from database
-    }
+public void setLocation(long userId, String location) {
+  // code to access database and set location
 }
+
+public String getLocation(long userId) {
+  // code to get location from database
+} } ```
 </code></pre>
 
 <p>You then provide Trident a StateFactory that can create instances of your State object within Trident tasks. The StateFactory for your LocationDB might look something like this:</p>
 
-<pre><code class="java">public class LocationDBFactory implements StateFactory {
+<p><code>java
+public class LocationDBFactory implements StateFactory {
    public State makeState(Map conf, int partitionIndex, int numPartitions) {
       return new LocationDB();
    } 
 }
-</code></pre>
+</code></p>
 
-<p>Trident provides the QueryFunction interface for writing Trident operations that query a source of state, and the StateUpdater interface for writing Trident operations that update a source of state. For example, let's write an operation "QueryLocation" that queries the LocationDB for the locations of users. Let's start off with how you would use it in a topology. Let's say this topology consumes an input stream of userids:</p>
+<p>Trident provides the QueryFunction interface for writing Trident operations that query a source of state, and the StateUpdater interface for writing Trident operations that update a source of state. For example, let&#8217;s write an operation &#8220;QueryLocation&#8221; that queries the LocationDB for the locations of users. Let&#8217;s start off with how you would use it in a topology. Let&#8217;s say this topology consumes an input stream of userids:</p>
 
-<pre><code class="java">TridentTopology topology = new TridentTopology();
+<p><code>java
+TridentTopology topology = new TridentTopology();
 TridentState locations = topology.newStaticState(new LocationDBFactory());
 topology.newStream("myspout", spout)
         .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"))
-</code></pre>
+</code></p>
 
-<p>Now let's take a look at what the implementation of QueryLocation would look like:</p>
+<p>Now let&#8217;s take a look at what the implementation of QueryLocation would look like:</p>
 
-<pre><code class="java">public class QueryLocation extends BaseQueryFunction&lt;LocationDB, String&gt; {
-    public List&lt;String&gt; batchRetrieve(LocationDB state, List&lt;TridentTuple&gt; inputs) {
-        List&lt;String&gt; ret = new ArrayList();
+<p>```java
+public class QueryLocation extends BaseQueryFunction&lt;LocationDB, String&gt; {
+    public List<string> batchRetrieve(LocationDB state, List<tridenttuple> inputs) {
+        List<string> ret = new ArrayList();
         for(TridentTuple input: inputs) {
             ret.add(state.getLocation(input.getLong(0)));
         }
         return ret;
-    }
+    }</string></tridenttuple></string></p>
 
-    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
-        collector.emit(new Values(location));
-    }    
-}
+<pre><code>public void execute(TridentTuple tuple, String location, TridentCollector collector) {
+    collector.emit(new Values(location));
+}     } ```
 </code></pre>
 
-<p>QueryFunction's execute in two steps. First, Trident collects a batch of reads together and passes them to batchRetrieve. In this case, batchRetrieve will receive multiple user ids. batchRetrieve is expected to return a list of results that's the same size as the list of input tuples. The first element of the result list corresponds to the result for the first input tuple, the second is the result for the second input tuple, and so on.</p>
-
-<p>You can see that this code doesn't take advantage of the batching that Trident does, since it just queries the LocationDB one at a time. So a better way to write the LocationDB would be like this:</p>
+<p>QueryFunction&#8217;s execute in two steps. First, Trident collects a batch of reads together and passes them to batchRetrieve. In this case, batchRetrieve will receive multiple user ids. batchRetrieve is expected to return a list of results that&#8217;s the same size as the list of input tuples. The first element of the result list corresponds to the result for the first input tuple, the second is the result for the second input tuple, and so on.</p>
 
-<pre><code class="java">public class LocationDB implements State {
-    public void beginCommit(Long txid) {    
-    }
+<p>You can see that this code doesn&#8217;t take advantage of the batching that Trident does, since it just queries the LocationDB one at a time. So a better way to write the LocationDB would be like this:</p>
 
-    public void commit(Long txid) {    
-    }
+<p>```java
+public class LocationDB implements State {
+    public void beginCommit(Long txid) {  <br />
+    }</p>
 
-    public void setLocationsBulk(List&lt;Long&gt; userIds, List&lt;String&gt; locations) {
-      // set locations in bulk
-    }
+<pre><code>public void commit(Long txid) {    
+}
 
-    public List&lt;String&gt; bulkGetLocations(List&lt;Long&gt; userIds) {
-      // get locations in bulk
-    }
+public void setLocationsBulk(List&lt;Long&gt; userIds, List&lt;String&gt; locations) {
+  // set locations in bulk
 }
+
+public List&lt;String&gt; bulkGetLocations(List&lt;Long&gt; userIds) {
+  // get locations in bulk
+} } ```
 </code></pre>
 
 <p>Then, you can write the QueryLocation function like this:</p>
 
-<pre><code class="java">public class QueryLocation extends BaseQueryFunction&lt;LocationDB, String&gt; {
-    public List&lt;String&gt; batchRetrieve(LocationDB state, List&lt;TridentTuple&gt; inputs) {
-        List&lt;Long&gt; userIds = new ArrayList&lt;Long&gt;();
+<p>```java
+public class QueryLocation extends BaseQueryFunction&lt;LocationDB, String&gt; {
+    public List<string> batchRetrieve(LocationDB state, List<tridenttuple> inputs) {
+        List<long> userIds = new ArrayList<long>();
         for(TridentTuple input: inputs) {
             userIds.add(input.getLong(0));
         }
         return state.bulkGetLocations(userIds);
-    }
+    }</long></long></tridenttuple></string></p>
 
-    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
-        collector.emit(new Values(location));
-    }    
-}
+<pre><code>public void execute(TridentTuple tuple, String location, TridentCollector collector) {
+    collector.emit(new Values(location));
+}     } ```
 </code></pre>
 
-<p>This code will be much more efficient by reducing roundtrips to the database.</p>
+<p>This code will be much more efficient by reducing roundtrips to the database. </p>
 
-<p>To update state, you make use of the StateUpdater interface. Here's a StateUpdater that updates a LocationDB with new location information:</p>
+<p>To update state, you make use of the StateUpdater interface. Here&#8217;s a StateUpdater that updates a LocationDB with new location information:</p>
 
-<pre><code class="java">public class LocationUpdater extends BaseStateUpdater&lt;LocationDB&gt; {
+<p><code>java
+public class LocationUpdater extends BaseStateUpdater&lt;LocationDB&gt; {
     public void updateState(LocationDB state, List&lt;TridentTuple&gt; tuples, TridentCollector collector) {
         List&lt;Long&gt; ids = new ArrayList&lt;Long&gt;();
         List&lt;String&gt; locations = new ArrayList&lt;String&gt;();
@@ -316,65 +324,70 @@ topology.newStream("myspout", spout)
         state.setLocationsBulk(ids, locations);
     }
 }
-</code></pre>
+</code></p>
 
-<p>Here's how you would use this operation in a Trident topology:</p>
+<p>Here&#8217;s how you would use this operation in a Trident topology:</p>
 
-<pre><code class="java">TridentTopology topology = new TridentTopology();
+<p><code>java
+TridentTopology topology = new TridentTopology();
 TridentState locations = 
     topology.newStream("locations", locationsSpout)
         .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater())
-</code></pre>
+</code></p>
 
-<p>The partitionPersist operation updates a source of state. The StateUpdater receives the State and a batch of tuples with updates to that State. This code just grabs the userids and locations from the input tuples and does a bulk set into the State.</p>
+<p>The partitionPersist operation updates a source of state. The StateUpdater receives the State and a batch of tuples with updates to that State. This code just grabs the userids and locations from the input tuples and does a bulk set into the State. </p>
 
-<p>partitionPersist returns a TridentState object representing the location db being updated by the Trident topology. You could then use this state in stateQuery operations elsewhere in the topology.</p>
+<p>partitionPersist returns a TridentState object representing the location db being updated by the Trident topology. You could then use this state in stateQuery operations elsewhere in the topology. </p>
 
-<p>You can also see that StateUpdaters are given a TridentCollector. Tuples emitted to this collector go to the "new values stream". In this case, there's nothing interesting to emit to that stream, but if you were doing something like updating counts in a database, you could emit the updated counts to that stream. You can then get access to the new values stream for further processing via the TridentState#newValuesStream method.</p>
+<p>You can also see that StateUpdaters are given a TridentCollector. Tuples emitted to this collector go to the &#8220;new values stream&#8221;. In this case, there&#8217;s nothing interesting to emit to that stream, but if you were doing something like updating counts in a database, you could emit the updated counts to that stream. You can then get access to the new values stream for further processing via the TridentState#newValuesStream method.</p>
 
-<h2>persistentAggregate</h2>
+<h2 id="persistentaggregate">persistentAggregate</h2>
 
-<p>Trident has another method for updating States called persistentAggregate. You've seen this used in the streaming word count example, shown again below:</p>
+<p>Trident has another method for updating States called persistentAggregate. You&#8217;ve seen this used in the streaming word count example, shown again below:</p>
 
-<pre><code class="java">TridentTopology topology = new TridentTopology();        
+<p><code>java
+TridentTopology topology = new TridentTopology();        
 TridentState wordCounts =
       topology.newStream("spout1", spout)
         .each(new Fields("sentence"), new Split(), new Fields("word"))
         .groupBy(new Fields("word"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
-</code></pre>
+</code></p>
 
-<p>persistentAggregate is an additional abstraction built on top of partitionPersist that knows how to take a Trident aggregator and use it to apply updates to the source of state. In this case, since this is a grouped stream, Trident expects the state you provide to implement the "MapState" interface. The grouping fields will be the keys in the state, and the aggregation result will be the values in the state. The "MapState" interface looks like this:</p>
+<p>persistentAggregate is an additional abstraction built on top of partitionPersist that knows how to take a Trident aggregator and use it to apply updates to the source of state. In this case, since this is a grouped stream, Trident expects the state you provide to implement the &#8220;MapState&#8221; interface. The grouping fields will be the keys in the state, and the aggregation result will be the values in the state. The &#8220;MapState&#8221; interface looks like this:</p>
 
-<pre><code class="java">public interface MapState&lt;T&gt; extends State {
+<p><code>java
+public interface MapState&lt;T&gt; extends State {
     List&lt;T&gt; multiGet(List&lt;List&lt;Object&gt;&gt; keys);
     List&lt;T&gt; multiUpdate(List&lt;List&lt;Object&gt;&gt; keys, List&lt;ValueUpdater&gt; updaters);
     void multiPut(List&lt;List&lt;Object&gt;&gt; keys, List&lt;T&gt; vals);
 }
-</code></pre>
+</code></p>
 
-<p>When you do aggregations on non-grouped streams (a global aggregation), Trident expects your State object to implement the "Snapshottable" interface:</p>
+<p>When you do aggregations on non-grouped streams (a global aggregation), Trident expects your State object to implement the &#8220;Snapshottable&#8221; interface:</p>
 
-<pre><code class="java">public interface Snapshottable&lt;T&gt; extends State {
+<p><code>java
+public interface Snapshottable&lt;T&gt; extends State {
     T get();
     T update(ValueUpdater updater);
     void set(T o);
 }
-</code></pre>
+</code></p>
 
 <p><a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/testing/MemoryMapState.java">MemoryMapState</a> and <a href="https://github.com/nathanmarz/trident-memcached/blob/master/src/jvm/trident/memcached/MemcachedState.java">MemcachedState</a> each implement both of these interfaces.</p>
 
-<h2>Implementing Map States</h2>
+<h2 id="implementing-map-states">Implementing Map States</h2>
 
-<p>Trident makes it easy to implement MapState's, doing almost all the work for you. The OpaqueMap, TransactionalMap, and NonTransactionalMap classes implement all the logic for doing the respective fault-tolerance logic. You simply provide these classes with an IBackingMap implementation that knows how to do multiGets and multiPuts of the respective key/values. IBackingMap looks like this:</p>
+<p>Trident makes it easy to implement MapState&#8217;s, doing almost all the work for you. The OpaqueMap, TransactionalMap, and NonTransactionalMap classes implement all the logic for doing the respective fault-tolerance logic. You simply provide these classes with an IBackingMap implementation that knows how to do multiGets and multiPuts of the respective key/values. IBackingMap looks like this:</p>
 
-<pre><code class="java">public interface IBackingMap&lt;T&gt; {
+<p><code>java
+public interface IBackingMap&lt;T&gt; {
     List&lt;T&gt; multiGet(List&lt;List&lt;Object&gt;&gt; keys); 
     void multiPut(List&lt;List&lt;Object&gt;&gt; keys, List&lt;T&gt; vals); 
 }
-</code></pre>
+</code></p>
 
-<p>OpaqueMap's will call multiPut with <a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/state/OpaqueValue.java">OpaqueValue</a>'s for the vals, TransactionalMap's will give <a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/state/TransactionalValue.java">TransactionalValue</a>'s for the vals, and NonTransactionalMaps will just pass the objects from the topology through.</p>
+<p>OpaqueMap&#8217;s will call multiPut with <a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/state/OpaqueValue.java">OpaqueValue</a>&#8217;s for the vals, TransactionalMap&#8217;s will give <a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/state/TransactionalValue.java">TransactionalValue</a>&#8217;s for the vals, and NonTransactionalMaps will just pass the objects from the topology through.</p>
 
 <p>Trident also provides the <a href="https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/storm/trident/state/map/CachedMap.java">CachedMap</a> class to do automatic LRU caching of map key/vals.</p>
 



Mime
View raw message