apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sas...@apache.org
Subject [21/22] incubator-apex-site git commit: APEXCORE-382 Adding docs/ to asf-site branch
Date Mon, 14 Mar 2016 23:08:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-apex-site/blob/5e417add/docs/apex-3.3/application_development/index.html
----------------------------------------------------------------------
diff --git a/docs/apex-3.3/application_development/index.html b/docs/apex-3.3/application_development/index.html
new file mode 100644
index 0000000..5297074
--- /dev/null
+++ b/docs/apex-3.3/application_development/index.html
@@ -0,0 +1,2550 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>Applications - Apache Apex Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../favicon.ico">
+  
+
+  
+  <link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "Applications";
+    var mkdocs_page_input_path = "application_development.md";
+    var mkdocs_page_url = "/application_development/";
+  </script>
+  
+  <script src="../js/jquery-2.1.1.min.js"></script>
+  <script src="../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../js/highlight.pack.js"></script>
+  <script src="../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href=".." class="icon icon-home"> Apache Apex Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../search.html" method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="..">Apache Apex</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Development</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../apex_development_setup/">Development Setup</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">Applications</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a href="#application-developer-guide">Application Developer Guide</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#running-a-test-application">Running A Test Application</a></li>
+                
+                    <li><a class="toctree-l4" href="#test-application-yahoo-finance-quotes">Test Application: Yahoo! Finance Quotes</a></li>
+                
+                    <li><a class="toctree-l4" href="#running-a-test-application_1">Running a Test Application</a></li>
+                
+                    <li><a class="toctree-l4" href="#local-mode">Local Mode</a></li>
+                
+                    <li><a class="toctree-l4" href="#hadoop-cluster">Hadoop Cluster</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#apache-apex-platform-overview">Apache Apex Platform Overview</a></li>
+                
+                    <li><a class="toctree-l4" href="#streaming-computational-model">Streaming Computational Model</a></li>
+                
+                    <li><a class="toctree-l4" href="#streaming-application-manager-stram">Streaming Application Manager (STRAM)</a></li>
+                
+                    <li><a class="toctree-l4" href="#hadoop-components">Hadoop Components</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#developing-an-application">Developing An Application</a></li>
+                
+                    <li><a class="toctree-l4" href="#development-process">Development Process</a></li>
+                
+                    <li><a class="toctree-l4" href="#application-api">Application API</a></li>
+                
+                    <li><a class="toctree-l4" href="#operators">Operators</a></li>
+                
+                    <li><a class="toctree-l4" href="#streams">Streams</a></li>
+                
+                    <li><a class="toctree-l4" href="#validating-an-application">Validating an Application</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#multi-tenancy-and-security">Multi-Tenancy and Security</a></li>
+                
+                    <li><a class="toctree-l4" href="#security">Security</a></li>
+                
+                    <li><a class="toctree-l4" href="#resource-limits">Resource Limits</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#scalability-and-partitioning">Scalability and Partitioning</a></li>
+                
+                    <li><a class="toctree-l4" href="#partitioning">Partitioning</a></li>
+                
+                    <li><a class="toctree-l4" href="#nxm-partitions">NxM Partitions</a></li>
+                
+                    <li><a class="toctree-l4" href="#parallel">Parallel</a></li>
+                
+                    <li><a class="toctree-l4" href="#parallel-partitions-with-streams-modes">Parallel Partitions with Streams Modes</a></li>
+                
+                    <li><a class="toctree-l4" href="#skew-balancing-partition">Skew Balancing Partition</a></li>
+                
+                    <li><a class="toctree-l4" href="#skew-unifier-partition">Skew Unifier Partition</a></li>
+                
+                    <li><a class="toctree-l4" href="#cascading-unifier">Cascading Unifier</a></li>
+                
+                    <li><a class="toctree-l4" href="#sla">SLA</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#fault-tolerance">Fault Tolerance</a></li>
+                
+                    <li><a class="toctree-l4" href="#state-of-the-application">State of the Application</a></li>
+                
+                    <li><a class="toctree-l4" href="#checkpointing">Checkpointing</a></li>
+                
+                    <li><a class="toctree-l4" href="#recovery-mechanisms_1">Recovery Mechanisms</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#dynamic-application-modifications">Dynamic Application Modifications</a></li>
+                
+            
+                <li class="toctree-l3"><a href="#demos">Demos</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../application_packages/">Packages</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../operator_development/">Operators</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../autometrics/">AutoMetric API</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operations</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../dtcli/">dtCli</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="..">Apache Apex Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Development &raquo;</li>
+        
+      
+    
+    <li>Applications</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="application-developer-guide">Application Developer Guide</h1>
+<p>The Apex platform is designed to process massive amounts of
+real-time events natively in Hadoop.  It runs as a YARN (Hadoop 2.x) 
+application and leverages Hadoop as a distributed operating
+system.  All the basic distributed operating system capabilities of
+Hadoop like resource management (YARN), distributed file system (HDFS),
+multi-tenancy, security, fault-tolerance, and scalability are supported natively 
+in all the Apex applications.  The platform handles all the details of the application 
+execution, including dynamic scaling, state checkpointing and recovery, event 
+processing guarantees, etc. allowing you to focus on writing your application logic without
+mixing operational and functional concerns.</p>
+<p>In the platform, building a streaming application can be extremely
+easy and intuitive.  The application is represented as a Directed
+Acyclic Graph (DAG) of computation units called <em>Operators</em> interconnected
+by the data-flow edges called  <em>Streams</em>. The operators process input
+streams and produce output streams. A library of common operators is
+provided to enable quick application development.  In case the desired
+processing is not available in the Operator Library, one can easily
+write a custom operator. We refer those interested in creating their own
+operators to the <a href="../operator_development/">Operator Development Guide</a>.</p>
+<h1 id="running-a-test-application">Running A Test Application</h1>
+<p>If you are starting with the Apex platform for the first time,
+it can be informative to launch an existing application and see it run.
+One of the simplest examples provided in <a href="https://github.com/apache/incubator-apex-malhar">Apex-Malhar repository</a> is a Pi demo application,
+which computes the value of PI using random numbers.  After <a href="../apex_development_setup/">setting up development environment</a>
+Pi demo can be launched as follows:</p>
+<ol>
+<li>Open up Apex Malhar files in your IDE (for example Eclipse, IntelliJ, NetBeans, etc)</li>
+<li>Navigate to <code>demos/pi/src/test/java/com/datatorrent/demos/ApplicationTest.java</code></li>
+<li>Run the test for ApplicationTest.java</li>
+<li>View the output in system console</li>
+</ol>
+<p>Congratulations, you just ran your first real-time streaming demo :) 
+This demo is very simple and has four operators. The first operator
+emits random integers between 0 to 30, 000. The second operator receives
+these coefficients and emits a hashmap with x and y values each time it
+receives two values. The third operator takes these values and computes
+x**2+y**2. The last operator counts how many computed values from
+the previous operator were less than or equal to 30, 000**2. Assuming
+this count is N, then PI is computed as N/number of values received.
+Here is the code snippet for the PI application. This code populates the
+DAG. Do not worry about what each line does, we will cover these
+concepts later in this document.</p>
+<pre><code class="java">// Generates random numbers
+RandomEventGenerator rand = dag.addOperator(&quot;rand&quot;, new RandomEventGenerator());
+rand.setMinvalue(0);
+rand.setMaxvalue(30000);
+
+// Generates a round robin HashMap of &quot;x&quot; and &quot;y&quot;
+RoundRobinHashMap&lt;String,Object&gt; rrhm = dag.addOperator(&quot;rrhm&quot;, new RoundRobinHashMap&lt;String, Object&gt;());
+rrhm.setKeys(new String[] { &quot;x&quot;, &quot;y&quot; });
+
+// Calculates pi from x and y
+JavaScriptOperator calc = dag.addOperator(&quot;picalc&quot;, new Script());
+calc.setPassThru(false);
+calc.put(&quot;i&quot;,0);
+calc.put(&quot;count&quot;,0);
+calc.addSetupScript(&quot;function pi() { if (x*x+y*y &lt;= &quot;+maxValue*maxValue+&quot;) { i++; } count++; return i / count * 4; }&quot;);
+calc.setInvoke(&quot;pi&quot;);
+dag.addStream(&quot;rand_rrhm&quot;, rand.integer_data, rrhm.data);
+dag.addStream(&quot;rrhm_calc&quot;, rrhm.map, calc.inBindings);
+
+// puts results on system console
+ConsoleOutputOperator console = dag.addOperator(&quot;console&quot;, new ConsoleOutputOperator());
+dag.addStream(&quot;rand_console&quot;,calc.result, console.input);
+</code></pre>
+
+<p>You can review the other demos and see what they do. The examples
+given in the Demos project cover various features of the platform and we
+strongly encourage you to read these to familiarize yourself with the
+platform. In the remaining part of this document we will go through
+details needed for you to develop and run streaming applications in
+Malhar.</p>
+<h2 id="test-application-yahoo-finance-quotes">Test Application: Yahoo! Finance Quotes</h2>
+<p>The PI application was to
+get you started. It is a basic application and does not fully illustrate
+the features of the platform. For the purpose of describing concepts, we
+will consider the test application shown in Figure 1. The application
+downloads tick data from  <a href="http://finance.yahoo.com">Yahoo! Finance</a>  and computes the
+following for four tickers, namely <a href="http://finance.yahoo.com/q?s=IBM">IBM</a>,
+<a href="http://finance.yahoo.com/q?s=GOOG">GOOG</a>, <a href="http://finance.yahoo.com/q?s=YHOO">YHOO</a>.</p>
+<ol>
+<li>Quote: Consisting of last trade price, last trade time, and
+    total volume for the day</li>
+<li>Per-minute chart data: Highest trade price, lowest trade
+    price, and volume during that minute</li>
+<li>Simple Moving Average: trade price over 5 minutes</li>
+</ol>
+<p>Total volume must ensure that all trade volume for that day is
+added, i.e. data loss would result in wrong results. Charting data needs
+all the trades in the same minute to go to the same slot, and then on it
+starts afresh, so again data loss would result in wrong results. The
+aggregation for charting data is done over 1 minute. Simple moving
+average computes the average price over a 5 minute sliding window; it
+too would produce wrong results if there is data loss. Figure 1 shows
+the application with no partitioning.</p>
+<p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image00.png" /></p>
+<p>The operator StockTickerInput: StockTickerInput<a href="http://docs.google.com/../apidocs/com/datatorrent/demos/yahoofinance/StockTickInput.html"> </a>is
+the input operator that reads live data from Yahoo! Finance once per
+interval (user configurable in milliseconds), and emits the price, the
+incremental volume, and the last trade time of each stock symbol, thus
+emulating real ticks from the exchange.  We utilize the Yahoo! Finance
+CSV web service interface.  For example:</p>
+<pre><code>$ GET 'http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&amp;f=sl1vt1'
+&quot;IBM&quot;,203.966,1513041,&quot;1:43pm&quot;
+&quot;GOOG&quot;,762.68,1879741,&quot;1:43pm&quot;
+&quot;AAPL&quot;,444.3385,11738366,&quot;1:43pm&quot;
+&quot;YHOO&quot;,19.3681,14707163,&quot;1:43pm&quot;
+</code></pre>
+
+<p>Among all the operators in Figure 1, StockTickerInput is the only
+operator that requires extra code because it contains a custom mechanism
+to get the input data.  Other operators are used unchanged from the
+Malhar library.</p>
+<p>Here is the class implementation for StockTickInput:</p>
+<pre><code class="java">package com.datatorrent.demos.yahoofinance;
+
+import au.com.bytecode.opencsv.CSVReader;
+import com.datatorrent.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.*;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.cookie.CookiePolicy;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.params.DefaultHttpParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This operator sends price, volume and time into separate ports and calculates incremental volume.
+ */
+public class StockTickInput implements InputOperator
+{
+  private static final Logger logger = LoggerFactory.getLogger(StockTickInput.class);
+  /**
+   * Timeout interval for reading from server. 0 or negative indicates no timeout.
+   */
+  public int readIntervalMillis = 500;
+  /**
+   * The URL of the web service resource for the POST request.
+   */
+  private String url;
+  public String[] symbols;
+  private transient HttpClient client;
+  private transient GetMethod method;
+  private HashMap&lt;String, Long&gt; lastVolume = new HashMap&lt;String, Long&gt;();
+  private boolean outputEvenIfZeroVolume = false;
+  /**
+   * The output port to emit price.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort&lt;KeyValPair&lt;String, Double&gt;&gt; price = new DefaultOutputPort&lt;KeyValPair&lt;String, Double&gt;&gt;();
+  /**
+   * The output port to emit incremental volume.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort&lt;KeyValPair&lt;String, Long&gt;&gt; volume = new DefaultOutputPort&lt;KeyValPair&lt;String, Long&gt;&gt;();
+  /**
+   * The output port to emit last traded time.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort&lt;KeyValPair&lt;String, String&gt;&gt; time = new DefaultOutputPort&lt;KeyValPair&lt;String, String&gt;&gt;();
+
+  /**
+   * Prepare URL from symbols and parameters. URL will be something like: http://download.finance.yahoo.com/d/quotes.csv?s=IBM,GOOG,AAPL,YHOO&amp;f=sl1vt1
+   *
+   * @return the URL
+   */
+  private String prepareURL()
+  {
+    String str = &quot;http://download.finance.yahoo.com/d/quotes.csv?s=&quot;;
+    for (int i = 0; i &lt; symbols.length; i++) {
+      if (i != 0) {
+        str += &quot;,&quot;;
+      }
+      str += symbols[i];
+    }
+    str += &quot;&amp;f=sl1vt1&amp;e=.csv&quot;;
+    return str;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    url = prepareURL();
+    client = new HttpClient();
+    method = new GetMethod(url);
+    DefaultHttpParams.getDefaultParams().setParameter(&quot;http.protocol.cookie-policy&quot;, CookiePolicy.BROWSER_COMPATIBILITY);
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+
+  @Override
+  public void emitTuples()
+  {
+
+    try {
+      int statusCode = client.executeMethod(method);
+      if (statusCode != HttpStatus.SC_OK) {
+        System.err.println(&quot;Method failed: &quot; + method.getStatusLine());
+      }
+      else {
+        InputStream istream = method.getResponseBodyAsStream();
+        // Process response
+        InputStreamReader isr = new InputStreamReader(istream);
+        CSVReader reader = new CSVReader(isr);
+        List&lt;String[]&gt; myEntries = reader.readAll();
+        for (String[] stringArr: myEntries) {
+          ArrayList&lt;String&gt; tuple = new ArrayList&lt;String&gt;(Arrays.asList(stringArr));
+          if (tuple.size() != 4) {
+            return;
+          }
+          // input csv is &lt;Symbol&gt;,&lt;Price&gt;,&lt;Volume&gt;,&lt;Time&gt;
+          String symbol = tuple.get(0);
+          double currentPrice = Double.valueOf(tuple.get(1));
+          long currentVolume = Long.valueOf(tuple.get(2));
+          String timeStamp = tuple.get(3);
+          long vol = currentVolume;
+          // Sends total volume in first tick, and incremental volume afterwards.
+          if (lastVolume.containsKey(symbol)) {
+            vol -= lastVolume.get(symbol);
+          }
+
+          if (vol &gt; 0 || outputEvenIfZeroVolume) {
+            price.emit(new KeyValPair&lt;String, Double&gt;(symbol, currentPrice));
+            volume.emit(new KeyValPair&lt;String, Long&gt;(symbol, vol));
+            time.emit(new KeyValPair&lt;String, String&gt;(symbol, timeStamp));
+            lastVolume.put(symbol, currentVolume);
+          }
+        }
+      }
+      Thread.sleep(readIntervalMillis);
+    }
+    catch (InterruptedException ex) {
+      logger.debug(ex.toString());
+    }
+    catch (IOException ex) {
+      logger.debug(ex.toString());
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+  }
+
+  public void setOutputEvenIfZeroVolume(boolean outputEvenIfZeroVolume)
+  {
+       this.outputEvenIfZeroVolume = outputEvenIfZeroVolume;
+  }
+
+}
+</code></pre>
+
+<p>The operator has three output ports that emit the price of the
+stock, the volume of the stock and the last trade time of the stock,
+declared as public member variables price, volume and  time of the class.  The tuple of the
+price output port is a key-value
+pair with the stock symbol being the key, and the price being the value.
+ The tuple of the volume output
+port is a key value pair with the stock symbol being the key, and the
+incremental volume being the value.  The tuple of the  time output port is a key value pair with the
+stock symbol being the key, and the last trade time being the
+value.</p>
+<p>Important: Since operators will be
+serialized, all input and output ports need to be declared transient
+because they are stateless and should not be serialized.</p>
+<p>The method setup(OperatorContext)
+contains the code that is necessary for setting up the HTTP
+client for querying Yahoo! Finance.</p>
+<p>Method emitTuples() contains
+the code that reads from Yahoo! Finance, and emits the data to the
+output ports of the operator.  emitTuples() will be called one or more times
+within one application window as long as time is allowed within the
+window.</p>
+<p>Note that we want to emulate the tick input stream by having
+incremental volume data with Yahoo! Finance data.  We therefore subtract
+the previous volume from the current volume to emulate incremental
+volume for each tick.</p>
+<p>The operator
+DailyVolume: This operator
+reads from the input port, which contains the incremental volume tuples
+from StockTickInput, and
+aggregates the data to provide the cumulative volume.  It uses the
+library class  SumKeyVal&lt;K,V&gt; provided in math package.  In this case,
+SumKeyVal&lt;String,Long&gt;, where K is the stock symbol, V is the
+aggregated volume, with cumulative
+set to true. (Otherwise if  cumulativewas set to false, SumKeyVal would
+provide the sum for the application window.)  Malhar provides a number
+of built-in operators for simple operations like this so that
+application developers do not have to write them.  More examples to
+follow. This operator assumes that the application restarts before the
+market opens every day.</p>
+<p>The operator Quote:
+This operator has three input ports, which are price (from
+StockTickInput), daily_vol (from
+Daily Volume), and time (from
+ StockTickInput).  This operator
+just consolidates the three data items and and emits the consolidated
+data.  It utilizes the class ConsolidatorKeyVal&lt;K&gt; from the
+stream package.</p>
+<p>The operator HighLow: This operator reads from the input port,
+which contains the price tuples from StockTickInput, and provides the high and the
+low price within the application window.  It utilizes the library class
+ RangeKeyVal&lt;K,V&gt; provided
+in the math package. In this case,
+RangeKeyVal&lt;String,Double&gt;.</p>
+<p>The operator MinuteVolume:
+This operator reads from the input port, which contains the
+volume tuples from StockTickInput,
+and aggregates the data to provide the sum of the volume within one
+minute.  Like the operator  DailyVolume, this operator also uses
+SumKeyVal&lt;String,Long&gt;, but
+with cumulative set to false.  The
+Application Window is set to one minute. We will explain how to set this
+later.</p>
+<p>The operator Chart:
+This operator is very similar to the operator Quote, except that it takes inputs from
+High Low and  Minute Vol and outputs the consolidated tuples
+to the output port.</p>
+<p>The operator PriceSMA:
+SMA stands for - Simple Moving Average. It reads from the
+input port, which contains the price tuples from StockTickInput, and
+provides the moving average price of the stock.  It utilizes
+SimpleMovingAverage&lt;String,Double&gt;, which is provided in the
+ multiwindow package.
+SimpleMovingAverage keeps track of the data of the previous N
+application windows in a sliding manner.  For each end window event, it
+provides the average of the data in those application windows.</p>
+<p>The operator Console:
+This operator just outputs the input tuples to the console
+(or stdout).  In this example, there are four console operators, which connect to the output
+of  Quote, Chart, PriceSMA and VolumeSMA.  In
+practice, they should be replaced by operators that use the data to
+produce visualization artifacts like charts.</p>
+<p>Connecting the operators together and constructing the
+DAG: Now that we know the
+operators used, we will create the DAG, set the streaming window size,
+instantiate the operators, and connect the operators together by adding
+streams that connect the output ports with the input ports among those
+operators.  This code is in the file  YahooFinanceApplication.java. Refer to Figure 1
+again for the graphical representation of the DAG.  The last method in
+the code, namely getApplication(),
+does all that.  The rest of the methods are just for setting up the
+operators.</p>
+<pre><code class="java">package com.datatorrent.demos.yahoofinance;
+
+import com.datatorrent.api.ApplicationFactory;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.math.RangeKeyVal;
+import com.datatorrent.lib.math.SumKeyVal;
+import com.datatorrent.lib.multiwindow.SimpleMovingAverage;
+import com.datatorrent.lib.stream.ConsolidatorKeyVal;
+import com.datatorrent.lib.util.HighLow;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Yahoo! Finance application demo. &lt;p&gt;
+ *
+ * Get Yahoo finance feed and calculate minute price range, minute volume, simple moving average of 5 minutes.
+ */
+public class Application implements StreamingApplication
+{
+  private int streamingWindowSizeMilliSeconds = 1000; // 1 second (default is 500ms)
+  private int appWindowCountMinute = 60;   // 1 minute
+  private int appWindowCountSMA = 5 * 60;  // 5 minute
+
+  /**
+   * Get actual Yahoo finance ticks of symbol, last price, total daily volume, and last traded price.
+   */
+  public StockTickInput getStockTickInputOperator(String name, DAG dag)
+  {
+    StockTickInput oper = dag.addOperator(name, StockTickInput.class);
+    oper.readIntervalMillis = 200;
+    return oper;
+  }
+
+  /**
+   * This sends total daily volume by adding volumes from each ticks.
+   */
+  public SumKeyVal&lt;String, Long&gt; getDailyVolumeOperator(String name, DAG dag)
+  {
+    SumKeyVal&lt;String, Long&gt; oper = dag.addOperator(name, new SumKeyVal&lt;String, Long&gt;());
+    oper.setType(Long.class);
+    oper.setCumulative(true);
+    return oper;
+  }
+
+  /**
+   * Get aggregated volume of 1 minute and send at the end window of 1 minute.
+   */
+  public SumKeyVal&lt;String, Long&gt; getMinuteVolumeOperator(String name, DAG dag, int appWindowCount)
+  {
+    SumKeyVal&lt;String, Long&gt; oper = dag.addOperator(name, new SumKeyVal&lt;String, Long&gt;());
+    oper.setType(Long.class);
+    oper.setEmitOnlyWhenChanged(true);
+dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);
+    return oper;
+  }
+
+  /**
+   * Get High-low range for 1 minute.
+   */
+  public RangeKeyVal&lt;String, Double&gt; getHighLowOperator(String name, DAG dag, int appWindowCount)
+  {
+    RangeKeyVal&lt;String, Double&gt; oper = dag.addOperator(name, new RangeKeyVal&lt;String, Double&gt;());
+    dag.getOperatorMeta(name).getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT,appWindowCount);
+    oper.setType(Double.class);
+    return oper;
+  }
+
+  /**
+   * Quote (Merge price, daily volume, time)
+   */
+  public ConsolidatorKeyVal&lt;String,Double,Long,String,?,?&gt; getQuoteOperator(String name, DAG dag)
+  {
+    ConsolidatorKeyVal&lt;String,Double,Long,String,?,?&gt; oper = dag.addOperator(name, new ConsolidatorKeyVal&lt;String,Double,Long,String,Object,Object&gt;());
+    return oper;
+  }
+
+  /**
+   * Chart (Merge minute volume and minute high-low)
+   */
+  public ConsolidatorKeyVal&lt;String,HighLow,Long,?,?,?&gt; getChartOperator(String name, DAG dag)
+  {
+    ConsolidatorKeyVal&lt;String,HighLow,Long,?,?,?&gt; oper = dag.addOperator(name, new ConsolidatorKeyVal&lt;String,HighLow,Long,Object,Object,Object&gt;());
+    return oper;
+  }
+
+  /**
+   * Get simple moving average of price.
+   */
+  public SimpleMovingAverage&lt;String, Double&gt; getPriceSimpleMovingAverageOperator(String name, DAG dag, int appWindowCount)
+  {
+    SimpleMovingAverage&lt;String, Double&gt; oper = dag.addOperator(name, new SimpleMovingAverage&lt;String, Double&gt;());
+    oper.setWindowSize(appWindowCount);
+    oper.setType(Double.class);
+    return oper;
+  }
+
+  /**
+   * Get console for output.
+   */
+  public InputPort&lt;Object&gt; getConsole(String name, /*String nodeName,*/ DAG dag, String prefix)
+  {
+    ConsoleOutputOperator oper = dag.addOperator(name, ConsoleOutputOperator.class);
+    oper.setStringFormat(prefix + &quot;: %s&quot;);
+    return oper.input;
+  }
+
+  /**
+   * Create Yahoo Finance Application DAG.
+   */
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    dag.getAttributes().put(DAG.STRAM_WINDOW_SIZE_MILLIS,streamingWindowSizeMilliSeconds);
+
+    StockTickInput tick = getStockTickInputOperator(&quot;StockTickInput&quot;, dag);
+    SumKeyVal&lt;String, Long&gt; dailyVolume = getDailyVolumeOperator(&quot;DailyVolume&quot;, dag);
+    ConsolidatorKeyVal&lt;String,Double,Long,String,?,?&gt; quoteOperator = getQuoteOperator(&quot;Quote&quot;, dag);
+
+    RangeKeyVal&lt;String, Double&gt; highlow = getHighLowOperator(&quot;HighLow&quot;, dag, appWindowCountMinute);
+    SumKeyVal&lt;String, Long&gt; minuteVolume = getMinuteVolumeOperator(&quot;MinuteVolume&quot;, dag, appWindowCountMinute);
+    ConsolidatorKeyVal&lt;String,HighLow,Long,?,?,?&gt; chartOperator = getChartOperator(&quot;Chart&quot;, dag);
+
+    SimpleMovingAverage&lt;String, Double&gt; priceSMA = getPriceSimpleMovingAverageOperator(&quot;PriceSMA&quot;, dag, appWindowCountSMA);
+       DefaultPartitionCodec&lt;String, Double&gt; codec = new DefaultPartitionCodec&lt;String, Double&gt;();
+    dag.setInputPortAttribute(highlow.data, PortContext.STREAM_CODEC, codec);
+    dag.setInputPortAttribute(priceSMA.data, PortContext.STREAM_CODEC, codec);
+    dag.addStream(&quot;price&quot;, tick.price, quoteOperator.in1, highlow.data, priceSMA.data);
+    dag.addStream(&quot;vol&quot;, tick.volume, dailyVolume.data, minuteVolume.data);
+    dag.addStream(&quot;time&quot;, tick.time, quoteOperator.in3);
+    dag.addStream(&quot;daily_vol&quot;, dailyVolume.sum, quoteOperator.in2);
+
+    dag.addStream(&quot;quote_data&quot;, quoteOperator.out, getConsole(&quot;quoteConsole&quot;, dag, &quot;QUOTE&quot;));
+
+    dag.addStream(&quot;high_low&quot;, highlow.range, chartOperator.in1);
+    dag.addStream(&quot;vol_1min&quot;, minuteVolume.sum, chartOperator.in2);
+    dag.addStream(&quot;chart_data&quot;, chartOperator.out, getConsole(&quot;chartConsole&quot;, dag, &quot;CHART&quot;));
+
+    dag.addStream(&quot;sma_price&quot;, priceSMA.doubleSMA, getConsole(&quot;priceSMAConsole&quot;, dag, &quot;Price SMA&quot;));
+
+    return dag;
+  }
+
+}
+</code></pre>
+
+<p>Note that we also set a user-specific sliding window for SMA that
+keeps track of the previous N data points.  Do not confuse this with the
+attribute APPLICATION_WINDOW_COUNT.</p>
+<p>In the rest of this chapter we will run through the process of
+running this application. We assume that  you are familiar with details
+of your Hadoop infrastructure. For installation
+details please refer to the <a href="http://docs.datatorrent.com/installation/">Installation Guide</a>.</p>
+<h2 id="running-a-test-application_1">Running a Test Application</h2>
+<p>We will now describe how to run the yahoo
+finance application described above in different modes
+(local mode, single node on Hadoop, and multi-nodes on Hadoop).</p>
+<p>The platform runs streaming applications under the control of a
+light-weight Streaming Application Manager (STRAM). Each application has
+its own instance of STRAM. STRAM launches the application and
+continually provides run time monitoring, analysis, and takes action
+such as load scaling or outage recovery as needed.  We will discuss
+STRAM in more detail in the next chapter.</p>
+<p>The instructions below assume that the platform was installed in a
+directory &lt;INSTALL_DIR&gt; and the command line interface (CLI) will
+be used to launch the demo application. An application can be run in
+local mode (in IDE or from command line) or on a Hadoop cluster.</p>
+<p>To start the dtCli run</p>
+<pre><code>&lt;INSTALL_DIR&gt;/bin/dtcli
+</code></pre>
+<p>The command line prompt appears.  To start the application in local mode (the actual version number in the file name may differ)</p>
+<pre><code>dt&gt; launch -local &lt;INSTALL_DIR&gt;/yahoo-finance-demo-3.2.0-SNAPSHOT.apa
+</code></pre>
+<p>To terminate the application in local mode, enter Ctrl-C</p>
+<p>Tu run the application on the Hadoop cluster (the actual version
+number in the file name may differ)</p>
+<pre><code>dt&gt; launch &lt;INSTALL_DIR&gt;/yahoo-finance-demo-3.2.0-SNAPSHOT.apa
+</code></pre>
+<p>To stop the application running in Hadoop, terminate it in the dtCli:</p>
+<pre><code>dt&gt; kill-app
+</code></pre>
+<p>Executing the application in either mode includes the following
+steps. At a top level, STRAM (Streaming Application Manager) validates
+the application (DAG), translates the logical plan to the physical plan
+and then launches the execution engine. The mode determines the
+resources needed and how how they are used.</p>
+<h2 id="local-mode">Local Mode</h2>
+<p>In local mode, the application is run as a single-process with multiple threads. Although a
+few Hadoop classes are needed, there is no dependency on a Hadoop
+cluster or Hadoop services. The local file system is used in place of
+HDFS. This mode allows a quick run of an application in a single process
+sandbox, and hence is the most suitable to debug and analyze the
+application logic. This mode is recommended for developing the
+application and can be used for running applications within the IDE for
+functional testing purposes. Due to limited resources and lack  of
+scalability an application running in this single process mode is more
+likely to encounter throughput bottlenecks. A distributed cluster is
+recommended for benchmarking and production testing.</p>
+<h2 id="hadoop-cluster">Hadoop Cluster</h2>
+<p>In this section we discuss various Hadoop cluster setups.</p>
+<h3 id="single-node-cluster">Single Node Cluster</h3>
+<p>In a single node Hadoop cluster all services are deployed on a
+single server (a developer can use his/her development machine as a
+single node cluster). The platform does not distinguish between a single
+or multi-node setup and behaves exactly the same in both cases.</p>
+<p>In this mode, the resource manager, name node, data node, and node
+manager occupy one process each. This is an example of running a
+streaming application as a multi-process application on the same server.
+With prevalence of fast, multi-core systems, this mode is effective for
+debugging, fine tuning, and generic analysis before submitting the job
+to a larger Hadoop cluster. In this mode, execution uses the Hadoop
+services and hence is likely to identify issues that are related to the
+Hadoop environment (such issues will not be uncovered in local mode).
+The throughput will obviously not be as high as on a multi-node Hadoop
+cluster. Additionally, since each container (i.e. Java process) requires
+a significant amount of memory, you will be able to run a much smaller
+number of containers than on a multi-node cluster.</p>
+<h3 id="multi-node-cluster">Multi-Node Cluster</h3>
+<p>In a multi-node Hadoop cluster all the services of Hadoop are
+typically distributed across multiple nodes in a production or
+production-level test environment. Upon launch the application is
+submitted to the Hadoop cluster and executes as a  multi-processapplication on multiple nodes.</p>
+<p>Before you start deploying, testing and troubleshooting your
+application on a cluster, you should ensure that Hadoop (version 2.2.0
+or later) is properly installed and
+you have basic skills for working with it.</p>
+<hr />
+<h1 id="apache-apex-platform-overview">Apache Apex Platform Overview</h1>
+<h2 id="streaming-computational-model">Streaming Computational Model</h2>
+<p>In this chapter, we describe the the basics of the real-time streaming platform and its computational model.</p>
+<p>The platform is designed to enable completely asynchronous real time computations done in as unblocked a way as possible with
+minimal overhead .</p>
+<p>Applications running in the platform are represented by a Directed
+Acyclic Graph (DAG) made up of  operators and streams. All computations
+are done in memory on arrival of
+the input data, with an option to save the output to disk (HDFS) in a
+non-blocking way. The data that flows between operators consists of
+atomic data elements. Each data element along with its type definition
+(henceforth called  schema) is
+called a tuple. An application is a
+design of the flow of these tuples to and from
+the appropriate compute units to enable the computation of the final
+desired results. A message queue (henceforth called
+ buffer server) manages tuples streaming
+between compute units in different processes.This server keeps track of
+all consumers, publishers, partitions, and enables replay. More
+information is given in later section.</p>
+<p>The streaming application is monitored by a decision making entity
+called STRAM (streaming application
+manager). STRAM is designed to be a light weight
+controller that has minimal but sufficient interaction with the
+application. This is done via periodic heartbeats. The
+STRAM does the initial launch and periodically analyzes the system
+metrics to decide if any run time action needs to be taken.</p>
+<p>A fundamental building block for the streaming platform
+is the concept of breaking up a stream into equal finite time slices
+called streaming windows. Each window contains the ordered
+set of tuples in that time slice. A typical duration of a window is 500
+ms, but can be configured per application (the Yahoo! Finance
+application configures this value in the  properties.xml file to be 1000ms = 1s). Each
+window is preceded by a begin_window event and is terminated by an
+end_window event, and is assigned
+a unique window ID. Even though the platform performs computations at
+the tuple level, bookkeeping is done at the window boundary, making the
+computations within a window an atomic event in the platform.  We can
+think of each window as an  atomic
+micro-batch of tuples, to be processed together as one
+atomic operation (See Figure 2).  </p>
+<p>This atomic batching allows the platform to avoid the very steep
+per tuple bookkeeping cost and instead has a manageable per batch
+bookkeeping cost. This translates to higher throughput, low recovery
+time, and higher scalability. Later in this document we illustrate how
+the atomic micro-batch concept allows more efficient optimization
+algorithms.</p>
+<p>The platform also has in-built support for
+application windows.  An application window is part of the
+application specification, and can be a small or large multiple of the
+streaming window.  An example from our Yahoo! Finance test application
+is the moving average, calculated over a sliding application window of 5
+minutes which equates to 300 (= 5 * 60) streaming windows.</p>
+<p>Note that these two window concepts are distinct.  A streaming
+window is an abstraction of many tuples into a higher atomic event for
+easier management.  An application window is a group of consecutive
+streaming windows used for data aggregation (e.g. sum, average, maximum,
+minimum) on a per operator level.</p>
+<p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image02.png" /></p>
+<p>Alongside the platform, a set of
+predefined, benchmarked standard library operator templates is provided
+for ease of use and rapid development of application. These
+operators are open sourced to Apache Software Foundation under the
+project name “Malhar” as part of our efforts to foster community
+innovation. These operators can be used in a DAG as is, while others
+have properties that can be set to specify the
+desired computation. Those interested in details, should refer to
+<a href="https://github.com/apache/incubator-apex-malhar">Apex-Malhar operator library</a>.</p>
+<p>The platform is a Hadoop YARN native
+application. It runs in a Hadoop cluster just like any
+other YARN application (MapReduce etc.) and is designed to seamlessly
+integrate with rest of Hadoop technology stack. It leverages Hadoop as
+much as possible and relies on it as its distributed operating system.
+Hadoop dependencies include resource management, compute/memory/network
+allocation, HDFS, security, fault tolerance, monitoring, metrics,
+multi-tenancy, logging etc. Hadoop classes/concepts are reused as much
+as possible.  The aim is to enable enterprises
+to leverage their existing Hadoop infrastructure for real time streaming
+applications. The platform is designed to scale with big
+data applications and scale with Hadoop.</p>
+<p>A streaming application is an asynchronous execution of
+computations across distributed nodes. All computations are done in
+parallel on a distributed cluster. The computation model is designed to
+do as many parallel computations as possible in a non-blocking fashion.
+The task of monitoring of the entire application is done on (streaming)
+window boundaries with a streaming window as an atomic entity. A window
+completion is a quantum of work done. There is no assumption that an
+operator can be interrupted at precisely a particular tuple or window.</p>
+<p>An operator itself also
+cannot assume or predict the exact time a tuple that it emitted would
+get consumed by downstream operators. The operator processes the tuples
+it gets and simply emits new tuples based on its business logic. The
+only guarantee it has is that the upstream operators are processing
+either the current or some later window, and the downstream operator is
+processing either the current or some earlier window. The completion of
+a window (i.e. propagation of the  end_window event through an operator) in any
+operator guarantees that all upstream operators have finished processing
+this window. Thus, the end_window event is blocking on an operator
+with multiple outputs, and is a synchronization point in the DAG. The
+ begin_window event does not have
+any such restriction, a single begin_window event from any upstream operator
+triggers the operator to start processing tuples.</p>
+<h2 id="streaming-application-manager-stram">Streaming Application Manager (STRAM)</h2>
+<p>Streaming Application Manager (STRAM) is the Hadoop YARN native
+application master. STRAM is the first process that is activated upon
+application launch and orchestrates the streaming application on the
+platform. STRAM is a lightweight controller process. The
+responsibilities of STRAM include</p>
+<ol>
+<li>
+<p>Running the Application</p>
+<ul>
+<li>Read the logical plan of the application (DAG) submitted by the client</li>
+<li>Validate the logical plan</li>
+<li>Translate the logical plan into a physical plan, where certain operators may  be partitioned (i.e. replicated) to multiple operators for  handling load.</li>
+<li>Request resources (Hadoop containers) from Resource Manager,
+    per physical plan</li>
+<li>Based on acquired resources and application attributes, create
+    an execution plan by partitioning the DAG into fragments,
+    each assigned to different containers.</li>
+<li>Executes the application by deploying each fragment to
+    its container. Containers then start stream processing and run
+    autonomously, processing one streaming window after another. Each
+    container is represented as an instance of the  StreamingContainer class, which updates
+    STRAM via the heartbeat protocol and processes directions received
+    from STRAM.</li>
+</ul>
+</li>
+<li>
+<p>Continually monitoring the application via heartbeats from each StreamingContainer</p>
+</li>
+<li>Collecting Application System Statistics and Logs</li>
+<li>Logging all application-wide decisions taken</li>
+<li>Providing system data on the state of the application via a  Web Service.</li>
+<li>
+<p>Supporting Fault Tolerance</p>
+<p>a.  Detecting a node outage
+b.  Requesting a replacement resource from the Resource Manager
+    and scheduling state restoration for the streaming operators
+c.  Saving state to Zookeeper</p>
+</li>
+<li>
+<p>Supporting Dynamic Partitioning: Periodically evaluating the SLA and modifying the physical plan if required
+    (logical plan does not change).</p>
+</li>
+<li>Enabling Security: Distributing security tokens for distributed components of the execution engine
+    and securing web service requests.</li>
+<li>Enabling Dynamic modification of DAG: In the future, we intend to allow for user initiated
+    modification of the logical plan to allow for changes to the
+    processing logic and functionality.</li>
+</ol>
+<p>An example of the Yahoo! Finance Quote application scheduled on a
+cluster of 5 Hadoop containers (processes) is shown in Figure 3.</p>
+<p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image01.png" /></p>
+<p>An example for the translation from a logical plan to a physical
+plan and an execution plan for a subset of the application is shown in
+Figure 4.</p>
+<p><img alt="" src="../images/application_development/ApplicationDeveloperGuide.html-image04.png" /></p>
+<h2 id="hadoop-components">Hadoop Components</h2>
+<p>In this section we cover some aspects of Hadoop that your
+streaming application interacts with. This section is not meant to
+educate the reader on Hadoop, but just get the reader acquainted with
+the terms. We strongly advise readers to learn Hadoop from other
+sources.</p>
+<p>A streaming application runs as a native Hadoop 2.2 application.
+Hadoop 2.2 does not differentiate between a map-reduce job and other
+applications, and hence as far as Hadoop is concerned, the streaming
+application is just another job. This means that your application
+leverages all the bells and whistles Hadoop provides and is fully
+supported within Hadoop technology stack. The platform is responsible
+for properly integrating itself with the relevant components of Hadoop
+that exist today and those that may emerge in the future</p>
+<p>All investments that leverage multi-tenancy (for example quotas
+and queues), security (for example kerberos), data flow integration (for
+example copying data in-out of HDFS), monitoring, metrics collections,
+etc. will require no changes when streaming applications run on
+Hadoop.</p>
+<h3 id="yarn">YARN</h3>
+<p><a href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site">YARN</a>is
+the core library of Hadoop 2.2 that is tasked with resource management
+and works as a distributed application framework. In this section we
+will walk through Yarn's components. In Hadoop 2.2, the old jobTracker
+has been replaced by a combination of ResourceManager (RM) and
+ApplicationMaster (AM).</p>
+<h4 id="resource-manager-rm">Resource Manager (RM)</h4>
+<p><a href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html">ResourceManager</a>(RM)
+manages all the distributed resources. It allocates and arbitrates all
+the slots and the resources (cpu, memory, network) of these slots. It
+works with per-node NodeManagers (NMs) and per-application
+ApplicationMasters (AMs). Currently memory usage is monitored by RM; in
+upcoming releases it will have CPU as well as network management. RM is
+shared by map-reduce and streaming applications. Running streaming
+applications requires no changes in the RM.</p>
+<h4 id="application-master-am">Application Master (AM)</h4>
+<p>The AM is the watchdog or monitoring process for your application
+and has the responsibility of negotiating resources with RM and
+interacting with NodeManagers to get the allocated containers started.
+The AM is the starting point of your application and is considered user
+code (not system Hadoop code). The AM itself runs in one container. All
+resource management within the application are managed by the AM. This
+is a critical feature for Hadoop 2.2 where tasks done by jobTracker in
+Hadoop 1.0 have been distributed allowing Hadoop 2.2 to scale much
+beyond Hadoop 1.0. STRAM is a native YARN ApplicationManager.</p>
+<h4 id="node-managers-nm">Node Managers (NM)</h4>
+<p>There is one <a href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html">NodeManager</a>(NM)
+per node in the cluster. All the containers (i.e. processes) on that
+node are monitored by the NM. It takes instructions from RM and manages
+resources of that node as per RM instructions. NMs interactions are same
+for map-reduce and for streaming applications. Running streaming
+applications requires no changes in the NM.</p>
+<h4 id="rpc-protocol">RPC Protocol</h4>
+<p>Communication among RM, AM, and NM is done via the Hadoop RPC
+protocol. Streaming applications use the same protocol to send their
+data. No changes are needed in RPC support provided by Hadoop to enable
+communication done by components of your application.</p>
+<h3 id="hdfs">HDFS</h3>
+<p>Hadoop includes a highly fault tolerant, high throughput
+distributed file system (<a href="http://hadoop.apache.org/docs/r1.0.4/hdfs_design.html">HDFS</a>).
+It runs on commodity hardware, and your streaming application will, by
+default, use it. There is no difference between files created by a
+streaming application and those created by map-reduce.</p>
+<h1 id="developing-an-application">Developing An Application</h1>
+<p>In this chapter we describe the methodology to develop an
+application using the Realtime Streaming Platform. The platform was
+designed to make it easy to build and launch sophisticated streaming
+applications with the developer having to deal only with the
+application/business logic. The platform deals with details of where to
+run what operators on which servers and how to correctly route streams
+of data among them.</p>
+<h2 id="development-process">Development Process</h2>
+<p>While the platform does not mandate a specific methodology or set
+of development tools, we have recommendations to maximize productivity
+for the different phases of application development.</p>
+<h4 id="design">Design</h4>
+<ul>
+<li>Identify common, reusable operators. Use a library
+    if possible.</li>
+<li>Identify scalability and performance requirements before
+    designing the DAG.</li>
+<li>Leverage attributes that the platform supports for scalability
+    and performance.</li>
+<li>Use operators that are benchmarked and tested so that later
+    surprises are minimized. If you have glue code, create appropriate
+    unit tests for it.</li>
+<li>Use THREAD_LOCAL locality for high throughput streams. If all
+    the operators on that stream cannot fit in one container,
+    try NODE_LOCAL locality. Both THREAD_LOCAL and
+    NODE_LOCAL streams avoid the Network Interface Card (NIC)
+    completly. The former uses intra-process communication to also avoid
+    serialization-deserialization overhead.</li>
+<li>The overall throughput and latencies are are not necessarily
+    correlated to the number of operators in a simple way -- the
+    relationship is more nuanced. A lot depends on how much work
+    individual operators are doing, how many are able to operate in
+    parallel, and how much data is flowing through the arcs of the DAG.
+    It is, at times, better to break a computation down into its
+    constituent simple parts and then stitch them together via streams
+    to better utilize the compute resources of the cluster. Decide on a
+    per application basis the fine line between complexity of each
+    operator vs too many streams. Doing multiple computations in one
+    operator does save network I/O, while operators that are too complex
+    are hard to maintain.</li>
+<li>Do not use operators that depend on the order of two streams
+    as far as possible. In such cases behavior is not idempotent.</li>
+<li>Persist key information to HDFS if possible; it may be useful
+    for debugging later.</li>
+<li>Decide on an appropriate fault tolerance mechanism. If some
+    data loss is acceptable, use the at-most-once mechanism as it has
+    fastest recovery.</li>
+</ul>
+<h4 id="creating-new-project">Creating New Project</h4>
+<p>Please refer to the <a href="../application_packages/">Apex Application Packages</a> for
+the basic steps for creating a new project.</p>
+<h4 id="writing-the-application-code">Writing the application code</h4>
+<p>Preferably use an IDE (Eclipse, Netbeans etc.) that allows you to
+manage dependencies and assists with the Java coding. Specific benefits
+include ease of managing operator library jar files, individual operator
+classes, ports and properties. It will also highlight and assist to
+rectify issues such as type mismatches when adding streams while
+typing.</p>
+<h4 id="testing">Testing</h4>
+<p>Write test cases with JUnit or similar test framework so that code
+is tested as it is written. For such testing, the DAG can run in local
+mode within the IDE. Doing this may involve writing mock input or output
+operators for the integration points with external systems. For example,
+instead of reading from a live data stream, the application in test mode
+can read from and write to files. This can be done with a single
+application DAG by instrumenting a test mode using settings in the
+configuration that is passed to the application factory
+interface.</p>
+<p>Good test coverage will not only eliminate basic validation errors
+such as missing port connections or property constraint violations, but
+also validate the correct processing of the data. The same tests can be
+re-run whenever the application or its dependencies change (operator
+libraries, version of the platform etc.)</p>
+<h4 id="running-an-application">Running an application</h4>
+<p>The platform provides a commandline tool called dtcli for managing applications (launching,
+killing, viewing, etc.). This tool was already discussed above briefly
+in the section entitled Running the Test Application. It will introspect
+the jar file specified with the launch command for applications (classes
+that implement ApplicationFactory) or property files that define
+applications. It will also deploy the dependency jar files from the
+application package to the cluster.</p>
+<p>Dtcli can run the application in local mode (i.e. outside a
+cluster). It is recommended to first run the application in local mode
+in the development environment before launching on the Hadoop cluster.
+This way some of the external system integration and correct
+functionality of the application can be verified in an easier to debug
+environment before testing distributed mode.</p>
+<p>For more details on CLI please refer to the <a href="../dtcli/">dtCli Guide</a>.</p>
+<h2 id="application-api">Application API</h2>
+<p>This section introduces the API to write a streaming application.
+The work involves connecting operators via streams to form the logical
+DAG. The steps are</p>
+<ol>
+<li>
+<p>Instantiate an application (DAG)</p>
+</li>
+<li>
+<p>(Optional) Set Attributes</p>
+<ul>
+<li>Assign application name</li>
+<li>Set any other attributes as per application requirements</li>
+</ul>
+</li>
+<li>
+<p>Create/re-use and instantiate operators</p>
+<ul>
+<li>Assign operator name that is unique within the  application</li>
+<li>Declare schema upfront for each operator (and thereby its ports)</li>
+<li>(Optional) Set properties  and attributes on the dag as per specification</li>
+<li>Connect ports of operators via streams<ul>
+<li>Each stream connects one output port of an operator to one or  more input ports of other operators.</li>
+<li>(Optional) Set attributes on the streams</li>
+</ul>
+</li>
+</ul>
+</li>
+<li>
+<p>Test the application.</p>
+</li>
+</ol>
+<p>There are two methods to create an application, namely Java, and
+Properties file. Java API is for applications being developed by humans,
+and properties file (Hadoop like) is more suited for DAGs generated by
+tools.</p>
+<h3 id="java-api">Java API</h3>
+<p>The Java API is the most common way to create a streaming
+application. It is meant for application developers who prefer to
+leverage the features of Java, and the ease of use and enhanced
+productivity provided by IDEs like NetBeans or Eclipse. Using Java to
+specify the application provides extra validation abilities of Java
+compiler, such as compile time checks for type safety at the time of
+writing the code. Later in this chapter you can read more about
+validation support in the platform.</p>
+<p>The developer specifies the streaming application by implementing
+the ApplicationFactory interface, which is how platform tools (CLI etc.)
+recognize and instantiate applications. Here we show how to create a
+Yahoo! Finance application that streams the last trade price of a ticker
+and computes the high and low price in every 1 min window. Run above
+ test application to execute the
+DAG in local mode within the IDE.</p>
+<p>Let us revisit how the Yahoo! Finance test application constructs the DAG:</p>
+<pre><code class="java">public class Application implements StreamingApplication
+{
+
+  ...
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    dag.getAttributes().attr(DAG.STRAM_WINDOW_SIZE_MILLIS).set(streamingWindowSizeMilliSeconds);
+
+    StockTickInput tick = getStockTickInputOperator(&quot;StockTickInput&quot;, dag);
+    SumKeyVal&lt;String, Long&gt; dailyVolume = getDailyVolumeOperator(&quot;DailyVolume&quot;, dag);
+    ConsolidatorKeyVal&lt;String,Double,Long,String,?,?&gt; quoteOperator = getQuoteOperator(&quot;Quote&quot;, dag);
+
+    RangeKeyVal&lt;String, Double&gt; highlow = getHighLowOperator(&quot;HighLow&quot;, dag, appWindowCountMinute);
+    SumKeyVal&lt;String, Long&gt; minuteVolume = getMinuteVolumeOperator(&quot;MinuteVolume&quot;, dag, appWindowCountMinute);
+    ConsolidatorKeyVal&lt;String,HighLow,Long,?,?,?&gt; chartOperator = getChartOperator(&quot;Chart&quot;, dag);
+
+    SimpleMovingAverage&lt;String, Double&gt; priceSMA = getPriceSimpleMovingAverageOperator(&quot;PriceSMA&quot;, dag, appWindowCountSMA);
+
+    dag.addStream(&quot;price&quot;, tick.price, quoteOperator.in1, highlow.data, priceSMA.data);
+    dag.addStream(&quot;vol&quot;, tick.volume, dailyVolume.data, minuteVolume.data);
+    dag.addStream(&quot;time&quot;, tick.time, quoteOperator.in3);
+    dag.addStream(&quot;daily_vol&quot;, dailyVolume.sum, quoteOperator.in2);
+
+    dag.addStream(&quot;quote_data&quot;, quoteOperator.out, getConsole(&quot;quoteConsole&quot;, dag, &quot;QUOTE&quot;));
+
+    dag.addStream(&quot;high_low&quot;, highlow.range, chartOperator.in1);
+    dag.addStream(&quot;vol_1min&quot;, minuteVolume.sum, chartOperator.in2);
+    dag.addStream(&quot;chart_data&quot;, chartOperator.out, getConsole(&quot;chartConsole&quot;, dag, &quot;CHART&quot;));
+
+    dag.addStream(&quot;sma_price&quot;, priceSMA.doubleSMA, getConsole(&quot;priceSMAConsole&quot;, dag, &quot;Price SMA&quot;));
+
+    return dag;
+  }
+}
+</code></pre>
+
+<h3 id="property-file-api">Property File API</h3>
+<p>The platform also supports specification of a DAG via a property
+file. The aim here to make it easy for tools to create and run an
+application. This method of specification does not have the Java
+compiler support of compile time check, but since these applications
+would be created by software, they should be correct by construction.
+The syntax is derived from Hadoop properties and should be easy for
+folks who are used to creating software that integrated with
+Hadoop.</p>
+<p>Create an application (DAG): myApplication.properties</p>
+<pre><code># input operator that reads from a file
+dt.operator.inputOp.classname=com.acme.SampleInputOperator
+dt.operator.inputOp.fileName=somefile.txt
+
+# output operator that writes to the console
+dt.operator.outputOp.classname=com.acme.ConsoleOutputOperator
+
+# stream connecting both operators
+dt.stream.inputStream.source=inputOp.outputPort
+dt.stream.inputStream.sinks=outputOp.inputPort
+</code></pre>
+
+<p>Above snippet is intended to convey the basic idea of specifying
+the DAG without using Java. Operators would come from a predefined
+library and referenced in the specification by class name and port names
+(obtained from the library providers documentation or runtime
+introspection by tools). For those interested in details, see later
+sections and refer to the  Operation and
+Installation Guide mentioned above.</p>
+<h3 id="attributes">Attributes</h3>
+<p>Attributes impact the runtime behavior of the application. They do
+not impact the functionality. An example of an attribute is application
+name. Setting it changes the application name. Another example is
+streaming window size. Setting it changes the streaming window size from
+the default value to the specified value. Users cannot add new
+attributes, they can only choose from the ones that come packaged and
+pre-supported by the platform. Details of attributes are covered in the
+ Operation and Installation
+Guide.</p>
+<h2 id="operators">Operators</h2>
+<p>Operators are basic compute units.
+Operators process each incoming tuple and emit zero or more tuples on
+output ports as per the business logic. The data flow, connectivity,
+fault tolerance (node outage), etc. is taken care of by the platform. As
+an operator developer, all that is needed is to figure out what to do
+with the incoming tuple and when (and which output port) to send out a
+particular output tuple. Correctly designed operators will most likely
+get reused. Operator design needs care and foresight. For details, refer
+to the  <a href="../operator_development/">Operator Developer Guide</a>. As an application developer you need to connect operators
+in a way that it implements your business logic. You may also require
+operator customization for functionality and use attributes for
+performance/scalability etc.</p>
+<p>All operators process tuples asynchronously in a distributed
+cluster. An operator cannot assume or predict the exact time a tuple
+that it emitted will get consumed by a downstream operator. An operator
+also cannot predict the exact time when a tuple arrives from an upstream
+operator. The only guarantee is that the upstream operators are
+processing the current or a future window, i.e. the windowId of upstream
+operator is equals or exceeds its own windowId. Conversely the windowId
+of a downstream operator is less than or equals its own windowId. The
+end of a window operation, i.e. the API call to endWindow on an operator
+requires that all upstream operators have finished processing this
+window. This means that completion of processing a window propagates in
+a blocking fashion through an operator. Later sections provides more
+details on streams and data flow of tuples.</p>
+<p>Each operator has a unique name within the DAG as provided by the
+user. This is the name of the operator in the logical plan. The name of
+the operator in the physical plan is an integer assigned to it by STRAM.
+These integers are use the sequence from 1 to N, where N is total number
+of physically unique operators in the DAG.  Following the same rule,
+each partitioned instance of a logical operator has its own integer as
+an id. This id along with the Hadoop container name uniquely identifies
+the operator in the execution plan of the DAG. The logical names and the
+physical names are required for web service support. Operators can be
+accessed via both names. These same names are used while interacting
+with  dtcli to access an operator.
+Ideally these names should be self-descriptive. For example in Figure 1,
+the node named “Daily volume” has a physical identifier of 2.</p>
+<h3 id="operator-interface">Operator Interface</h3>
+<p>Operator interface in a DAG consists of ports, properties, and attributes.
+Operators interact with other components of the DAG via ports. Functional behavior of the operators
+can be customized via parameters. Run time performance and physical
+instantiation is controlled by attributes. Ports and parameters are
+fields (variables) of the Operator class/object, while attributes are
+meta information that is attached to the operator object via an
+AttributeMap. An operator must have at least one port. Properties are
+optional. Attributes are provided by the platform and always have a
+default value that enables normal functioning of operators.</p>
+<h4 id="ports">Ports</h4>
+<p>Ports are connection points by which an operator receives and
+emits tuples. These should be transient objects instantiated in the
+operator object, that implement particular interfaces. Ports should be
+transient as they contain no state. They have a pre-defined schema and
+can only be connected to other ports with the same schema. An input port
+needs to implement the interface  Operator.InputPort and
+interface Sink. A default
+implementation of these is provided by the abstract class DefaultInputPort. An output port needs to
+implement the interface  Operator.OutputPort. A default implementation
+of this is provided by the concrete class DefaultOutputPort. These two are a quick way to
+implement the above interfaces, but operator developers have the option
+of providing their own implementations.</p>
+<p>Here are examples of an input and an output port from the operator
+Sum.</p>
+<pre><code class="java">@InputPortFieldAnnotation(name = &quot;data&quot;)
+public final transient DefaultInputPort&lt;V&gt; data = new DefaultInputPort&lt;V&gt;() {
+  @Override
+  public void process(V tuple)
+  {
+    ...
+  }
+}
+@OutputPortFieldAnnotation(optional=true)
+public final transient DefaultOutputPort&lt;V&gt; sum = new DefaultOutputPort&lt;V&gt;(){ … };
+</code></pre>
+
+<p>The process call is in the Sink interface. An emit on an output
+port is done via emit(tuple) call. For the above example it would be
+sum.emit(t), where the type of t is the generic parameter V.</p>
+<p>There is no limit on how many ports an operator can have. However
+any operator must have at least one port. An operator with only one port
+is called an Input Adapter if it has no input port and an Output Adapter
+if it has no output port. These are special operators needed to get/read
+data from outside system/source into the application, or push/write data
+into an outside system/sink. These could be in Hadoop or outside of
+Hadoop. These two operators are in essence gateways for the streaming
+application to communicate with systems outside the application.</p>
+<p>Port connectivity can be validated during compile time by adding
+PortFieldAnnotations shown above. By default all ports have to be
+connected, to allow a port to go unconnected, you need to add
+“optional=true” to the annotation.</p>
+<p>Attributes can be specified for ports that affect the runtime
+behavior. An example of an attribute is parallel partition that specifes
+a parallel computation flow per partition. It is described in detail in
+the Parallel Partitions section. Another example is queue capacity that specifies the buffer size for the
+port. Details of attributes are covered in  Operation and Installation Guide.</p>
+<h4 id="properties">Properties</h4>
+<p>Properties are the abstractions by which functional behavior of an
+operator can be customized. They should be non-transient objects
+instantiated in the operator object. They need to be non-transient since
+they are part of the operator state and re-construction of the operator
+object from its checkpointed state must restore the operator to the
+desired state. Properties are optional, i.e. an operator may or may not
+have properties; they are part of user code and their values are not
+interpreted by the platform in any way.</p>
+<p>All non-serializable objects should be declared transient.
+Examples include sockets, session information, etc. These objects should
+be initialized during setup call, which is called every time the
+operator is initialized.</p>
+<h4 id="attributes_1">Attributes</h4>
+<p>Attributes are values assigned to the operators that impact
+run-time. This includes things like the number of partitions, at most
+once or at least once or exactly once recovery modes, etc. Attributes do
+not impact functionality of the operator. Users can change certain
+attributes in runtime. Users cannot add attributes to operators; they
+are pre-defined by the platform. They are interpreted by the platform
+and thus cannot be defined in user created code (like properties).
+Details of attributes are covered in  <a href="http://docs.datatorrent.com/configuration/">Configuration Guide</a>.</p>
+<h3 id="operator-state">Operator State</h3>
+<p>The state of an operator is defined as the data that it transfers
+from one window to a future window. Since the computing model of the
+platform is to treat windows like micro-batches, the operator state can
+be checkpointed every Nth window, or every T units of time, where T is significantly greater
+than the streaming window.  When an operator is checkpointed, the entire
+object is written to HDFS.  The larger the amount of state in an
+operator, the longer it takes to recover from a failure. A stateless
+operator can recover much quicker than a stateful one. The needed
+windows are preserved by the upstream buffer server and are used to
+recompute the lost windows, and also rebuild the buffer server in the
+current container.</p>
+<p>The distinction between Stateless and Stateful is based solely on
+the need to transfer data in the operator from one window to the next.
+The state of an operator is independent of the number of ports.</p>
+<h4 id="stateless">Stateless</h4>
+<p>A Stateless operator is defined as one where no data is needed to
+be kept at the end of every window. This means that all the computations
+of a window can be derived from all the tuples the operator receives
+within that window. This guarantees that the output of any window can be
+reconstructed by simply replaying the tuples that arrived in that
+window. Stateless operators are more efficient in terms of fault
+tolerance, and cost to achieve SLA.</p>
+<h4 id="stateful">Stateful</h4>
+<p>A Stateful operator is defined as one where data is needed to be
+stored at the end of a window for computations occurring in later
+window; a common example is the computation of a sum of values in the
+input tuples.</p>
+<h3 id="operator-api">Operator API</h3>
+<p>The Operator API consists of methods that operator developers may
+need to override. In this section we will discuss the Operator APIs from
+the point of view of an application developer. Knowledge of how an
+operator works internally is critical for writing an application. Those
+interested in the details should refer to  Malhar Operator Developer Guide.</p>
+<p>The APIs are available in three modes, namely Single Streaming
+Window, Sliding Application Window, and Aggregate Application Window.
+These are not mutually exclusive, i.e. an operator can use single
+streaming window as well as sliding application window. A physical
+instance of an operator is always processing tuples from a single
+window. The processing of tuples is guaranteed to be sequential, no
+matter which input port the tuples arrive on.</p>
+<p>In the later part of this section we will evaluate three common
+uses of streaming windows by applications. They have different
+characteristics and implications on optimization and recovery mechanisms
+(i.e. algorithm used to recover a node after outage) as discussed later
+in the section.</p>
+<h4 id="streaming-window">Streaming Window</h4>
+<p>Streaming window is atomic micro-batch computation period. The API
+methods relating to a streaming window are as follows</p>
+<pre><code class="java">public void process(&lt;tuple_type&gt; tuple) // Called on the input port on which the tuple arrives
+public void beginWindow(long windowId) // Called at the start of the window as soon as the first begin_window tuple arrives
+public void endWindow() // Called at the end of the window after end_window tuples arrive on all input ports
+public void setup(OperatorContext context) // Called once during initialization of the operator
+public void teardown() // Called once when the operator is being shutdown
+</code></pre>
+
+<p>A tuple can be emitted in any of the three streaming run-time
+calls, namely beginWindow, process, and endWindow but not in setup or
+teardown.</p>
+<h4 id="aggregate-application-window">Aggregate Application Window</h4>
+<p>An operator with an aggregate window is stateful within the
+application window timeframe and possibly stateless at the end of that
+application window. An size of an aggregate application window is an
+operator attribute and is defined as a multiple of the streaming window
+size. The platform recognizes this attribute and optimizes the operator.
+The beginWindow, and endWindow calls are not invoked for those streaming
+windows that do not align with the application window. For example in
+case of streaming window of 0.5 second and application window of 5
+minute, an application window spans 600 streaming windows (5*60*2 =
+600). At the start of the sequence of these 600 atomic streaming
+windows, a beginWindow gets invoked, and at the end of these 600
+streaming windows an endWindow gets invoked. All the intermediate
+streaming windows do not invoke beginWindow or endWindow. Bookkeeping,
+node recovery, stats, UI, etc. continue to work off streaming windows.
+For example if operators are being checkpointed say on an average every
+30th window, then the above application window would have about 20
+checkpoints.</p>
+<h4 id="sliding-application-window">Sliding Application Window</h4>
+<p>A sliding window is computations that requires previous N
+streaming windows. After each streaming window the Nth past window is
+dropped and the new window is added to the computation. An operator with
+sliding window is a stateful operator at end of any window. The sliding
+window period is an attribute and is a multiple of streaming window. The
+platform recognizes this attribute and leverages it during bookkeeping.
+A sliding aggregate window with tolerance to data loss does not have a
+very high bookkeeping cost. The cost of all three recovery mechanisms,
+ at most once (data loss tolerant),
+at least once (data loss
+intolerant), and exactly once (data
+loss intolerant and no extra computations) is same as recovery
+mechanisms based on streaming window. STRAM is not able to leverage this
+operator for any extra optimization.</p>
+<h3 id="single-vs-multi-input-operator">Single vs Multi-Input Operator</h3>
+<p>A single-input operator by definition has a single upstream
+operator, since there can only be one writing port for a stream.  If an
+operator has a single upstream operator, then the beginWindow on the
+upstream also blocks the beginWindow of the single-input operator. For
+an operator to start processing any window at least one upstream
+operator has to start processing that window. A multi-input operator
+reads from more than one upstream ports. Such an operator would start
+processing as soon as the first begin_window event arrives. However the
+window would not close (i.e. invoke endWindow) till all ports receive
+end_window events for that windowId. Thus the end of a window is a
+blocking event. As we saw earlier, a multi-input operator is also the
+point in the DAG where windows of all upstream operators are
+synchronized. The windows (atomic micro-batches) from a faster (or just
+ahead in processing) upstream operators are queued up till the slower
+upstream operator catches up. STRAM monitors such bottlenecks and takes
+corrective actions. The platform ensures minimal delay, i.e processing
+starts as long as at least one upstream operator has started
+processing.</p>
+<h3 id="recovery-mechanisms">Recovery Mechanisms</h3>
+<p>Application developers can set any of the recovery mechanisms
+below to deal with node outage. In general, the cost of recovery depends
+on the state of the operator, while data integrity is dependant on the
+application. The mechanisms are per window as the platform treats
+windows as atomic compute units. Three recovery mechanisms are
+supported, namely</p>
+<ul>
+<li>At-least-once: All atomic batches are processed at least once.
+    No data loss occurs.</li>
+<li>At-most-once: All atomic batches are processed at most once.
+    Data loss is possible; this is the most efficient setting.</li>
+<li>Exactly-once: All atomic batches are processed exactly once.
+    No data loss occurs; this is the least efficient setting since
+    additional work is needed to ensure proper semantics.</li>
+</ul>
+<p>At-least-once is the default. During a recovery event, the
+operator connects to the upstream buffer server and asks for windows to
+be replayed. At-least-once and exactly-once mechanisms start from its
+checkpointed state. At-most-once starts from the next begin-window
+event.</p>
+<p>Recovery mechanisms can be specified per Operator while writing
+the application as shown below.</p>
+<pre><code class="java">Operator o = dag.addOperator(“operator”, …);
+dag.setAttribute(o,  OperatorContext.PROCESSING_MODE,  ProcessingMode.AT_MOST_ONCE);
+</code></pre>
+
+<p>Also note that once an operator is attributed to AT_MOST_ONCE,
+all the operators downstream to it have to be AT_MOST_ONCE. The client
+will give appropriate warnings or errors if that’s not the case.</p>
+<p>Details are explained in the chapter on Fault Tolerance below.</p>
+<h2 id="streams">Streams</h2>
+<p>A stream is a connector
+(edge) abstraction, and is a fundamental building block of the platform.
+A stream consists of tuples that flow from one port (called the
+output port) to one or more ports
+on other operators (called  input ports) another -- so note a potentially
+confusing aspect of this terminology: tuples enter a stream through its
+output port and leave via one or more input ports. A stream has the
+following characteristics</p>
+<ul>
+<li>Tuples are always delivered in the same order in which they
+    were emitted.</li>
+<li>Consists of a sequence of windows one after another. Each
+    window being a collection of in-order tuples.</li>
+<li>A stream that connects two containers passes through a
+    buffer server.</li>
+<li>All streams can be persisted (by default in HDFS).</li>
+<li>Exactly one output port writes to the stream.</li>
+<li>Can be read by one or more input ports.</li>
+<li>Connects operators within an application, not outside
+    an application.</li>
+<li>Has an unique name within an application.</li>
+<li>Has attributes which act as hints to STRAM.</li>
+<li>
+<p>Streams have four modes, namely in-line, in-node, in-rack,
+    and other. Modes may be overruled (for example due to lack
+    of containers). They are defined as follows:</p>
+<ul>
+<li>THREAD_LOCAL: In the same thread, uses thread
+    stack (intra-thread). This mode can only be used for a downstream
+    operator which has only one input port connected; also called
+    in-line.</li>
+<li>CONTAINER_LOCAL: In the same container (intra-process); also
+    called in-container.</li>
+<li>NODE_LOCAL: In the same Hadoop node (inter processes, skips
+    NIC); also called in-node.</li>
+<li>RACK_LOCAL: On nodes in the same rack; also called
+    in-rack.</li>
+<li>unspecified: No guarantee. Could be anywhere within the
+    cluster</li>
+</ul>
+</li>
+</ul>
+<p>An example of a stream declaration is given below</p>
+<pre><code class="java">DAG dag = new DAG();
+ …
+dag.addStream(&quot;views&quot;, viewAggregate.sum, cost.data).setLocality(CONTAINER_LOCAL); // A container local  stream
+dag.addStream(“clicks”, clickAggregate.sum, rev.data); // An example of unspecified locality
+</code></pre>
+
+<p>The platform guarantees in-order delivery of tuples in a stream.
+STRAM views each stream as collection of ordered windows. Since no tuple
+can exist outside a window, a replay of a stream consists of replay of a
+set of windows. When multiple input ports read the same stream, the
+execution plan of a stream ensures that each input port is logically not
+blocked by the reading of another input port. The schema of a stream is
+same as the schema of the tuple.</p>
+<p>In a stream all tuples emitted by an operator in a window belong
+to that window. A replay of this window would consists of an in-order
+replay of all the tuples. Thus the tuple order within a stream is
+guaranteed. However since an operator may receive multiple streams (for
+example an operator with two input ports), the order of arrival of two
+tuples belonging to different streams is not guaranteed. In general in
+an asynchronous distributed architecture this is expected. Thus the
+operator (specially one with multiple input ports) should not depend on
+the tuple order from two streams. One way to cope with this
+indeterminate order, if necessary, is to wait to get all the tuples of a
+window and emit results in endWindow call. All operator templates
+provided as part of Malhar operator library follow these principles.</p>
+<p>A logical stream gets partitioned into physical streams each
+connecting the partition to the upstream operator. If two different
+attributes are needed on the same stream, it should be split using
+StreamDuplicator operator.</p>
+<p>Modes of the streams are critical for performance. An in-line
+stream is the most optimal as it simply delivers the tuple as-is without
+serialization-deserialization. Streams should be marked
+container_local, specially in case where there is a large tuple volume
+between two operators which then on drops significantly. Since the
+setLocality call merely provides a hint, STRAM may ignore it. An In-node
+stream is not as efficient as an in-line one, but it is clearly better
+than going off-node since it still avoids the potential bottleneck of
+the network card.</p>
+<p>THREAD_LOCAL and CONTAINER_LOCAL streams do not use a buffer
+server as this stream is in a single process. The other two do.</p>
+<h2 id="validating-an-application">Validating an Application</h2>
+<p>The platform provides various ways of validating the application
+specification and data input. An understanding of these checks is very
+important for an application developer since it affects productivity.
+Validation of an application is done in three phases, namely</p>
+<ol>
+<li>Compile Time: Caught during application development, and is
+    most cost effective. These checks are mainly done on declarative
+    objects and leverages the Java compiler. An example is checking that
+    the schemas specified on all ports of a stream are
+    mutually compatible.</li>
+<li>Initialization Time: When the application is being
+    initialized, before submitting to Hadoop. These checks are related
+    to configuration/context of an application, and are done by the
+    logical DAG builder implementation. An example is the checking that
+    all non-optional ports are connected to other ports.</li>
+<li>Run Time: Validations done when the application is running.
+    This is the costliest of all checks. These are checks that can only
+    be done at runtime as they involve data. For example divide by 0
+    check as part of business logic.</li>
+</ol>
+<h3 id="compile-time">Compile Time</h3>
+<p>Compile time validations apply when an application is specified in
+Java code and include all checks that can be done by Java compiler in
+the development environment (including IDEs like NetBeans or Eclipse).
+Examples include</p>
+<ol>
+<li>Schema Validation: The tuples on ports are POJO (plain old
+    java objects) and compiler checks to ensure that all the ports on a
+    stream have the same schema.</li>
+<li>Stream Check: Single Output Port and at least one Input port
+    per stream. A stream can only have one output port writer. This is
+    part of the addStream api. This
+    check ensures that developers only connect one output port to
+    a stream. The same signature also ensures that there is at least one
+    input port for a stream</li>
+<li>Naming: Compile time checks ensures that applications
+    components operators, streams are named</li>
+</ol>
+<h3 id="initializationinstantiation-time">Initialization/Instantiation Time</h3>
+<p>Initialization time validations include various checks that are
+done post compile, and before the application starts running in a
+cluster (or local mode). These are mainly configuration/contextual in
+nature. These checks are as critical to proper functionality of the
+application as the compile time validations.</p>
+<p>Examples include</p>
+<ul>
+<li>
+<p><a href="http://docs.oracle.com/javaee/6/tutorial/doc/gircz.html">JavaBeans Validation</a>:
+    Examples include</p>
+<ul>
+<li>@Max(): Value must be less than or equal to the number</li>
+<li>@Min(): Value must be greater than or equal to the
+    number</li>
+<li>@NotNull: The value of the field or property must not be
+    null</li>
+<li>@Pattern(regexp = “....”): Value must match the regular
+    expression</li>
+<li>Input port connectivity: By default, every non-optional input
+    port must be connected. A port can be declared optional by using an
+    annotation:     @InputPortFieldAnnotation(name = "...", optional
+    = true)</li>
+<li>Output Port Connectivity: Similar. The annotation here is:    
+    @OutputPortFieldAnnotation(name = "...", optional = true)</li>
+</ul>
+</li>
+<li>
+<p>Unique names in application scope: Operators, streams, must have
+    unique names.</p>
+</li>
+<li>Cycles in the dag: DAG cannot have a cycle.</li>
+<li>Unique names in operator scope: Ports, properties, annotations
+    must have unique names.</li>
+<li>One stream per port: A port can connect to only one stream.
+    This check applies to input as well as output ports even though an
+    output port can technically write to two streams. If you must have
+    two streams originating from a single output port, use  a streamDuplicator operator.</li>
+<li>Application Window Period: Has to be an integral multiple the
+    streaming window period.</li>
+</ul>
+<h3 id="run-time">Run Time</h3>
+<p>Run time checks are those that are done when the application is
+running. The real-time streaming platform provides rich run time error
+handling mechanisms. The checks are exclusively done by the application
+business logic, but the platform allows applications to count and audit
+these. Some of these features are in the process of development (backend
+and UI) and this section will be updated as they are developed. Upon
+completion examples will be added to demos to illustrate these.</p>
+<p>Error ports are output ports with error annotations. Since they
+are normal ports, they can be monitored and tuples counted, persisted
+and counts shown in the UI.</p>
+<hr />
+<h1 id="multi-tenancy-and-security">Multi-Tenancy and Security</h1>
+<p>Hadoop is a multi-tenant distributed operating system. Security is
+an intrinsic element of multi-tenancy as without it a cluster cannot be
+reasonably be shared among enterprise applications. Streaming
+applications follow all multi-tenancy security models used in Hadoop as
+they are native Hadoop 

<TRUNCATED>


Mime
View raw message