flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [1/3] flink git commit: [streaming] ITCase for WindowWordCount
Date Sat, 11 Apr 2015 19:00:40 GMT
Repository: flink
Updated Branches:
  refs/heads/master 36fcdae58 -> 46573a6ae


[streaming] ITCase for WindowWordCount


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71b2d664
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71b2d664
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71b2d664

Branch: refs/heads/master
Commit: 71b2d664e0f98d9f5e9af8c84d2e749d4cece273
Parents: 36fcdae
Author: mbalassi <mbalassi@apache.org>
Authored: Thu Apr 9 12:35:52 2015 +0200
Committer: Gyula Fora <gyfora@apache.org>
Committed: Fri Apr 10 18:14:18 2015 +0200

----------------------------------------------------------------------
 .../examples/windowing/WindowWordCount.java     | 20 ++++++--
 .../examples/test/join/WindowJoinITCase.java    |  1 -
 .../test/windowing/WindowWordCountITCase.java   | 50 ++++++++++++++++++++
 3 files changed, 65 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/71b2d664/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index cef760f..bd3acc6 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -46,6 +46,10 @@ import org.apache.flink.streaming.examples.wordcount.WordCount;
  */
 public class WindowWordCount {
 
+	// window parameters with default values
+	private static int windowSize = 250;
+	private static int slideSize = 150;
+
 	// *************************************************************************
 	// PROGRAM
 	// *************************************************************************
@@ -65,8 +69,8 @@ public class WindowWordCount {
 		DataStream<Tuple2<String, Integer>> counts =
 		// split up the lines in pairs (2-tuples) containing: (word,1)
 		text.flatMap(new WordCount.Tokenizer())
-				// create windows of 250 records slided every 150 records
-				.window(Count.of(250)).every(Count.of(150))
+				// create windows of windowSize records slided every slideSize records
+				.window(Count.of(windowSize)).every(Count.of(slideSize))
 				// group by the tuple field "0" and sum up tuple field "1"
 				.groupBy(0).sum(1)
 				// flatten the windows to a single stream
@@ -97,17 +101,23 @@ public class WindowWordCount {
 		if (args.length > 0) {
 			// parse input arguments
 			fileOutput = true;
-			if (args.length == 2) {
+			if (args.length >= 2 && args.length <= 4) {
 				textPath = args[0];
 				outputPath = args[1];
+				if (args.length >= 3){
+					windowSize = Integer.parseInt(args[2]);
+
+					// if no slide size is specified use the
+					slideSize = args.length == 3 ? windowSize : Integer.parseInt(args[2]);
+				}
 			} else {
-				System.err.println("Usage: WindowWordCount <text path> <result path>");
+				System.err.println("Usage: WindowWordCount <text path> <result path> [<window
size>] [<slide size>]");
 				return false;
 			}
 		} else {
 			System.out.println("Executing WindowWordCount example with built-in default data.");
 			System.out.println("  Provide parameters to read input data from a file.");
-			System.out.println("  Usage: WindowWordCount <text path> <result path>");
+			System.out.println("  Usage: WindowWordCount <text path> <result path> [<window
size>] [<slide size>]");
 		}
 		return true;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71b2d664/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
index 0c1fb39..a1bef5c 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/join/WindowJoinITCase.java
@@ -30,7 +30,6 @@ public class WindowJoinITCase extends StreamingProgramTestBase {
 
 	@Override
 	protected void preSubmit() throws Exception {
-		setParallelism(1);
 		gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
 		salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
 		resultPath = getTempDirPath("result");

http://git-wip-us.apache.org/repos/asf/flink/blob/71b2d664/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java
new file mode 100644
index 0000000..6fdd4ef
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/examples/test/windowing/WindowWordCountITCase.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.test.windowing;
+
+import org.apache.flink.streaming.examples.windowing.WindowWordCount;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class WindowWordCountITCase extends StreamingProgramTestBase {
+
+	protected String textPath;
+	protected String resultPath;
+	protected String windowSize = "250";
+	protected String slideSize = "150";
+
+	@Override
+	protected void preSubmit() throws Exception {
+		textPath = createTempFile("text.txt", WordCountData.TEXT);
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		// since the parallel tokenizers might have different speed
+		// the exact output can not be checked just whether it is well-formed
+		// checks that the result lines look like e.g. (faust, 2)
+		checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		WindowWordCount.main(new String[]{textPath, resultPath, windowSize, slideSize});
+	}
+}


Mime
View raw message