flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [57/78] [abbrv] flink-web git commit: Rebuild site
Date Wed, 18 Jan 2017 14:11:04 GMT
http://git-wip-us.apache.org/repos/asf/flink-web/blob/9ec0a879/content/news/2015/02/09/streaming-example.html
----------------------------------------------------------------------
diff --git a/content/news/2015/02/09/streaming-example.html b/content/news/2015/02/09/streaming-example.html
new file mode 100644
index 0000000..0b83ec4
--- /dev/null
+++ b/content/news/2015/02/09/streaming-example.html
@@ -0,0 +1,837 @@
+<!DOCTYPE html>
+<html lang="en">
+  <head>
+    <meta charset="utf-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+    <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
+    <title>Apache Flink: Introducing Flink Streaming</title>
+    <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
+    <link rel="icon" href="/favicon.ico" type="image/x-icon">
+
+    <!-- Bootstrap -->
+    <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
+    <link rel="stylesheet" href="/css/flink.css">
+    <link rel="stylesheet" href="/css/syntax.css">
+
+    <!-- Blog RSS feed -->
+    <link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Apache Flink Blog: RSS feed" />
+
+    <!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
+    <!-- We need to load Jquery in the header for custom google analytics event tracking-->
+    <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.11.2/jquery.min.js"></script>
+
+    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
+    <!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
+    <!--[if lt IE 9]>
+      <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
+      <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
+    <![endif]-->
+  </head>
+  <body>  
+    
+
+    <!-- Main content. -->
+    <div class="container">
+    <div class="row">
+
+      
+     <div id="sidebar" class="col-sm-3">
+          <!-- Top navbar. -->
+    <nav class="navbar navbar-default">
+        <!-- The logo. -->
+        <div class="navbar-header">
+          <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
+            <span class="icon-bar"></span>
+            <span class="icon-bar"></span>
+            <span class="icon-bar"></span>
+          </button>
+          <div class="navbar-logo">
+            <a href="/">
+              <img alt="Apache Flink" src="/img/navbar-brand-logo.png" width="147px" height="73px">
+            </a>
+          </div>
+        </div><!-- /.navbar-header -->
+
+        <!-- The navigation links. -->
+        <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
+          <ul class="nav navbar-nav navbar-main">
+
+            <!-- Downloads -->
+            <li class=""><a class="btn btn-info" href="/downloads.html">Download Flink</a></li>
+
+            <!-- Overview -->
+            <li><a href="/index.html">Home</a></li>
+
+            <!-- Intro -->
+            <li><a href="/introduction.html">Introduction to Flink</a></li>
+
+            <!-- Use cases -->
+            <li><a href="/usecases.html">Flink Use Cases</a></li>
+
+            <!-- Powered by -->
+            <li><a href="/poweredby.html">Powered by Flink</a></li>
+
+            <!-- Ecosystem -->
+            <li><a href="/ecosystem.html">Ecosystem</a></li>
+
+            <!-- Community -->
+            <li><a href="/community.html">Community &amp; Project Info</a></li>
+
+            <!-- Contribute -->
+            <li><a href="/how-to-contribute.html">How to Contribute</a></li>
+
+            <!-- Blog -->
+            <li class=" active hidden-md hidden-sm"><a href="/blog/"><b>Flink Blog</b></a></li>
+
+            <hr />
+
+
+
+            <!-- Documentation -->
+            <!-- <li>
+              <a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1" target="_blank">Documentation <small><span class="glyphicon glyphicon-new-window"></span></small></a>
+            </li> -->
+            <li class="dropdown">
+              <a class="dropdown-toggle" data-toggle="dropdown" href="#">Documentation
+                <span class="caret"></span></a>
+                <ul class="dropdown-menu">
+                  <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1" target="_blank">1.1 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
+                  <li><a href="http://ci.apache.org/projects/flink/flink-docs-release-1.2" target="_blank">1.2 (Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
+                </ul>
+              </li>
+
+            <!-- Quickstart -->
+            <li>
+              <a href="http://ci.apache.org/projects/flink/flink-docs-release-1.1/quickstart/setup_quickstart.html" target="_blank">Quickstart <small><span class="glyphicon glyphicon-new-window"></span></small></a>
+            </li>
+
+            <!-- GitHub -->
+            <li>
+              <a href="https://github.com/apache/flink" target="_blank">Flink on GitHub <small><span class="glyphicon glyphicon-new-window"></span></small></a>
+            </li>
+
+
+
+
+
+
+          </ul>
+
+
+
+          <ul class="nav navbar-nav navbar-bottom">
+          <hr />
+
+            <!-- FAQ -->
+            <li ><a href="/faq.html">Project FAQ</a></li>
+
+            <!-- Twitter -->
+            <li><a href="https://twitter.com/apacheflink" target="_blank">@ApacheFlink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
+
+            <!-- Visualizer -->
+            <li class=" hidden-md hidden-sm"><a href="/visualizer/" target="_blank">Plan Visualizer <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
+
+          </ul>
+        </div><!-- /.navbar-collapse -->
+    </nav>
+
+      </div>
+      <div class="col-sm-9">
+      <div class="row-fluid">
+  <div class="col-sm-12">
+    <div class="row">
+      <h1>Introducing Flink Streaming</h1>
+
+      <article>
+        <p>09 Feb 2015</p>
+
+<p>This post is the first of a series of blog posts on Flink Streaming,
+the recent addition to Apache Flink that makes it possible to analyze
+continuous data sources in addition to static files. Flink Streaming
+uses the pipelined Flink engine to process data streams in real time
+and offers a new API including definition of flexible windows.</p>
+
+<p>In this post, we go through an example that uses the Flink Streaming
+API to compute statistics on stock market data that arrive
+continuously and combine the stock market data with Twitter streams.
+See the <a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html">Streaming Programming
+Guide</a> for a
+detailed presentation of the Streaming API.</p>
+
+<p>First, we read a bunch of stock price streams and combine them into
+one stream of market data. We apply several transformations on this
+market data stream, like rolling aggregations per stock. Then we emit
+price warning alerts when the prices are rapidly changing. Moving 
+towards more advanced features, we compute rolling correlations
+between the market data streams and a Twitter stream with stock mentions.</p>
+
+<p>For running the example implementation please use the <em>0.9-SNAPSHOT</em> 
+version of Flink as a dependency. The full example code base can be 
+found <a href="https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala">here</a> in Scala and <a href="https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java">here</a> in Java7.</p>
+
+<p><a href="#top"></a></p>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="reading-from-multiple-inputs">Reading from multiple inputs</h2>
+
+<p>First, let us create the stream of stock prices:</p>
+
+<ol>
+  <li>Read a socket stream of stock prices</li>
+  <li>Parse the text in the stream to create a stream of <code>StockPrice</code> objects</li>
+  <li>Add four other sources tagged with the stock symbol.</li>
+  <li>Finally, merge the streams to create a unified stream.</li>
+</ol>
+
+<p><img alt="Reading from multiple inputs" src="/img/blog/blog_multi_input.png" width="70%" class="img-responsive center-block" /></p>
+
+<div class="codetabs">
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">def</span> <span class="n">main</span><span class="o">(</span><span class="n">args</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="o">{</span>
+
+  <span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">StreamExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
+
+  <span class="c1">//Read from a socket stream at map it to StockPrice objects</span>
+  <span class="k">val</span> <span class="n">socketStockStream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">socketTextStream</span><span class="o">(</span><span class="s">&quot;localhost&quot;</span><span class="o">,</span> <span class="mi">9999</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=&gt;</span> <span class="o">{</span>
+    <span class="k">val</span> <span class="n">split</span> <span class="k">=</span> <span class="n">x</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot;,&quot;</span><span class="o">)</span>
+    <span class="nc">StockPrice</span><span class="o">(</span><span class="n">split</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">split</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">toDouble</span><span class="o">)</span>
+  <span class="o">})</span>
+
+  <span class="c1">//Generate other stock streams</span>
+  <span class="k">val</span> <span class="nc">SPX_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">&quot;SPX&quot;</span><span class="o">)(</span><span class="mi">10</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span>
+  <span class="k">val</span> <span class="nc">FTSE_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">&quot;FTSE&quot;</span><span class="o">)(</span><span class="mi">20</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span>
+  <span class="k">val</span> <span class="nc">DJI_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">&quot;DJI&quot;</span><span class="o">)(</span><span class="mi">30</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span>
+  <span class="k">val</span> <span class="nc">BUX_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">&quot;BUX&quot;</span><span class="o">)(</span><span class="mi">40</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span>
+
+  <span class="c1">//Merge all stock streams together</span>
+  <span class="k">val</span> <span class="n">stockStream</span> <span class="k">=</span> <span class="n">socketStockStream</span><span class="o">.</span><span class="n">merge</span><span class="o">(</span><span class="nc">SPX_Stream</span><span class="o">,</span> <span class="nc">FTSE_Stream</span><span class="o">,</span> 
+    <span class="nc">DJI_Stream</span><span class="o">,</span> <span class="nc">BUX_Stream</span><span class="o">)</span>
+
+  <span class="n">stockStream</span><span class="o">.</span><span class="n">print</span><span class="o">()</span>
+
+  <span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="o">(</span><span class="s">&quot;Stock stream&quot;</span><span class="o">)</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+  <div data-lang="java7">
+
+    <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="n">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+
+    <span class="kd">final</span> <span class="n">StreamExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span>
+        <span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
+
+    <span class="c1">//Read from a socket stream at map it to StockPrice objects</span>
+    <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">socketStockStream</span> <span class="o">=</span> <span class="n">env</span>
+            <span class="o">.</span><span class="na">socketTextStream</span><span class="o">(</span><span class="s">&quot;localhost&quot;</span><span class="o">,</span> <span class="mi">9999</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">StockPrice</span><span class="o">&gt;()</span> <span class="o">{</span>
+                <span class="kd">private</span> <span class="n">String</span><span class="o">[]</span> <span class="n">tokens</span><span class="o">;</span>
+
+                <span class="nd">@Override</span>
+                <span class="kd">public</span> <span class="n">StockPrice</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+                    <span class="n">tokens</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;,&quot;</span><span class="o">);</span>
+                    <span class="k">return</span> <span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">tokens</span><span class="o">[</span><span class="mi">0</span><span class="o">],</span>
+                        <span class="n">Double</span><span class="o">.</span><span class="na">parseDouble</span><span class="o">(</span><span class="n">tokens</span><span class="o">[</span><span class="mi">1</span><span class="o">]));</span>
+                <span class="o">}</span>
+            <span class="o">});</span>
+
+    <span class="c1">//Generate other stock streams</span>
+    <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">SPX_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">&quot;SPX&quot;</span><span class="o">,</span> <span class="mi">10</span><span class="o">));</span>
+    <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">FTSE_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">&quot;FTSE&quot;</span><span class="o">,</span> <span class="mi">20</span><span class="o">));</span>
+    <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">DJI_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">&quot;DJI&quot;</span><span class="o">,</span> <span class="mi">30</span><span class="o">));</span>
+    <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">BUX_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockSource</span><span class="o">(</span><span class="s">&quot;BUX&quot;</span><span class="o">,</span> <span class="mi">40</span><span class="o">));</span>
+
+    <span class="c1">//Merge all stock streams together</span>
+    <span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">stockStream</span> <span class="o">=</span> <span class="n">socketStockStream</span>
+        <span class="o">.</span><span class="na">merge</span><span class="o">(</span><span class="n">SPX_stream</span><span class="o">,</span> <span class="n">FTSE_stream</span><span class="o">,</span> <span class="n">DJI_stream</span><span class="o">,</span> <span class="n">BUX_stream</span><span class="o">);</span>
+
+    <span class="n">stockStream</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
+
+    <span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;Stock stream&quot;</span><span class="o">);</span></code></pre></div>
+
+  </div>
+</div>
+
+<p>See
+<a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#data-sources">here</a>
+on how you can create streaming sources for Flink Streaming
+programs. Flink, of course, has support for reading in streams from
+<a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html">external
+sources</a>
+such as Apache Kafka, Apache Flume, RabbitMQ, and others. For the sake
+of this example, the data streams are simply generated using the
+<code>generateStock</code> method:</p>
+
+<div class="codetabs">
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="n">symbols</span> <span class="k">=</span> <span class="nc">List</span><span class="o">(</span><span class="s">&quot;SPX&quot;</span><span class="o">,</span> <span class="s">&quot;FTSE&quot;</span><span class="o">,</span> <span class="s">&quot;DJI&quot;</span><span class="o">,</span> <span class="s">&quot;DJT&quot;</span><span class="o">,</span> <span class="s">&quot;BUX&quot;</span><span class="o">,</span> <span class="s">&quot;DAX&quot;</span><span class="o">,</span> <span class="s">&quot;GOOG&quot;</span><span class="o">)</span>
+
+<span class="k">case</span> <span class="k">class</span> <span class="nc">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">price</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span>
+
+<span class="k">def</span> <span class="n">generateStock</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">)(</span><span class="n">sigma</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)(</span><span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
+  <span class="k">var</span> <span class="n">price</span> <span class="k">=</span> <span class="mf">1000.</span>
+  <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span>
+    <span class="n">price</span> <span class="k">=</span> <span class="n">price</span> <span class="o">+</span> <span class="nc">Random</span><span class="o">.</span><span class="n">nextGaussian</span> <span class="o">*</span> <span class="n">sigma</span>
+    <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="nc">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">price</span><span class="o">))</span>
+    <span class="nc">Thread</span><span class="o">.</span><span class="n">sleep</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="mi">200</span><span class="o">))</span>
+  <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+  <div data-lang="java7">
+
+    <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">ArrayList</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">SYMBOLS</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span>
+    <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">&quot;SPX&quot;</span><span class="o">,</span> <span class="s">&quot;FTSE&quot;</span><span class="o">,</span> <span class="s">&quot;DJI&quot;</span><span class="o">,</span> <span class="s">&quot;DJT&quot;</span><span class="o">,</span> <span class="s">&quot;BUX&quot;</span><span class="o">,</span> <span class="s">&quot;DAX&quot;</span><span class="o">,</span> <span class="s">&quot;GOOG&quot;</span><span class="o">));</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">StockPrice</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span>
+
+    <span class="kd">public</span> <span class="n">String</span> <span class="n">symbol</span><span class="o">;</span>
+    <span class="kd">public</span> <span class="n">Double</span> <span class="n">price</span><span class="o">;</span>
+
+    <span class="kd">public</span> <span class="nf">StockPrice</span><span class="o">()</span> <span class="o">{</span>
+    <span class="o">}</span>
+
+    <span class="kd">public</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">String</span> <span class="n">symbol</span><span class="o">,</span> <span class="n">Double</span> <span class="n">price</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">this</span><span class="o">.</span><span class="na">symbol</span> <span class="o">=</span> <span class="n">symbol</span><span class="o">;</span>
+        <span class="k">this</span><span class="o">.</span><span class="na">price</span> <span class="o">=</span> <span class="n">price</span><span class="o">;</span>
+    <span class="o">}</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">String</span> <span class="nf">toString</span><span class="o">()</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="s">&quot;StockPrice{&quot;</span> <span class="o">+</span>
+                <span class="s">&quot;symbol=&#39;&quot;</span> <span class="o">+</span> <span class="n">symbol</span> <span class="o">+</span> <span class="sc">&#39;\&#39;&#39;</span> <span class="o">+</span>
+                <span class="s">&quot;, count=&quot;</span> <span class="o">+</span> <span class="n">price</span> <span class="o">+</span>
+                <span class="sc">&#39;}&#39;</span><span class="o">;</span>
+    <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">final</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">StockSource</span> <span class="kd">implements</span> <span class="n">SourceFunction</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="o">{</span>
+
+    <span class="kd">private</span> <span class="n">Double</span> <span class="n">price</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">String</span> <span class="n">symbol</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">Integer</span> <span class="n">sigma</span><span class="o">;</span>
+
+    <span class="kd">public</span> <span class="nf">StockSource</span><span class="o">(</span><span class="n">String</span> <span class="n">symbol</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">sigma</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">this</span><span class="o">.</span><span class="na">symbol</span> <span class="o">=</span> <span class="n">symbol</span><span class="o">;</span>
+        <span class="k">this</span><span class="o">.</span><span class="na">sigma</span> <span class="o">=</span> <span class="n">sigma</span><span class="o">;</span>
+    <span class="o">}</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">invoke</span><span class="o">(</span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">collector</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+        <span class="n">price</span> <span class="o">=</span> <span class="n">DEFAULT_PRICE</span><span class="o">;</span>
+        <span class="n">Random</span> <span class="n">random</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Random</span><span class="o">();</span>
+
+        <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">price</span> <span class="o">=</span> <span class="n">price</span> <span class="o">+</span> <span class="n">random</span><span class="o">.</span><span class="na">nextGaussian</span><span class="o">()</span> <span class="o">*</span> <span class="n">sigma</span><span class="o">;</span>
+            <span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">price</span><span class="o">));</span>
+            <span class="n">Thread</span><span class="o">.</span><span class="na">sleep</span><span class="o">(</span><span class="n">random</span><span class="o">.</span><span class="na">nextInt</span><span class="o">(</span><span class="mi">200</span><span class="o">));</span>
+        <span class="o">}</span>
+    <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+</div>
+
+<p>To read from the text socket stream please make sure that you have a
+socket running. For the sake of the example executing the following
+command in a terminal does the job. You can get
+<a href="http://netcat.sourceforge.net/">netcat</a> here if it is not available
+on your machine.</p>
+
+<div class="highlight"><pre><code>nc -lk 9999
+</code></pre></div>
+
+<p>If we execute the program from our IDE we see the system the
+stock prices being generated:</p>
+
+<div class="highlight"><pre><code>INFO    Job execution switched to status RUNNING.
+INFO    Socket Stream(1/1) switched to SCHEDULED 
+INFO    Socket Stream(1/1) switched to DEPLOYING
+INFO    Custom Source(1/1) switched to SCHEDULED 
+INFO    Custom Source(1/1) switched to DEPLOYING
+…
+1&gt; StockPrice{symbol='SPX', count=1011.3405732645239}
+2&gt; StockPrice{symbol='SPX', count=1018.3381290039248}
+1&gt; StockPrice{symbol='DJI', count=1036.7454894073978}
+3&gt; StockPrice{symbol='DJI', count=1135.1170217478427}
+3&gt; StockPrice{symbol='BUX', count=1053.667523187687}
+4&gt; StockPrice{symbol='BUX', count=1036.552601487263}
+</code></pre></div>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="window-aggregations">Window aggregations</h2>
+
+<p>We first compute aggregations on time-based windows of the
+data. Flink provides <a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html">flexible windowing semantics</a> where windows can
+also be defined based on count of records or any custom user defined
+logic.</p>
+
+<p>We partition our stream into windows of 10 seconds and slide the
+window every 5 seconds. We compute three statistics every 5 seconds.
+The first is the minimum price of all stocks, the second produces
+maximum price per stock, and the third is the mean stock price 
+(using a map window function). Aggregations and groupings can be
+performed on named fields of POJOs, making the code more readable.</p>
+
+<p><img alt="Basic windowing aggregations" src="/img/blog/blog_basic_window.png" width="70%" class="img-responsive center-block" /></p>
+
+<div class="codetabs">
+
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">//Define the desired time window</span>
+<span class="k">val</span> <span class="n">windowedStream</span> <span class="k">=</span> <span class="n">stockStream</span>
+  <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">)).</span><span class="n">every</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">5</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span>
+
+<span class="c1">//Compute some simple statistics on a rolling window</span>
+<span class="k">val</span> <span class="n">lowest</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">minBy</span><span class="o">(</span><span class="s">&quot;price&quot;</span><span class="o">)</span>
+<span class="k">val</span> <span class="n">maxByStock</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">).</span><span class="n">maxBy</span><span class="o">(</span><span class="s">&quot;price&quot;</span><span class="o">)</span>
+<span class="k">val</span> <span class="n">rollingMean</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">).</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">mean</span> <span class="k">_</span><span class="o">)</span>
+
+<span class="c1">//Compute the mean of a window</span>
+<span class="k">def</span> <span class="n">mean</span><span class="o">(</span><span class="n">ts</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
+  <span class="k">if</span> <span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="o">{</span>
+    <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="nc">StockPrice</span><span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">head</span><span class="o">.</span><span class="n">symbol</span><span class="o">,</span> <span class="n">ts</span><span class="o">.</span><span class="n">foldLeft</span><span class="o">(</span><span class="mi">0</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)(</span><span class="k">_</span> <span class="o">+</span> <span class="k">_</span><span class="o">.</span><span class="n">price</span><span class="o">)</span> <span class="o">/</span> <span class="n">ts</span><span class="o">.</span><span class="n">size</span><span class="o">))</span>
+  <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+
+  <div data-lang="java7">
+
+    <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">//Define the desired time window</span>
+<span class="n">WindowedDataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">windowedStream</span> <span class="o">=</span> <span class="n">stockStream</span>
+    <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">))</span>
+    <span class="o">.</span><span class="na">every</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">5</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">));</span>
+
+<span class="c1">//Compute some simple statistics on a rolling window</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">lowest</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">minBy</span><span class="o">(</span><span class="s">&quot;price&quot;</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">maxByStock</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">maxBy</span><span class="o">(</span><span class="s">&quot;price&quot;</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">rollingMean</span> <span class="o">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">WindowMean</span><span class="o">()).</span><span class="na">flatten</span><span class="o">();</span>
+
+<span class="c1">//Compute the mean of a window</span>
+<span class="kd">public</span> <span class="kd">final</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">WindowMean</span> <span class="kd">implements</span> 
+    <span class="n">WindowMapFunction</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">,</span> <span class="n">StockPrice</span><span class="o">&gt;</span> <span class="o">{</span>
+
+    <span class="kd">private</span> <span class="n">Double</span> <span class="n">sum</span> <span class="o">=</span> <span class="mf">0.0</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">Integer</span> <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">String</span> <span class="n">symbol</span> <span class="o">=</span> <span class="s">&quot;&quot;</span><span class="o">;</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapWindow</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> 
+        <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+
+        <span class="k">if</span> <span class="o">(</span><span class="n">values</span><span class="o">.</span><span class="na">iterator</span><span class="o">().</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span><span class="n">s</span>
+            <span class="nf">for</span> <span class="o">(</span><span class="n">StockPrice</span> <span class="n">sp</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span>
+                <span class="n">sum</span> <span class="o">+=</span> <span class="n">sp</span><span class="o">.</span><span class="na">price</span><span class="o">;</span>
+                <span class="n">symbol</span> <span class="o">=</span> <span class="n">sp</span><span class="o">.</span><span class="na">symbol</span><span class="o">;</span>
+                <span class="n">count</span><span class="o">++;</span>
+            <span class="o">}</span>
+            <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">sum</span> <span class="o">/</span> <span class="n">count</span><span class="o">));</span>
+        <span class="o">}</span>
+    <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+
+</div>
+
+<p>Let us note that to print a windowed stream one has to flatten it first,
+thus getting rid of the windowing logic. For example execute 
+<code>maxByStock.flatten().print()</code> to print the stream of maximum prices of
+ the time windows by stock. For Scala <code>flatten()</code> is called implicitly
+when needed.</p>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="data-driven-windows">Data-driven windows</h2>
+
+<p>The most interesting event in the stream is when the price of a stock
+is changing rapidly. We can send a warning when a stock price changes
+more than 5% since the last warning. To do that, we use a delta-based window providing a
+threshold on when the computation will be triggered, a function to
+compute the difference and a default value with which the first record
+is compared. We also create a <code>Count</code> data type to count the warnings
+every 30 seconds.</p>
+
+<p><img alt="Data-driven windowing semantics" src="/img/blog/blog_data_driven.png" width="100%" class="img-responsive center-block" /></p>
+
+<div class="codetabs">
+
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">case</span> <span class="k">class</span> <span class="nc">Count</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span>
+<span class="k">val</span> <span class="n">defaultPrice</span> <span class="k">=</span> <span class="nc">StockPrice</span><span class="o">(</span><span class="s">&quot;&quot;</span><span class="o">,</span> <span class="mi">1000</span><span class="o">)</span>
+
+<span class="c1">//Use delta policy to create price change warnings</span>
+<span class="k">val</span> <span class="n">priceWarnings</span> <span class="k">=</span> <span class="n">stockStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
+  <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Delta</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mf">0.05</span><span class="o">,</span> <span class="n">priceChange</span><span class="o">,</span> <span class="n">defaultPrice</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">sendWarning</span> <span class="k">_</span><span class="o">)</span>
+
+<span class="c1">//Count the number of warnings every half a minute</span>
+<span class="k">val</span> <span class="n">warningsPerStock</span> <span class="k">=</span> <span class="n">priceWarnings</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="nc">Count</span><span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
+  <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="s">&quot;count&quot;</span><span class="o">)</span>
+
+<span class="k">def</span> <span class="n">priceChange</span><span class="o">(</span><span class="n">p1</span><span class="k">:</span> <span class="kt">StockPrice</span><span class="o">,</span> <span class="n">p2</span><span class="k">:</span> <span class="kt">StockPrice</span><span class="o">)</span><span class="k">:</span> <span class="kt">Double</span> <span class="o">=</span> <span class="o">{</span>
+  <span class="nc">Math</span><span class="o">.</span><span class="n">abs</span><span class="o">(</span><span class="n">p1</span><span class="o">.</span><span class="n">price</span> <span class="o">/</span> <span class="n">p2</span><span class="o">.</span><span class="n">price</span> <span class="o">-</span> <span class="mi">1</span><span class="o">)</span>
+<span class="o">}</span>
+
+<span class="k">def</span> <span class="n">sendWarning</span><span class="o">(</span><span class="n">ts</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
+  <span class="k">if</span> <span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">head</span><span class="o">.</span><span class="n">symbol</span><span class="o">)</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+
+  <div data-lang="java7">
+
+    <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">Double</span> <span class="n">DEFAULT_PRICE</span> <span class="o">=</span> <span class="mi">1000</span><span class="o">.;</span>
+<span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">StockPrice</span> <span class="n">DEFAULT_STOCK_PRICE</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">StockPrice</span><span class="o">(</span><span class="s">&quot;&quot;</span><span class="o">,</span> <span class="n">DEFAULT_PRICE</span><span class="o">);</span>
+
+<span class="c1">//Use delta policy to create price change warnings</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">priceWarnings</span> <span class="o">=</span> <span class="n">stockStream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Delta</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mf">0.05</span><span class="o">,</span> <span class="k">new</span> <span class="n">DeltaFunction</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;()</span> <span class="o">{</span>
+        <span class="nd">@Override</span>
+        <span class="kd">public</span> <span class="kt">double</span> <span class="nf">getDelta</span><span class="o">(</span><span class="n">StockPrice</span> <span class="n">oldDataPoint</span><span class="o">,</span> <span class="n">StockPrice</span> <span class="n">newDataPoint</span><span class="o">)</span> <span class="o">{</span>
+            <span class="k">return</span> <span class="n">Math</span><span class="o">.</span><span class="na">abs</span><span class="o">(</span><span class="n">oldDataPoint</span><span class="o">.</span><span class="na">price</span> <span class="o">-</span> <span class="n">newDataPoint</span><span class="o">.</span><span class="na">price</span><span class="o">);</span>
+        <span class="o">}</span>
+    <span class="o">},</span> <span class="n">DEFAULT_STOCK_PRICE</span><span class="o">))</span>
+<span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">SendWarning</span><span class="o">()).</span><span class="na">flatten</span><span class="o">();</span>
+
+<span class="c1">//Count the number of warnings every half a minute</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Count</span><span class="o">&gt;</span> <span class="n">warningsPerStock</span> <span class="o">=</span> <span class="n">priceWarnings</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Count</span><span class="o">&gt;()</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">Count</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="k">new</span> <span class="nf">Count</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span>
+    <span class="o">}</span>
+<span class="o">}).</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">).</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)).</span><span class="na">sum</span><span class="o">(</span><span class="s">&quot;count&quot;</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">Count</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span>
+    <span class="kd">public</span> <span class="n">String</span> <span class="n">symbol</span><span class="o">;</span>
+    <span class="kd">public</span> <span class="n">Integer</span> <span class="n">count</span><span class="o">;</span>
+
+    <span class="kd">public</span> <span class="nf">Count</span><span class="o">()</span> <span class="o">{</span>
+    <span class="o">}</span>
+
+    <span class="kd">public</span> <span class="nf">Count</span><span class="o">(</span><span class="n">String</span> <span class="n">symbol</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">count</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">this</span><span class="o">.</span><span class="na">symbol</span> <span class="o">=</span> <span class="n">symbol</span><span class="o">;</span>
+        <span class="k">this</span><span class="o">.</span><span class="na">count</span> <span class="o">=</span> <span class="n">count</span><span class="o">;</span>
+    <span class="o">}</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">String</span> <span class="nf">toString</span><span class="o">()</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="s">&quot;Count{&quot;</span> <span class="o">+</span>
+                <span class="s">&quot;symbol=&#39;&quot;</span> <span class="o">+</span> <span class="n">symbol</span> <span class="o">+</span> <span class="sc">&#39;\&#39;&#39;</span> <span class="o">+</span>
+                <span class="s">&quot;, count=&quot;</span> <span class="o">+</span> <span class="n">count</span> <span class="o">+</span>
+                <span class="sc">&#39;}&#39;</span><span class="o">;</span>
+    <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">SendWarning</span> <span class="kd">implements</span> <span class="n">MapWindowFunction</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapWindow</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">StockPrice</span><span class="o">&gt;</span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> 
+        <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+
+        <span class="k">if</span> <span class="o">(</span><span class="n">values</span><span class="o">.</span><span class="na">iterator</span><span class="o">().</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
+            <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">values</span><span class="o">.</span><span class="na">iterator</span><span class="o">().</span><span class="na">next</span><span class="o">().</span><span class="na">symbol</span><span class="o">);</span>
+        <span class="o">}</span>
+    <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+
+</div>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="combining-with-a-twitter-stream">Combining with a Twitter stream</h2>
+
+<p>Next, we will read a Twitter stream and correlate it with our stock
+price stream. Flink has support for connecting to <a href="https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/twitter.html">Twitter’s
+API</a>
+but for the sake of this example we generate dummy tweet data.</p>
+
+<p><img alt="Social media analytics" src="/img/blog/blog_social_media.png" width="100%" class="img-responsive center-block" /></p>
+
+<div class="codetabs">
+
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">//Read a stream of tweets</span>
+<span class="k">val</span> <span class="n">tweetStream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateTweets</span> <span class="k">_</span><span class="o">)</span>
+
+<span class="c1">//Extract the stock symbols</span>
+<span class="k">val</span> <span class="n">mentionedSymbols</span> <span class="k">=</span> <span class="n">tweetStream</span><span class="o">.</span><span class="n">flatMap</span><span class="o">(</span><span class="n">tweet</span> <span class="k">=&gt;</span> <span class="n">tweet</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">toUpperCase</span><span class="o">())</span>
+  <span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="n">symbols</span><span class="o">.</span><span class="n">contains</span><span class="o">(</span><span class="k">_</span><span class="o">))</span>
+
+<span class="c1">//Count the extracted symbols</span>
+<span class="k">val</span> <span class="n">tweetsPerStock</span> <span class="k">=</span> <span class="n">mentionedSymbols</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="nc">Count</span><span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
+  <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="s">&quot;count&quot;</span><span class="o">)</span>
+
+<span class="k">def</span> <span class="n">generateTweets</span><span class="o">(</span><span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
+  <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">val</span> <span class="n">s</span> <span class="k">=</span> <span class="k">for</span> <span class="o">(</span><span class="n">i</span> <span class="k">&lt;-</span> <span class="mi">1</span> <span class="n">to</span> <span class="mi">3</span><span class="o">)</span> <span class="k">yield</span> <span class="o">(</span><span class="n">symbols</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="n">symbols</span><span class="o">.</span><span class="n">size</span><span class="o">)))</span>
+    <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">s</span><span class="o">.</span><span class="n">mkString</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">))</span>
+    <span class="nc">Thread</span><span class="o">.</span><span class="n">sleep</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="mi">500</span><span class="o">))</span>
+  <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+
+  <div data-lang="java7">
+
+    <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">//Read a stream of tweets</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">tweetStream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="k">new</span> <span class="nf">TweetSource</span><span class="o">());</span>
+
+<span class="c1">//Extract the stock symbols</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">mentionedSymbols</span> <span class="o">=</span> <span class="n">tweetStream</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+        <span class="n">String</span><span class="o">[]</span> <span class="n">words</span> <span class="o">=</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">);</span>
+        <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">words</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">word</span><span class="o">.</span><span class="na">toUpperCase</span><span class="o">());</span>
+        <span class="o">}</span>
+    <span class="o">}</span>
+<span class="o">}).</span><span class="na">filter</span><span class="o">(</span><span class="k">new</span> <span class="n">FilterFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">filter</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">SYMBOLS</span><span class="o">.</span><span class="na">contains</span><span class="o">(</span><span class="n">value</span><span class="o">);</span>
+    <span class="o">}</span>
+<span class="o">});</span>
+
+<span class="c1">//Count the extracted symbols</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Count</span><span class="o">&gt;</span> <span class="n">tweetsPerStock</span> <span class="o">=</span> <span class="n">mentionedSymbols</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">new</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Count</span><span class="o">&gt;()</span> <span class="o">{</span>
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="n">Count</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="k">new</span> <span class="nf">Count</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span>
+    <span class="o">}</span>
+<span class="o">}).</span><span class="na">groupBy</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">).</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)).</span><span class="na">sum</span><span class="o">(</span><span class="s">&quot;count&quot;</span><span class="o">).</span><span class="na">flatten</span><span class="o">();</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">TweetSource</span> <span class="kd">implements</span> <span class="n">SourceFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
+    <span class="n">Random</span> <span class="n">random</span><span class="o">;</span>
+    <span class="n">StringBuilder</span> <span class="n">stringBuilder</span><span class="o">;</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">invoke</span><span class="o">(</span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">collector</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+        <span class="n">random</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Random</span><span class="o">();</span>
+        <span class="n">stringBuilder</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">StringBuilder</span><span class="o">();</span>
+
+        <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">stringBuilder</span><span class="o">.</span><span class="na">setLength</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span>
+            <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o">&lt;</span> <span class="mi">3</span><span class="o">;</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span>
+                <span class="n">stringBuilder</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">);</span>
+                <span class="n">stringBuilder</span><span class="o">.</span><span class="na">append</span><span class="o">(</span><span class="n">SYMBOLS</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">random</span><span class="o">.</span><span class="na">nextInt</span><span class="o">(</span><span class="n">SYMBOLS</span><span class="o">.</span><span class="na">size</span><span class="o">())));</span>
+            <span class="o">}</span>
+            <span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">stringBuilder</span><span class="o">.</span><span class="na">toString</span><span class="o">());</span>
+            <span class="n">Thread</span><span class="o">.</span><span class="na">sleep</span><span class="o">(</span><span class="mi">500</span><span class="o">);</span>
+        <span class="o">}</span>
+
+    <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+
+</div>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="streaming-joins">Streaming joins</h2>
+
+<p>Finally, we join real-time tweets and stock prices and compute a
+rolling correlation between the number of price warnings and the
+number of mentions of a given stock in the Twitter stream. As both of
+these data streams are potentially infinite, we apply the join on a
+30-second window.</p>
+
+<p><img alt="Streaming joins" src="/img/blog/blog_stream_join.png" width="60%" class="img-responsive center-block" /></p>
+
+<div class="codetabs">
+
+  <div data-lang="scala">
+
+    <div class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">//Join warnings and parsed tweets</span>
+<span class="k">val</span> <span class="n">tweetsAndWarning</span> <span class="k">=</span> <span class="n">warningsPerStock</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">tweetsPerStock</span><span class="o">)</span>
+  <span class="o">.</span><span class="n">onWindow</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">)</span>
+  <span class="o">.</span><span class="n">where</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
+  <span class="o">.</span><span class="n">equalTo</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span> <span class="o">{</span> <span class="o">(</span><span class="n">c1</span><span class="o">,</span> <span class="n">c2</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">c1</span><span class="o">.</span><span class="n">count</span><span class="o">,</span> <span class="n">c2</span><span class="o">.</span><span class="n">count</span><span class="o">)</span> <span class="o">}</span>
+
+<span class="k">val</span> <span class="n">rollingCorrelation</span> <span class="k">=</span> <span class="n">tweetsAndWarning</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span>
+  <span class="o">.</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">computeCorrelation</span> <span class="k">_</span><span class="o">)</span>
+
+<span class="n">rollingCorrelation</span> <span class="n">print</span>
+
+<span class="c1">//Compute rolling correlation</span>
+<span class="k">def</span> <span class="n">computeCorrelation</span><span class="o">(</span><span class="n">input</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">Double</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span>
+  <span class="k">if</span> <span class="o">(</span><span class="n">input</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">val</span> <span class="n">var1</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">_1</span><span class="o">)</span>
+    <span class="k">val</span> <span class="n">mean1</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">)</span>
+    <span class="k">val</span> <span class="n">var2</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span>
+    <span class="k">val</span> <span class="n">mean2</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var2</span><span class="o">)</span>
+
+    <span class="k">val</span> <span class="n">cov</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">.</span><span class="n">zip</span><span class="o">(</span><span class="n">var2</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">xy</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="n">xy</span><span class="o">.</span><span class="n">_1</span> <span class="o">-</span> <span class="n">mean1</span><span class="o">)</span> <span class="o">*</span> <span class="o">(</span><span class="n">xy</span><span class="o">.</span><span class="n">_2</span> <span class="o">-</span> <span class="n">mean2</span><span class="o">)))</span>
+    <span class="k">val</span> <span class="n">d1</span> <span class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span class="n">sqrt</span><span class="o">(</span><span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=&gt;</span> <span class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span class="o">((</span><span class="n">x</span> <span class="o">-</span> <span class="n">mean1</span><span class="o">),</span> <span class="mi">2</span><span class="o">))))</span>
+    <span class="k">val</span> <span class="n">d2</span> <span class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span class="n">sqrt</span><span class="o">(</span><span class="n">average</span><span class="o">(</span><span class="n">var2</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=&gt;</span> <span class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span class="o">((</span><span class="n">x</span> <span class="o">-</span> <span class="n">mean2</span><span class="o">),</span> <span class="mi">2</span><span class="o">))))</span>
+
+    <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">cov</span> <span class="o">/</span> <span class="o">(</span><span class="n">d1</span> <span class="o">*</span> <span class="n">d2</span><span class="o">))</span>
+  <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+
+  <div data-lang="java7">
+
+    <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">//Join warnings and parsed tweets</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">tweetsAndWarning</span> <span class="o">=</span> <span class="n">warningsPerStock</span>
+    <span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">tweetsPerStock</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">onWindow</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">equalTo</span><span class="o">(</span><span class="s">&quot;symbol&quot;</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="k">new</span> <span class="n">JoinFunction</span><span class="o">&lt;</span><span class="n">Count</span><span class="o">,</span> <span class="n">Count</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;()</span> <span class="o">{</span>
+        <span class="nd">@Override</span>
+        <span class="kd">public</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="nf">join</span><span class="o">(</span><span class="n">Count</span> <span class="n">first</span><span class="o">,</span> <span class="n">Count</span> <span class="n">second</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+            <span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;(</span><span class="n">first</span><span class="o">.</span><span class="na">count</span><span class="o">,</span> <span class="n">second</span><span class="o">.</span><span class="na">count</span><span class="o">);</span>
+            <span class="o">}</span>
+    <span class="o">});</span>
+
+<span class="c1">//Compute rolling correlation</span>
+<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">rollingCorrelation</span> <span class="o">=</span> <span class="n">tweetsAndWarning</span>
+    <span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="n">TimeUnit</span><span class="o">.</span><span class="na">SECONDS</span><span class="o">))</span>
+    <span class="o">.</span><span class="na">mapWindow</span><span class="o">(</span><span class="k">new</span> <span class="nf">WindowCorrelation</span><span class="o">());</span>
+
+<span class="n">rollingCorrelation</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kd">class</span> <span class="nc">WindowCorrelation</span>
+    <span class="kd">implements</span> <span class="n">WindowMapFunction</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;,</span> <span class="n">Double</span><span class="o">&gt;</span> <span class="o">{</span>
+
+    <span class="kd">private</span> <span class="n">Integer</span> <span class="n">leftSum</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">Integer</span> <span class="n">rightSum</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">Integer</span> <span class="n">count</span><span class="o">;</span>
+
+    <span class="kd">private</span> <span class="n">Double</span> <span class="n">leftMean</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">Double</span> <span class="n">rightMean</span><span class="o">;</span>
+
+    <span class="kd">private</span> <span class="n">Double</span> <span class="n">cov</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">Double</span> <span class="n">leftSd</span><span class="o">;</span>
+    <span class="kd">private</span> <span class="n">Double</span> <span class="n">rightSd</span><span class="o">;</span>
+
+    <span class="nd">@Override</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span class="nf">mapWindow</span><span class="o">(</span><span class="n">Iterable</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">values</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Double</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> 
+        <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
+
+        <span class="n">leftSum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+        <span class="n">rightSum</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+        <span class="n">count</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
+
+        <span class="n">cov</span> <span class="o">=</span> <span class="mi">0</span><span class="o">.;</span>
+        <span class="n">leftSd</span> <span class="o">=</span> <span class="mi">0</span><span class="o">.;</span>
+        <span class="n">rightSd</span> <span class="o">=</span> <span class="mi">0</span><span class="o">.;</span>
+
+        <span class="c1">//compute mean for both sides, save count</span>
+        <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">pair</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">leftSum</span> <span class="o">+=</span> <span class="n">pair</span><span class="o">.</span><span class="na">f0</span><span class="o">;</span>
+            <span class="n">rightSum</span> <span class="o">+=</span> <span class="n">pair</span><span class="o">.</span><span class="na">f1</span><span class="o">;</span>
+            <span class="n">count</span><span class="o">++;</span>
+        <span class="o">}</span>
+
+        <span class="n">leftMean</span> <span class="o">=</span> <span class="n">leftSum</span><span class="o">.</span><span class="na">doubleValue</span><span class="o">()</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span>
+        <span class="n">rightMean</span> <span class="o">=</span> <span class="n">rightSum</span><span class="o">.</span><span class="na">doubleValue</span><span class="o">()</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span>
+
+        <span class="c1">//compute covariance &amp; std. deviations</span>
+        <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">pair</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">cov</span> <span class="o">+=</span> <span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f0</span> <span class="o">-</span> <span class="n">leftMean</span><span class="o">)</span> <span class="o">*</span> <span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f1</span> <span class="o">-</span> <span class="n">rightMean</span><span class="o">)</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span>
+        <span class="o">}</span>
+
+        <span class="k">for</span> <span class="o">(</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">pair</span> <span class="o">:</span> <span class="n">values</span><span class="o">)</span> <span class="o">{</span>
+            <span class="n">leftSd</span> <span class="o">+=</span> <span class="n">Math</span><span class="o">.</span><span class="na">pow</span><span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f0</span> <span class="o">-</span> <span class="n">leftMean</span><span class="o">,</span> <span class="mi">2</span><span class="o">)</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span>
+            <span class="n">rightSd</span> <span class="o">+=</span> <span class="n">Math</span><span class="o">.</span><span class="na">pow</span><span class="o">(</span><span class="n">pair</span><span class="o">.</span><span class="na">f1</span> <span class="o">-</span> <span class="n">rightMean</span><span class="o">,</span> <span class="mi">2</span><span class="o">)</span> <span class="o">/</span> <span class="n">count</span><span class="o">;</span>
+        <span class="o">}</span>
+        <span class="n">leftSd</span> <span class="o">=</span> <span class="n">Math</span><span class="o">.</span><span class="na">sqrt</span><span class="o">(</span><span class="n">leftSd</span><span class="o">);</span>
+        <span class="n">rightSd</span> <span class="o">=</span> <span class="n">Math</span><span class="o">.</span><span class="na">sqrt</span><span class="o">(</span><span class="n">rightSd</span><span class="o">);</span>
+
+        <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">cov</span> <span class="o">/</span> <span class="o">(</span><span class="n">leftSd</span> <span class="o">*</span> <span class="n">rightSd</span><span class="o">));</span>
+    <span class="o">}</span>
+<span class="o">}</span></code></pre></div>
+
+  </div>
+
+</div>
+
+<p><a href="#top">Back to top</a></p>
+
+<h2 id="other-things-to-try">Other things to try</h2>
+
+<p>For a full feature overview please check the <a href="http://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html">Streaming Guide</a>, which describes all the available API features.
+You are very welcome to try out our features for different use-cases we are looking forward to your experiences. Feel free to <a href="http://flink.apache.org/community.html#mailing-lists">contact us</a>.</p>
+
+<h2 id="upcoming-for-streaming">Upcoming for streaming</h2>
+
+<p>There are some aspects of Flink Streaming that are subjects to
+change by the next release making this application look even nicer.</p>
+
+<p>Stay tuned for later blog posts on how Flink Streaming works
+internally, fault tolerance, and performance measurements!</p>
+
+<p><a href="#top">Back to top</a></p>
+
+      </article>
+    </div>
+
+    <div class="row">
+      <div id="disqus_thread"></div>
+      <script type="text/javascript">
+        /* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
+        var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname
+
+        /* * * DON'T EDIT BELOW THIS LINE * * */
+        (function() {
+            var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
+            dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
+             (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
+        })();
+      </script>
+    </div>
+  </div>
+</div>
+      </div>
+    </div>
+
+    <hr />
+
+    <div class="row">
+      <div class="footer text-center col-sm-12">
+        <p>Copyright © 2014-2016 <a href="http://apache.org">The Apache Software Foundation</a>. All Rights Reserved.</p>
+        <p>Apache Flink, Apache, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.</p>
+        <p><a href="/privacy-policy.html">Privacy Policy</a> &middot; <a href="/blog/feed.xml">RSS feed</a></p>
+      </div>
+    </div>
+    </div><!-- /.container -->
+
+    <!-- Include all compiled plugins (below), or include individual files as needed -->
+    <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
+    <script src="/js/codetabs.js"></script>
+    <script src="/js/stickysidebar.js"></script>
+
+
+    <!-- Google Analytics -->
+    <script>
+      (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
+      (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
+      m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
+      })(window,document,'script','//www.google-analytics.com/analytics.js','ga');
+
+      ga('create', 'UA-52545728-1', 'auto');
+      ga('send', 'pageview');
+    </script>
+  </body>
+</html>


Mime
View raw message