flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [08/50] flink git commit: [FLINK-3195] [examples] Consolidate batch examples into one project, unify batch and streaming examples under on parent project
Date Thu, 14 Jan 2016 16:16:05 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
new file mode 100644
index 0000000..838834b
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.socket;
+
+import org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount;
+import org.apache.flink.streaming.util.SocketProgramITCaseBase;
+
+public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
+
+	@Override
+	protected void testProgram() throws Exception {
+		SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath});
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/twitter/TwitterStreamITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/twitter/TwitterStreamITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/twitter/TwitterStreamITCase.java
new file mode 100644
index 0000000..7850082
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/twitter/TwitterStreamITCase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.twitter;
+
+import org.apache.flink.streaming.examples.twitter.TwitterStream;
+import org.apache.flink.streaming.examples.twitter.util.TwitterStreamData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class TwitterStreamITCase extends StreamingProgramTestBase {
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(TwitterStreamData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		TwitterStream.main(new String[]{resultPath});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/SessionWindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/SessionWindowingITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/SessionWindowingITCase.java
new file mode 100644
index 0000000..7f46be9
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/SessionWindowingITCase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.windowing;
+
+import org.apache.flink.streaming.examples.windowing.SessionWindowing;
+import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class SessionWindowingITCase extends StreamingProgramTestBase {
+
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		SessionWindowing.main(new String[]{resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
new file mode 100644
index 0000000..37812c9
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/TopSpeedWindowingExampleITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.windowing;
+
+import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing;
+import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
+	
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		setParallelism(1); //needed to ensure total ordering for windows
+		textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		TopSpeedWindowing.main(new String[]{textPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/WindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/WindowWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/WindowWordCountITCase.java
new file mode 100644
index 0000000..e7cce60
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/windowing/WindowWordCountITCase.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.windowing;
+
+import org.apache.flink.streaming.examples.windowing.WindowWordCount;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class WindowWordCountITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+	protected String windowSize = "250";
+	protected String slideSize = "150";
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		// since the parallel tokenizers might have different speed
+		// the exact output can not be checked just whether it is well-formed
+		// checks that the result lines look like e.g. (faust, 2)
+		checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WindowWordCount.main(new String[]{textPath, resultPath, windowSize, slideSize});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/PojoExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/PojoExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/PojoExampleITCase.java
new file mode 100644
index 0000000..6e3c213
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/PojoExampleITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.wordcount;
+
+import org.apache.flink.streaming.examples.wordcount.PojoExample;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class PojoExampleITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		PojoExample.main(new String[]{textPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/WordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/WordCountITCase.java
new file mode 100644
index 0000000..fcf568e
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/wordcount/WordCountITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.wordcount;
+
+import org.apache.flink.streaming.examples.wordcount.WordCount;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class WordCountITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WordCount.main(new String[]{textPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
new file mode 100644
index 0000000..08ce890
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleScalaPrograms.join;
+
+import org.apache.flink.streaming.scala.examples.join.WindowJoin;
+import org.apache.flink.streaming.examples.join.util.WindowJoinData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class WindowJoinITCase extends StreamingProgramTestBase {
+
+	protected String gradesPath;
+	protected String salariesPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
+		salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		// since the two sides of the join might have different speed
+		// the exact output can not be checked just whether it is well-formed
+		// checks that the result lines look like e.g. Person(bob, 2, 2015)
+		checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WindowJoin.main(new String[]{gradesPath, salariesPath, resultPath});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
new file mode 100644
index 0000000..b3629ad
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleScalaPrograms.socket;
+
+import org.apache.flink.streaming.scala.examples.socket.SocketTextStreamWordCount;
+import org.apache.flink.streaming.util.SocketProgramITCaseBase;
+
+public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
+
+	@Override
+	protected void testProgram() throws Exception {
+		SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath});
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
new file mode 100644
index 0000000..ef4e47f
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/windowing/TopSpeedWindowingExampleITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleScalaPrograms.windowing;
+
+import org.apache.flink.streaming.scala.examples.windowing.TopSpeedWindowing;
+import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class TopSpeedWindowingExampleITCase extends StreamingProgramTestBase {
+	protected String textPath;
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		setParallelism(1); //needed to ensure total ordering for windows
+		textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_CASE_CLASS_SPEEDS, resultPath);
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		TopSpeedWindowing.main(new String[]{textPath, resultPath});
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/pom.xml b/flink-examples/flink-java-examples/pom.xml
deleted file mode 100644
index ae5a0fd..0000000
--- a/flink-examples/flink-java-examples/pom.xml
+++ /dev/null
@@ -1,330 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-examples</artifactId>
-		<version>1.0-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-java-examples</artifactId>
-	<name>flink-java-examples</name>
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-	</dependencies>
-	
-	
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-				
-					<!-- KMeans -->
-					<execution>
-						<id>KMeans</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-
-						<configuration>
-							<classifier>KMeans</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.clustering.KMeans</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>**/java/clustering/KMeans.class</include>
-								<include>**/java/clustering/KMeans$*.class</include>
-								<include>**/java/clustering/util/KMeansDataGenerator.class</include>
-								<include>**/java/clustering/util/KMeansData.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- Transitive Closure -->
-					<execution>
-						<id>TransitiveClosure</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>TransitiveClosure</classifier>
-				
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.graph.TransitiveClosureNaive</program-class>
-								</manifestEntries>
-							</archive>
-				
-							<includes>
-								<include>**/java/graph/TransitiveClosureNaive.class</include>
-								<include>**/java/graph/TransitiveClosureNaive$*.class</include>
-								<include>**/java/graph/util/ConnectedComponentsData.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- Connected Components -->
-					<execution>
-						<id>ConnectedComponents</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>ConnectedComponents</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.graph.ConnectedComponents</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>**/java/graph/ConnectedComponents.class</include>
-								<include>**/java/graph/ConnectedComponents$*.class</include>
-								<include>**/java/graph/util/ConnectedComponentsData.class</include>
-							</includes>
-						</configuration>
-					</execution>
-					
-					<!-- EnumTriangles Basic -->
-					<execution>
-						<id>EnumTrianglesBasic</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>EnumTrianglesBasic</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.graph.EnumTrianglesBasic</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>**/java/graph/EnumTrianglesBasic.class</include>
-								<include>**/java/graph/EnumTrianglesBasic$*.class</include>
-								<include>**/java/graph/util/EnumTrianglesDataTypes.class</include>
-								<include>**/java/graph/util/EnumTrianglesDataTypes$*.class</include>
-								<include>**/java/graph/util/EnumTrianglesData.class</include>
-							</includes>
-						</configuration>
-					</execution>
-					
-					<!-- EnumTriangles Opt -->
-					<execution>
-						<id>EnumTrianglesOpt</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>EnumTrianglesOpt</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.graph.EnumTrianglesOpt</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>**/java/graph/EnumTrianglesOpt.class</include>
-								<include>**/java/graph/EnumTrianglesOpt$*.class</include>
-								<include>**/java/graph/util/EnumTrianglesDataTypes.class</include>
-								<include>**/java/graph/util/EnumTrianglesDataTypes$*.class</include>
-								<include>**/java/graph/util/EnumTrianglesData.class</include>
-							</includes>
-						</configuration>
-					</execution>
-					
-					<!-- PageRank Basic-->
-					<execution>
-						<id>PageRankBasic</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>PageRankBasic</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.graph.PageRankBasic</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>**/java/graph/PageRankBasic.class</include>
-								<include>**/java/graph/PageRankBasic$*.class</include>
-								<include>**/java/graph/util/PageRankData.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- WebLogAnalysis -->
-					<execution>
-						<id>WebLogAnalysis</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>WebLogAnalysis</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.relational.WebLogAnalysis</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>**/java/relational/WebLogAnalysis.class</include>
-								<include>**/java/relational/WebLogAnalysis$*.class</include>
-								<include>**/java/relational/util/WebLogData.class</include>
-								<include>**/java/relational/util/WebLogDataGenerator.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- WordCount -->
-					<execution>
-						<id>WordCount</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>WordCount</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.wordcount.WordCount</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>**/java/wordcount/WordCount.class</include>
-								<include>**/java/wordcount/WordCount$*.class</include>
-								<include>**/java/wordcount/util/WordCountData.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- WordCountPOJO -->
-					<execution>
-						<id>WordCountPOJO</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>WordCountPOJO</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.wordcount.PojoExample</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>**/java/wordcount/PojoExample.class</include>
-								<include>**/java/wordcount/PojoExample$*.class</include>
-								<include>**/java/wordcount/util/WordCountData.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- Distributed Copy -->
-					<execution>
-						<id>DistCp</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>DistCp</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.examples.java.distcp.DistCp</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>**/java/distcp/*</include>
-							</includes>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!--simplify the name of example JARs for build-target/examples -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-antrun-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<execution>
-						<id>rename</id>
-						<phase>package</phase>
-						<goals>
-							<goal>run</goal>
-						</goals>
-						<configuration> 
-							<target>
-								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-KMeans.jar" tofile="${project.basedir}/target/KMeans.jar" />
-								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-ConnectedComponents.jar" tofile="${project.basedir}/target/ConnectedComponents.jar" />
-								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-EnumTrianglesBasic.jar" tofile="${project.basedir}/target/EnumTrianglesBasic.jar" />
-								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-EnumTrianglesOpt.jar" tofile="${project.basedir}/target/EnumTrianglesOpt.jar" />
-								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-PageRankBasic.jar" tofile="${project.basedir}/target/PageRankBasic.jar" />
-								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-TransitiveClosure.jar" tofile="${project.basedir}/target/TransitiveClosure.jar" />
-								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WebLogAnalysis.jar" tofile="${project.basedir}/target/WebLogAnalysis.jar" />
-								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
-								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WordCountPOJO.jar" tofile="${project.basedir}/target/WordCountPOJO.jar" />
-								<copy file="${project.basedir}/target/flink-java-examples-${project.version}-DistCp.jar" tofile="${project.basedir}/target/DistCp.jar" />
-							</target>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
deleted file mode 100644
index 2db6f65..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.clustering;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.java.clustering.util.KMeansData;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-
-/**
- * This example implements a basic K-Means clustering algorithm.
- * 
- * <p>
- * K-Means is an iterative clustering algorithm and works as follows:<br>
- * K-Means is given a set of data points to be clustered and an initial set of <i>K</i> cluster centers.
- * In each iteration, the algorithm computes the distance of each data point to each cluster center.
- * Each point is assigned to the cluster center which is closest to it.
- * Subsequently, each cluster center is moved to the center (<i>mean</i>) of all points that have been assigned to it.
- * The moved cluster centers are fed into the next iteration. 
- * The algorithm terminates after a fixed number of iterations (as in this implementation) 
- * or if cluster centers do not (significantly) move in an iteration.<br>
- * This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/K-means_clustering">K-Means Clustering algorithm</a>.
- * 
- * <p>
- * This implementation works on two-dimensional data points. <br>
- * It computes an assignment of data points to cluster centers, i.e., 
- * each data point is annotated with the id of the final cluster (center) it belongs to.
- * 
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- * <li>Data points are represented as two double values separated by a blank character.
- * Data points are separated by newline characters.<br>
- * For example <code>"1.2 2.3\n5.3 7.2\n"</code> gives two data points (x=1.2, y=2.3) and (x=5.3, y=7.2).
- * <li>Cluster centers are represented by an integer id and a point value.<br>
- * For example <code>"1 6.2 3.2\n2 2.9 5.7\n"</code> gives two centers (id=1, x=6.2, y=3.2) and (id=2, x=2.9, y=5.7).
- * </ul>
- * 
- * <p>
- * Usage: <code>KMeans &lt;points path&gt; &lt;centers path&gt; &lt;result path&gt; &lt;num iterations&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link KMeansData} and 10 iterations. 
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>Bulk iterations
- * <li>Broadcast variables in bulk iterations
- * <li>Custom Java objects (PoJos)
- * </ul>
- */
-@SuppressWarnings("serial")
-public class KMeans {
-	
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-	
-	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
-			return;
-		}
-	
-		// set up execution environment
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// get input data
-		DataSet<Point> points = getPointDataSet(env);
-		DataSet<Centroid> centroids = getCentroidDataSet(env);
-		
-		// set number of bulk iterations for KMeans algorithm
-		IterativeDataSet<Centroid> loop = centroids.iterate(numIterations);
-		
-		DataSet<Centroid> newCentroids = points
-			// compute closest centroid for each point
-			.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
-			// count and sum point coordinates for each centroid
-			.map(new CountAppender())
-			.groupBy(0).reduce(new CentroidAccumulator())
-			// compute new centroids from point counts and coordinate sums
-			.map(new CentroidAverager());
-		
-		// feed new centroids back into next iteration
-		DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
-		
-		DataSet<Tuple2<Integer, Point>> clusteredPoints = points
-				// assign points to final clusters
-				.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
-		
-		// emit result
-		if (fileOutput) {
-			clusteredPoints.writeAsCsv(outputPath, "\n", " ");
-
-			// since file sinks are lazy, we trigger the execution explicitly
-			env.execute("KMeans Example");
-		}
-		else {
-			clusteredPoints.print();
-		}
-	}
-	
-	// *************************************************************************
-	//     DATA TYPES
-	// *************************************************************************
-	
-	/**
-	 * A simple two-dimensional point.
-	 */
-	public static class Point implements Serializable {
-		
-		public double x, y;
-		
-		public Point() {}
-
-		public Point(double x, double y) {
-			this.x = x;
-			this.y = y;
-		}
-		
-		public Point add(Point other) {
-			x += other.x;
-			y += other.y;
-			return this;
-		}
-		
-		public Point div(long val) {
-			x /= val;
-			y /= val;
-			return this;
-		}
-		
-		public double euclideanDistance(Point other) {
-			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
-		}
-		
-		public void clear() {
-			x = y = 0.0;
-		}
-		
-		@Override
-		public String toString() {
-			return x + " " + y;
-		}
-	}
-	
-	/**
-	 * A simple two-dimensional centroid, basically a point with an ID. 
-	 */
-	public static class Centroid extends Point {
-		
-		public int id;
-		
-		public Centroid() {}
-		
-		public Centroid(int id, double x, double y) {
-			super(x,y);
-			this.id = id;
-		}
-		
-		public Centroid(int id, Point p) {
-			super(p.x, p.y);
-			this.id = id;
-		}
-		
-		@Override
-		public String toString() {
-			return id + " " + super.toString();
-		}
-	}
-	
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-	
-	/** Converts a {@code Tuple2<Double,Double>} into a Point. */
-	@ForwardedFields("0->x; 1->y")
-	public static final class TuplePointConverter implements MapFunction<Tuple2<Double, Double>, Point> {
-
-		@Override
-		public Point map(Tuple2<Double, Double> t) throws Exception {
-			return new Point(t.f0, t.f1);
-		}
-	}
-	
-	/** Converts a {@code Tuple3<Integer, Double,Double>} into a Centroid. */
-	@ForwardedFields("0->id; 1->x; 2->y")
-	public static final class TupleCentroidConverter implements MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
-
-		@Override
-		public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception {
-			return new Centroid(t.f0, t.f1, t.f2);
-		}
-	}
-	
-	/** Determines the closest cluster center for a data point. */
-	@ForwardedFields("*->1")
-	public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<Integer, Point>> {
-		private Collection<Centroid> centroids;
-
-		/** Reads the centroid values from a broadcast variable into a collection. */
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
-		}
-		
-		@Override
-		public Tuple2<Integer, Point> map(Point p) throws Exception {
-			
-			double minDistance = Double.MAX_VALUE;
-			int closestCentroidId = -1;
-			
-			// check all cluster centers
-			for (Centroid centroid : centroids) {
-				// compute distance
-				double distance = p.euclideanDistance(centroid);
-				
-				// update nearest cluster if necessary 
-				if (distance < minDistance) {
-					minDistance = distance;
-					closestCentroidId = centroid.id;
-				}
-			}
-
-			// emit a new record with the center id and the data point.
-			return new Tuple2<Integer, Point>(closestCentroidId, p);
-		}
-	}
-	
-	/** Appends a count variable to the tuple. */
-	@ForwardedFields("f0;f1")
-	public static final class CountAppender implements MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
-
-		@Override
-		public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t) {
-			return new Tuple3<Integer, Point, Long>(t.f0, t.f1, 1L);
-		} 
-	}
-	
-	/** Sums and counts point coordinates. */
-	@ForwardedFields("0")
-	public static final class CentroidAccumulator implements ReduceFunction<Tuple3<Integer, Point, Long>> {
-
-		@Override
-		public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, Point, Long> val1, Tuple3<Integer, Point, Long> val2) {
-			return new Tuple3<Integer, Point, Long>(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2);
-		}
-	}
-	
-	/** Computes new centroid from coordinate sum and count of points. */
-	@ForwardedFields("0->id")
-	public static final class CentroidAverager implements MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
-
-		@Override
-		public Centroid map(Tuple3<Integer, Point, Long> value) {
-			return new Centroid(value.f0, value.f1.div(value.f2));
-		}
-	}
-	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static boolean fileOutput = false;
-	private static String pointsPath = null;
-	private static String centersPath = null;
-	private static String outputPath = null;
-	private static int numIterations = 10;
-	
-	private static boolean parseParameters(String[] programArguments) {
-		
-		if(programArguments.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if(programArguments.length == 4) {
-				pointsPath = programArguments[0];
-				centersPath = programArguments[1];
-				outputPath = programArguments[2];
-				numIterations = Integer.parseInt(programArguments[3]);
-			} else {
-				System.err.println("Usage: KMeans <points path> <centers path> <result path> <num iterations>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing K-Means example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  We provide a data generator to create synthetic input files for this program.");
-			System.out.println("  Usage: KMeans <points path> <centers path> <result path> <num iterations>");
-		}
-		return true;
-	}
-	
-	private static DataSet<Point> getPointDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			// read points from CSV file
-			return env.readCsvFile(pointsPath)
-						.fieldDelimiter(" ")
-						.includeFields(true, true)
-						.types(Double.class, Double.class)
-						.map(new TuplePointConverter());
-		} else {
-			return KMeansData.getDefaultPointDataSet(env);
-		}
-	}
-	
-	private static DataSet<Centroid> getCentroidDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			return env.readCsvFile(centersPath)
-						.fieldDelimiter(" ")
-						.includeFields(true, true, true)
-						.types(Integer.class, Double.class, Double.class)
-						.map(new TupleCentroidConverter());
-		} else {
-			return KMeansData.getDefaultCentroidDataSet(env);
-		}
-	}
-		
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
deleted file mode 100644
index 91a4c23..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansData.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.examples.java.clustering.util;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.examples.java.clustering.KMeans.Centroid;
-import org.apache.flink.examples.java.clustering.KMeans.Point;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Provides the default data sets used for the K-Means example program.
- * The default data sets are used, if no parameters are given to the program.
- *
- */
-public class KMeansData {
-
-	// We have the data as object arrays so that we can also generate Scala Data Sources from it.
-	public static final Object[][] CENTROIDS = new Object[][] {
-		new Object[] {1, -31.85, -44.77},
-		new Object[]{2, 35.16, 17.46},
-		new Object[]{3, -5.16, 21.93},
-		new Object[]{4, -24.06, 6.81}
-	};
-
-	public static final Object[][] POINTS = new Object[][] {
-		new Object[] {-14.22, -48.01},
-		new Object[] {-22.78, 37.10},
-		new Object[] {56.18, -42.99},
-		new Object[] {35.04, 50.29},
-		new Object[] {-9.53, -46.26},
-		new Object[] {-34.35, 48.25},
-		new Object[] {55.82, -57.49},
-		new Object[] {21.03, 54.64},
-		new Object[] {-13.63, -42.26},
-		new Object[] {-36.57, 32.63},
-		new Object[] {50.65, -52.40},
-		new Object[] {24.48, 34.04},
-		new Object[] {-2.69, -36.02},
-		new Object[] {-38.80, 36.58},
-		new Object[] {24.00, -53.74},
-		new Object[] {32.41, 24.96},
-		new Object[] {-4.32, -56.92},
-		new Object[] {-22.68, 29.42},
-		new Object[] {59.02, -39.56},
-		new Object[] {24.47, 45.07},
-		new Object[] {5.23, -41.20},
-		new Object[] {-23.00, 38.15},
-		new Object[] {44.55, -51.50},
-		new Object[] {14.62, 59.06},
-		new Object[] {7.41, -56.05},
-		new Object[] {-26.63, 28.97},
-		new Object[] {47.37, -44.72},
-		new Object[] {29.07, 51.06},
-		new Object[] {0.59, -31.89},
-		new Object[] {-39.09, 20.78},
-		new Object[] {42.97, -48.98},
-		new Object[] {34.36, 49.08},
-		new Object[] {-21.91, -49.01},
-		new Object[] {-46.68, 46.04},
-		new Object[] {48.52, -43.67},
-		new Object[] {30.05, 49.25},
-		new Object[] {4.03, -43.56},
-		new Object[] {-37.85, 41.72},
-		new Object[] {38.24, -48.32},
-		new Object[] {20.83, 57.85}
-	};
-
-	public static DataSet<Centroid> getDefaultCentroidDataSet(ExecutionEnvironment env) {
-		List<Centroid> centroidList = new LinkedList<Centroid>();
-		for (Object[] centroid : CENTROIDS) {
-			centroidList.add(
-					new Centroid((Integer) centroid[0], (Double) centroid[1], (Double) centroid[2]));
-		}
-		return env.fromCollection(centroidList);
-	}
-	
-	public static DataSet<Point> getDefaultPointDataSet(ExecutionEnvironment env) {
-		List<Point> pointList = new LinkedList<Point>();
-		for (Object[] point : POINTS) {
-			pointList.add(new Point((Double) point[0], (Double) point[1]));
-		}
-		return env.fromCollection(pointList);
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
deleted file mode 100644
index 8f48d0a..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.examples.java.clustering.util;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.Locale;
-import java.util.Random;
-
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.examples.java.clustering.KMeans;
-
-/**
- * Generates data for the {@link KMeans} example program.
- */
-public class KMeansDataGenerator {
-	
-	static {
-		Locale.setDefault(Locale.US);
-	}
-	
-	private static final String CENTERS_FILE = "centers";
-	private static final String POINTS_FILE = "points";
-	private static final long DEFAULT_SEED = 4650285087650871364L;
-	private static final double DEFAULT_VALUE_RANGE = 100.0;
-	private static final double RELATIVE_STDDEV = 0.08;
-	private static final int DIMENSIONALITY = 2;
-	private static final DecimalFormat FORMAT = new DecimalFormat("#0.00");
-	private static final char DELIMITER = ' ';
-
-	/**
-	 * Main method to generate data for the {@link KMeans} example program.
-	 * <p>
-	 * The generator creates to files:
-	 * <ul>
-	 * <li><code>&lt; output-path &gt;/points</code> for the data points
-	 * <li><code>&lt; output-path &gt;/centers</code> for the cluster centers
-	 * </ul> 
-	 * 
-	 * @param args 
-	 * <ol>
-	 * <li>Int: Number of data points
-	 * <li>Int: Number of cluster centers
-	 * <li><b>Optional</b> String: Output path, default value is {tmp.dir}
-	 * <li><b>Optional</b> Double: Standard deviation of data points
-	 * <li><b>Optional</b> Double: Value range of cluster centers
-	 * <li><b>Optional</b> Long: Random seed
-	 * </ol>
-	 *
-	 * @throws IOException
-	 */
-	public static void main(String[] args) throws IOException {
-
-		// check parameter count
-		if (args.length < 2) {
-			System.out.println("KMeansDataGenerator -points <num> -k <num clusters> [-output <output-path>] [-stddev <relative stddev>] [-range <centroid range>] [-seed <seed>]");
-			System.exit(1);
-		}
-
-		// parse parameters
-
-		final ParameterTool params = ParameterTool.fromArgs(args);
-		final int numDataPoints = params.getInt("points");
-		final int k = params.getInt("k");
-		final String outDir = params.get("output", System.getProperty("java.io.tmpdir"));
-		final double stddev = params.getDouble("stddev", RELATIVE_STDDEV);
-		final double range = params.getDouble("range", DEFAULT_VALUE_RANGE);
-		final long firstSeed = params.getLong("seed", DEFAULT_SEED);
-
-		
-		final double absoluteStdDev = stddev * range;
-		final Random random = new Random(firstSeed);
-		
-		// the means around which data points are distributed
-		final double[][] means = uniformRandomCenters(random, k, DIMENSIONALITY, range);
-		
-		// write the points out
-		BufferedWriter pointsOut = null;
-		try {
-			pointsOut = new BufferedWriter(new FileWriter(new File(outDir+"/"+POINTS_FILE)));
-			StringBuilder buffer = new StringBuilder();
-			
-			double[] point = new double[DIMENSIONALITY];
-			int nextCentroid = 0;
-			
-			for (int i = 1; i <= numDataPoints; i++) {
-				// generate a point for the current centroid
-				double[] centroid = means[nextCentroid];
-				for (int d = 0; d < DIMENSIONALITY; d++) {
-					point[d] = (random.nextGaussian() * absoluteStdDev) + centroid[d];
-				}
-				writePoint(point, buffer, pointsOut);
-				nextCentroid = (nextCentroid + 1) % k;
-			}
-		}
-		finally {
-			if (pointsOut != null) {
-				pointsOut.close();
-			}
-		}
-		
-		// write the uniformly distributed centers to a file
-		BufferedWriter centersOut = null;
-		try {
-			centersOut = new BufferedWriter(new FileWriter(new File(outDir+"/"+CENTERS_FILE)));
-			StringBuilder buffer = new StringBuilder();
-			
-			double[][] centers = uniformRandomCenters(random, k, DIMENSIONALITY, range);
-			
-			for (int i = 0; i < k; i++) {
-				writeCenter(i + 1, centers[i], buffer, centersOut);
-			}
-		}
-		finally {
-			if (centersOut != null) {
-				centersOut.close();
-			}
-		}
-		
-		System.out.println("Wrote "+numDataPoints+" data points to "+outDir+"/"+POINTS_FILE);
-		System.out.println("Wrote "+k+" cluster centers to "+outDir+"/"+CENTERS_FILE);
-	}
-	
-	private static double[][] uniformRandomCenters(Random rnd, int num, int dimensionality, double range) {
-		final double halfRange = range / 2;
-		final double[][] points = new double[num][dimensionality];
-		
-		for (int i = 0; i < num; i++) {
-			for (int dim = 0; dim < dimensionality; dim ++) {
-				points[i][dim] = (rnd.nextDouble() * range) - halfRange;
-			}
-		}
-		return points;
-	}
-	
-	private static void writePoint(double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException {
-		buffer.setLength(0);
-		
-		// write coordinates
-		for (int j = 0; j < coordinates.length; j++) {
-			buffer.append(FORMAT.format(coordinates[j]));
-			if(j < coordinates.length - 1) {
-				buffer.append(DELIMITER);
-			}
-		}
-		
-		out.write(buffer.toString());
-		out.newLine();
-	}
-	
-	private static void writeCenter(long id, double[] coordinates, StringBuilder buffer, BufferedWriter out) throws IOException {
-		buffer.setLength(0);
-		
-		// write id
-		buffer.append(id);
-		buffer.append(DELIMITER);
-
-		// write coordinates
-		for (int j = 0; j < coordinates.length; j++) {
-			buffer.append(FORMAT.format(coordinates[j]));
-			if(j < coordinates.length - 1) {
-				buffer.append(DELIMITER);
-			}
-		}
-		
-		out.write(buffer.toString());
-		out.newLine();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
deleted file mode 100644
index 8e87892..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/DistCp.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.distcp;
-
-import org.apache.commons.io.IOUtils;
-
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.LocalEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.operators.FlatMapOperator;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.Collector;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A main class of the Flink distcp utility.
- * It's a simple reimplementation of Hadoop distcp
- * (see <a href="http://hadoop.apache.org/docs/r1.2.1/distcp.html">http://hadoop.apache.org/docs/r1.2.1/distcp.html</a>)
- * with a dynamic input format
- * Note that this tool does not deal with retriability. Additionally, empty directories are not copied over.
- * <p>
- * When running locally, local file systems paths can be used.
- * However, in a distributed environment HDFS paths must be provided both as input and output.
- */
-public class DistCp {
-	
-	private static final Logger LOGGER = LoggerFactory.getLogger(DistCp.class);
-	public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
-	public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
-
-	public static void main(String[] args) throws Exception {
-		if (args.length != 3) {
-			printHelp();
-			return;
-		}
-
-		final Path sourcePath = new Path(args[0]);
-		final Path targetPath = new Path(args[1]);
-		int parallelism = Integer.valueOf(args[2], 10);
-
-		// set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		checkInputParams(env, sourcePath, targetPath, parallelism);
-		env.setParallelism(parallelism);
-
-		long startTime = System.currentTimeMillis();
-		LOGGER.info("Initializing copy tasks");
-		List<FileCopyTask> tasks = getCopyTasks(sourcePath);
-		LOGGER.info("Copy task initialization took " + (System.currentTimeMillis() - startTime) + "ms");
-
-		DataSet<FileCopyTask> inputTasks = new DataSource<>(env,
-				new FileCopyTaskInputFormat(tasks),
-				new GenericTypeInfo<>(FileCopyTask.class), "fileCopyTasks");
-
-
-		FlatMapOperator<FileCopyTask, Object> res = inputTasks.flatMap(new RichFlatMapFunction<FileCopyTask, Object>() {
-			
-			private LongCounter fileCounter;
-			private LongCounter bytesCounter;
-
-			@Override
-			public void open(Configuration parameters) throws Exception {
-				bytesCounter = getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME);
-				fileCounter = getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME);
-			}
-
-			@Override
-			public void flatMap(FileCopyTask task, Collector<Object> out) throws Exception {
-				LOGGER.info("Processing task: " + task);
-				Path outPath = new Path(targetPath, task.getRelativePath());
-
-				FileSystem targetFs = targetPath.getFileSystem();
-				// creating parent folders in case of a local FS
-				if (!targetFs.isDistributedFS()) {
-					//dealing with cases like file:///tmp or just /tmp
-					File outFile = outPath.toUri().isAbsolute() ? new File(outPath.toUri()) : new File(outPath.toString());
-					File parentFile = outFile.getParentFile();
-					if (!parentFile.mkdirs() && !parentFile.exists()) {
-						throw new RuntimeException("Cannot create local file system directories: " + parentFile);
-					}
-				}
-				FSDataOutputStream outputStream = null;
-				FSDataInputStream inputStream = null;
-				try {
-					outputStream = targetFs.create(outPath, true);
-					inputStream = task.getPath().getFileSystem().open(task.getPath());
-					int bytes = IOUtils.copy(inputStream, outputStream);
-					bytesCounter.add(bytes);
-				} finally {
-					IOUtils.closeQuietly(inputStream);
-					IOUtils.closeQuietly(outputStream);
-				}
-				fileCounter.add(1l);
-			}
-		});
-
-		// no data sinks are needed, therefore just printing an empty result
-		res.print();
-
-		Map<String, Object> accumulators = env.getLastJobExecutionResult().getAllAccumulatorResults();
-		LOGGER.info("== COUNTERS ==");
-		for (Map.Entry<String, Object> e : accumulators.entrySet()) {
-			LOGGER.info(e.getKey() + ": " + e.getValue());
-		}
-	}
-
-
-	// -----------------------------------------------------------------------------------------
-	// HELPER METHODS
-	// -----------------------------------------------------------------------------------------
-
-	private static void checkInputParams(ExecutionEnvironment env, Path sourcePath, Path targetPath, int parallelism) throws IOException {
-		if (parallelism <= 0) {
-			throw new IllegalArgumentException("Parallelism should be greater than 0");
-		}
-
-		boolean isLocal = env instanceof LocalEnvironment;
-		if (!isLocal &&
-				!(sourcePath.getFileSystem().isDistributedFS() && targetPath.getFileSystem().isDistributedFS())) {
-			throw new IllegalArgumentException("In a distributed mode only HDFS input/output paths are supported");
-		}
-	}
-
-	private static void printHelp() {
-		System.err.println("Usage: <input_path> <output_path> <level_of_parallelism>");
-	}
-
-	private static List<FileCopyTask> getCopyTasks(Path sourcePath) throws IOException {
-		List<FileCopyTask> tasks = new ArrayList<>();
-		getCopyTasks(sourcePath, "", tasks);
-		return tasks;
-	}
-
-	private static void getCopyTasks(Path p, String rel, List<FileCopyTask> tasks) throws IOException {
-		FileStatus[] res = p.getFileSystem().listStatus(p);
-		if (res == null) {
-			return;
-		}
-		for (FileStatus fs : res) {
-			if (fs.isDir()) {
-				getCopyTasks(fs.getPath(), rel + fs.getPath().getName() + "/", tasks);
-			} else {
-				Path cp = fs.getPath();
-				tasks.add(new FileCopyTask(cp, rel + cp.getName()));
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
deleted file mode 100644
index 7f38a8b..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTask.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.distcp;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.core.fs.Path;
-
-import java.io.Serializable;
-
-/**
- * A Java POJO that represents a task for copying a single file
- */
-public class FileCopyTask implements Serializable {
-	
-	private static final long serialVersionUID = -8760082278978316032L;
-	
-	private final Path path;
-	private final String relativePath;
-
-	public FileCopyTask(Path path, String relativePath) {
-		if (StringUtils.isEmpty(relativePath)) {
-			throw new IllegalArgumentException("Relative path should not be empty for: " + path);
-		}
-		this.path = path;
-		this.relativePath = relativePath;
-	}
-
-	public Path getPath() {
-		return path;
-	}
-
-	public String getRelativePath() {
-		return relativePath;
-	}
-
-	@Override
-	public String toString() {
-		return "FileCopyTask{" +
-				"path=" + path +
-				", relativePath='" + relativePath + '\'' +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
deleted file mode 100644
index 6137e12..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputFormat.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.examples.java.distcp;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-/**
- * An implementation of an input format that dynamically assigns {@code FileCopyTask} to the mappers
- * that have finished previously assigned tasks
- */
-public class FileCopyTaskInputFormat implements InputFormat<FileCopyTask, FileCopyTaskInputSplit> {
-
-	private static final long serialVersionUID = -644394866425221151L;
-	
-	private static final Logger LOGGER = LoggerFactory.getLogger(FileCopyTaskInputFormat.class);
-	
-
-	private final List<FileCopyTask> tasks;
-
-	public FileCopyTaskInputFormat(List<FileCopyTask> tasks) {
-		this.tasks = tasks;
-	}
-
-	private class FileCopyTaskAssigner implements InputSplitAssigner {
-		private Queue<FileCopyTaskInputSplit> splits;
-
-		public FileCopyTaskAssigner(FileCopyTaskInputSplit[] inputSplits) {
-			splits = new LinkedList<>(Arrays.asList(inputSplits));
-		}
-
-		@Override
-		public InputSplit getNextInputSplit(String host, int taskId) {
-			LOGGER.info("Getting copy task for task: " + taskId);
-			return splits.poll();
-		}
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
-		//no op
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
-		return null;
-	}
-
-	@Override
-	public FileCopyTaskInputSplit[] createInputSplits(int minNumSplits) throws IOException {
-		FileCopyTaskInputSplit[] splits = new FileCopyTaskInputSplit[tasks.size()];
-		int i = 0;
-		for (FileCopyTask t : tasks) {
-			splits[i] = new FileCopyTaskInputSplit(t, i);
-			i++;
-		}
-		return splits;
-	}
-
-	@Override
-	public InputSplitAssigner getInputSplitAssigner(FileCopyTaskInputSplit[] inputSplits) {
-		return new FileCopyTaskAssigner(inputSplits);
-	}
-
-	private FileCopyTaskInputSplit curInputSplit = null;
-
-	@Override
-	public void open(FileCopyTaskInputSplit split) throws IOException {
-		curInputSplit = split;
-	}
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return curInputSplit == null;
-	}
-
-	@Override
-	public FileCopyTask nextRecord(FileCopyTask reuse) throws IOException {
-		FileCopyTask toReturn = curInputSplit.getTask();
-		curInputSplit = null;
-		return toReturn;
-	}
-
-	@Override
-	public void close() throws IOException {
-		//no op
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
deleted file mode 100644
index 33943b6..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/distcp/FileCopyTaskInputSplit.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.distcp;
-
-import org.apache.flink.core.io.InputSplit;
-
-/**
- * Implementation of {@code InputSplit} for copying files
- */
-public class FileCopyTaskInputSplit implements InputSplit {
-	
-	private static final long serialVersionUID = -7621656017747660450L;
-	
-	private final FileCopyTask task;
-	private final int splitNumber;
-
-	public FileCopyTaskInputSplit(FileCopyTask task, int splitNumber) {
-		this.task = task;
-		this.splitNumber = splitNumber;
-	}
-
-	public FileCopyTask getTask() {
-		return task;
-	}
-
-	@Override
-	public int getSplitNumber() {
-		return splitNumber;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0e1d635/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
deleted file mode 100644
index 535bf9d..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/ConnectedComponents.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.examples.java.graph;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-
-/**
- * An implementation of the connected components algorithm, using a delta iteration.
- * 
- * <p>
- * Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the minimum of its own ID and its
- * neighbors' IDs, as its new ID and tells its neighbors about its new ID. After the algorithm has completed, all vertices in the
- * same component will have the same ID.
- * 
- * <p>
- * A vertex whose component ID did not change needs not propagate its information in the next step. Because of that,
- * the algorithm is easily expressible via a delta iteration. We here model the solution set as the vertices with
- * their current component ids, and the workset as the changed vertices. Because we see all vertices initially as
- * changed, the initial workset and the initial solution set are identical. Also, the delta to the solution set
- * is consequently also the next workset.<br>
- * 
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- * <li>Vertices represented as IDs and separated by new-line characters.<br> 
- * For example <code>"1\n2\n12\n42\n63"</code> gives five vertices (1), (2), (12), (42), and (63).
- * <li>Edges are represented as pairs for vertex IDs which are separated by space 
- * characters. Edges are separated by new-line characters.<br>
- * For example <code>"1 2\n2 12\n1 12\n42 63"</code> gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
- * </ul>
- * 
- * <p>
- * Usage: <code>ConnectedComponents &lt;vertices path&gt; &lt;edges path&gt; &lt;result path&gt; &lt;max number of iterations&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link ConnectedComponentsData} and 10 iterations. 
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>Delta Iterations
- * <li>Generic-typed Functions 
- * </ul>
- */
-@SuppressWarnings("serial")
-public class ConnectedComponents implements ProgramDescription {
-	
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-	
-	public static void main(String... args) throws Exception {
-		
-		if(!parseParameters(args)) {
-			return;
-		}
-		
-		// set up execution environment
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// read vertex and edge data
-		DataSet<Long> vertices = getVertexDataSet(env);
-		DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
-		
-		// assign the initial components (equal to the vertex id)
-		DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
-				
-		// open a delta iteration
-		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
-				verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
-		
-		// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
-		DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
-				.groupBy(0).aggregate(Aggregations.MIN, 1)
-				.join(iteration.getSolutionSet()).where(0).equalTo(0)
-				.with(new ComponentIdFilter());
-
-		// close the delta iteration (delta and new workset are identical)
-		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
-		
-		// emit result
-		if (fileOutput) {
-			result.writeAsCsv(outputPath, "\n", " ");
-			// execute program
-			env.execute("Connected Components Example");
-		} else {
-			result.print();
-		}
-	}
-	
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-	
-	/**
-	 * Function that turns a value into a 2-tuple where both fields are that value.
-	 */
-	@ForwardedFields("*->f0")
-	public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
-		
-		@Override
-		public Tuple2<T, T> map(T vertex) {
-			return new Tuple2<T, T>(vertex, vertex);
-		}
-	}
-	
-	/**
-	 * Undirected edges by emitting for each input edge the input edges itself and an inverted version.
-	 */
-	public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-		Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
-		
-		@Override
-		public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
-			invertedEdge.f0 = edge.f1;
-			invertedEdge.f1 = edge.f0;
-			out.collect(edge);
-			out.collect(invertedEdge);
-		}
-	}
-	
-	/**
-	 * UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that
-	 * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function
-	 * produces a (Target-vertex-ID, Component-ID) pair.
-	 */
-	@ForwardedFieldsFirst("f1->f1")
-	@ForwardedFieldsSecond("f1->f0")
-	public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-		@Override
-		public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
-			return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
-		}
-	}
-	
-
-
-	@ForwardedFieldsFirst("*")
-	public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-		@Override
-		public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) {
-			if (candidate.f1 < old.f1) {
-				out.collect(candidate);
-			}
-		}
-	}
-
-
-
-	@Override
-	public String getDescription() {
-		return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>";
-	}
-	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static boolean fileOutput = false;
-	private static String verticesPath = null;
-	private static String edgesPath = null;
-	private static String outputPath = null;
-	private static int maxIterations = 10;
-	
-	private static boolean parseParameters(String[] programArguments) {
-		
-		if(programArguments.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if(programArguments.length == 4) {
-				verticesPath = programArguments[0];
-				edgesPath = programArguments[1];
-				outputPath = programArguments[2];
-				maxIterations = Integer.parseInt(programArguments[3]);
-			} else {
-				System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing Connected Components example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>");
-		}
-		return true;
-	}
-	
-	private static DataSet<Long> getVertexDataSet(ExecutionEnvironment env) {
-		
-		if(fileOutput) {
-			return env.readCsvFile(verticesPath).types(Long.class)
-						.map(
-								new MapFunction<Tuple1<Long>, Long>() {
-									public Long map(Tuple1<Long> value) { return value.f0; }
-								});
-		} else {
-			return ConnectedComponentsData.getDefaultVertexDataSet(env);
-		}
-	}
-	
-	private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
-		
-		if(fileOutput) {
-			return env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class);
-		} else {
-			return ConnectedComponentsData.getDefaultEdgeDataSet(env);
-		}
-	}
-	
-	
-}


Mime
View raw message