flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [10/10] flink git commit: [FLINK-3552] [examples] Add a properly windowed word count reading from a socket (Java + Scala)
Date Fri, 04 Mar 2016 23:25:39 GMT
[FLINK-3552] [examples] Add a properly windowed word count reading from a socket (Java + Scala)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/986d5368
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/986d5368
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/986d5368

Branch: refs/heads/master
Commit: 986d5368fdb84c44111994b667ce1fc5f9992716
Parents: 271071a
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 29 23:00:51 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Mar 4 21:05:17 2016 +0100

----------------------------------------------------------------------
 flink-examples/flink-examples-streaming/pom.xml |  41 +----
 .../socket/SocketTextStreamWordCount.java       |  88 -----------
 .../examples/socket/SocketWindowWordCount.java  | 114 ++++++++++++++
 .../socket/SocketTextStreamWordCount.scala      |  75 ---------
 .../examples/socket/SocketWindowWordCount.scala |  76 +++++++++
 .../socket/SocketTextStreamWordCountITCase.java |  33 ----
 .../socket/SocketTextStreamWordCountITCase.java |  33 ----
 .../socket/SocketWindowWordCountITCase.java     | 157 +++++++++++++++++++
 .../streaming/util/SocketProgramITCaseBase.java |  90 -----------
 9 files changed, 354 insertions(+), 353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/986d5368/flink-examples/flink-examples-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index d918a2b..d516635 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -303,51 +303,25 @@ under the License.
 						</configuration>
 					</execution>
 
-					<!-- WindowWordCount -->
+					<!-- SocketWindowWordCount -->
 					<execution>
-						<id>WindowWordCount</id>
+						<id>SocketWindowWordCount</id>
 						<phase>package</phase>
 						<goals>
 							<goal>jar</goal>
 						</goals>
 						<configuration>
-							<classifier>WindowWordCount</classifier>
+							<classifier>SocketWindowWordCount</classifier>
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.windowing.WindowWordCount</program-class>
+									<program-class>org.apache.flink.streaming.examples.socket.SocketWindowWordCount</program-class>
 								</manifestEntries>
 							</archive>
 
 							<includes>
-								<include>org/apache/flink/streaming/examples/windowing/WindowWordCount.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
-								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
-					<!-- SocketTextStreamWordCount -->
-					<execution>
-						<id>SocketTextStreamWordCount</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>SocketTextStreamWordCount</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+								<include>org/apache/flink/streaming/examples/socket/SocketWindowWordCount.class</include>
+								<include>org/apache/flink/streaming/examples/socket/SocketWindowWordCount$*.class</include>
 							</includes>
 						</configuration>
 					</execution>
@@ -517,12 +491,11 @@ under the License.
 								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-IncrementalLearning.jar"
tofile="${project.basedir}/target/IncrementalLearning.jar" />
 								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-Iteration.jar"
tofile="${project.basedir}/target/Iteration.jar" />
 								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-SessionWindowing.jar"
tofile="${project.basedir}/target/SessionWindowing.jar" />
-								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-SocketTextStreamWordCount.jar"
tofile="${project.basedir}/target/SocketTextStreamWordCount.jar" />
 								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-TopSpeedWindowing.jar"
tofile="${project.basedir}/target/TopSpeedWindowing.jar" />
 								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-Twitter.jar"
tofile="${project.basedir}/target/Twitter.jar" />
 								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-WindowJoin.jar"
tofile="${project.basedir}/target/WindowJoin.jar" />
 								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-WordCount.jar"
tofile="${project.basedir}/target/WordCount.jar" />
-								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-WindowWordCount.jar"
tofile="${project.basedir}/target/WindowWordCount.jar" />
+								<copy file="${project.basedir}/target/flink-examples-streaming_2.10-${project.version}-SocketWindowWordCount.jar"
tofile="${project.basedir}/target/SocketWindowWordCount.jar" />
 							</target>
 						</configuration>
 					</execution>

