flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [14/50] flink git commit: [FLINK-3195] [examples] Consolidate batch examples into one project, unify batch and streaming examples under on parent project
Date Thu, 14 Jan 2016 16:16:11 GMT
[FLINK-3195] [examples] Consolidate batch examples into one project, unify batch and streaming examples under on parent project


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0e1d635
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0e1d635
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0e1d635

Branch: refs/heads/master
Commit: d0e1d635d9e6a4ef157275099e2e787b3b18c0ed
Parents: 62938c1
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Dec 23 20:17:56 2015 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Thu Jan 14 11:18:58 2016 +0100

----------------------------------------------------------------------
 docs/apis/cli.md                                |  18 +-
 docs/apis/examples.md                           |   4 +-
 docs/setup/gce_setup.md                         |   2 +-
 docs/setup/yarn_setup.md                        |   6 +-
 flink-contrib/flink-storm-examples/pom.xml      |   6 +-
 flink-dist/pom.xml                              |   8 +-
 flink-dist/src/main/assemblies/bin.xml          |  32 +-
 flink-examples/flink-examples-batch/pom.xml     | 400 +++++++++++++
 .../flink/examples/java/clustering/KMeans.java  | 343 ++++++++++++
 .../java/clustering/util/KMeansData.java        | 104 ++++
 .../clustering/util/KMeansDataGenerator.java    | 189 +++++++
 .../flink/examples/java/distcp/DistCp.java      | 182 ++++++
 .../examples/java/distcp/FileCopyTask.java      |  59 ++
 .../java/distcp/FileCopyTaskInputFormat.java    | 116 ++++
 .../java/distcp/FileCopyTaskInputSplit.java     |  46 ++
 .../java/graph/ConnectedComponents.java         | 243 ++++++++
 .../examples/java/graph/EnumTrianglesBasic.java | 231 ++++++++
 .../examples/java/graph/EnumTrianglesOpt.java   | 356 ++++++++++++
 .../flink/examples/java/graph/PageRank.java     | 286 ++++++++++
 .../java/graph/TransitiveClosureNaive.java      | 154 +++++
 .../graph/util/ConnectedComponentsData.java     |  72 +++
 .../java/graph/util/EnumTrianglesData.java      |  58 ++
 .../java/graph/util/EnumTrianglesDataTypes.java | 116 ++++
 .../examples/java/graph/util/PageRankData.java  |  86 +++
 .../java/misc/CollectionExecutionExample.java   |  96 ++++
 .../flink/examples/java/misc/PiEstimation.java  |  99 ++++
 .../examples/java/ml/LinearRegression.java      | 316 +++++++++++
 .../java/ml/util/LinearRegressionData.java      |  71 +++
 .../ml/util/LinearRegressionDataGenerator.java  | 112 ++++
 .../relational/EmptyFieldsCountAccumulator.java | 255 +++++++++
 .../examples/java/relational/TPCHQuery10.java   | 234 ++++++++
 .../examples/java/relational/TPCHQuery3.java    | 271 +++++++++
 .../java/relational/WebLogAnalysis.java         | 325 +++++++++++
 .../java/relational/util/WebLogData.java        | 427 ++++++++++++++
 .../relational/util/WebLogDataGenerator.java    | 210 +++++++
 .../examples/java/wordcount/WordCount.java      | 148 +++++
 .../examples/java/wordcount/WordCountPojo.java  | 173 ++++++
 .../java/wordcount/util/WordCountData.java      |  72 +++
 .../src/main/resources/log4j-test.properties    |  23 +
 .../src/main/resources/log4j.properties         |  23 +
 .../src/main/resources/logback.xml              |  29 +
 .../examples/scala/clustering/KMeans.scala      | 255 +++++++++
 .../scala/graph/ConnectedComponents.scala       | 168 ++++++
 .../examples/scala/graph/DeltaPageRank.scala    | 104 ++++
 .../scala/graph/EnumTrianglesBasic.scala        | 185 ++++++
 .../examples/scala/graph/EnumTrianglesOpt.scala | 253 +++++++++
 .../examples/scala/graph/PageRankBasic.scala    | 210 +++++++
 .../scala/graph/TransitiveClosureNaive.scala    | 116 ++++
 .../examples/scala/misc/PiEstimation.scala      |  52 ++
 .../examples/scala/ml/LinearRegression.scala    | 195 +++++++
 .../examples/scala/relational/TPCHQuery10.scala | 184 ++++++
 .../examples/scala/relational/TPCHQuery3.scala  | 172 ++++++
 .../scala/relational/WebLogAnalysis.scala       | 211 +++++++
 .../examples/scala/wordcount/WordCount.scala    | 101 ++++
 flink-examples/flink-examples-streaming/pom.xml | 559 +++++++++++++++++++
 .../examples/iteration/IterateExample.java      | 246 ++++++++
 .../iteration/util/IterateExampleData.java      |  32 ++
 .../streaming/examples/join/WindowJoin.java     | 296 ++++++++++
 .../examples/join/util/WindowJoinData.java      |  61 ++
 .../ml/IncrementalLearningSkeleton.java         | 254 +++++++++
 .../util/IncrementalLearningSkeletonData.java   |  32 ++
 .../socket/SocketTextStreamWordCount.java       | 105 ++++
 .../examples/twitter/TwitterStream.java         | 164 ++++++
 .../twitter/util/TwitterStreamData.java         |  32 ++
 .../GroupedProcessingTimeWindowExample.java     | 127 +++++
 .../examples/windowing/SessionWindowing.java    | 167 ++++++
 .../examples/windowing/TopSpeedWindowing.java   | 210 +++++++
 .../examples/windowing/WindowWordCount.java     | 132 +++++
 .../windowing/util/SessionWindowingData.java    |  27 +
 .../util/TopSpeedWindowingExampleData.java      | 276 +++++++++
 .../examples/wordcount/PojoExample.java         | 186 ++++++
 .../streaming/examples/wordcount/WordCount.java | 148 +++++
 .../scala/examples/join/WindowJoin.scala        | 156 ++++++
 .../socket/SocketTextStreamWordCount.scala      |  93 +++
 .../examples/windowing/TopSpeedWindowing.scala  | 150 +++++
 .../iteration/IterateExampleITCase.java         |  45 ++
 .../join/WindowJoinITCase.java                  |  50 ++
 .../ml/IncrementalLearningSkeletonITCase.java   |  42 ++
 .../socket/SocketTextStreamWordCountITCase.java |  30 +
 .../twitter/TwitterStreamITCase.java            |  42 ++
 .../windowing/SessionWindowingITCase.java       |  42 ++
 .../TopSpeedWindowingExampleITCase.java         |  45 ++
 .../windowing/WindowWordCountITCase.java        |  50 ++
 .../wordcount/PojoExampleITCase.java            |  45 ++
 .../wordcount/WordCountITCase.java              |  45 ++
 .../join/WindowJoinITCase.java                  |  50 ++
 .../socket/SocketTextStreamWordCountITCase.java |  30 +
 .../TopSpeedWindowingExampleITCase.java         |  45 ++
 flink-examples/flink-java-examples/pom.xml      | 330 -----------
 .../flink/examples/java/clustering/KMeans.java  | 344 ------------
 .../java/clustering/util/KMeansData.java        | 105 ----
 .../clustering/util/KMeansDataGenerator.java    | 189 -------
 .../flink/examples/java/distcp/DistCp.java      | 182 ------
 .../examples/java/distcp/FileCopyTask.java      |  59 --
 .../java/distcp/FileCopyTaskInputFormat.java    | 116 ----
 .../java/distcp/FileCopyTaskInputSplit.java     |  46 --
 .../java/graph/ConnectedComponents.java         | 243 --------
 .../examples/java/graph/EnumTrianglesBasic.java | 231 --------
 .../examples/java/graph/EnumTrianglesOpt.java   | 358 ------------
 .../examples/java/graph/PageRankBasic.java      | 288 ----------
 .../java/graph/TransitiveClosureNaive.java      | 155 -----
 .../graph/util/ConnectedComponentsData.java     |  73 ---
 .../java/graph/util/EnumTrianglesData.java      |  59 --
 .../java/graph/util/EnumTrianglesDataTypes.java | 117 ----
 .../examples/java/graph/util/PageRankData.java  |  87 ---
 .../java/misc/CollectionExecutionExample.java   |  96 ----
 .../flink/examples/java/misc/PiEstimation.java  |  99 ----
 .../examples/java/ml/LinearRegression.java      | 317 -----------
 .../java/ml/util/LinearRegressionData.java      |  72 ---
 .../ml/util/LinearRegressionDataGenerator.java  | 113 ----
 .../relational/EmptyFieldsCountAccumulator.java | 254 ---------
 .../examples/java/relational/TPCHQuery10.java   | 234 --------
 .../examples/java/relational/TPCHQuery3.java    | 272 ---------
 .../java/relational/WebLogAnalysis.java         | 327 -----------
 .../java/relational/util/WebLogData.java        | 428 --------------
 .../relational/util/WebLogDataGenerator.java    | 211 -------
 .../examples/java/wordcount/PojoExample.java    | 171 ------
 .../examples/java/wordcount/WordCount.java      | 148 -----
 .../examples/java/wordcount/WordCountMeta.java  |  54 --
 .../java/wordcount/util/WordCountData.java      |  72 ---
 .../src/main/resources/log4j-test.properties    |  23 -
 .../src/main/resources/log4j.properties         |  23 -
 .../src/main/resources/logback.xml              |  29 -
 flink-examples/flink-scala-examples/pom.xml     | 487 ----------------
 .../org/apache/flink/examples/scala/Dummy.java  |  27 -
 .../examples/scala/clustering/KMeans.scala      | 255 ---------
 .../scala/graph/ConnectedComponents.scala       | 167 ------
 .../examples/scala/graph/DeltaPageRank.scala    | 104 ----
 .../scala/graph/EnumTrianglesBasic.scala        | 185 ------
 .../examples/scala/graph/EnumTrianglesOpt.scala | 253 ---------
 .../examples/scala/graph/PageRankBasic.scala    | 210 -------
 .../scala/graph/TransitiveClosureNaive.scala    | 115 ----
 .../examples/scala/misc/PiEstimation.scala      |  52 --
 .../examples/scala/ml/LinearRegression.scala    | 195 -------
 .../examples/scala/relational/TPCHQuery10.scala | 184 ------
 .../examples/scala/relational/TPCHQuery3.scala  | 172 ------
 .../scala/relational/WebLogAnalysis.scala       | 210 -------
 .../examples/scala/wordcount/WordCount.scala    | 100 ----
 flink-examples/pom.xml                          |   4 +-
 flink-staging/flink-fs-tests/pom.xml            |   2 +-
 flink-staging/flink-table/pom.xml               |   2 +-
 flink-staging/flink-tez/pom.xml                 |   2 +-
 flink-streaming-examples/pom.xml                | 535 ------------------
 .../examples/iteration/IterateExample.java      | 246 --------
 .../iteration/util/IterateExampleData.java      |  32 --
 .../streaming/examples/join/WindowJoin.java     | 296 ----------
 .../examples/join/util/WindowJoinData.java      |  61 --
 .../ml/IncrementalLearningSkeleton.java         | 254 ---------
 .../util/IncrementalLearningSkeletonData.java   |  32 --
 .../socket/SocketTextStreamWordCount.java       | 105 ----
 .../examples/twitter/TwitterStream.java         | 164 ------
 .../twitter/util/TwitterStreamData.java         |  32 --
 .../GroupedProcessingTimeWindowExample.java     | 127 -----
 .../examples/windowing/SessionWindowing.java    | 167 ------
 .../examples/windowing/TopSpeedWindowing.java   | 210 -------
 .../examples/windowing/WindowWordCount.java     | 132 -----
 .../windowing/util/SessionWindowingData.java    |  27 -
 .../util/TopSpeedWindowingExampleData.java      | 276 ---------
 .../examples/wordcount/PojoExample.java         | 186 ------
 .../streaming/examples/wordcount/WordCount.java | 148 -----
 .../scala/examples/join/WindowJoin.scala        | 156 ------
 .../socket/SocketTextStreamWordCount.scala      |  93 ---
 .../examples/windowing/TopSpeedWindowing.scala  | 150 -----
 .../iteration/IterateExampleITCase.java         |  45 --
 .../join/WindowJoinITCase.java                  |  50 --
 .../ml/IncrementalLearningSkeletonITCase.java   |  42 --
 .../socket/SocketTextStreamWordCountITCase.java |  30 -
 .../twitter/TwitterStreamITCase.java            |  42 --
 .../windowing/SessionWindowingITCase.java       |  42 --
 .../TopSpeedWindowingExampleITCase.java         |  45 --
 .../windowing/WindowWordCountITCase.java        |  50 --
 .../wordcount/PojoExampleITCase.java            |  45 --
 .../wordcount/WordCountITCase.java              |  45 --
 .../join/WindowJoinITCase.java                  |  50 --
 .../socket/SocketTextStreamWordCountITCase.java |  30 -
 .../TopSpeedWindowingExampleITCase.java         |  45 --
 flink-tests/pom.xml                             |  12 +-
 .../exampleJavaPrograms/PageRankITCase.java     |   8 +-
 .../iterations/PageRankCompilerTest.java        |  10 +-
 .../jsonplan/DumpCompiledPlanTest.java          |   4 +-
 .../optimizer/jsonplan/PreviewPlanDumpTest.java |   4 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  13 +-
 pom.xml                                         |   1 -
 183 files changed, 12215 insertions(+), 12699 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/docs/apis/cli.md
