flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [10/50] flink git commit: [FLINK-3195] [examples] Consolidate batch examples into one project, unify batch and streaming examples under on parent project
Date Thu, 14 Jan 2016 16:16:07 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
new file mode 100644
index 0000000..f6da4bf
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala.relational
+
+import org.apache.flink.api.scala._
+
+import org.apache.flink.api.java.aggregation.Aggregations
+
+/**
+ * This program implements a modified version of the TPC-H query 3. The
+ * example demonstrates how to assign names to fields by extending the Tuple class.
+ * The original query can be found at
+ * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+ * (page 29).
+ *
+ * This program implements the following SQL equivalent:
+ *
+ * {{{
+ * SELECT 
+ *      l_orderkey, 
+ *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
+ *      o_orderdate, 
+ *      o_shippriority 
+ * FROM customer, 
+ *      orders, 
+ *      lineitem 
+ * WHERE
+ *      c_mktsegment = '[SEGMENT]' 
+ *      AND c_custkey = o_custkey
+ *      AND l_orderkey = o_orderkey
+ *      AND o_orderdate < date '[DATE]'
+ *      AND l_shipdate > date '[DATE]'
+ * GROUP BY
+ *      l_orderkey, 
+ *      o_orderdate, 
+ *      o_shippriority;
+ * }}}
+ *
+ * Compared to the original TPC-H query this version does not sort the result by revenue
+ * and orderdate.
+ *
+ * Input files are plain text CSV files using the pipe character ('|') as field separator 
+ * as generated by the TPC-H data generator which is available at 
+ * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+ *
+ * Usage: 
+ * {{{
+ * TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
+ * }}}
+ *  
+ * This example shows how to use:
+ *  - case classes and case class field addressing
+ *  - build-in aggregation functions
+ * 
+ */
+object TPCHQuery3 {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set filter date
+    val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
+    val date = dateFormat.parse("1995-03-12")
+    
+    // get execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    // read and filter lineitems by shipDate
+    val lineitems = getLineitemDataSet(env).filter( l => dateFormat.parse(l.shipDate).after(date) )
+    // read and filter customers by market segment
+    val customers = getCustomerDataSet(env).filter( c => c.mktSegment.equals("AUTOMOBILE"))
+    // read orders
+    val orders = getOrdersDataSet(env)
+
+                      // filter orders by order date
+    val items = orders.filter( o => dateFormat.parse(o.orderDate).before(date) )
+                      // filter orders by joining with customers
+                      .join(customers).where("custId").equalTo("custId").apply( (o,c) => o )
+                      // join with lineitems 
+                      .join(lineitems).where("orderId").equalTo("orderId")
+                                      .apply( (o,l) => 
+                                        new ShippedItem( o.orderId,
+                                                         l.extdPrice * (1.0 - l.discount),
+                                                         o.orderDate,
+                                                         o.shipPrio ) )
+
+    // group by order and aggregate revenue
+    val result = items.groupBy("orderId", "orderDate", "shipPrio")
+                      .aggregate(Aggregations.SUM, "revenue")
+
+    // emit result
+    result.writeAsCsv(outputPath, "\n", "|")
+    
+    // execute program
+    env.execute("Scala TPCH Query 3 Example")
+  }
+  
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+  
+  case class Lineitem(orderId: Long, extdPrice: Double, discount: Double, shipDate: String)
+  case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
+  case class Customer(custId: Long, mktSegment: String)
+  case class ShippedItem(orderId: Long, revenue: Double, orderDate: String, shipPrio: Long)
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+  
+  private var lineitemPath: String = null
+  private var customerPath: String = null
+  private var ordersPath: String = null
+  private var outputPath: String = null
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length == 4) {
+      lineitemPath = args(0)
+      customerPath = args(1)
+      ordersPath = args(2)
+      outputPath = args(3)
+      true
+    } else {
+      System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+          " Due to legal restrictions, we can not ship generated data.\n" +
+          " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + 
+                             "<orders-csv path> <result path>")
+      false
+    }
+  }
+  
+  private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
+    env.readCsvFile[Lineitem](
+        lineitemPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 5, 6, 10) )
+  }
+
+  private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
+    env.readCsvFile[Customer](
+        customerPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 6) )
+  }
+  
+  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
+    env.readCsvFile[Order](
+        ordersPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 1, 4, 7) )
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
new file mode 100644
index 0000000..5c2587f
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala.relational
+
+import org.apache.flink.api.scala._
+import org.apache.flink.examples.java.relational.util.WebLogData
+import org.apache.flink.util.Collector
+
+/**
+ * This program processes web logs and relational data.
+ * It implements the following relational query:
+ *
+ * {{{
+ * SELECT
+ *       r.pageURL,
+ *       r.pageRank,
+ *       r.avgDuration
+ * FROM documents d JOIN rankings r
+ *                  ON d.url = r.url
+ * WHERE CONTAINS(d.text, [keywords])
+ *       AND r.rank > [rank]
+ *       AND NOT EXISTS
+ *           (
+ *              SELECT * FROM Visits v
+ *              WHERE v.destUrl = d.url
+ *                    AND v.visitDate < [date]
+ *           );
+ * }}}
+ *
+ *
+ * Input files are plain text CSV files using the pipe character ('|') as field separator.
+ * The tables referenced in the query can be generated using the
+ * [org.apache.flink.examples.java.relational.util.WebLogDataGenerator]] and
+ * have the following schemas
+ *
+ * {{{
+ * CREATE TABLE Documents (
+ *                url VARCHAR(100) PRIMARY KEY,
+ *                contents TEXT );
+ *
+ * CREATE TABLE Rankings (
+ *                pageRank INT,
+ *                pageURL VARCHAR(100) PRIMARY KEY,
+ *                avgDuration INT );
+ *
+ * CREATE TABLE Visits (
+ *                sourceIP VARCHAR(16),
+ *                destURL VARCHAR(100),
+ *                visitDate DATE,
+ *                adRevenue FLOAT,
+ *                userAgent VARCHAR(64),
+ *                countryCode VARCHAR(3),
+ *                languageCode VARCHAR(6),
+ *                searchWord VARCHAR(32),
+ *                duration INT );
+ * }}}
+ *
+ *
+ * Usage
+ * {{{
+ *   WebLogAnalysis <documents path> <ranks path> <visits path> <result path>
+ * }}}
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.examples.java.relational.util.WebLogData]].
+ *
+ * This example shows how to use:
+ *
+ *  - tuple data types
+ *  - projection and join projection
+ *  - the CoGroup transformation for an anti-join
+ *
+ */
+object WebLogAnalysis {
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val documents = getDocumentsDataSet(env)
+    val ranks = getRanksDataSet(env)
+    val visits = getVisitsDataSet(env)
+
+    val filteredDocs = documents
+      .filter(doc => doc._2.contains(" editors ") && doc._2.contains(" oscillations "))
+
+    val filteredRanks = ranks
+      .filter(rank => rank._1 > 40)
+
+    val filteredVisits = visits
+      .filter(visit => visit._2.substring(0, 4).toInt == 2007)
+
+    val joinDocsRanks = filteredDocs.join(filteredRanks).where(0).equalTo(1) {
+      (doc, rank) => rank
+    }.withForwardedFieldsSecond("*")
+
+    val result = joinDocsRanks.coGroup(filteredVisits).where(1).equalTo(0) {
+      (ranks, visits, out: Collector[(Int, String, Int)]) =>
+        if (visits.isEmpty) for (rank <- ranks) out.collect(rank)
+    }.withForwardedFieldsFirst("*")
+
+
+
+
+    // emit result
+    if (fileOutput) {
+      result.writeAsCsv(outputPath, "\n", "|")
+      env.execute("Scala WebLogAnalysis Example")
+    } else {
+      result.print()
+    }
+
+  }
+
+  private var fileOutput: Boolean = false
+  private var documentsPath: String = null
+  private var ranksPath: String = null
+  private var visitsPath: String = null
+  private var outputPath: String = null
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 4) {
+        documentsPath = args(0)
+        ranksPath = args(1)
+        visitsPath = args(2)
+        outputPath = args(3)
+      }
+      else {
+        System.err.println("Usage: WebLogAnalysis <documents path> <ranks path> <visits path> " +
+          "<result path>")
+        return false
+      }
+    }
+    else {
+      System.out.println("Executing WebLog Analysis example with built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("  We provide a data generator to create synthetic input files for this " +
+        "program.")
+      System.out.println("  Usage: WebLogAnalysis <documents path> <ranks path> <visits path> " +
+        "<result path>")
+    }
+    true
+  }
+
+  private def getDocumentsDataSet(env: ExecutionEnvironment): DataSet[(String, String)] = {
+    if (fileOutput) {
+      env.readCsvFile[(String, String)](
+        documentsPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 1))
+    }
+    else {
+      val documents = WebLogData.DOCUMENTS map {
+        case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
+      }
+      env.fromCollection(documents)
+    }
+  }
+
+  private def getRanksDataSet(env: ExecutionEnvironment): DataSet[(Int, String, Int)] = {
+    if (fileOutput) {
+      env.readCsvFile[(Int, String, Int)](
+        ranksPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 1, 2))
+    }
+    else {
+      val ranks = WebLogData.RANKS map {
+        case Array(x, y, z) => (x.asInstanceOf[Int], y.asInstanceOf[String], z.asInstanceOf[Int])
+      }
+      env.fromCollection(ranks)
+    }
+  }
+
+  private def getVisitsDataSet(env: ExecutionEnvironment): DataSet[(String, String)] = {
+    if (fileOutput) {
+      env.readCsvFile[(String, String)](
+        visitsPath,
+        fieldDelimiter = "|",
+        includedFields = Array(1, 2))
+    }
+    else {
+      val visits = WebLogData.VISITS map {
+        case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
+      }
+      env.fromCollection(visits)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
new file mode 100644
index 0000000..68092b3
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala.wordcount
+
+import org.apache.flink.api.scala._
+import org.apache.flink.examples.java.wordcount.util.WordCountData
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram
+ * over text files. 
+ *
+ * The input is a plain text file with lines separated by newline characters.
+ *
+ * Usage:
+ * {{{
+ *   WordCount <text path> <result path>>
+ * }}}
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[org.apache.flink.examples.java.wordcount.util.WordCountData]]
+ *
+ * This example shows how to:
+ *
+ *   - write a simple Flink program.
+ *   - use Tuple data types.
+ *   - write and use user-defined functions.
+ *
+ */
+object WordCount {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val text = getTextDataSet(env)
+
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    if (fileOutput) {
+      counts.writeAsCsv(outputPath, "\n", " ")
+      env.execute("Scala WordCount Example")
+    } else {
+      counts.print()
+    }
+
+  }
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 2) {
+        textPath = args(0)
+        outputPath = args(1)
+        true
+      } else {
+        System.err.println("Usage: WordCount <text path> <result path>")
+        false
+      }
+    } else {
+      System.out.println("Executing WordCount example with built-in default data.")
+      System.out.println("  Provide parameters to read input data from a file.")
+      System.out.println("  Usage: WordCount <text path> <result path>")
+      true
+    }
+  }
+
+  private def getTextDataSet(env: ExecutionEnvironment): DataSet[String] = {
+    if (fileOutput) {
+      env.readTextFile(textPath)
+    }
+    else {
+      env.fromCollection(WordCountData.WORDS)
+    }
+  }
+
+  private var fileOutput: Boolean = false
+  private var textPath: String = null
+  private var outputPath: String = null
+}
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
new file mode 100644
index 0000000..30f2343
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -0,0 +1,559 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-examples</artifactId>
+		<version>1.0-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-examples-streaming</artifactId>
+	<name>flink-examples-streaming</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-scala</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-examples-batch</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-twitter</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+
+			<!-- Scala Code Style -->
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+			
+			<!-- get default data from flink-examples-batch package -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<version>2.9</version><!--$NO-MVN-MAN-VER$-->
+				<executions>
+					<execution>
+						<id>unpack</id>
+						<phase>prepare-package</phase>
+						<goals>
+							<goal>unpack</goal>
+						</goals>
+						<configuration>
+							<artifactItems>
+								<!-- For WordCount example data -->
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-examples-batch</artifactId>
+									<version>${project.version}</version>
+									<type>jar</type>
+									<overWrite>false</overWrite>
+									<outputDirectory>${project.build.directory}/classes</outputDirectory>
+									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
+								</artifactItem>
+								<!-- For JSON utilities -->
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-twitter</artifactId>
+									<version>${project.version}</version>
+									<type>jar</type>
+									<overWrite>false</overWrite>
+									<outputDirectory>${project.build.directory}/classes</outputDirectory>
+									<includes>org/apache/flink/streaming/connectors/json/*</includes>
+								</artifactItem>
+							</artifactItems>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- self-contained jars for each example -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.4</version><!--$NO-MVN-MAN-VER$-->
+				<executions>
+					<!-- Default Execution -->
+					<execution>
+						<id>default</id>
+						<phase>package</phase>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+					
+					<!-- Iteration -->
+					<execution>
+						<id>Iteration</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>Iteration</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.iteration.IterateExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/iteration/*.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- IncrementalLearning -->
+					<execution>
+						<id>IncrementalLearning</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>IncrementalLearning</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/ml/*.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- Twitter -->
+					<execution>
+						<id>Twitter</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>Twitter</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.twitter.TwitterStream</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/twitter/*.class</include>
+								<include>org/apache/flink/streaming/examples/twitter/util/*.class</include>
+								<include>org/apache/flink/streaming/connectors/json/*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WindowJoin -->
+					<execution>
+						<id>WindowJoin</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WindowJoin</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.join.WindowJoin</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/join/*.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WordCountPOJO -->
+					<execution>
+						<id>WordCountPOJO</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WordCountPOJO</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WordCount -->
+					<execution>
+						<id>WordCount</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WordCount</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.wordcount.WordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>				
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WindowWordCount -->
+					<execution>
+						<id>WindowWordCount</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WindowWordCount</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.WindowWordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/WindowWordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- SocketTextStreamWordCount -->
+					<execution>
+						<id>SocketTextStreamWordCount</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>SocketTextStreamWordCount</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- TopSpeedWindowing -->
+					<execution>
+						<id>TopSpeedWindowing</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>TopSpeedWindowing</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.TopSpeedWindowing</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- SessionWindowing -->
+					<execution>
+						<id>SessionWindowing</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>SessionWindowing</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.SessionWindowing</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/SessionWindowing.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/SessionWindowing$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+				</executions>
+			</plugin>
+
+
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+ 
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+				</configuration>
+			</plugin>
+			
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!--simplify the name of example JARs for build-target/examples -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-antrun-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<execution>
+						<id>rename</id>
+						<phase>package</phase>
+						<goals>
+							<goal>run</goal>
+						</goals>
+						<configuration>
+							<target>
+								<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-IncrementalLearning.jar" tofile="${project.basedir}/target/IncrementalLearning.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-Iteration.jar" tofile="${project.basedir}/target/Iteration.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-SessionWindowing.jar" tofile="${project.basedir}/target/SessionWindowing.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-SocketTextStreamWordCount.jar" tofile="${project.basedir}/target/SocketTextStreamWordCount.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-TopSpeedWindowing.jar" tofile="${project.basedir}/target/TopSpeedWindowing.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-Twitter.jar" tofile="${project.basedir}/target/Twitter.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-WindowJoin.jar" tofile="${project.basedir}/target/WindowJoin.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-WindowWordCount.jar" tofile="${project.basedir}/target/WindowWordCount.jar" />
+							</target>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+		
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-dependency-plugin</artifactId>
+										<versionRange>[2.9,)</versionRange>
+										<goals>
+											<goal>unpack</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
new file mode 100644
index 0000000..b6e1a61
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.iteration;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Example illustrating iterations in Flink streaming.
+ * <p> The program sums up random numbers and counts additions
+ * it performs to reach a specific threshold in an iterative streaming fashion. </p>
+ *
+ * <p>
+ * This example shows how to use: <ul> <li>streaming iterations, <li>buffer timeout to enhance latency, <li>directed
+ * outputs. </ul>
+ */
+public class IterateExample {
+
+	private static final int BOUND = 100;
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up input for the stream of integer pairs
+
+		// obtain execution environment and set setBufferTimeout to 1 to enable
+		// continuous flushing of the output buffers (lowest latency)
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
+				.setBufferTimeout(1);
+
+		// create input stream of integer pairs
+		DataStream<Tuple2<Integer, Integer>> inputStream;
+		if (fileInput) {
+			inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap());
+		} else {
+			inputStream = env.addSource(new RandomFibonacciSource());
+		}
+
+		// create an iterative data stream from the input with 5 second timeout
+		IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = inputStream.map(new InputMap())
+				.iterate(5000);
+
+		// apply the step function to get the next Fibonacci number
+		// increment the counter and split the output with the output selector
+		SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new Step())
+				.split(new MySelector());
+
+		// close the iteration by selecting the tuples that were directed to the
+		// 'iterate' channel in the output selector
+		it.closeWith(step.select("iterate"));
+
+		// to produce the final output select the tuples directed to the
+		// 'output' channel then get the input pairs that have the greatest iteration counter
+		// on a 1 second sliding window
+		DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output")
+				.map(new OutputMap());
+
+		// emit results
+		if (fileOutput) {
+			numbers.writeAsText(outputPath, 1);
+		} else {
+			numbers.print();
+		}
+
+		// execute the program
+		env.execute("Streaming Iteration Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Generate BOUND number of random integer pairs from the range from 0 to BOUND/2
+	 */
+	private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Random rnd = new Random();
+
+		private volatile boolean isRunning = true;
+		private int counter = 0;
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+
+			while (isRunning && counter < BOUND) {
+				int first = rnd.nextInt(BOUND / 2 - 1) + 1;
+				int second = rnd.nextInt(BOUND / 2 - 1) + 1;
+
+				ctx.collect(new Tuple2<>(first, second));
+				counter++;
+				Thread.sleep(50L);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+	}
+
+	/**
+	 * Generate random integer pairs from the range from 0 to BOUND/2
+	 */
+	private static class FibonacciInputMap implements MapFunction<String, Tuple2<Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Integer> map(String value) throws Exception {
+			String record = value.substring(1, value.length() - 1);
+			String[] splitted = record.split(",");
+			return new Tuple2<>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
+		}
+	}
+
+	/**
+	 * Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple A
+	 * counter is attached to the tuple and incremented in every iteration step
+	 */
+	public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer,
+			Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws
+				Exception {
+			return new Tuple5<>(value.f0, value.f1, value.f0, value.f1, 0);
+		}
+	}
+
+	/**
+	 * Iteration step function that calculates the next Fibonacci number
+	 */
+	public static class Step implements
+			MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer,
+					Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
+				Integer> value) throws Exception {
+			return new Tuple5<>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);
+		}
+	}
+
+	/**
+	 * OutputSelector testing which tuple needs to be iterated again.
+	 */
+	public static class MySelector implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) {
+			List<String> output = new ArrayList<>();
+			if (value.f2 < BOUND && value.f3 < BOUND) {
+				output.add("iterate");
+			} else {
+				output.add("output");
+			}
+			return output;
+		}
+	}
+
+	/**
+	 * Giving back the input pair and the counter
+	 */
+	public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
+			Tuple2<Tuple2<Integer, Integer>, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer>
+				value) throws
+				Exception {
+			return new Tuple2<>(new Tuple2<>(value.f0, value.f1), value.f4);
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileInput = false;
+	private static boolean fileOutput = false;
+	private static String inputPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			if (args.length == 1) {
+				fileOutput = true;
+				outputPath = args[0];
+			} else if (args.length == 2) {
+				fileInput = true;
+				inputPath = args[0];
+				fileOutput = true;
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: IterateExample <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing IterateExample with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: IterateExample <result path>");
+		}
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
new file mode 100644
index 0000000..0077459
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.iteration.util;
+
+public class IterateExampleData {
+	public static final String INPUT_PAIRS = "(1,40)\n" + "(29,38)\n" + "(11,15)\n" + "(17,39)\n" + "(24,41)\n" +
+			"(7,33)\n" + "(20,2)\n" + "(11,5)\n" + "(3,16)\n" + "(23,36)\n" + "(15,23)\n" + "(28,13)\n" + "(1,1)\n" +
+			"(10,6)\n" + "(21,5)\n" + "(14,36)\n" + "(17,15)\n" + "(7,9)";
+
+	public static final String RESULTS = "((1,40),3)\n" + "((24,41),2)\n" + "((3,16),5)\n" + "((1,1),10)\n" +
+			"((17,15),4)\n" + "((29,38),2)\n" + "((7,33),3)\n" + "((23,36),3)\n" + "((10,6),6)\n" + "((7,9),5)\n" +
+			"((11,15),4)\n" + "((20,2),5)\n" + "((15,23),4)\n" + "((21,5),5)\n" +
+			"((17,39),3)\n" + "((11,5),6)\n" + "((28,13),4)\n" + "((14,36),3)";
+
+	private IterateExampleData() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
new file mode 100644
index 0000000..3355f1c
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.join;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Example illustrating join over sliding windows of streams in Flink.
+ *
+ * <p>
+ * This example will join two streams with a sliding window. One which emits grades and one which
+ * emits salaries of people. The input format for both sources has an additional timestamp
+ * as field 0. This is used to to event-time windowing. Time timestamps must be
+ * monotonically increasing.
+ *
+ * This example shows how to:
+ * <ul>
+ *   <li>do windowed joins,
+ *   <li>use tuple data types,
+ *   <li>write a simple streaming program.
+ * </ul>
+ */
+public class WindowJoin {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// obtain execution environment
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		// connect to the data sources for grades and salaries
+		Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> input = getInputStreams(env);
+		DataStream<Tuple3<Long, String, Integer>> grades = input.f0;
+		DataStream<Tuple3<Long, String, Integer>> salaries = input.f1;
+
+		// extract the timestamps
+		grades = grades.assignTimestamps(new MyTimestampExtractor());
+		salaries = salaries.assignTimestamps(new MyTimestampExtractor());
+
+		// apply a temporal join over the two stream based on the names over one
+		// second windows
+		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades
+				.join(salaries)
+				.where(new NameKeySelector())
+				.equalTo(new NameKeySelector())
+				.window(TumblingTimeWindows.of(Time.of(5, TimeUnit.MILLISECONDS)))
+				.apply(new MyJoinFunction());
+
+		// emit result
+		if (fileOutput) {
+			joinedStream.writeAsText(outputPath, 1);
+		} else {
+			joinedStream.print();
+		}
+
+		// execute program
+		env.execute("Windowed Join Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private final static String[] names = {"tom", "jerry", "alice", "bob", "john", "grace"};
+	private final static int GRADE_COUNT = 5;
+	private final static int SALARY_MAX = 10000;
+	private final static int SLEEP_TIME = 10;
+
+	/**
+	 * Continuously emit tuples with random names and integers (grades).
+	 */
+	public static class GradeSource implements SourceFunction<Tuple3<Long, String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Random rand;
+		private Tuple3<Long, String, Integer> outTuple;
+		private volatile boolean isRunning = true;
+		private int counter;
+
+		public GradeSource() {
+			rand = new Random();
+			outTuple = new Tuple3<>();
+		}
+
+		@Override
+		public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
+			while (isRunning && counter < 100) {
+				outTuple.f0 = System.currentTimeMillis();
+				outTuple.f1 = names[rand.nextInt(names.length)];
+				outTuple.f2 = rand.nextInt(GRADE_COUNT) + 1;
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+				counter++;
+				ctx.collect(outTuple);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+	}
+
+	/**
+	 * Continuously emit tuples with random names and integers (salaries).
+	 */
+	public static class SalarySource extends RichSourceFunction<Tuple3<Long, String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private transient Random rand;
+		private transient Tuple3<Long, String, Integer> outTuple;
+		private volatile boolean isRunning;
+		private int counter;
+
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			rand = new Random();
+			outTuple = new Tuple3<Long, String, Integer>();
+			isRunning = true;
+		}
+
+
+		@Override
+		public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
+			while (isRunning && counter < 100) {
+				outTuple.f0 = System.currentTimeMillis();
+				outTuple.f1 = names[rand.nextInt(names.length)];
+				outTuple.f2 = rand.nextInt(SALARY_MAX) + 1;
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+				counter++;
+				ctx.collect(outTuple);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+	}
+
+	public static class MySourceMap extends RichMapFunction<String, Tuple3<Long, String, Integer>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private String[] record;
+
+		public MySourceMap() {
+			record = new String[2];
+		}
+
+		@Override
+		public Tuple3<Long, String, Integer> map(String line) throws Exception {
+			record = line.substring(1, line.length() - 1).split(",");
+			return new Tuple3<>(Long.parseLong(record[0]), record[1], Integer.parseInt(record[2]));
+		}
+	}
+
+	public static class MyJoinFunction
+			implements
+			JoinFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple3<String, Integer, Integer>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private Tuple3<String, Integer, Integer> joined = new Tuple3<>();
+
+		@Override
+		public Tuple3<String, Integer, Integer> join(Tuple3<Long, String, Integer> first,
+				Tuple3<Long, String, Integer> second) throws Exception {
+			joined.f0 = first.f1;
+			joined.f1 = first.f2;
+			joined.f2 = second.f2;
+			return joined;
+		}
+	}
+
+	private static class MyTimestampExtractor implements TimestampExtractor<Tuple3<Long, String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long extractTimestamp(Tuple3<Long, String, Integer> element, long currentTimestamp) {
+			return element.f0;
+		}
+
+		@Override
+		public long extractWatermark(Tuple3<Long, String, Integer> element, long currentTimestamp) {
+			return element.f0 - 1;
+		}
+
+		@Override
+		public long getCurrentWatermark() {
+			return Long.MIN_VALUE;
+		}
+	}
+
+	private static class NameKeySelector implements KeySelector<Tuple3<Long, String, Integer>, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Tuple3<Long, String, Integer> value) throws Exception {
+			return value.f1;
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileInput = false;
+	private static boolean fileOutput = false;
+
+	private static String gradesPath;
+	private static String salariesPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			if (args.length == 1) {
+				fileOutput = true;
+				outputPath = args[0];
+			} else if (args.length == 3) {
+				fileInput = true;
+				fileOutput = true;
+				gradesPath = args[0];
+				salariesPath = args[1];
+				outputPath = args[2];
+			} else {
+				System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> <input path 2> " +
+						"<result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WindowJoin with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: WindowJoin <result path>");
+		}
+		return true;
+	}
+
+	private static Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> getInputStreams(
+			StreamExecutionEnvironment env) {
+
+		DataStream<Tuple3<Long, String, Integer>> grades;
+		DataStream<Tuple3<Long, String, Integer>> salaries;
+
+		if (fileInput) {
+			grades = env.readTextFile(gradesPath).map(new MySourceMap());
+			salaries = env.readTextFile(salariesPath).map(new MySourceMap());
+		} else {
+			grades = env.addSource(new GradeSource());
+			salaries = env.addSource(new SalarySource());
+		}
+
+		return Tuple2.of(grades, salaries);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
new file mode 100644
index 0000000..15c1280
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.join.util;
+
+public class WindowJoinData {
+
+	public static final String GRADES_INPUT = "(0,john,5)\n" + "(0,tom,3)\n" + "(0,alice,1)\n" + "(0,grace,5)\n" +
+			"(1,john,4)\n" + "(1,bob,1)\n" + "(1,alice,2)\n" + "(1,alice,3)\n" + "(1,bob,5)\n" + "(1,alice,3)\n" + "(1,tom,5)\n" +
+			"(2,john,2)\n" + "(2,john,1)\n" + "(2,grace,2)\n" + "(2,jerry,2)\n" + "(2,tom,4)\n" + "(2,bob,4)\n" + "(2,bob,2)\n" +
+			"(3, tom,2)\n" + "(3,alice,5)\n" + "(3,grace,5)\n" + "(3,grace,1)\n" + "(3,alice,1)\n" + "(3,grace,3)\n" + "(3,tom,1)\n" +
+			"(4,jerry,5)\n" + "(4,john,3)\n" + "(4,john,4)\n" + "(4,john,1)\n" + "(4,jerry,3)\n" + "(4,grace,3)\n" + "(4,bob,3)\n" +
+			"(5,john,3)\n" + "(5,jerry,4)\n" + "(5,tom,5)\n" + "(5,tom,4)\n" + "(5,john,2)\n" + "(5,jerry,1)\n" + "(5,bob,1)\n" +
+			"(6,john,5)\n" + "(6,grace,4)\n" + "(6,tom,5)\n" + "(6,john,4)\n" + "(6,tom,1)\n" + "(6,grace,1)\n" + "(6,john,2)\n" +
+			"(7,jerry,3)\n" + "(7,jerry,5)\n" + "(7,tom,2)\n" + "(7,tom,2)\n" + "(7,alice,4)\n" + "(7,tom,4)\n" + "(7,jerry,4)\n" +
+			"(8,john,3)\n" + "(8,grace,4)\n" + "(8,tom,3)\n" + "(8,jerry,4)\n" + "(8,john,5)\n" + "(8,john,4)\n" + "(8,jerry,1)\n" +
+			"(9,john,5)\n" + "(9,alice,2)\n" + "(9,tom,1)\n" + "(9,alice,5)\n" + "(9,grace,4)\n" + "(9,bob,4)\n" + "(9,jerry,1)\n" +
+			"(10,john,5)\n" + "(10,tom,4)\n" + "(10,tom,5)\n" + "(10,jerry,5)\n" + "(10,tom,1)\n" + "(10,grace,3)\n" + "(10,bob,5)\n" +
+			"(11,john,1)\n" + "(11,alice,1)\n" + "(11,grace,3)\n" + "(11,grace,1)\n" + "(11,jerry,1)\n" + "(11,jerry,4)\n" +
+			"(12,bob,4)\n" + "(12,alice,3)\n" + "(12,tom,5)\n" + "(12,alice,4)\n" + "(12,alice,4)\n" + "(12,grace,4)\n" + "(12,john,5)\n" +
+			"(13,john,5)\n" + "(13,grace,4)\n" + "(13,tom,4)\n" + "(13,john,4)\n" + "(13,john,5)\n" + "(13,alice,5)\n" + "(13,jerry,5)\n" +
+			"(14,john,3)\n" + "(14,tom,5)\n" + "(14,jerry,4)\n" + "(14,grace,4)\n" + "(14,john,3)\n" + "(14,bob,2)";
+
+	public static final String SALARIES_INPUT = "(0,john,6469)\n" + "(0,jerry,6760)\n" + "(0,jerry,8069)\n" +
+			"(1,tom,3662)\n" + "(1,grace,8427)\n" + "(1,john,9425)\n" + "(1,bob,9018)\n" + "(1,john,352)\n" + "(1,tom,3770)\n" +
+			"(2,grace,7622)\n" + "(2,jerry,7441)\n" + "(2,alice,1468)\n" + "(2,bob,5472)\n" + "(2,grace,898)\n" +
+			"(3,tom,3849)\n" + "(3,grace,1865)\n" + "(3,alice,5582)\n" + "(3,john,9511)\n" + "(3,alice,1541)\n" +
+			"(4,john,2477)\n" + "(4,grace,3561)\n" + "(4,john,1670)\n" + "(4,grace,7290)\n" + "(4,grace,6565)\n" +
+			"(5,tom,6179)\n" + "(5,tom,1601)\n" + "(5,john,2940)\n" + "(5,bob,4685)\n" + "(5,bob,710)\n" + "(5,bob,5936)\n" +
+			"(6,jerry,1412)\n" + "(6,grace,6515)\n" + "(6,grace,3321)\n" + "(6,tom,8088)\n" + "(6,john,2876)\n" +
+			"(7,bob,9896)\n" + "(7,grace,7368)\n" + "(7,grace,9749)\n" + "(7,bob,2048)\n" + "(7,alice,4782)\n" +
+			"(8,alice,3375)\n" + "(8,tom,5841)\n" + "(8,bob,958)\n" + "(8,bob,5258)\n" + "(8,tom,3935)\n" + "(8,jerry,4394)\n" +
+			"(9,alice,102)\n" + "(9,alice,4931)\n" + "(9,alice,5240)\n" + "(9,jerry,7951)\n" + "(9,john,5675)\n" +
+			"(10,bob,609)\n" + "(10,alice,5997)\n" + "(10,jerry,9651)\n" + "(10,alice,1328)\n" + "(10,bob,1022)\n" +
+			"(11,grace,2578)\n" + "(11,jerry,9704)\n" + "(11,tom,4476)\n" + "(11,grace,3784)\n" + "(11,alice,6144)\n" +
+			"(12,bob,6213)\n" + "(12,alice,7525)\n" + "(12,jerry,2908)\n" + "(12,grace,8464)\n" + "(12,jerry,9920)\n" +
+			"(13,bob,3720)\n" + "(13,bob,7612)\n" + "(13,alice,7211)\n" + "(13,jerry,6484)\n" + "(13,alice,1711)\n" +
+			"(14,jerry,5994)\n" + "(14,grace,928)\n" + "(14,jerry,2492)\n" + "(14,grace,9080)\n" + "(14,tom,4330)\n" +
+			"(15,bob,8302)\n" + "(15,john,4981)\n" + "(15,tom,1781)\n" + "(15,grace,1379)\n" + "(15,jerry,3700)\n" +
+			"(16,jerry,3584)\n" + "(16,jerry,2038)\n" + "(16,jerry,3902)\n" + "(16,tom,1336)\n" + "(16,jerry,7500)\n" +
+			"(17,tom,3648)\n" + "(17,alice,2533)\n" + "(17,tom,8685)\n" + "(17,bob,3968)\n" + "(17,tom,3241)\n" + "(17,bob,7461)\n" +
+			"(18,jerry,2138)\n" + "(18,alice,7503)\n" + "(18,alice,6424)\n" + "(18,tom,140)\n" + "(18,john,9802)\n" +
+			"(19,grace,2977)\n" + "(19,grace,889)\n" + "(19,john,1338)";
+
+	private WindowJoinData() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
new file mode 100644
index 0000000..32cf430
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.ml;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Skeleton for incremental machine learning algorithm consisting of a
+ * pre-computed model, which gets updated for the new inputs and new input data
+ * for which the job provides predictions.
+ *
+ * <p>
+ * This may serve as a base of a number of algorithms, e.g. updating an
+ * incremental Alternating Least Squares model while also providing the
+ * predictions.
+ *
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ *   <li>Connected streams
+ *   <li>CoFunctions
+ *   <li>Tuple data types
+ * </ul>
+ */
+public class IncrementalLearningSkeleton {
+
+	private static DataStream<Integer> trainingData = null;
+	private static DataStream<Integer> newData = null;
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		trainingData = env.addSource(new FiniteTrainingDataSource());
+		newData = env.addSource(new FiniteNewDataSource());
+
+		// build new model on every second of new data
+		DataStream<Double[]> model = trainingData
+				.assignTimestamps(new LinearTimestamp())
+				.timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+				.apply(new PartialModelBuilder());
+
+		// use partial model for newData
+		DataStream<Integer> prediction = newData.connect(model).map(new Predictor());
+
+		// emit result
+		if (fileOutput) {
+			prediction.writeAsText(outputPath, 1);
+		} else {
+			prediction.print();
+		}
+
+		// execute program
+		env.execute("Streaming Incremental Learning");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Feeds new data for newData. By default it is implemented as constantly
+	 * emitting the Integer 1 in a loop.
+	 */
+	public static class FiniteNewDataSource implements SourceFunction<Integer> {
+		private static final long serialVersionUID = 1L;
+		private int counter;
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			Thread.sleep(15);
+			while (counter < 50) {
+				ctx.collect(getNewData());
+			}
+		}
+
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
+
+		private Integer getNewData() throws InterruptedException {
+			Thread.sleep(5);
+			counter++;
+			return 1;
+		}
+	}
+
+	/**
+	 * Feeds new training data for the partial model builder. By default it is
+	 * implemented as constantly emitting the Integer 1 in a loop.
+	 */
+	public static class FiniteTrainingDataSource implements SourceFunction<Integer> {
+		private static final long serialVersionUID = 1L;
+		private int counter = 0;
+
+		@Override
+		public void run(SourceContext<Integer> collector) throws Exception {
+			while (counter < 8200) {
+				collector.collect(getTrainingData());
+			}
+		}
+
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
+
+		private Integer getTrainingData() throws InterruptedException {
+			counter++;
+			return 1;
+		}
+	}
+
+	public static class LinearTimestamp implements TimestampExtractor<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		private long counter = 0L;
+
+		@Override
+		public long extractTimestamp(Integer element, long currentTimestamp) {
+			return counter += 10L;
+		}
+
+		@Override
+		public long extractWatermark(Integer element, long currentTimestamp) {
+			return counter - 1;
+		}
+
+		@Override
+		public long getCurrentWatermark() {
+			return Long.MIN_VALUE;
+		}
+
+	}
+
+	/**
+	 * Builds up-to-date partial models on new training data.
+	 */
+	public static class PartialModelBuilder implements AllWindowFunction<Integer, Double[], TimeWindow> {
+		private static final long serialVersionUID = 1L;
+
+		protected Double[] buildPartialModel(Iterable<Integer> values) {
+			return new Double[]{1.};
+		}
+
+		@Override
+		public void apply(TimeWindow window, Iterable<Integer> values, Collector<Double[]> out) throws Exception {
+			out.collect(buildPartialModel(values));
+		}
+	}
+
+	/**
+	 * Creates newData using the model produced in batch-processing and the
+	 * up-to-date partial model.
+	 * <p>
+	 * By defaults emits the Integer 0 for every newData and the Integer 1
+	 * for every model update.
+	 * </p>
+	 */
+	public static class Predictor implements CoMapFunction<Integer, Double[], Integer> {
+		private static final long serialVersionUID = 1L;
+
+		Double[] batchModel = null;
+		Double[] partialModel = null;
+
+		@Override
+		public Integer map1(Integer value) {
+			// Return newData
+			return predict(value);
+		}
+
+		@Override
+		public Integer map2(Double[] value) {
+			// Update model
+			partialModel = value;
+			batchModel = getBatchModel();
+			return 1;
+		}
+
+		// pulls model built with batch-job on the old training data
+		protected Double[] getBatchModel() {
+			return new Double[]{0.};
+		}
+
+		// performs newData using the two models
+		protected Integer predict(Integer inTuple) {
+			return 0;
+		}
+
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: IncrementalLearningSkeleton <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing IncrementalLearningSkeleton with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: IncrementalLearningSkeleton <result path>");
+		}
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
new file mode 100644
index 0000000..8a6cd88
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.ml.util;
+
+public class IncrementalLearningSkeletonData {
+
+	public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
+			"1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+			"0\n" + "0\n" + "0\n" + "0\n";
+
+	private IncrementalLearningSkeletonData() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
new file mode 100644
index 0000000..cecabdd
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.socket;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
+
+/**
+ * This example shows an implementation of WordCount with data from a text
+ * socket. To run the example make sure that the service providing the text data
+ * is already up and running.
+ * <p>
+ * To start an example socket text stream on your local machine run netcat from
+ * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
+ * port number.
+ * </p>
+ * <p>
+ * Usage:
+ * <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result path&gt;</code>
+ * </p>
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>use StreamExecutionEnvironment.socketTextStream
+ * <li>write a simple Flink program,
+ * <li>write and use user-defined functions.
+ * </ul>
+ *
+ * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
+ */
+public class SocketTextStreamWordCount {
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> text = env.socketTextStream(hostName, port, '\n', 0);
+
+		DataStream<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+						// group by the tuple field "0" and sum up tuple field "1"
+						.keyBy(0)
+						.sum(1);
+
+		if (fileOutput) {
+			counts.writeAsText(outputPath, 1);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("WordCount from SocketTextStream Example");
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String hostName;
+	private static int port;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		// parse input arguments
+		if (args.length == 3) {
+			fileOutput = true;
+			hostName = args[0];
+			port = Integer.valueOf(args[1]);
+			outputPath = args[2];
+		} else if (args.length == 2) {
+			hostName = args[0];
+			port = Integer.valueOf(args[1]);
+		} else {
+			System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]");
+			return false;
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
new file mode 100644
index 0000000..d26dc42
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.twitter;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.streaming.connectors.twitter.TwitterSource;
+import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
+import org.apache.flink.util.Collector;
+import org.apache.sling.commons.json.JSONException;
+
+import java.util.StringTokenizer;
+
+/**
+ * Implements the "TwitterStream" program that computes a most used word
+ * occurrence over JSON files in a streaming fashion.
+ * <p>
+ * The input is a JSON text file with lines separated by newline characters.
+ * </p>
+ * <p>
+ * Usage: <code>TwitterStream &lt;text path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link TwitterStreamData}.
+ * </p>
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>acquire external data,
+ * <li>use in-line defined functions,
+ * <li>handle flattened stream inputs.
+ * </ul>
+ */
+public class TwitterStream {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> streamSource = getTextDataStream(env);
+
+		DataStream<Tuple2<String, Integer>> tweets = streamSource
+				// selecting English tweets and splitting to (word, 1)
+				.flatMap(new SelectEnglishAndTokenizeFlatMap())
+				// group by words and sum their occurrences
+				.keyBy(0).sum(1);
+
+		// emit result
+		if (fileOutput) {
+			tweets.writeAsText(outputPath);
+		} else {
+			tweets.print();
+		}
+
+		// execute program
+		env.execute("Twitter Streaming Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Makes sentences from English tweets.
+	 * <p>
+	 * Implements a string tokenizer that splits sentences into words as a
+	 * user-defined FlatMapFunction. The function takes a line (String) and
+	 * splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,
+	 * Integer>}).
+	 */
+	public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * Select the language from the incoming JSON text
+		 */
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
+			try {
+				if (getString(value, "user.lang").equals("en")) {
+					// message of tweet
+					StringTokenizer tokenizer = new StringTokenizer(getString(value, "text"));
+
+					// split the message
+					while (tokenizer.hasMoreTokens()) {
+						String result = tokenizer.nextToken().replaceAll("\\s*", "").toLowerCase();
+
+						if (!result.equals("")) {
+							out.collect(new Tuple2<>(result, 1));
+						}
+					}
+				}
+			} catch (JSONException e) {
+				// the JSON was not parsed correctly
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileInput = false;
+	private static boolean fileOutput = false;
+	private static String propertiesPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				fileInput = true;
+				propertiesPath = args[0];
+				outputPath = args[1];
+			} else if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("USAGE:\nTwitterStream [<pathToPropertiesFile>] <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing TwitterStream example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  USAGE: TwitterStream [<pathToPropertiesFile>] <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileInput) {
+			// read the text file from given input path
+			return env.addSource(new TwitterSource(propertiesPath));
+		} else {
+			// get default test text data
+			return env.fromElements(TwitterStreamData.TEXTS);
+		}
+	}
+}


Mime
View raw message