flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [30/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:42:08 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
deleted file mode 100644
index 3661726..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ /dev/null
@@ -1,518 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-parent</artifactId>
-		<version>0.9-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-streaming-examples</artifactId>
-	<name>flink-streaming-examples</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java-examples</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-connectors</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<!-- get default data from flink-java-examples package -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-dependency-plugin</artifactId>
-				<version>2.9</version><!--$NO-MVN-MAN-VER$-->
-				<executions>
-					<execution>
-						<id>unpack</id>
-						<phase>prepare-package</phase>
-						<goals>
-							<goal>unpack</goal>
-						</goals>
-						<configuration>
-							<artifactItems>
-								<artifactItem>
-									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-java-examples</artifactId>
-									<version>${project.version}</version>
-									<type>jar</type>
-									<overWrite>false</overWrite>
-									<outputDirectory>${project.build.directory}/classes</outputDirectory>
-									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
-								</artifactItem>
-							</artifactItems>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!-- self-contained jars for each example -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				
-				<executions>
-
-					<!-- Iteration -->
-					<execution>
-						<id>Iteration</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>Iteration</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.iteration.IterateExample</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/iteration/*.class</include>			
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- IncrementalLearning -->
-					<execution>
-						<id>IncrementalLearning</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>IncrementalLearning</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/ml/*.class</include>			
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- Twitter -->
-					<execution>
-						<id>Twitter</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>Twitter</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.twitter.TwitterStream</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/twitter/*.class</include>
-								<include>org/apache/flink/streaming/examples/twitter/util/*.class</include>						
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- WindowJoin -->
-					<execution>
-						<id>WindowJoin</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>WindowJoin</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.join.WindowJoin</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/join/*.class</include>			
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- WordCountPOJO -->
-					<execution>
-						<id>WordCountPOJO</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>WordCountPOJO</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
-								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>			
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- WordCount -->
-					<execution>
-						<id>WordCount</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>WordCount</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.wordcount.WordCount</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
-								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>				
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- SocketTextStreamWordCount -->
-					<execution>
-						<id>SocketTextStreamWordCount</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>SocketTextStreamWordCount</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/WordCount$Tokenizer.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- DeltaExract -->
-					<execution>
-						<id>DeltaExract</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>DeltaExract</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.windowing.DeltaExtractExample</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample.class</include>
-								<include>org/apache/flink/streaming/examples/windowing/DeltaExtractExample$*.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- MultiplePolicies -->
-					<execution>
-						<id>MultiplePolicies</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>MultiplePolicies</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.windowing.MultiplePoliciesExample</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample.class</include>
-								<include>org/apache/flink/streaming/examples/windowing/MultiplePoliciesExample$*.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- SlidingExample -->
-					<execution>
-						<id>SlidingExample</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>SlidingExample</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.windowing.SlidingExample</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/windowing/SlidingExample.class</include>
-								<include>org/apache/flink/streaming/examples/windowing/SlidingExample$*.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- TimeWindowing -->
-					<execution>
-						<id>TimeWindowing</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>TimeWindowing</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.windowing.TimeWindowingExample</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample.class</include>
-								<include>org/apache/flink/streaming/examples/windowing/TimeWindowingExample$*.class</include>
-							</includes>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-<!-- Scala Compiler -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<version>3.1.4</version>
-				<executions>
-					<!-- Run scala compiler in the process-resources phase, so that dependencies on
-						scala classes can be resolved later in the (Java) compile phase -->
-					<execution>
-						<id>scala-compile-first</id>
-						<phase>process-resources</phase>
-						<goals>
-							<goal>compile</goal>
-						</goals>
-					</execution>
- 
-					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
-						 scala classes can be resolved later in the (Java) test-compile phase -->
-					<execution>
-						<id>scala-test-compile</id>
-						<phase>process-test-resources</phase>
-						<goals>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<jvmArgs>
-						<jvmArg>-Xms128m</jvmArg>
-						<jvmArg>-Xmx512m</jvmArg>
-					</jvmArgs>
-					<compilerPlugins>
-					   <compilerPlugin>
-						   <groupId>org.scalamacros</groupId>
-						   <artifactId>paradise_${scala.version}</artifactId>
-						   <version>${scala.macros.version}</version>
-					   </compilerPlugin>
-				   </compilerPlugins>
-				</configuration>
-			</plugin>
-			
-			<!-- Eclipse Integration -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-eclipse-plugin</artifactId>
-				<version>2.8</version>
-				<configuration>
-					<downloadSources>true</downloadSources>
-					<projectnatures>
-						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-					</projectnatures>
-					<buildcommands>
-						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-					</buildcommands>
-					<classpathContainers>
-						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-					</classpathContainers>
-					<excludes>
-						<exclude>org.scala-lang:scala-library</exclude>
-						<exclude>org.scala-lang:scala-compiler</exclude>
-					</excludes>
-					<sourceIncludes>
-						<sourceInclude>**/*.scala</sourceInclude>
-						<sourceInclude>**/*.java</sourceInclude>
-					</sourceIncludes>
-				</configuration>
-			</plugin>
-
-			<!-- Adding scala source directories to build path -->
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<!-- Add src/main/scala to eclipse build path -->
-					<execution>
-						<id>add-source</id>
-						<phase>generate-sources</phase>
-						<goals>
-							<goal>add-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/main/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-					<!-- Add src/test/scala to eclipse build path -->
-					<execution>
-						<id>add-test-source</id>
-						<phase>generate-test-sources</phase>
-						<goals>
-							<goal>add-test-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/test/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<plugin>
-				<groupId>org.scalastyle</groupId>
-				<artifactId>scalastyle-maven-plugin</artifactId>
-				<version>0.5.0</version>
-				<executions>
-					<execution>
-						<goals>
-							<goal>check</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<verbose>false</verbose>
-					<failOnViolation>true</failOnViolation>
-					<includeTestSourceDirectory>true</includeTestSourceDirectory>
-					<failOnWarning>false</failOnWarning>
-					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
-					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
-					<configLocation>${project.basedir}/../../../tools/maven/scalastyle-config.xml</configLocation>
-					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
-					<outputEncoding>UTF-8</outputEncoding>
-				</configuration>
-			</plugin>
-
-		</plugins>
-		
-		<pluginManagement>
-			<plugins>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-dependency-plugin</artifactId>
-										<versionRange>[2.9,)</versionRange>
-										<goals>
-											<goal>unpack</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-		
-	</build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
deleted file mode 100644
index 998e818..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.iteration;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.IterativeDataStream;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * Example illustrating iterations in Flink streaming.
- * 
- * <p>
- * The program sums up random numbers and counts additions it performs to reach
- * a specific threshold in an iterative streaming fashion.
- * </p>
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>streaming iterations,
- * <li>buffer timeout to enhance latency,
- * <li>directed outputs.
- * </ul>
- */
-public class IterateExample {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up input for the stream of (0,0) pairs
-		List<Tuple2<Double, Integer>> input = new ArrayList<Tuple2<Double, Integer>>();
-		for (int i = 0; i < 1000; i++) {
-			input.add(new Tuple2<Double, Integer>(0., 0));
-		}
-
-		// obtain execution environment and set setBufferTimeout(0) to enable
-		// continuous flushing of the output buffers (lowest latency)
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
-				.setBufferTimeout(1);
-
-		// create an iterative data stream from the input with 5 second timeout
-		IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle()
-				.iterate(5000);
-
-		// apply the step function to add new random value to the tuple and to
-		// increment the counter and split the output with the output selector
-		SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).split(new MySelector());
-
-		// close the iteration by selecting the tuples that were directed to the
-		// 'iterate' channel in the output selector
-		it.closeWith(step.select("iterate"));
-
-		// to produce the final output select the tuples directed to the
-		// 'output' channel then project it to the desired second field
-
-		DataStream<Tuple1<Integer>> numbers = step.select("output").project(1).types(Integer.class);
-
-		// emit result
-		if (fileOutput) {
-			numbers.writeAsText(outputPath, 1);
-		} else {
-			numbers.print();
-		}
-
-		// execute the program
-		env.execute("Streaming Iteration Example");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * Iteration step function which takes an input (Double , Integer) and
-	 * produces an output (Double + random, Integer + 1).
-	 */
-	public static class Step extends
-			RichMapFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>> {
-		private static final long serialVersionUID = 1L;
-		private transient Random rnd;
-
-		public void open(Configuration parameters) {
-			rnd = new Random();
-		}
-
-		@Override
-		public Tuple2<Double, Integer> map(Tuple2<Double, Integer> value) throws Exception {
-			return new Tuple2<Double, Integer>(value.f0 + rnd.nextDouble(), value.f1 + 1);
-		}
-	}
-
-	/**
-	 * OutputSelector testing which tuple needs to be iterated again.
-	 */
-	public static class MySelector implements OutputSelector<Tuple2<Double, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Iterable<String> select(Tuple2<Double, Integer> value) {
-			List<String> output = new ArrayList<String>();
-			if (value.f0 > 100) {
-				output.add("output");
-			} else {
-				output.add("iterate");
-			}
-			return output;
-		}
-
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 1) {
-				outputPath = args[0];
-			} else {
-				System.err.println("Usage: IterateExample <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing IterateExample with generated data.");
-			System.out.println("  Provide parameter to write to file.");
-			System.out.println("  Usage: IterateExample <result path>");
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
deleted file mode 100644
index dcfed50..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.source.RichSourceFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * Example illustrating join over sliding windows of streams in Flink.
- * 
- * <p>
- * his example will join two streams with a sliding window. One which emits
- * grades and one which emits salaries of people.
- * </p>
- * 
- * <p>
- * This example shows how to:
- * <ul>
- * <li>do windowed joins,
- * <li>use tuple data types,
- * <li>write a simple streaming program.
- */
-public class WindowJoin {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// obtain execution environment
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// connect to the data sources for grades and salaries
-		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource());
-		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource());
-
-		// apply a temporal join over the two stream based on the names over one
-		// second windows
-		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades
-						.join(salaries)
-						.onWindow(1, TimeUnit.SECONDS)
-						.where(0)
-						.equalTo(0)
-						.with(new MyJoinFunction());
-		
-		// emit result
-		if (fileOutput) {
-			joinedStream.writeAsText(outputPath, 1);
-		} else {
-			joinedStream.print();
-		}
-
-		// execute program
-		env.execute("Windowed Join Example");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	private final static String[] names = { "tom", "jerry", "alice", "bob", "john", "grace" };
-	private final static int GRADE_COUNT = 5;
-	private final static int SALARY_MAX = 10000;
-	private final static int SLEEP_TIME = 10;
-
-	/**
-	 * Continuously emit tuples with random names and integers (grades).
-	 */
-	public static class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private Random rand;
-		private Tuple2<String, Integer> outTuple;
-
-		public GradeSource() {
-			rand = new Random();
-			outTuple = new Tuple2<String, Integer>();
-		}
-
-		@Override
-		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
-			while (true) {
-				outTuple.f0 = names[rand.nextInt(names.length)];
-				outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
-				out.collect(outTuple);
-				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-			}
-		}
-	}
-
-	/**
-	 * Continuously emit tuples with random names and integers (salaries).
-	 */
-	public static class SalarySource extends RichSourceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private transient Random rand;
-		private transient Tuple2<String, Integer> outTuple;
-
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			rand = new Random();
-			outTuple = new Tuple2<String, Integer>();
-		}
-
-		@Override
-		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
-			while (true) {
-				outTuple.f0 = names[rand.nextInt(names.length)];
-				outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
-				out.collect(outTuple);
-				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-			}
-		}
-	}
-
-	public static class MyJoinFunction
-			implements
-			JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		private Tuple3<String, Integer, Integer> joined = new Tuple3<String, Integer, Integer>();
-
-		@Override
-		public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first,
-				Tuple2<String, Integer> second) throws Exception {
-			joined.f0 = first.f0;
-			joined.f1 = first.f1;
-			joined.f2 = second.f1;
-			return joined;
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 1) {
-				outputPath = args[0];
-			} else {
-				System.err.println("Usage: WindowJoin <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WindowJoin with generated data.");
-			System.out.println("  Provide parameter to write to file.");
-			System.out.println("  Usage: WindowJoin <result path>");
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
deleted file mode 100755
index 375c86d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.ml;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.util.Collector;
-
-/**
- * Skeleton for incremental machine learning algorithm consisting of a
- * pre-computed model, which gets updated for the new inputs and new input data
- * for which the job provides predictions.
- * 
- * <p>
- * This may serve as a base of a number of algorithms, e.g. updating an
- * incremental Alternating Least Squares model while also providing the
- * predictions.
- * </p>
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>Connected streams
- * <li>CoFunctions
- * <li>Tuple data types
- * </ul>
- */
-public class IncrementalLearningSkeleton {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// build new model on every second of new data
-		DataStream<Double[]> model = env.addSource(new TrainingDataSource())
-				.window(Time.of(5000, TimeUnit.MILLISECONDS))
-				.reduceGroup(new PartialModelBuilder());
-
-		// use partial model for prediction
-		DataStream<Integer> prediction = env.addSource(new NewDataSource()).connect(model)
-				.map(new Predictor());
-
-		// emit result
-		if (fileOutput) {
-			prediction.writeAsText(outputPath, 1);
-		} else {
-			prediction.print();
-		}
-
-		// execute program
-		env.execute("Streaming Incremental Learning");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * Feeds new data for prediction. By default it is implemented as constantly
-	 * emitting the Integer 1 in a loop.
-	 */
-	public static class NewDataSource implements SourceFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-		private static final int NEW_DATA_SLEEP_TIME = 1000;
-
-		@Override
-		public void invoke(Collector<Integer> collector) throws Exception {
-			while (true) {
-				collector.collect(getNewData());
-			}
-		}
-
-		private Integer getNewData() throws InterruptedException {
-			Thread.sleep(NEW_DATA_SLEEP_TIME);
-			return 1;
-		}
-	}
-
-	/**
-	 * Feeds new training data for the partial model builder. By default it is
-	 * implemented as constantly emitting the Integer 1 in a loop.
-	 */
-	public static class TrainingDataSource implements SourceFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-		private static final int TRAINING_DATA_SLEEP_TIME = 10;
-
-		@Override
-		public void invoke(Collector<Integer> collector) throws Exception {
-			while (true) {
-				collector.collect(getTrainingData());
-			}
-
-		}
-
-		private Integer getTrainingData() throws InterruptedException {
-			Thread.sleep(TRAINING_DATA_SLEEP_TIME);
-			return 1;
-
-		}
-	}
-
-	/**
-	 * Builds up-to-date partial models on new training data.
-	 */
-	public static class PartialModelBuilder implements GroupReduceFunction<Integer, Double[]> {
-		private static final long serialVersionUID = 1L;
-
-		protected Double[] buildPartialModel(Iterable<Integer> values) {
-			return new Double[] { 1. };
-		}
-
-		@Override
-		public void reduce(Iterable<Integer> values, Collector<Double[]> out) throws Exception {
-			out.collect(buildPartialModel(values));
-		}
-	}
-
-	/**
-	 * Creates prediction using the model produced in batch-processing and the
-	 * up-to-date partial model.
-	 * 
-	 * <p>
-	 * By defaults emits the Integer 0 for every prediction and the Integer 1
-	 * for every model update.
-	 * </p>
-	 */
-	public static class Predictor implements CoMapFunction<Integer, Double[], Integer> {
-		private static final long serialVersionUID = 1L;
-
-		Double[] batchModel = null;
-		Double[] partialModel = null;
-
-		@Override
-		public Integer map1(Integer value) {
-			// Return prediction
-			return predict(value);
-		}
-
-		@Override
-		public Integer map2(Double[] value) {
-			// Update model
-			partialModel = value;
-			batchModel = getBatchModel();
-			return 1;
-		}
-
-		// pulls model built with batch-job on the old training data
-		protected Double[] getBatchModel() {
-			return new Double[] { 0. };
-		}
-
-		// performs prediction using the two models
-		protected Integer predict(Integer inTuple) {
-			return 0;
-		}
-
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 1) {
-				outputPath = args[0];
-			} else {
-				System.err.println("Usage: IncrementalLearningSkeleton <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing IncrementalLearningSkeleton with generated data.");
-			System.out.println("  Provide parameter to write to file.");
-			System.out.println("  Usage: IncrementalLearningSkeleton <result path>");
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
deleted file mode 100644
index e9b60f4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.socket;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
-
-/**
- * This example shows an implementation of WordCount with data from a text
- * socket. To run the example make sure that the service providing the text data
- * is already up and running.
- * 
- * <p>
- * To start an example socket text stream on your local machine run netcat from
- * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
- * port number.
- * 
- * 
- * <p>
- * Usage:
- * <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port&gt; &lt;result path&gt;</code>
- * <br>
- * 
- * <p>
- * This example shows how to:
- * <ul>
- * <li>use StreamExecutionEnvironment.socketTextStream
- * <li>write a simple Flink program,
- * <li>write and use user-defined functions.
- * </ul>
- * 
- * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
- */
-public class SocketTextStreamWordCount {
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.getExecutionEnvironment();
-
-		// get input data
-		DataStream<String> text = env.socketTextStream(hostName, port);
-
-		DataStream<Tuple2<String, Integer>> counts =
-		// split up the lines in pairs (2-tuples) containing: (word,1)
-		text.flatMap(new Tokenizer())
-		// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0)
-				.sum(1);
-
-		if (fileOutput) {
-			counts.writeAsText(outputPath, 1);
-		} else {
-			counts.print();
-		}
-
-		// execute program
-		env.execute("WordCount from SocketTextStream Example");
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String hostName;
-	private static int port;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		// parse input arguments
-		if (args.length == 3) {
-			fileOutput = true;
-			hostName = args[0];
-			port = Integer.valueOf(args[1]);
-			outputPath = args[2];
-		} else if (args.length == 2) {
-			hostName = args[0];
-			port = Integer.valueOf(args[1]);
-		} else {
-			System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]");
-			return false;
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
deleted file mode 100644
index 1901475..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterStream.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.twitter;
-
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
-import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
-import org.apache.flink.util.Collector;
-import org.apache.sling.commons.json.JSONException;
-
-/**
- * Implements the "TwitterStream" program that computes a most used word
- * occurrence over JSON files in a streaming fashion.
- * 
- * <p>
- * The input is a JSON text file with lines separated by newline characters.
- * 
- * <p>
- * Usage: <code>TwitterStream &lt;text path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link TwitterStreamData}.
- * 
- * <p>
- * This example shows how to:
- * <ul>
- * <li>acquire external data,
- * <li>use in-line defined functions,
- * <li>handle flattened stream inputs.
- * </ul>
- * 
- */
-public class TwitterStream {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception {
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		env.setBufferTimeout(1000);
-
-		// get input data
-		DataStream<String> streamSource = getTextDataStream(env);
-
-		DataStream<Tuple2<String, Integer>> tweets = streamSource
-		// selecting English tweets and splitting to words
-				.flatMap(new SelectEnglishAndTokenizeFlatMap())
-				// returning (word, 1)
-				.map(new MapFunction<String, Tuple2<String, Integer>>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Tuple2<String, Integer> map(String value) throws Exception {
-						return new Tuple2<String, Integer>(value, 1);
-					}
-				})
-				// group by words and sum their occurence
-				.groupBy(0).sum(1)
-				// select word with maximum occurence
-				.flatMap(new SelectMaxOccurence());
-
-		// emit result
-		if (fileOutput) {
-			tweets.writeAsText(outputPath, 1);
-		} else {
-			tweets.print();
-		}
-
-		// execute program
-		env.execute("Twitter Streaming Example");
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	/**
-	 * Makes sentences from English tweets.
-	 * 
-	 * <p>
-	 * Implements a string tokenizer that splits sentences into words as a
-	 * user-defined FlatMapFunction. The function takes a line (String) and
-	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
-	 * Integer>).
-	 * </p>
-	 */
-	public static class SelectEnglishAndTokenizeFlatMap extends JSONParseFlatMap<String, String> {
-		private static final long serialVersionUID = 1L;
-
-		/**
-		 * Select the language from the incoming JSON text
-		 */
-		@Override
-		public void flatMap(String value, Collector<String> out) throws Exception {
-			try {
-				if (getString(value, "lang").equals("en")) {
-					// message of tweet
-					StringTokenizer tokenizer = new StringTokenizer(getString(value, "text"));
-
-					// split the message
-					while (tokenizer.hasMoreTokens()) {
-						String result = tokenizer.nextToken().replaceAll("\\s*", "");
-
-						if (result != null && !result.equals("")) {
-							out.collect(result);
-						}
-					}
-				}
-			} catch (JSONException e) {
-
-			}
-		}
-	}
-
-	/**
-	 * Implements a user-defined FlatMapFunction that checks if the current
-	 * occurence is higher than the maximum occurence. If so, returns the word
-	 * and changes the maximum.
-	 */
-	public static class SelectMaxOccurence implements
-			FlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-		private Integer maximum;
-
-		public SelectMaxOccurence() {
-			this.maximum = 0;
-		}
-
-		@Override
-		public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out)
-				throws Exception {
-			if ((Integer) value.getField(1) >= maximum) {
-				out.collect(value);
-				maximum = (Integer) value.getField(1);
-			}
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String textPath;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if (args.length == 2) {
-				textPath = args[0];
-				outputPath = args[1];
-			} else {
-				System.err.println("USAGE:\nTwitterStream <pathToPropertiesFile> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing TwitterStream example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  USAGE: TwitterStream <pathToPropertiesFile>");
-		}
-		return true;
-	}
-
-	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
-		if (fileOutput) {
-			// read the text file from given input path
-			return env.readTextFile(textPath);
-		} else {
-			// get default test text data
-			return env.fromElements(TwitterStreamData.TEXTS);
-		}
-	}
-}


Mime
View raw message