----------------------------------------------------------------------
diff --git a/docs/apis/cli.md b/docs/apis/cli.md
index 78ea4b6..1150123 100644
--- a/docs/apis/cli.md
+++ b/docs/apis/cli.md
@@ -46,47 +46,47 @@ The command line can be used to
 
 -   Run example program with no arguments.
 
-        ./bin/flink run ./examples/WordCount.jar
+        ./bin/flink run ./examples/batch/WordCount.jar
 
 -   Run example program with arguments for input and result files
 
-        ./bin/flink run ./examples/WordCount.jar \
+        ./bin/flink run ./examples/batch/WordCount.jar \
                                file:///home/user/hamlet.txt file:///home/user/wordcount_out
 
 -   Run example program with parallelism 16 and arguments for input and result files
 
-        ./bin/flink run -p 16 ./examples/WordCount.jar \
+        ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
                                 file:///home/user/hamlet.txt file:///home/user/wordcount_out
 
 -   Run example program with flink log output disabled
 
-            ./bin/flink run -q ./examples/WordCount.jar
+            ./bin/flink run -q ./examples/batch/WordCount.jar
 
 -   Run example program in detached mode
 
-            ./bin/flink run -d ./examples/WordCount.jar
+            ./bin/flink run -d ./examples/batch/WordCount.jar
 
 -   Run example program on a specific JobManager:
 
         ./bin/flink run -m myJMHost:6123 \
