flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [05/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:41:43 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
new file mode 100644
index 0000000..3661726
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
@@ -0,0 +1,518 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-parent</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-streaming-examples</artifactId>
+	<name>flink-streaming-examples</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-scala</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java-examples</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-connectors</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- get default data from flink-java-examples package -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<version>2.9</version><!--$NO-MVN-MAN-VER$-->
+				<executions>
+					<execution>
+						<id>unpack</id>
+						<phase>prepare-package</phase>
+						<goals>
+							<goal>unpack</goal>
+						</goals>
+						<configuration>
+							<artifactItems>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-java-examples</artifactId>
+									<version>${project.version}</version>
+									<type>jar</type>
+									<overWrite>false</overWrite>
+									<outputDirectory>${project.build.directory}/classes</outputDirectory>
+									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
+								</artifactItem>
+							</artifactItems>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- self-contained jars for each example -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				
+				<executions>
+
+					<!-- Iteration -->
+					<execution>
+						<id>Iteration</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>Iteration</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.iteration.IterateExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/iteration/*.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- IncrementalLearning -->
+					<execution>
+						<id>IncrementalLearning</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>IncrementalLearning</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/ml/*.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- Twitter -->
+					<execution>
+						<id>Twitter</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>Twitter</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.twitter.TwitterStream</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/twitter/*.class</include>
+								<include>org/apache/flink/streaming/examples/twitter/util/*.class</include>						
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WindowJoin -->
+					<execution>
+						<id>WindowJoin</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WindowJoin</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.join.WindowJoin</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/join/*.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WordCountPOJO -->
+					<execution>
+						<id>WordCountPOJO</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WordCountPOJO</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WordCount -->
+					<execution>
+						<id>WordCount</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WordCount</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.wordcount.WordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>				
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- SocketTextStreamWordCount -->
+					<execution>
+						<id>SocketTextStreamWordCount</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>SocketTextStreamWordCount</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount$Tokenizer.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- DeltaExract -->
+					<execution>
+						<id>DeltaExract</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>DeltaExract</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.DeltaExtractExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- MultiplePolicies -->
+					<execution>
+						<id>MultiplePolicies</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>MultiplePolicies</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.MultiplePoliciesExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- SlidingExample -->
+					<execution>
+						<id>SlidingExample</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>SlidingExample</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.SlidingExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/SlidingExample.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/SlidingExample$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- TimeWindowing -->
+					<execution>
+						<id>TimeWindowing</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>TimeWindowing</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.TimeWindowingExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+ 
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+					<compilerPlugins>
+					   <compilerPlugin>
+						   <groupId>org.scalamacros</groupId>
+						   <artifactId>paradise_${scala.version}</artifactId>
+						   <version>${scala.macros.version}</version>
+					   </compilerPlugin>
+				   </compilerPlugins>
+				</configuration>
+			</plugin>
+			
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../../../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+
+		</plugins>
+		
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-dependency-plugin</artifactId>
+										<versionRange>[2.9,)</versionRange>
+										<goals>
+											<goal>unpack</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+		
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
new file mode 100644
index 0000000..998e818
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.iteration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Example illustrating iterations in Flink streaming.
+ * 
+ * <p>
+ * The program sums up random numbers and counts additions it performs to reach
+ * a specific threshold in an iterative streaming fashion.
+ * </p>
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>streaming iterations,
+ * <li>buffer timeout to enhance latency,
+ * <li>directed outputs.
+ * </ul>
+ */
+public class IterateExample {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up input for the stream of (0,0) pairs
+		List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
+		for (int i = 0; i < 1000; i++) {
+			input.add(new Tuple2<Double, Integer>(0., 0));
+		}
+
+		// obtain execution environment and set setBufferTimeout(0) to enable
+		// continuous flushing of the output buffers (lowest latency)
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
+				.setBufferTimeout(1);
+
+		// create an iterative data stream from the input with 5 second timeout
+		IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle()
+				.iterate(5000);
+
+		// apply the step function to add new random value to the tuple and to
+		// increment the counter and split the output with the output selector
+		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).split(new MySelector());
+
+		// close the iteration by selecting the tuples that were directed to the
+		// 'iterate' channel in the output selector
+		it.closeWith(step.select("iterate"));
+
+		// to produce the final output select the tuples directed to the
+		// 'output' channel then project it to the desired second field
+
+		DataStream<Tuple1<Integer>> numbers = step.select("output").project(1).types(Integer.class);
+
+		// emit result
+		if (fileOutput) {
+			numbers.writeAsText(outputPath, 1);
+		} else {
+			numbers.print();
+		}
+
+		// execute the program
+		env.execute("Streaming Iteration Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Iteration step function which takes an input (Double , Integer) and
+	 * produces an output (Double + random, Integer + 1).
+	 */
+	public static class Step extends
+			RichMapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
+		private static final long serialVersionUID = 1L;
+		private transient Random rnd;
+
+		public void open(Configuration parameters) {
+			rnd = new Random();
+		}
+
+		@Override
+		public Tuple2<Double, Integer> map(Tuple2<Double, Integer> value) throws Exception {
+			return new Tuple2<Double, Integer>(value.f0 + rnd.nextDouble(), value.f1 + 1);
+		}
+	}
+
+	/**
+	 * OutputSelector testing which tuple needs to be iterated again.
+	 */
+	public static class MySelector implements OutputSelector<Tuple2<Double, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Iterable<String> select(Tuple2<Double, Integer> value) {
+			List<String> output = new ArrayList<String>();
+			if (value.f0 > 100) {
+				output.add("output");
+			} else {
+				output.add("iterate");
+			}
+			return output;
+		}
+
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: IterateExample <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing IterateExample with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: IterateExample <result path>");
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
new file mode 100644
index 0000000..dcfed50
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.join;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.source.RichSourceFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.util.Collector;
+
+/**
+ * Example illustrating join over sliding windows of streams in Flink.
+ * 
+ * <p>
+ * his example will join two streams with a sliding window. One which emits
+ * grades and one which emits salaries of people.
+ * </p>
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>do windowed joins,
+ * <li>use tuple data types,
+ * <li>write a simple streaming program.
+ */
+public class WindowJoin {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// obtain execution environment
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// connect to the data sources for grades and salaries
+		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
+		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
+
+		// apply a temporal join over the two stream based on the names over one
+		// second windows
+		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades
+						.join(salaries)
+						.onWindow(1, TimeUnit.SECONDS)
+						.where(0)
+						.equalTo(0)
+						.with(new MyJoinFunction());
+		
+		// emit result
+		if (fileOutput) {
+			joinedStream.writeAsText(outputPath, 1);
+		} else {
+			joinedStream.print();
+		}
+
+		// execute program
+		env.execute("Windowed Join Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" };
+	private final static int GRADE_COUNT = 5;
+	private final static int SALARY_MAX = 10000;
+	private final static int SLEEP_TIME = 10;
+
+	/**
+	 * Continuously emit tuples with random names and integers (grades).
+	 */
+	public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Random rand;
+		private Tuple2<String, Integer> outTuple;
+
+		public GradeSource() {
+			rand = new Random();
+			outTuple = new Tuple2<String, Integer>();
+		}
+
+		@Override
+		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+			while (true) {
+				outTuple.f0 = names[rand.nextInt(names.length)];
+				outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
+				out.collect(outTuple);
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+			}
+		}
+	}
+
+	/**
+	 * Continuously emit tuples with random names and integers (salaries).
+	 */
+	public static class SalarySource extends RichSourceFunction<Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private transient Random rand;
+		private transient Tuple2<String, Integer> outTuple;
+
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			rand = new Random();
+			outTuple = new Tuple2<String, Integer>();
+		}
+
+		@Override
+		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+			while (true) {
+				outTuple.f0 = names[rand.nextInt(names.length)];
+				outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
+				out.collect(outTuple);
+				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
+			}
+		}
+	}
+
+	public static class MyJoinFunction
+			implements
+			JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
+
+		private static final long serialVersionUID = 1L;
+
+		private Tuple3<String, Integer, Integer> joined = new Tuple3<String, Integer, Integer>();
+
+		@Override
+		public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first,
+				Tuple2<String, Integer> second) throws Exception {
+			joined.f0 = first.f0;
+			joined.f1 = first.f1;
+			joined.f2 = second.f1;
+			return joined;
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: WindowJoin <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WindowJoin with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: WindowJoin <result path>");
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
new file mode 100755
index 0000000..375c86d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.ml;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.util.Collector;
+
+/**
+ * Skeleton for incremental machine learning algorithm consisting of a
+ * pre-computed model, which gets updated for the new inputs and new input data
+ * for which the job provides predictions.
+ * 
+ * <p>
+ * This may serve as a base of a number of algorithms, e.g. updating an
+ * incremental Alternating Least Squares model while also providing the
+ * predictions.
+ * </p>
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Connected streams
+ * <li>CoFunctions
+ * <li>Tuple data types
+ * </ul>
+ */
+public class IncrementalLearningSkeleton {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// build new model on every second of new data
+		DataStream<Double[]> model = env.addSource(new TrainingDataSource())
+				.window(Time.of(5000, TimeUnit.MILLISECONDS))
+				.reduceGroup(new PartialModelBuilder());
+
+		// use partial model for prediction
+		DataStream<Integer> prediction = env.addSource(new NewDataSource()).connect(model)
+				.map(new Predictor());
+
+		// emit result
+		if (fileOutput) {
+			prediction.writeAsText(outputPath, 1);
+		} else {
+			prediction.print();
+		}
+
+		// execute program
+		env.execute("Streaming Incremental Learning");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Feeds new data for prediction. By default it is implemented as constantly
+	 * emitting the Integer 1 in a loop.
+	 */
+	public static class NewDataSource implements SourceFunction<Integer> {
+		private static final long serialVersionUID = 1L;
+		private static final int NEW_DATA_SLEEP_TIME = 1000;
+
+		@Override
+		public void invoke(Collector<Integer> collector) throws Exception {
+			while (true) {
+				collector.collect(getNewData());
+			}
+		}
+
+		private Integer getNewData() throws InterruptedException {
+			Thread.sleep(NEW_DATA_SLEEP_TIME);
+			return 1;
+		}
+	}
+
+	/**
+	 * Feeds new training data for the partial model builder. By default it is
+	 * implemented as constantly emitting the Integer 1 in a loop.
+	 */
+	public static class TrainingDataSource implements SourceFunction<Integer> {
+		private static final long serialVersionUID = 1L;
+		private static final int TRAINING_DATA_SLEEP_TIME = 10;
+
+		@Override
+		public void invoke(Collector<Integer> collector) throws Exception {
+			while (true) {
+				collector.collect(getTrainingData());
+			}
+
+		}
+
+		private Integer getTrainingData() throws InterruptedException {
+			Thread.sleep(TRAINING_DATA_SLEEP_TIME);
+			return 1;
+
+		}
+	}
+
+	/**
+	 * Builds up-to-date partial models on new training data.
+	 */
+	public static class PartialModelBuilder implements GroupReduceFunction<Integer, Double[]> {
+		private static final long serialVersionUID = 1L;
+
+		protected Double[] buildPartialModel(Iterable<Integer> values) {
+			return new Double[] { 1. };
+		}
+
+		@Override
+		public void reduce(Iterable<Integer> values, Collector<Double[]> out) throws Exception {
+			out.collect(buildPartialModel(values));
+		}
+	}
+
+	/**
+	 * Creates prediction using the model produced in batch-processing and the
+	 * up-to-date partial model.
+	 * 
+	 * <p>
+	 * By defaults emits the Integer 0 for every prediction and the Integer 1
+	 * for every model update.
+	 * </p>
+	 */
+	public static class Predictor implements CoMapFunction<Integer, Double[], Integer> {
+		private static final long serialVersionUID = 1L;
+
+		Double[] batchModel = null;
+		Double[] partialModel = null;
+
+		@Override
+		public Integer map1(Integer value) {
+			// Return prediction
+			return predict(value);
+		}
+
+		@Override
+		public Integer map2(Double[] value) {
+			// Update model
+			partialModel = value;
+			batchModel = getBatchModel();
+			return 1;
+		}
+
+		// pulls model built with batch-job on the old training data
+		protected Double[] getBatchModel() {
+			return new Double[] { 0. };
+		}
+
+		// performs prediction using the two models
+		protected Integer predict(Integer inTuple) {
+			return 0;
+		}
+
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 1) {
+				outputPath = args[0];
+			} else {
+				System.err.println("Usage: IncrementalLearningSkeleton <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing IncrementalLearningSkeleton with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: IncrementalLearningSkeleton <result path>");
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
new file mode 100644
index 0000000..e9b60f4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.socket;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
+
+/**
+ * This example shows an implementation of WordCount with data from a text
+ * socket. To run the example make sure that the service providing the text data
+ * is already up and running.
+ * 
+ * <p>
+ * To start an example socket text stream on your local machine run netcat from
+ * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
+ * port number.
+ * 
+ * 
+ * <p>
+ * Usage:
+ * <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result path&gt;</code>
+ * <br>
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>use StreamExecutionEnvironment.socketTextStream
+ * <li>write a simple Flink program,
+ * <li>write and use user-defined functions.
+ * </ul>
+ * 
+ * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
+ */
+public class SocketTextStreamWordCount {
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> text = env.socketTextStream(hostName, port);
+
+		DataStream<Tuple2<String, Integer>> counts =
+		// split up the lines in pairs (2-tuples) containing: (word,1)
+		text.flatMap(new Tokenizer())
+		// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0)
+				.sum(1);
+
+		if (fileOutput) {
+			counts.writeAsText(outputPath, 1);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("WordCount from SocketTextStream Example");
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String hostName;
+	private static int port;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		// parse input arguments
+		if (args.length == 3) {
+			fileOutput = true;
+			hostName = args[0];
+			port = Integer.valueOf(args[1]);
+			outputPath = args[2];
+		} else if (args.length == 2) {
+			hostName = args[0];
+			port = Integer.valueOf(args[1]);
+		} else {
+			System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]");
+			return false;
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
new file mode 100644
index 0000000..1901475
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.twitter;
+
+import java.util.StringTokenizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
+import org.apache.flink.util.Collector;
+import org.apache.sling.commons.json.JSONException;
+
+/**
+ * Implements the "TwitterStream" program that computes a most used word
+ * occurrence over JSON files in a streaming fashion.
+ * 
+ * <p>
+ * The input is a JSON text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>TwitterStream &lt;text path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link TwitterStreamData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>acquire external data,
+ * <li>use in-line defined functions,
+ * <li>handle flattened stream inputs.
+ * </ul>
+ * 
+ */
+public class TwitterStream {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setBufferTimeout(1000);
+
+		// get input data
+		DataStream<String> streamSource = getTextDataStream(env);
+
+		DataStream<Tuple2<String, Integer>> tweets = streamSource
+		// selecting English tweets and splitting to words
+				.flatMap(new SelectEnglishAndTokenizeFlatMap())
+				// returning (word, 1)
+				.map(new MapFunction<String, Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Tuple2<String, Integer> map(String value) throws Exception {
+						return new Tuple2<String, Integer>(value, 1);
+					}
+				})
+				// group by words and sum their occurence
+				.groupBy(0).sum(1)
+				// select word with maximum occurence
+				.flatMap(new SelectMaxOccurence());
+
+		// emit result
+		if (fileOutput) {
+			tweets.writeAsText(outputPath, 1);
+		} else {
+			tweets.print();
+		}
+
+		// execute program
+		env.execute("Twitter Streaming Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Makes sentences from English tweets.
+	 * 
+	 * <p>
+	 * Implements a string tokenizer that splits sentences into words as a
+	 * user-defined FlatMapFunction. The function takes a line (String) and
+	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+	 * Integer>).
+	 * </p>
+	 */
+	public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * Select the language from the incoming JSON text
+		 */
+		@Override
+		public void flatMap(String value, Collector<String> out) throws Exception {
+			try {
+				if (getString(value, "lang").equals("en")) {
+					// message of tweet
+					StringTokenizer tokenizer = new StringTokenizer(getString(value, "text"));
+
+					// split the message
+					while (tokenizer.hasMoreTokens()) {
+						String result = tokenizer.nextToken().replaceAll("\\s*", "");
+
+						if (result != null && !result.equals("")) {
+							out.collect(result);
+						}
+					}
+				}
+			} catch (JSONException e) {
+
+			}
+		}
+	}
+
+	/**
+	 * Implements a user-defined FlatMapFunction that checks if the current
+	 * occurence is higher than the maximum occurence. If so, returns the word
+	 * and changes the maximum.
+	 */
+	public static class SelectMaxOccurence implements
+			FlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+		private Integer maximum;
+
+		public SelectMaxOccurence() {
+			this.maximum = 0;
+		}
+
+		@Override
+		public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out)
+				throws Exception {
+			if ((Integer) value.getField(1) >= maximum) {
+				out.collect(value);
+				maximum = (Integer) value.getField(1);
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing TwitterStream example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  USAGE: TwitterStream <pathToPropertiesFile>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return env.fromElements(TwitterStreamData.TEXTS);
+		}
+	}
+}


Mime
View raw message