http://git-wip-us.apache.org/repos/asf/flink/blob/986d5368/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
deleted file mode 100644
index 8080c46..0000000
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.socket;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
-
-/**
- * This example shows an implementation of WordCount with data from a text
- * socket. To run the example make sure that the service providing the text data
- * is already up and running.
- * <p>
- * To start an example socket text stream on your local machine run netcat from
- * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
- * port number.
- * </p>
- * <p>
- * Usage:
- * <code>SocketTextStreamWordCount --hostname &lt;name&gt; --port &lt;n&gt;
--output &lt;path&gt;</code>
- * </p>
- * <p>
- * This example shows how to:
- * <ul>
- * <li>use StreamExecutionEnvironment.socketTextStream
- * <li>write a simple Flink program,
- * <li>write and use user-defined functions.
- * </ul>
- *
- * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
- */
-public class SocketTextStreamWordCount {
-
-	public static void main(String[] args) throws Exception {
-
-		// Checking input parameters
-		final ParameterTool params = ParameterTool.fromArgs(args);
-		if (!params.has("hostname") || !params.has("port")) {
-			System.err.println("Usage: SocketTextStreamWordCount --hostname <name> --port <n>
[--output <path>]");
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// make parameters available in the web interface
-		env.getConfig().setGlobalJobParameters(params);
-
-		// get input data
-		DataStream<String> text =
-				env.socketTextStream(params.get("hostname"), params.getInt("port"), '\n', 0);
-
-		DataStream<Tuple2<String, Integer>> counts =
-				// split up the lines in pairs (2-tuples) containing: (word,1)
-				text.flatMap(new Tokenizer())
-						// group by the tuple field "0" and sum up tuple field "1"
-						.keyBy(0)
-						.sum(1);
-
-		if (params.has("output")) {
-			counts.writeAsText(params.get("output"));
-		} else {
-			System.out.println("Printing result to stdout. Use --output to specify output path.");
-			counts.print();
-		}
-
-		// execute program
-		env.execute("WordCount from SocketTextStream Example");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/986d5368/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
new file mode 100644
index 0000000..105e7d9
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.socket;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements a streaming windowed version of the "WordCount" program.
+ *
+ * This program connects to a server socket and reads strings from the socket.
+ * The easiest way to try this out is to open a text sever (at port 12345) 
+ * using the <i>netcat</i> tool via
+ * <pre>
+ * nc -l 12345
+ * </pre>
+ * and run this example with the port as an argument.
+ */
+@SuppressWarnings("serial")
+public class SocketWindowWordCount {
+	
+	public static void main(String[] args) throws Exception {
+
+		// the port to connect to
+		final int port;
+		try {
+			final ParameterTool params = ParameterTool.fromArgs(args);
+			port = params.getInt("port");
+		} catch (Exception e) {
+			System.err.println("No port specified. Please run 'WindowWordCount --port <port>',
" +
+					"where port is the address of the text server");
+			System.err.println("To start a simple text server, run 'netcat -l <port>' and type
the input text " +
+					"into the command line");
+			return;
+		} 
+		
+		// get the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data by connecting to the socket
+		DataStream<String> text = env.socketTextStream("localhost", port, '\n');
+
+		// parse the data, group it, window it, and aggregate the counts 
+		DataStream<WordWithCount> windowCounts = text
+				
+				.flatMap(new FlatMapFunction<String, WordWithCount>() {
+					@Override
+					public void flatMap(String value, Collector<WordWithCount> out) {
+						for (String word : value.split("\\s")) {
+							out.collect(new WordWithCount(word, 1L));
+						}
+					}
+				})
+				
+				.keyBy("word")
+				.timeWindow(Time.seconds(5), Time.seconds(1))
+				
+				.reduce(new ReduceFunction<WordWithCount>() {
+					@Override
+					public WordWithCount reduce(WordWithCount a, WordWithCount b) {
+						return new WordWithCount(a.word, a.count + b.count);
+					}
+				});
+
+		// print the results with a single thread, rather than in parallel
+		windowCounts.print().setParallelism(1);
+
+		env.execute("Socket Window WordCount");
+	}
+	
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Data type for words with count
+	 */
+	public static class WordWithCount {
+		
+		public String word;
+		public long count;
+		
+		public WordWithCount() {}
+
+		public WordWithCount(String word, long count) {
+			this.word = word;
+			this.count = count;
+		}
+
+		@Override
+		public String toString() {
+			return word + " : " + count;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/986d5368/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
deleted file mode 100644
index 6b7499c..0000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketTextStreamWordCount.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples.socket
-
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.streaming.api.scala._
-
-/**
- * This example shows an implementation of WordCount with data from a text socket. 
- * To run the example make sure that the service providing the text data is already up and
running.
- *
- * To start an example socket text stream on your local machine run netcat from a command
line, 
- * where the parameter specifies the port number:
- *
- * {{{
- *   nc -lk 9999
- * }}}
- *
- * Usage:
- * {{{
- *   SocketTextStreamWordCount --hostname <name> --port <n> [--output <path>]
- * }}}
- *
- * This example shows how to:
- *
- *   - use StreamExecutionEnvironment.socketTextStream
- *   - write a simple Flink Streaming program in scala.
- *   - write and use user-defined functions.
- */
-object SocketTextStreamWordCount {
-
-  def main(args: Array[String]) {
-
-    val params = ParameterTool.fromArgs(args)
-    if (!params.has("hostname") || !params.has("port")) {
-      println("Usage: SocketTextStreamWordCount --hostname <name> --port <n>
--output <path>")
-      return
-    }
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.getConfig.setGlobalJobParameters(params)
-
-    //Create streams for names and ages by mapping the inputs to the corresponding objects
-    val text = env.socketTextStream(params.get("hostname"), params.getInt("port"))
-    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
-      .map { (_, 1) }
-      .keyBy(0)
-      .sum(1)
-
-    if (params.has("output")) {
-      counts.writeAsText(params.get("output"))
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      counts.print
-    }
-
-    env.execute("Scala SocketTextStreamWordCount Example")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/986d5368/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
new file mode 100644
index 0000000..e942bb5
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.socket
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+ * Implements a streaming windowed version of the "WordCount" program.
+ * 
+ * This program connects to a server socket and reads strings from the socket.
+ * The easiest way to try this out is to open a text sever (at port 12345) 
+ * using the <i>netcat</i> tool via
+ * <pre>
+ * nc -l 12345
+ * </pre>
+ * and run this example with the port as an argument.
+ */
+object SocketWindowWordCount {
+
+  /** Main program method */
+  def main(args: Array[String]) : Unit = {
+    
+    // the port to connect to
+    val port: Int = try {
+      ParameterTool.fromArgs(args).getInt("port")
+    } catch {
+      case e: Exception => {
+        System.err.println("No port specified. Please run 'WindowWordCount --port <port>',
" +
+          "where port is the address of the text server")
+        System.err.println("To start a simple text server, run 'netcat -l <port>' "
+
+          "and type the input text into the command line")
+        return
+      }
+    }
+    
+    // get the execution environment
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+    
+    // get input data by connecting to the socket
+    val text = env.socketTextStream("localhost", port, '\n')
+
+    // parse the data, group it, window it, and aggregate the counts 
+    val windowCounts = text
+          .flatMap { w => w.split("\\s") }
+          .map { w => WordWithCount(w, 1) }
+          .keyBy("word")
+          .timeWindow(Time.seconds(5), Time.seconds(1))
+          .sum("count")
+
+    // print the results with a single thread, rather than in parallel
+    windowCounts.print().setParallelism(1)
+
+    env.execute("Socket Window WordCount")
+  }
+
+  /** Data type for words with count */
+  case class WordWithCount(word: String, count: Long)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/986d5368/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
deleted file mode 100644
index 38696ca..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.exampleJavaPrograms.socket;
-
-import org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount;
-import org.apache.flink.streaming.util.SocketProgramITCaseBase;
-
-public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
-
-	@Override
-	protected void testProgram() throws Exception {
-		SocketTextStreamWordCount.main(new String[]{
-				"--hostname", HOST,
-				"--port", port.toString(),
-				"--output", resultPath});
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/986d5368/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
deleted file mode 100644
index 39345af..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.exampleScalaPrograms.socket;
-
-import org.apache.flink.streaming.scala.examples.socket.SocketTextStreamWordCount;
-import org.apache.flink.streaming.util.SocketProgramITCaseBase;
-
-public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
-
-	@Override
-	protected void testProgram() throws Exception {
-		SocketTextStreamWordCount.main(new String[]{
-				"--hostname", HOST,
-				"--port", port.toString(),
-				"--output", resultPath});
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/986d5368/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
new file mode 100644
index 0000000..4a1556a
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.socket;
+
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
+import org.apache.flink.test.testdata.WordCountData;
+
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+import static org.junit.Assert.fail;
+
+public class SocketWindowWordCountITCase extends StreamingMultipleProgramsTestBase {
+	
+	@Test
+	public void testJavaProgram() throws Exception {
+		InetAddress localhost = InetAddress.getByName("localhost");
+		
+		// suppress sysout messages from this example
+		final PrintStream originalSysout = System.out;
+		final PrintStream originalSyserr = System.err;
+		
+		final ByteArrayOutputStream errorMessages = new ByteArrayOutputStream();
+		
+		System.setOut(new PrintStream(new NullStream()));
+		System.setErr(new PrintStream(errorMessages));
+		
+		try {
+			try (ServerSocket server = new ServerSocket(0, 10, localhost)) {
+				
+				final ServerThread serverThread = new ServerThread(server);
+				serverThread.setDaemon(true);
+				serverThread.start();
+				
+				final int serverPort = server.getLocalPort();
+
+				org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(
+						new String[] { "--port", String.valueOf(serverPort) });
+
+				if (errorMessages.size() != 0) {
+					fail("Found error message: " + new String(errorMessages.toByteArray()));
+				}
+				
+				serverThread.join();
+				serverThread.checkError();
+			}
+		}
+		finally {
+			System.setOut(originalSysout);
+			System.setErr(originalSyserr);
+		}
+	}
+
+	@Test
+	public void testScalaProgram() throws Exception {
+		InetAddress localhost = InetAddress.getByName("localhost");
+
+		// suppress sysout messages from this example
+		final PrintStream originalSysout = System.out;
+		final PrintStream originalSyserr = System.err;
+
+		final ByteArrayOutputStream errorMessages = new ByteArrayOutputStream();
+
+		System.setOut(new PrintStream(new NullStream()));
+		System.setErr(new PrintStream(errorMessages));
+
+		try {
+			try (ServerSocket server = new ServerSocket(0, 10, localhost)) {
+
+				final ServerThread serverThread = new ServerThread(server);
+				serverThread.setDaemon(true);
+				serverThread.start();
+
+				final int serverPort = server.getLocalPort();
+
+				org.apache.flink.streaming.scala.examples.socket.SocketWindowWordCount.main(
+						new String[] { "--port", String.valueOf(serverPort) });
+
+				if (errorMessages.size() != 0) {
+					fail("Found error message: " + new String(errorMessages.toByteArray()));
+				}
+				
+				serverThread.join();
+				serverThread.checkError();
+			}
+		}
+		finally {
+			System.setOut(originalSysout);
+			System.setErr(originalSyserr);
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+
+	private static class ServerThread extends Thread {
+
+		private final ServerSocket serverSocket;
+
+		private volatile Throwable error;
+		
+		public ServerThread(ServerSocket serverSocket) {
+			super("Socket Server Thread");
+			
+			this.serverSocket = serverSocket;
+		}
+
+		@Override
+		public void run() {
+			try {
+				try (Socket socket = serverSocket.accept(); 
+						PrintWriter writer = new PrintWriter(socket.getOutputStream(), true)) {
+					
+					writer.println(WordCountData.TEXT);
+				}
+			}
+			catch (Throwable t) {
+				this.error = t;
+			}
+		}
+		
+		public void checkError() throws IOException {
+			if (error != null) {
+				throw new IOException("Error in server thread: " + error.getMessage(), error);
+			}
+		}
+	}
+	
+	private static final class NullStream extends OutputStream {
+		
+		@Override
+		public void write(int b) {}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/986d5368/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
deleted file mode 100644
index d1bd64a..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.NetUtils;
-
-import org.junit.Assert;
-
-import java.io.PrintWriter;
-import java.net.ServerSocket;
-import java.net.Socket;
-
-public abstract class SocketProgramITCaseBase extends StreamingProgramTestBase {
-
-	protected static final String HOST = "localhost";
-	protected static Integer port;
-	protected String resultPath;
-
-	private ServerSocket temporarySocket;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		port = NetUtils.getAvailablePort();
-		temporarySocket = createSocket(HOST, port, WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
-		temporarySocket.close();
-	}
-
-	public ServerSocket createSocket(String host, int port, String contents) throws Exception
{
-		ServerSocket serverSocket = new ServerSocket(port);
-		ServerThread st = new ServerThread(serverSocket, contents);
-		st.start();
-		return serverSocket;
-	}
-
-	private static class ServerThread extends Thread {
-
-		private ServerSocket serverSocket;
-		private String contents;
-		private Thread t;
-
-		public ServerThread(ServerSocket serverSocket, String contents) {
-			this.serverSocket = serverSocket;
-			this.contents = contents;
-			t = new Thread(this);
-		}
-
-		public void waitForAccept() throws Exception {
-			Socket socket = serverSocket.accept();
-			PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
-			writer.println(contents);
-			writer.close();
-			socket.close();
-		}
-
-		public void run() {
-			try {
-				waitForAccept();
-			} catch (Exception e) {
-				Assert.fail();
-			}
-		}
-
-		@Override
-		public void start() {
-			t.start();
-		}
-	}
-}
\ No newline at end of file


Mime
View raw message