-                               ./examples/WordCount.jar \
+                               ./examples/batch/WordCount.jar \
                                file:///home/user/hamlet.txt file:///home/user/wordcount_out
 
 -   Run example program with a specific class as an entry point:
 
         ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
-                               ./examples/WordCount.jar \
+                               ./examples/batch/WordCount.jar \
                                file:///home/user/hamlet.txt file:///home/user/wordcount_out
 
 -   Run example program using a [per-job YARN cluster]({{site.baseurl}}/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers:
 
         ./bin/flink run -m yarn-cluster -yn 2 \
-                               ./examples/WordCount.jar \
+                               ./examples/batch/WordCount.jar \
                                hdfs:///user/hamlet.txt hdfs:///user/wordcount_out
 
 -   Display the optimized execution plan for the WordCount example program as JSON:
 
-        ./bin/flink info ./examples/WordCount.jar \
+        ./bin/flink info ./examples/batch/WordCount.jar \
                                 file:///home/user/hamlet.txt file:///home/user/wordcount_out
 
 -   List scheduled and running jobs (including their JobIDs):

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/docs/apis/examples.md
----------------------------------------------------------------------
diff --git a/docs/apis/examples.md b/docs/apis/examples.md
index d22b436..a11ed9c 100644
--- a/docs/apis/examples.md
+++ b/docs/apis/examples.md
@@ -42,7 +42,7 @@ Each binary release of Flink contains an `examples` directory with jar files for
 To run the WordCount example, issue the following command:
 
 ~~~bash
-./bin/flink run ./examples/WordCount.jar
+./bin/flink run ./examples/batch/WordCount.jar
 ~~~
 
 The other examples can be started in a similar way.
@@ -50,7 +50,7 @@ The other examples can be started in a similar way.
 Note that many examples run without passing any arguments for them, by using build-in data. To run WordCount with real data, you have to pass the path to the data:
 
 ~~~bash
-./bin/flink run ./examples/WordCount.jar /path/to/some/text/data /path/to/result
+./bin/flink run ./examples/batch/WordCount.jar /path/to/some/text/data /path/to/result
 ~~~
 
 Note that non-local file systems require a schema prefix, such as `hdfs://`.

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/docs/setup/gce_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/gce_setup.md b/docs/setup/gce_setup.md
index f6499dc..4f3996a 100644
--- a/docs/setup/gce_setup.md
+++ b/docs/setup/gce_setup.md
@@ -95,7 +95,7 @@ To bring up the Flink cluster on Google Compute Engine, execute:
 
     ./bdutil shell
     cd /home/hadoop/flink-install/bin
-    ./flink run ../examples/WordCount.jar gs://dataflow-samples/shakespeare/othello.txt gs://<bucket_name>/output
+    ./flink run ../examples/batch/WordCount.jar gs://dataflow-samples/shakespeare/othello.txt gs://<bucket_name>/output
 
 ## Shut down your cluster
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/docs/setup/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md
index a7309e4..7a00af5 100644
--- a/docs/setup/yarn_setup.md
+++ b/docs/setup/yarn_setup.md
@@ -48,7 +48,7 @@ Once the session has been started, you can submit jobs to the cluster using the
 curl -O <flink_hadoop2_download_url>
 tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
 cd flink-{{ site.version }}/
-./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/WordCount.jar
+./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar
 ~~~
 
 ## Apache Flink on Hadoop YARN using a YARN Session
@@ -179,7 +179,7 @@ Use the *run* action to submit a job to YARN. The client is able to determine th
 ~~~bash
 wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
 hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
-./bin/flink run ./examples/WordCount.jar \
+./bin/flink run ./examples/batch/WordCount.jar \
         hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt
 ~~~
 
@@ -205,7 +205,7 @@ Please note that the client then expects the `-yn` value to be set (number of Ta
 ***Example:***
 
 ~~~bash
-./bin/flink run -m yarn-cluster -yn 2 ./examples/WordCount.jar
+./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
 ~~~
 
 The command line options of the YARN session are also available with the `./bin/flink` tool.

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-contrib/flink-storm-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/pom.xml b/flink-contrib/flink-storm-examples/pom.xml
index 6f3a050..7e093ff 100644
--- a/flink-contrib/flink-storm-examples/pom.xml
+++ b/flink-contrib/flink-storm-examples/pom.xml
@@ -43,7 +43,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java-examples</artifactId>
+			<artifactId>flink-examples-batch</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -73,7 +73,7 @@ under the License.
 
 	<build>
 		<plugins>
-			<!-- get default data from flink-java-examples package -->
+			<!-- get default data from flink-example-batch package -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-dependency-plugin</artifactId>
@@ -89,7 +89,7 @@ under the License.
 							<artifactItems>
 								<artifactItem>
 									<groupId>org.apache.flink</groupId>
-									<artifactId>flink-java-examples</artifactId>
+									<artifactId>flink-examples-batch</artifactId>
 									<version>${project.version}</version>
 									<type>jar</type>
 									<overWrite>false</overWrite>

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index e6b2fe0..543652f 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -85,13 +85,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java-examples</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala-examples</artifactId>
+			<artifactId>flink-examples-batch</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 602af68..b067280 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -103,7 +103,7 @@ under the License.
 		<!-- copy *.txt files -->
 		<fileSet>
 			<directory>src/main/flink-bin/</directory>
-			<outputDirectory></outputDirectory>
+			<outputDirectory/>
 			<fileMode>0644</fileMode>
 			<includes>
 				<include>*.txt</include>
@@ -113,7 +113,7 @@ under the License.
 		<!-- copy LICENSE/NOTICE files -->
 		<fileSet>
 			<directory>../</directory>
-			<outputDirectory></outputDirectory>
+			<outputDirectory/>
 			<fileMode>0644</fileMode>
 			<includes>
 				<include>LICENSE*</include>
@@ -150,21 +150,31 @@ under the License.
 			</excludes>
 		</fileSet>
 
-		<!-- copy jar files of java examples -->
+		<!-- copy jar files of the batch examples -->
 		<fileSet>
-			<directory>../flink-examples/flink-java-examples/target</directory>
-			<outputDirectory>examples</outputDirectory>
+			<directory>../flink-examples/flink-examples-batch/target</directory>
+			<outputDirectory>examples/batch</outputDirectory>
 			<fileMode>0644</fileMode>
 			<includes>
 				<include>*.jar</include>
 			</includes>
 			<excludes>
-				<exclude>flink-java-examples*-${project.version}.jar</exclude>
-				<exclude>original-flink-java-examples*-${project.version}.jar</exclude>
-				<exclude>flink-java-examples*-${project.version}-sources.jar</exclude>
-				<exclude>flink-java-examples*-${project.version}-tests.jar</exclude>
-				<exclude>flink-java-examples*-${project.version}-javadoc.jar</exclude>
-				<exclude>flink-java-examples*-${project.version}-*.jar</exclude>
+				<exclude>flink-examples-batch*.jar</exclude>
+				<exclude>original-flink-examples-batch*.jar</exclude>
+			</excludes>
+		</fileSet>
+
+		<!-- copy jar files of the streaming examples -->
+		<fileSet>
+			<directory>../flink-examples/flink-examples-streaming/target</directory>
+			<outputDirectory>examples/streaming</outputDirectory>
+			<fileMode>0644</fileMode>
+			<includes>
+				<include>*.jar</include>
+			</includes>
+			<excludes>
+				<exclude>flink-examples-streaming*.jar</exclude>
+				<exclude>original-flink-examples-streaming*.jar</exclude>
 			</excludes>
 		</fileSet>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/pom.xml b/flink-examples/flink-examples-batch/pom.xml
new file mode 100644
index 0000000..a989ef5
--- /dev/null
+++ b/flink-examples/flink-examples-batch/pom.xml
@@ -0,0 +1,400 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-examples</artifactId>
+		<version>1.0-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-examples-batch</artifactId>
+	<name>flink-examples-batch</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-scala</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+	
+	
+	<build>
+		<plugins>
+
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+				</configuration>
+			</plugin>
+
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- Scala Code Style -->
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+			
+			<!-- create the exampe JAR files -->
+			
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+				
+					<!-- KMeans -->
+					<execution>
+						<id>KMeans</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+
+						<configuration>
+							<classifier>KMeans</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.examples.java.clustering.KMeans</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>**/java/clustering/KMeans.class</include>
+								<include>**/java/clustering/KMeans$*.class</include>
+								<include>**/java/clustering/util/KMeansDataGenerator.class</include>
+								<include>**/java/clustering/util/KMeansData.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- Transitive Closure -->
+					<execution>
+						<id>TransitiveClosure</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>TransitiveClosure</classifier>
+				
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.examples.java.graph.TransitiveClosureNaive</program-class>
+								</manifestEntries>
+							</archive>
+				
+							<includes>
+								<include>**/java/graph/TransitiveClosureNaive.class</include>
+								<include>**/java/graph/TransitiveClosureNaive$*.class</include>
+								<include>**/java/graph/util/ConnectedComponentsData.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- Connected Components -->
+					<execution>
+						<id>ConnectedComponents</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>ConnectedComponents</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.examples.java.graph.ConnectedComponents</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>**/java/graph/ConnectedComponents.class</include>
+								<include>**/java/graph/ConnectedComponents$*.class</include>
+								<include>**/java/graph/util/ConnectedComponentsData.class</include>
+							</includes>
+						</configuration>
+					</execution>
+					
+					<!-- EnumTriangles Basic -->
+					<execution>
+						<id>EnumerateGraphTriangles</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>EnumerateGraphTriangles</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.examples.java.graph.EnumTrianglesBasic</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>**/java/graph/EnumTrianglesBasic.class</include>
+								<include>**/java/graph/EnumTrianglesBasic$*.class</include>
+								<include>**/java/graph/util/EnumTrianglesDataTypes.class</include>
+								<include>**/java/graph/util/EnumTrianglesDataTypes$*.class</include>
+								<include>**/java/graph/util/EnumTrianglesData.class</include>
+							</includes>
+						</configuration>
+					</execution>
+					
+					<!-- PageRank -->
+					<execution>
+						<id>PageRank</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>PageRank</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.examples.java.graph.PageRank</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>**/java/graph/PageRank.class</include>
+								<include>**/java/graph/PageRank$*.class</include>
+								<include>**/java/graph/util/PageRankData.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WebLogAnalysis -->
+					<execution>
+						<id>WebLogAnalysis</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WebLogAnalysis</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.examples.java.relational.WebLogAnalysis</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>**/java/relational/WebLogAnalysis.class</include>
+								<include>**/java/relational/WebLogAnalysis$*.class</include>
+								<include>**/java/relational/util/WebLogData.class</include>
+								<include>**/java/relational/util/WebLogDataGenerator.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WordCount -->
+					<execution>
+						<id>WordCount</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WordCount</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.examples.java.wordcount.WordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>**/java/wordcount/WordCount.class</include>
+								<include>**/java/wordcount/WordCount$*.class</include>
+								<include>**/java/wordcount/util/WordCountData.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- Distributed Copy -->
+					<execution>
+						<id>DistCp</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>DistCp</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.examples.java.distcp.DistCp</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>**/java/distcp/*</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!--simplify the name of example JARs for build-target/examples -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-antrun-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<execution>
+						<id>rename</id>
+						<phase>package</phase>
+						<goals>
+							<goal>run</goal>
+						</goals>
+						<configuration> 
+							<target>
+								<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-KMeans.jar" tofile="${project.basedir}/target/KMeans.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-ConnectedComponents.jar" tofile="${project.basedir}/target/ConnectedComponents.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-EnumerateGraphTriangles.jar" tofile="${project.basedir}/target/EnumerateGraphTriangles.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-PageRank.jar" tofile="${project.basedir}/target/PageRank.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-TransitiveClosure.jar" tofile="${project.basedir}/target/TransitiveClosure.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-WebLogAnalysis.jar" tofile="${project.basedir}/target/WebLogAnalysis.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
+								<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-DistCp.jar" tofile="${project.basedir}/target/DistCp.jar" />
+							</target>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
new file mode 100644
index 0000000..1730e2a
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.clustering;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.java.clustering.util.KMeansData;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+
+/**
+ * This example implements a basic K-Means clustering algorithm.
+ * 
+ * <p>
+ * K-Means is an iterative clustering algorithm and works as follows:<br>
+ * K-Means is given a set of data points to be clustered and an initial set of <i>K</i> cluster centers.
+ * In each iteration, the algorithm computes the distance of each data point to each cluster center.
+ * Each point is assigned to the cluster center which is closest to it.
+ * Subsequently, each cluster center is moved to the center (<i>mean</i>) of all points that have been assigned to it.
+ * The moved cluster centers are fed into the next iteration. 
+ * The algorithm terminates after a fixed number of iterations (as in this implementation) 
+ * or if cluster centers do not (significantly) move in an iteration.<br>
+ * This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/K-means_clustering">K-Means Clustering algorithm</a>.
+ * 
+ * <p>
+ * This implementation works on two-dimensional data points. <br>
+ * It computes an assignment of data points to cluster centers, i.e., 
+ * each data point is annotated with the id of the final cluster (center) it belongs to.
+ * 
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Data points are represented as two double values separated by a blank character.
+ * Data points are separated by newline characters.<br>
+ * For example <code>"1.2 2.3\n5.3 7.2\n"</code> gives two data points (x=1.2, y=2.3) and (x=5.3, y=7.2).
+ * <li>Cluster centers are represented by an integer id and a point value.<br>
+ * For example <code>"1 6.2 3.2\n2 2.9 5.7\n"</code> gives two centers (id=1, x=6.2, y=3.2) and (id=2, x=2.9, y=5.7).
+ * </ul>
+ * 
+ * <p>
+ * Usage: <code>KMeans &lt;points path&gt; &lt;centers path&gt; &lt;result path&gt; &lt;num iterations&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.clustering.util.KMeansData} and 10 iterations. 
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Bulk iterations
+ * <li>Broadcast variables in bulk iterations
+ * <li>Custom Java objects (PoJos)
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class KMeans {
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+	
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// get input data
+		DataSet<Point> points = getPointDataSet(env);
+		DataSet<Centroid> centroids = getCentroidDataSet(env);
+		
+		// set number of bulk iterations for KMeans algorithm
+		IterativeDataSet<Centroid> loop = centroids.iterate(numIterations);
+		
+		DataSet<Centroid> newCentroids = points
+			// compute closest centroid for each point
+			.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
+			// count and sum point coordinates for each centroid
+			.map(new CountAppender())
+			.groupBy(0).reduce(new CentroidAccumulator())
+			// compute new centroids from point counts and coordinate sums
+			.map(new CentroidAverager());
+		
+		// feed new centroids back into next iteration
+		DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
+		
+		DataSet<Tuple2<Integer, Point>> clusteredPoints = points
+				// assign points to final clusters
+				.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
+		
+		// emit result
+		if (fileOutput) {
+			clusteredPoints.writeAsCsv(outputPath, "\n", " ");
+
+			// since file sinks are lazy, we trigger the execution explicitly
+			env.execute("KMeans Example");
+		}
+		else {
+			clusteredPoints.print();
+		}
+	}
+	
+	// *************************************************************************
+	//     DATA TYPES
+	// *************************************************************************
+	
+	/**
+	 * A simple two-dimensional point.
+	 */
+	public static class Point implements Serializable {
+		
+		public double x, y;
+		
+		public Point() {}
+
+		public Point(double x, double y) {
+			this.x = x;
+			this.y = y;
+		}
+		
+		public Point add(Point other) {
+			x += other.x;
+			y += other.y;
+			return this;
+		}
+		
+		public Point div(long val) {
+			x /= val;
+			y /= val;
+			return this;
+		}
+		
+		public double euclideanDistance(Point other) {
+			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
+		}
+		
+		public void clear() {
+			x = y = 0.0;
+		}
+		
+		@Override
+		public String toString() {
+			return x + " " + y;
+		}
+	}
+	
+	/**
+	 * A simple two-dimensional centroid, basically a point with an ID. 
+	 */
+	public static class Centroid extends Point {
+		
+		public int id;
+		
+		public Centroid() {}
+		
+		public Centroid(int id, double x, double y) {
+			super(x,y);
+			this.id = id;
+		}
+		
+		public Centroid(int id, Point p) {
+			super(p.x, p.y);
+			this.id = id;
+		}
+		
+		@Override
+		public String toString() {
+			return id + " " + super.toString();
+		}
+	}
+	
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+	
+	/** Converts a {@code Tuple2<Double,Double>} into a Point. */
+	@ForwardedFields("0->x; 1->y")
+	public static final class TuplePointConverter implements MapFunction<Tuple2<Double, Double>, Point> {
+
+		@Override
+		public Point map(Tuple2<Double, Double> t) throws Exception {
+			return new Point(t.f0, t.f1);
+		}
+	}
+	
+	/** Converts a {@code Tuple3<Integer, Double,Double>} into a Centroid. */
+	@ForwardedFields("0->id; 1->x; 2->y")
+	public static final class TupleCentroidConverter implements MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
+
+		@Override
+		public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception {
+			return new Centroid(t.f0, t.f1, t.f2);
+		}
+	}
+	
+	/** Determines the closest cluster center for a data point. */
+	@ForwardedFields("*->1")
+	public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<Integer, Point>> {
+		private Collection<Centroid> centroids;
+
+		/** Reads the centroid values from a broadcast variable into a collection. */
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
+		}
+		
+		@Override
+		public Tuple2<Integer, Point> map(Point p) throws Exception {
+			
+			double minDistance = Double.MAX_VALUE;
+			int closestCentroidId = -1;
+			
+			// check all cluster centers
+			for (Centroid centroid : centroids) {
+				// compute distance
+				double distance = p.euclideanDistance(centroid);
+				
+				// update nearest cluster if necessary 
+				if (distance < minDistance) {
+					minDistance = distance;
+					closestCentroidId = centroid.id;
+				}
+			}
+
+			// emit a new record with the center id and the data point.
+			return new Tuple2<Integer, Point>(closestCentroidId, p);
+		}
+	}
+	
+	/** Appends a count variable to the tuple. */
+	@ForwardedFields("f0;f1")
+	public static final class CountAppender implements MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
+
+		@Override
+		public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t) {
+			return new Tuple3<Integer, Point, Long>(t.f0, t.f1, 1L);
+		} 
+	}
+	
+	/** Sums and counts point coordinates. */
+	@ForwardedFields("0")
+	public static final class CentroidAccumulator implements ReduceFunction<Tuple3<Integer, Point, Long>> {
+
+		@Override
+		public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, Point, Long> val1, Tuple3<Integer, Point, Long> val2) {
+			return new Tuple3<Integer, Point, Long>(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2);
+		}
+	}
+	
+	/** Computes new centroid from coordinate sum and count of points. */
+	@ForwardedFields("0->id")
+	public static final class CentroidAverager implements MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
+
+		@Override
+		public Centroid map(Tuple3<Integer, Point, Long> value) {
+			return new Centroid(value.f0, value.f1.div(value.f2));
+		}
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String pointsPath = null;
+	private static String centersPath = null;
+	private static String outputPath = null;
+	private static int numIterations = 10;
+	
+	private static boolean parseParameters(String[] programArguments) {
+		
+		if(programArguments.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(programArguments.length == 4) {
+				pointsPath = programArguments[0];
+				centersPath = programArguments[1];
+				outputPath = programArguments[2];
+				numIterations = Integer.parseInt(programArguments[3]);
+			} else {
+				System.err.println("Usage: KMeans <points path> <centers path> <result path> <num iterations>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing K-Means example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  We provide a data generator to create synthetic input files for this program.");
+			System.out.println("  Usage: KMeans <points path> <centers path> <result path> <num iterations>");
+		}
+		return true;
+	}
+	
+	private static DataSet<Point> getPointDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			// read points from CSV file
+			return env.readCsvFile(pointsPath)
+						.fieldDelimiter(" ")
+						.includeFields(true, true)
+						.types(Double.class, Double.class)
+						.map(new TuplePointConverter());
+		} else {
+			return KMeansData.getDefaultPointDataSet(env);
+		}
+	}
+	
+	private static DataSet<Centroid> getCentroidDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env.readCsvFile(centersPath)
+						.fieldDelimiter(" ")
+						.includeFields(true, true, true)
+						.types(Integer.class, Double.class, Double.class)
+						.map(new TupleCentroidConverter());
+		} else {
+			return KMeansData.getDefaultCentroidDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
new file mode 100644
index 0000000..e165612
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.clustering.util;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.examples.java.clustering.KMeans.Centroid;
+import org.apache.flink.examples.java.clustering.KMeans.Point;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the K-Means example program.
+ * The default data sets are used, if no parameters are given to the program.
+ *
+ */
+public class KMeansData {
+
+	// We have the data as object arrays so that we can also generate Scala Data Sources from it.
+	public static final Object[][] CENTROIDS = new Object[][] {
+		new Object[] {1, -31.85, -44.77},
+		new Object[]{2, 35.16, 17.46},
+		new Object[]{3, -5.16, 21.93},
+		new Object[]{4, -24.06, 6.81}
+	};
+
+	public static final Object[][] POINTS = new Object[][] {
+		new Object[] {-14.22, -48.01},
+		new Object[] {-22.78, 37.10},
+		new Object[] {56.18, -42.99},
+		new Object[] {35.04, 50.29},
+		new Object[] {-9.53, -46.26},
+		new Object[] {-34.35, 48.25},
+		new Object[] {55.82, -57.49},
+		new Object[] {21.03, 54.64},
+		new Object[] {-13.63, -42.26},
+		new Object[] {-36.57, 32.63},
+		new Object[] {50.65, -52.40},
+		new Object[] {24.48, 34.04},
+		new Object[] {-2.69, -36.02},
+		new Object[] {-38.80, 36.58},
+		new Object[] {24.00, -53.74},
+		new Object[] {32.41, 24.96},
+		new Object[] {-4.32, -56.92},
+		new Object[] {-22.68, 29.42},
+		new Object[] {59.02, -39.56},
+		new Object[] {24.47, 45.07},
+		new Object[] {5.23, -41.20},
+		new Object[] {-23.00, 38.15},
+		new Object[] {44.55, -51.50},
+		new Object[] {14.62, 59.06},
+		new Object[] {7.41, -56.05},
+		new Object[] {-26.63, 28.97},
+		new Object[] {47.37, -44.72},
+		new Object[] {29.07, 51.06},
+		new Object[] {0.59, -31.89},
+		new Object[] {-39.09, 20.78},
+		new Object[] {42.97, -48.98},
+		new Object[] {34.36, 49.08},
+		new Object[] {-21.91, -49.01},
+		new Object[] {-46.68, 46.04},
+		new Object[] {48.52, -43.67},
+		new Object[] {30.05, 49.25},
+		new Object[] {4.03, -43.56},
+		new Object[] {-37.85, 41.72},
+		new Object[] {38.24, -48.32},
+		new Object[] {20.83, 57.85}
+	};
+
+	public static DataSet<Centroid> getDefaultCentroidDataSet(ExecutionEnvironment env) {
+		List<Centroid> centroidList = new LinkedList<Centroid>();
+		for (Object[] centroid : CENTROIDS) {
+			centroidList.add(
+					new Centroid((Integer) centroid[0], (Double) centroid[1], (Double) centroid[2]));
+		}
+		return env.fromCollection(centroidList);
+	}
+	
+	public static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env) {
+		List<Point> pointList = new LinkedList<Point>();
+		for (Object[] point : POINTS) {
+			pointList.add(new Point((Double) point[0], (Double) point[1]));
+		}
+		return env.fromCollection(pointList);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
new file mode 100644
index 0000000..8f48d0a
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.clustering.util;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.Locale;
+import java.util.Random;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.examples.java.clustering.KMeans;
+
+/**
+ * Generates data for the {@link KMeans} example program.
+ */
+public class KMeansDataGenerator {
+	
+	static {
+		Locale.setDefault(Locale.US);
+	}
+	
+	private static final String CENTERS_FILE = "centers";
+	private static final String POINTS_FILE = "points";
+	private static final long DEFAULT_SEED = 4650285087650871364L;
+	private static final double DEFAULT_VALUE_RANGE = 100.0;
+	private static final double RELATIVE_STDDEV = 0.08;
+	private static final int DIMENSIONALITY = 2;
+	private static final DecimalFormat FORMAT = new DecimalFormat("#0.00");
+	private static final char DELIMITER = ' ';
+
+	/**
+	 * Main method to generate data for the {@link KMeans} example program.
+	 * <p>
+	 * The generator creates to files:
+	 * <ul>
+	 * <li><code>&lt; output-path &gt;/points</code> for the data points
+	 * <li><code>&lt; output-path &gt;/centers</code> for the cluster centers
+	 * </ul> 
+	 * 
+	 * @param args 
+	 * <ol>
+	 * <li>Int: Number of data points
+	 * <li>Int: Number of cluster centers
+	 * <li><b>Optional</b> String: Output path, default value is {tmp.dir}
+	 * <li><b>Optional</b> Double: Standard deviation of data points
+	 * <li><b>Optional</b> Double: Value range of cluster centers
+	 * <li><b>Optional</b> Long: Random seed
+	 * </ol>
+	 *
+	 * @throws IOException
+	 */
+	public static void main(String[] args) throws IOException {
+
+		// check parameter count
+		if (args.length < 2) {
+			System.out.println("KMeansDataGenerator -points <num> -k <num clusters> [-output <output-path>] [-stddev <relative stddev>] [-range <centroid range>] [-seed <seed>]");
+			System.exit(1);
+		}
+
+		// parse parameters
+
+		final ParameterTool params = ParameterTool.fromArgs(args);
+		final int numDataPoints = params.getInt("points");
+		final int k = params.getInt("k");
+		final String outDir = params.get("output", System.getProperty("java.io.tmpdir"));
+		final double stddev = params.getDouble("stddev", RELATIVE_STDDEV);
+		final double range = params.getDouble("range", DEFAULT_VALUE_RANGE);
+		final long firstSeed = params.getLong("seed", DEFAULT_SEED);
+
+		
+		final double absoluteStdDev = stddev * range;
+		final Random random = new Random(firstSeed);
+		
+		// the means around which data points are distributed
+		final double[][] means = uniformRandomCenters(random, k, DIMENSIONALITY, range);
+		
+		// write the points out
+		BufferedWriter pointsOut = null;
+		try {
+			pointsOut = new BufferedWriter(new FileWriter(new File(outDir+"/"+POINTS_FILE)));
+			StringBuilder buffer = new StringBuilder();
+			
+			double[] point = new double[DIMENSIONALITY];
+			int nextCentroid = 0;
+			
+			for (int i = 1; i <= numDataPoints; i++) {
+				// generate a point for the current centroid
+				double[] centroid = means[nextCentroid];
+				for (int d = 0; d < DIMENSIONALITY; d++) {
+					point[d] = (random.nextGaussian() * absoluteStdDev) + centroid[d];
+				}
+				writePoint(point, buffer, pointsOut);
+				nextCentroid = (nextCentroid + 1) % k;
+			}
+		}
+		finally {
+			if (pointsOut != null) {
+				pointsOut.close();
+			}
+		}
+		
+		// write the uniformly distributed centers to a file
+		BufferedWriter centersOut = null;
+		try {
+			centersOut = new BufferedWriter(new FileWriter(new File(outDir+"/"+CENTERS_FILE)));
+			StringBuilder buffer = new StringBuilder();
+			
+			double[][] centers = uniformRandomCenters(random, k, DIMENSIONALITY, range);
+			
+			for (int i = 0; i < k; i++) {
+				writeCenter(i + 1, centers[i], buffer, centersOut);
+			}
+		}
+		finally {
+			if (centersOut != null) {
+				centersOut.close();
+			}
+		}
+		
+		System.out.println("Wrote "+numDataPoints+" data points to "+outDir+"/"+POINTS_FILE);
+		System.out.println("Wrote "+k+" cluster centers to "+outDir+"/"+CENTERS_FILE);
+	}
+	
+	private static double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
+		final double halfRange = range / 2;
+		final double[][] points = new double[num][dimensionality];
+		
+		for (int i = 0; i < num; i++) {
+			for (int dim = 0; dim < dimensionality; dim ++) {
+				points[i][dim] = (rnd.nextDouble() * range) - halfRange;
+			}
+		}
+		return points;
+	}
+	
+	private static void writePoint(double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException {
+		buffer.setLength(0);
+		
+		// write coordinates
+		for (int j = 0; j < coordinates.length; j++) {
+			buffer.append(FORMAT.format(coordinates[j]));
+			if(j < coordinates.length - 1) {
+				buffer.append(DELIMITER);
+			}
+		}
+		
+		out.write(buffer.toString());
+		out.newLine();
+	}
+	
+	private static void writeCenter(long id, double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException {
+		buffer.setLength(0);
+		
+		// write id
+		buffer.append(id);
+		buffer.append(DELIMITER);
+
+		// write coordinates
+		for (int j = 0; j < coordinates.length; j++) {
+			buffer.append(FORMAT.format(coordinates[j]));
+			if(j < coordinates.length - 1) {
+				buffer.append(DELIMITER);
+			}
+		}
+		
+		out.write(buffer.toString());
+		out.newLine();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
new file mode 100644
index 0000000..8e87892
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.commons.io.IOUtils;
+
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.operators.FlatMapOperator;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A main class of the Flink distcp utility.
+ * It's a simple reimplementation of Hadoop distcp
+ * (see <a href="http://hadoop.apache.org/docs/r1.2.1/distcp.html">http://hadoop.apache.org/docs/r1.2.1/distcp.html</a>)
+ * with a dynamic input format
+ * Note that this tool does not deal with retriability. Additionally, empty directories are not copied over.
+ * <p>
+ * When running locally, local file systems paths can be used.
+ * However, in a distributed environment HDFS paths must be provided both as input and output.
+ */
+public class DistCp {
+	
+	private static final Logger LOGGER = LoggerFactory.getLogger(DistCp.class);
+	public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
+	public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
+
+	public static void main(String[] args) throws Exception {
+		if (args.length != 3) {
+			printHelp();
+			return;
+		}
+
+		final Path sourcePath = new Path(args[0]);
+		final Path targetPath = new Path(args[1]);
+		int parallelism = Integer.valueOf(args[2], 10);
+
+		// set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		checkInputParams(env, sourcePath, targetPath, parallelism);
+		env.setParallelism(parallelism);
+
+		long startTime = System.currentTimeMillis();
+		LOGGER.info("Initializing copy tasks");
+		List<FileCopyTask> tasks = getCopyTasks(sourcePath);
+		LOGGER.info("Copy task initialization took " + (System.currentTimeMillis() - startTime) + "ms");
+
+		DataSet<FileCopyTask> inputTasks = new DataSource<>(env,
+				new FileCopyTaskInputFormat(tasks),
+				new GenericTypeInfo<>(FileCopyTask.class), "fileCopyTasks");
+
+
+		FlatMapOperator<FileCopyTask, Object> res = inputTasks.flatMap(new RichFlatMapFunction<FileCopyTask, Object>() {
+			
+			private LongCounter fileCounter;
+			private LongCounter bytesCounter;
+
+			@Override
+			public void open(Configuration parameters) throws Exception {
+				bytesCounter = getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME);
+				fileCounter = getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME);
+			}
+
+			@Override
+			public void flatMap(FileCopyTask task, Collector<Object> out) throws Exception {
+				LOGGER.info("Processing task: " + task);
+				Path outPath = new Path(targetPath, task.getRelativePath());
+
+				FileSystem targetFs = targetPath.getFileSystem();
+				// creating parent folders in case of a local FS
+				if (!targetFs.isDistributedFS()) {
+					//dealing with cases like file:///tmp or just /tmp
+					File outFile = outPath.toUri().isAbsolute() ? new File(outPath.toUri()) : new File(outPath.toString());
+					File parentFile = outFile.getParentFile();
+					if (!parentFile.mkdirs() && !parentFile.exists()) {
+						throw new RuntimeException("Cannot create local file system directories: " + parentFile);
+					}
+				}
+				FSDataOutputStream outputStream = null;
+				FSDataInputStream inputStream = null;
+				try {
+					outputStream = targetFs.create(outPath, true);
+					inputStream = task.getPath().getFileSystem().open(task.getPath());
+					int bytes = IOUtils.copy(inputStream, outputStream);
+					bytesCounter.add(bytes);
+				} finally {
+					IOUtils.closeQuietly(inputStream);
+					IOUtils.closeQuietly(outputStream);
+				}
+				fileCounter.add(1l);
+			}
+		});
+
+		// no data sinks are needed, therefore just printing an empty result
+		res.print();
+
+		Map<String, Object> accumulators = env.getLastJobExecutionResult().getAllAccumulatorResults();
+		LOGGER.info("== COUNTERS ==");
+		for (Map.Entry<String, Object> e : accumulators.entrySet()) {
+			LOGGER.info(e.getKey() + ": " + e.getValue());
+		}
+	}
+
+
+	// -----------------------------------------------------------------------------------------
+	// HELPER METHODS
+	// -----------------------------------------------------------------------------------------
+
+	private static void checkInputParams(ExecutionEnvironment env, Path sourcePath, Path targetPath, int parallelism) throws IOException {
+		if (parallelism <= 0) {
+			throw new IllegalArgumentException("Parallelism should be greater than 0");
+		}
+
+		boolean isLocal = env instanceof LocalEnvironment;
+		if (!isLocal &&
+				!(sourcePath.getFileSystem().isDistributedFS() && targetPath.getFileSystem().isDistributedFS())) {
+			throw new IllegalArgumentException("In a distributed mode only HDFS input/output paths are supported");
+		}
+	}
+
+	private static void printHelp() {
+		System.err.println("Usage: <input_path> <output_path> <level_of_parallelism>");
+	}
+
+	private static List<FileCopyTask> getCopyTasks(Path sourcePath) throws IOException {
+		List<FileCopyTask> tasks = new ArrayList<>();
+		getCopyTasks(sourcePath, "", tasks);
+		return tasks;
+	}
+
+	private static void getCopyTasks(Path p, String rel, List<FileCopyTask> tasks) throws IOException {
+		FileStatus[] res = p.getFileSystem().listStatus(p);
+		if (res == null) {
+			return;
+		}
+		for (FileStatus fs : res) {
+			if (fs.isDir()) {
+				getCopyTasks(fs.getPath(), rel + fs.getPath().getName() + "/", tasks);
+			} else {
+				Path cp = fs.getPath();
+				tasks.add(new FileCopyTask(cp, rel + cp.getName()));
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
new file mode 100644
index 0000000..7f38a8b
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * A Java POJO that represents a task for copying a single file
+ */
+public class FileCopyTask implements Serializable {
+	
+	private static final long serialVersionUID = -8760082278978316032L;
+	
+	private final Path path;
+	private final String relativePath;
+
+	public FileCopyTask(Path path, String relativePath) {
+		if (StringUtils.isEmpty(relativePath)) {
+			throw new IllegalArgumentException("Relative path should not be empty for: " + path);
+		}
+		this.path = path;
+		this.relativePath = relativePath;
+	}
+
+	public Path getPath() {
+		return path;
+	}
+
+	public String getRelativePath() {
+		return relativePath;
+	}
+
+	@Override
+	public String toString() {
+		return "FileCopyTask{" +
+				"path=" + path +
+				", relativePath='" + relativePath + '\'' +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
new file mode 100644
index 0000000..d6e6713
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * An implementation of an input format that dynamically assigns {@code FileCopyTask} to the mappers
+ * that have finished previously assigned tasks
+ */
+public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCopyTaskInputSplit> {
+
+	private static final long serialVersionUID = -644394866425221151L;
+	
+	private static final Logger LOGGER = LoggerFactory.getLogger(FileCopyTaskInputFormat.class);
+	
+
+	private final List<FileCopyTask> tasks;
+
+	public FileCopyTaskInputFormat(List<FileCopyTask> tasks) {
+		this.tasks = tasks;
+	}
+
+	private class FileCopyTaskAssigner implements InputSplitAssigner {
+		private Queue<FileCopyTaskInputSplit> splits;
+
+		public FileCopyTaskAssigner(FileCopyTaskInputSplit[] inputSplits) {
+			splits = new LinkedList<>(Arrays.asList(inputSplits));
+		}
+
+		@Override
+		public InputSplit getNextInputSplit(String host, int taskId) {
+			LOGGER.info("Getting copy task for task: " + taskId);
+			return splits.poll();
+		}
+	}
+
+	@Override
+	public void configure(Configuration parameters) {
+		//no op
+	}
+
+	@Override
+	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+		return null;
+	}
+
+	@Override
+	public FileCopyTaskInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+		FileCopyTaskInputSplit[] splits = new FileCopyTaskInputSplit[tasks.size()];
+		int i = 0;
+		for (FileCopyTask t : tasks) {
+			splits[i] = new FileCopyTaskInputSplit(t, i);
+			i++;
+		}
+		return splits;
+	}
+
+	@Override
+	public InputSplitAssigner getInputSplitAssigner(FileCopyTaskInputSplit[] inputSplits) {
+		return new FileCopyTaskAssigner(inputSplits);
+	}
+
+	private FileCopyTaskInputSplit curInputSplit = null;
+
+	@Override
+	public void open(FileCopyTaskInputSplit split) throws IOException {
+		curInputSplit = split;
+	}
+
+	@Override
+	public boolean reachedEnd() throws IOException {
+		return curInputSplit == null;
+	}
+
+	@Override
+	public FileCopyTask nextRecord(FileCopyTask reuse) throws IOException {
+		FileCopyTask toReturn = curInputSplit.getTask();
+		curInputSplit = null;
+		return toReturn;
+	}
+
+	@Override
+	public void close() throws IOException {
+		//no op
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
new file mode 100644
index 0000000..33943b6
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.java.distcp;
+
+import org.apache.flink.core.io.InputSplit;
+
+/**
+ * Implementation of {@code InputSplit} for copying files
+ */
+public class FileCopyTaskInputSplit implements InputSplit {
+	
+	private static final long serialVersionUID = -7621656017747660450L;
+	
+	private final FileCopyTask task;
+	private final int splitNumber;
+
+	public FileCopyTaskInputSplit(FileCopyTask task, int splitNumber) {
+		this.task = task;
+		this.splitNumber = splitNumber;
+	}
+
+	public FileCopyTask getTask() {
+		return task;
+	}
+
+	@Override
+	public int getSplitNumber() {
+		return splitNumber;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
new file mode 100644
index 0000000..ce36504
--- /dev/null
+++ b/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.examples.java.graph;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
+import org.apache.flink.util.Collector;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * An implementation of the connected components algorithm, using a delta iteration.
+ * 
+ * <p>
+ * Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the minimum of its own ID and its
+ * neighbors' IDs, as its new ID and tells its neighbors about its new ID. After the algorithm has completed, all vertices in the
+ * same component will have the same ID.
+ * 
+ * <p>
+ * A vertex whose component ID did not change needs not propagate its information in the next step. Because of that,
+ * the algorithm is easily expressible via a delta iteration. We here model the solution set as the vertices with
+ * their current component ids, and the workset as the changed vertices. Because we see all vertices initially as
+ * changed, the initial workset and the initial solution set are identical. Also, the delta to the solution set
+ * is consequently also the next workset.<br>
+ * 
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Vertices represented as IDs and separated by new-line characters.<br> 
+ * For example <code>"1\n2\n12\n42\n63"</code> gives five vertices (1), (2), (12), (42), and (63).
+ * <li>Edges are represented as pairs for vertex IDs which are separated by space 
+ * characters. Edges are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
+ * </ul>
+ * 
+ * <p>
+ * Usage: <code>ConnectedComponents &lt;vertices path&gt; &lt;edges path&gt; &lt;result path&gt; &lt;max number of iterations&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.graph.util.ConnectedComponentsData} and 10 iterations. 
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Delta Iterations
+ * <li>Generic-typed Functions 
+ * </ul>
+ */
+@SuppressWarnings("serial")
+public class ConnectedComponents implements ProgramDescription {
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String... args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// read vertex and edge data
+		DataSet<Long> vertices = getVertexDataSet(env);
+		DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
+		
+		// assign the initial components (equal to the vertex id)
+		DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
+				
+		// open a delta iteration
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
+				verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
+		
+		// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
+		DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
+				.groupBy(0).aggregate(Aggregations.MIN, 1)
+				.join(iteration.getSolutionSet()).where(0).equalTo(0)
+				.with(new ComponentIdFilter());
+
+		// close the delta iteration (delta and new workset are identical)
+		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
+		
+		// emit result
+		if (fileOutput) {
+			result.writeAsCsv(outputPath, "\n", " ");
+			// execute program
+			env.execute("Connected Components Example");
+		} else {
+			result.print();
+		}
+	}
+	
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+	
+	/**
+	 * Function that turns a value into a 2-tuple where both fields are that value.
+	 */
+	@ForwardedFields("*->f0")
+	public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
+		
+		@Override
+		public Tuple2<T, T> map(T vertex) {
+			return new Tuple2<T, T>(vertex, vertex);
+		}
+	}
+	
+	/**
+	 * Undirected edges by emitting for each input edge the input edges itself and an inverted version.
+	 */
+	public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
+		
+		@Override
+		public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
+			invertedEdge.f0 = edge.f1;
+			invertedEdge.f1 = edge.f0;
+			out.collect(edge);
+			out.collect(invertedEdge);
+		}
+	}
+	
+	/**
+	 * UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that
+	 * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function
+	 * produces a (Target-vertex-ID, Component-ID) pair.
+	 */
+	@ForwardedFieldsFirst("f1->f1")
+	@ForwardedFieldsSecond("f1->f0")
+	public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		@Override
+		public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
+			return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
+		}
+	}
+	
+
+
+	@ForwardedFieldsFirst("*")
+	public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+		@Override
+		public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) {
+			if (candidate.f1 < old.f1) {
+				out.collect(candidate);
+			}
+		}
+	}
+
+
+
+	@Override
+	public String getDescription() {
+		return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>";
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String verticesPath = null;
+	private static String edgesPath = null;
+	private static String outputPath = null;
+	private static int maxIterations = 10;
+	
+	private static boolean parseParameters(String[] programArguments) {
+		
+		if(programArguments.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(programArguments.length == 4) {
+				verticesPath = programArguments[0];
+				edgesPath = programArguments[1];
+				outputPath = programArguments[2];
+				maxIterations = Integer.parseInt(programArguments[3]);
+			} else {
+				System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing Connected Components example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>");
+		}
+		return true;
+	}
+	
+	private static DataSet<Long> getVertexDataSet(ExecutionEnvironment env) {
+		
+		if(fileOutput) {
+			return env.readCsvFile(verticesPath).types(Long.class)
+						.map(
+								new MapFunction<Tuple1<Long>, Long>() {
+									public Long map(Tuple1<Long> value) { return value.f0; }
+								});
+		} else {
+			return ConnectedComponentsData.getDefaultVertexDataSet(env);
+		}
+	}
+	
+	private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
+		
+		if(fileOutput) {
+			return env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class);
+		} else {
+			return ConnectedComponentsData.getDefaultEdgeDataSet(env);
+		}
+	}
+	
+	
+}


Mime
View raw message