flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [09/10] flink git commit: [FLINK-3550] [examples] Rework WindowJoin to properly demonstrate continuous window joins (Java + Scala)
Date Fri, 04 Mar 2016 23:25:38 GMT
[FLINK-3550] [examples] Rework WindowJoin to properly demonstrate continuous window joins (Java + Scala)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/271071ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/271071ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/271071ad

Branch: refs/heads/master
Commit: 271071adb53c29534f9f51c1fb2846515f97b2be
Parents: 434e88f
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 29 22:04:26 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Mar 4 21:05:17 2016 +0100

----------------------------------------------------------------------
 flink-examples/flink-examples-streaming/pom.xml |   6 +-
 .../streaming/examples/join/WindowJoin.java     | 251 ++++---------------
 .../examples/join/WindowJoinSampleData.java     |  97 +++++++
 .../examples/join/util/WindowJoinData.java      |  61 -----
 .../examples/utils/ThrottledIterator.java       | 101 ++++++++
 .../scala/examples/join/WindowJoin.scala        | 133 ++++------
 .../examples/join/WindowJoinSampleData.scala    |  76 ++++++
 .../join/WindowJoinData.java                    |  64 +++++
 .../join/WindowJoinITCase.java                  |  76 ++++--
 .../join/WindowJoinITCase.java                  |  53 ----
 .../scala/examples/WindowJoinITCase.scala       |  77 ++++++
 .../api/scala/typeutils/TryTypeInfoTest.scala   |   8 +-
 12 files changed, 576 insertions(+), 427 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/271071ad/flink-examples/flink-examples-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index 74877d2..d918a2b 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -182,7 +182,7 @@ under the License.
 							</archive>
 
 							<includes>
-								<include>org/apache/flink/streaming/examples/iteration/*.class</include>			
+								<include>org/apache/flink/streaming/examples/iteration/*.class</include>
 							</includes>
 						</configuration>
 					</execution>
@@ -250,7 +250,7 @@ under the License.
 							</archive>
 
 							<includes>
-								<include>org/apache/flink/streaming/examples/join/*.class</include>			
+								<include>org/apache/flink/streaming/examples/join/*.class</include>
 							</includes>
 						</configuration>
 					</execution>
@@ -274,7 +274,7 @@ under the License.
 							<includes>
 								<include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
 								<include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
-								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>			
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
 							</includes>
 						</configuration>
 					</execution>

http://git-wip-us.apache.org/repos/asf/flink/blob/271071ad/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index f57d216..42101a5 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -18,38 +18,27 @@
 package org.apache.flink.streaming.examples.join;
 
 import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
+import org.apache.flink.streaming.examples.join.WindowJoinSampleData.GradeSource;
+import org.apache.flink.streaming.examples.join.WindowJoinSampleData.SalarySource;
 
 /**
- * Example illustrating join over sliding windows of streams in Flink.
+ * Example illustrating a windowed stream join between two data streams.
+ * 
+ * <p>The example works on two input streams with pairs (name, grade) and (name, salary)
+ * respectively. It joins the steams based on "name" within a configurable window.
  *
- * <p>
- * This example will join two streams with a sliding window. One which emits grades and one which
- * emits salaries of people. The input format for both sources has an additional timestamp
- * as field 0. This is used to to event-time windowing. Time timestamps must be
- * monotonically increasing.
- *
- * This example shows how to:
- * <ul>
- *   <li>do windowed joins,
- *   <li>use tuple data types,
- *   <li>write a simple streaming program.
- * </ul>
+ * <p>The example uses a built-in sample data generator that generates
+ * the steams of pairs at a configurable rate.
  */
 @SuppressWarnings("serial")
 public class WindowJoin {
@@ -59,200 +48,62 @@ public class WindowJoin {
 	// *************************************************************************
 
 	public static void main(String[] args) throws Exception {
-
-		// Checking input parameters
+		// parse the parameters
 		final ParameterTool params = ParameterTool.fromArgs(args);
-		System.out.println("Usage: WindowJoin --grades <path> --salaries <path> --output <path>");
+		final long windowSize = params.getLong("windowSize", 2000);
+		final long rate = params.getLong("rate", 3L);
+		
+		System.out.println("Using windowSize=" + windowSize + ", data rate=" + rate);
+		System.out.println("To customize example, use: WindowJoin [--windowSize <window-size-in-millis>] [--rate <elements-per-second>]");
 
-		// obtain execution environment
+		// obtain execution environment, run this example in "ingestion time"
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
 		// make parameters available in the web interface
 		env.getConfig().setGlobalJobParameters(params);
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
-		// connect to the data sources for grades and salaries
-		DataStream<Tuple3<Long, String, Integer>> grades = getGradesPath(env, params);
-		DataStream<Tuple3<Long, String, Integer>> salaries = getSalariesPath(env, params);
+		// create the data sources for both grades and salaries
+		DataStream<Tuple2<String, Integer>> grades = GradeSource.getSource(env, rate);
+		DataStream<Tuple2<String, Integer>> salaries = SalarySource.getSource(env, rate);
+		
+		// run the actual window join program
+		// for testability, this functionality is in a separate method.
+		DataStream<Tuple3<String, Integer, Integer>> joinedStream = runWindowJoin(grades, salaries, windowSize);
 
-		// extract the timestamps
-		grades = grades.assignTimestampsAndWatermarks(new MyTimestampExtractor());
-		salaries = salaries.assignTimestampsAndWatermarks(new MyTimestampExtractor());
-
-		// apply a temporal join over the two stream based on the names over one
-		// second windows
-		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades
-				.join(salaries)
-				.where(new NameKeySelector())
-				.equalTo(new NameKeySelector())
-				.window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MILLISECONDS)))
-				.apply(new MyJoinFunction());
-
-		// emit result
-		if (params.has("output")) {
-			joinedStream.writeAsText(params.get("output"));
-		} else {
-			System.out.println("Printing result to stdout. Use --output to specify output path.");
-			joinedStream.print();
-		}
+		// print the results with a single thread, rather than in parallel
+		joinedStream.print().setParallelism(1);
 
 		// execute program
 		env.execute("Windowed Join Example");
 	}
+	
+	public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(
+			DataStream<Tuple2<String, Integer>> grades,
+			DataStream<Tuple2<String, Integer>> salaries,
+			long windowSize) {
 
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	private final static String[] names = {"tom", "jerry", "alice", "bob", "john", "grace"};
-	private final static int GRADE_COUNT = 5;
-	private final static int SALARY_MAX = 10000;
-	private final static int SLEEP_TIME = 10;
-
-	/**
-	 * Continuously emit tuples with random names and integers (grades).
-	 */
-	public static class GradeSource implements SourceFunction<Tuple3<Long, String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private Random rand;
-		private Tuple3<Long, String, Integer> outTuple;
-		private volatile boolean isRunning = true;
-		private int counter;
-
-		public GradeSource() {
-			rand = new Random();
-			outTuple = new Tuple3<>();
-		}
-
-		@Override
-		public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
-			while (isRunning && counter < 100) {
-				outTuple.f0 = System.currentTimeMillis();
-				outTuple.f1 = names[rand.nextInt(names.length)];
-				outTuple.f2 = rand.nextInt(GRADE_COUNT) + 1;
-				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-				counter++;
-				ctx.collect(outTuple);
-			}
-		}
-
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-	}
-
-	/**
-	 * Continuously emit tuples with random names and integers (salaries).
-	 */
-	public static class SalarySource extends RichSourceFunction<Tuple3<Long, String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		private transient Random rand;
-		private transient Tuple3<Long, String, Integer> outTuple;
-		private volatile boolean isRunning;
-		private int counter;
-
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			rand = new Random();
-			outTuple = new Tuple3<Long, String, Integer>();
-			isRunning = true;
-		}
-
-
-		@Override
-		public void run(SourceContext<Tuple3<Long, String, Integer>> ctx) throws Exception {
-			while (isRunning && counter < 100) {
-				outTuple.f0 = System.currentTimeMillis();
-				outTuple.f1 = names[rand.nextInt(names.length)];
-				outTuple.f2 = rand.nextInt(SALARY_MAX) + 1;
-				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
-				counter++;
-				ctx.collect(outTuple);
-			}
-		}
-
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-	}
-
-	public static class MySourceMap extends RichMapFunction<String, Tuple3<Long, String, Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		private String[] record;
-
-		public MySourceMap() {
-			record = new String[2];
-		}
-
-		@Override
-		public Tuple3<Long, String, Integer> map(String line) throws Exception {
-			record = line.substring(1, line.length() - 1).split(",");
-			return new Tuple3<>(Long.parseLong(record[0]), record[1], Integer.parseInt(record[2]));
-		}
-	}
-
-	public static class MyJoinFunction
-			implements
-			JoinFunction<Tuple3<Long, String, Integer>, Tuple3<Long, String, Integer>, Tuple3<String, Integer, Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		private Tuple3<String, Integer, Integer> joined = new Tuple3<>();
-
-		@Override
-		public Tuple3<String, Integer, Integer> join(Tuple3<Long, String, Integer> first,
-				Tuple3<Long, String, Integer> second) throws Exception {
-			joined.f0 = first.f1;
-			joined.f1 = first.f2;
-			joined.f2 = second.f2;
-			return joined;
-		}
-	}
-
-	private static class MyTimestampExtractor extends AscendingTimestampExtractor<Tuple3<Long, String, Integer>> {
-
-		@Override
-		public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
-			return element.f0;
-		}
+		return grades.join(salaries)
+				.where(new NameKeySelector())
+				.equalTo(new NameKeySelector())
+				
+				.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
+				
+				.apply(new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>() {
+					
+					@Override
+					public Tuple3<String, Integer, Integer> join(
+									Tuple2<String, Integer> first,
+									Tuple2<String, Integer> second) {
+						return new Tuple3<String, Integer, Integer>(first.f0, first.f1, second.f1);
+					}
+				});
 	}
-
-	private static class NameKeySelector implements KeySelector<Tuple3<Long, String, Integer>, String> {
-		private static final long serialVersionUID = 1L;
-
+	
+	private static class NameKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
 		@Override
-		public String getKey(Tuple3<Long, String, Integer> value) throws Exception {
-			return value.f1;
+		public String getKey(Tuple2<String, Integer> value) {
+			return value.f0;
 		}
 	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static DataStream<Tuple3<Long, String, Integer>> getGradesPath(StreamExecutionEnvironment env, ParameterTool params) {
-		if (params.has("grades")) {
-			return env.readTextFile(params.get("grades")).map(new MySourceMap());
-		} else {
-			System.out.println("Executing WindowJoin example with default grades data set.");
-			System.out.println("Use --grades to specify file input.");
-			return env.addSource(new GradeSource());
-		}
-	}
-
-	private static DataStream<Tuple3<Long, String, Integer>> getSalariesPath(StreamExecutionEnvironment env, ParameterTool params) {
-		if (params.has("salaries")) {
-			return env.readTextFile(params.get("salaries")).map(new MySourceMap());
-		} else {
-			System.out.println("Executing WindowJoin example with default salaries data set.");
-			System.out.println("Use --salaries to specify file input.");
-			return env.addSource(new SalarySource());
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/271071ad/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java
new file mode 100644
index 0000000..1e3999e
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoinSampleData.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.join;
+
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.examples.utils.ThrottledIterator;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * Sample data for the {@link WindowJoin} example.
+ */
+@SuppressWarnings("serial")
+public class WindowJoinSampleData {
+
+	static final String[] NAMES = {"tom", "jerry", "alice", "bob", "john", "grace"};
+	static final int GRADE_COUNT = 5;
+	static final int SALARY_MAX = 10000;
+
+	/**
+	 * Continuously generates (name, grade).
+	 */
+	public static class GradeSource implements Iterator<Tuple2<String, Integer>>, Serializable {
+
+		private final Random rnd = new Random(hashCode());
+
+		@Override
+		public boolean hasNext() {
+			return true;
+		}
+
+		@Override
+		public Tuple2<String, Integer> next() {
+			return new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(GRADE_COUNT) + 1);
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+
+		public static DataStream<Tuple2<String, Integer>> getSource(StreamExecutionEnvironment env, long rate) {
+			return env.fromCollection(new ThrottledIterator<>(new GradeSource(), rate),
+					TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
+		}
+	}
+
+	/**
+	 * Continuously generates (name, salary).
+	 */
+	public static class SalarySource implements Iterator<Tuple2<String, Integer>>, Serializable {
+
+		private final Random rnd = new Random(hashCode());
+
+		@Override
+		public boolean hasNext() {
+			return true;
+		}
+
+		@Override
+		public Tuple2<String, Integer> next() {
+			return new Tuple2<>(NAMES[rnd.nextInt(NAMES.length)], rnd.nextInt(SALARY_MAX) + 1);
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+
+		public static DataStream<Tuple2<String, Integer>> getSource(StreamExecutionEnvironment env, long rate) {
+			return env.fromCollection(new ThrottledIterator<>(new SalarySource(), rate),
+					TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/271071ad/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
deleted file mode 100644
index 15c1280..0000000
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/util/WindowJoinData.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join.util;
-
-public class WindowJoinData {
-
-	public static final String GRADES_INPUT = "(0,john,5)\n" + "(0,tom,3)\n" + "(0,alice,1)\n" + "(0,grace,5)\n" +
-			"(1,john,4)\n" + "(1,bob,1)\n" + "(1,alice,2)\n" + "(1,alice,3)\n" + "(1,bob,5)\n" + "(1,alice,3)\n" + "(1,tom,5)\n" +
-			"(2,john,2)\n" + "(2,john,1)\n" + "(2,grace,2)\n" + "(2,jerry,2)\n" + "(2,tom,4)\n" + "(2,bob,4)\n" + "(2,bob,2)\n" +
-			"(3, tom,2)\n" + "(3,alice,5)\n" + "(3,grace,5)\n" + "(3,grace,1)\n" + "(3,alice,1)\n" + "(3,grace,3)\n" + "(3,tom,1)\n" +
-			"(4,jerry,5)\n" + "(4,john,3)\n" + "(4,john,4)\n" + "(4,john,1)\n" + "(4,jerry,3)\n" + "(4,grace,3)\n" + "(4,bob,3)\n" +
-			"(5,john,3)\n" + "(5,jerry,4)\n" + "(5,tom,5)\n" + "(5,tom,4)\n" + "(5,john,2)\n" + "(5,jerry,1)\n" + "(5,bob,1)\n" +
-			"(6,john,5)\n" + "(6,grace,4)\n" + "(6,tom,5)\n" + "(6,john,4)\n" + "(6,tom,1)\n" + "(6,grace,1)\n" + "(6,john,2)\n" +
-			"(7,jerry,3)\n" + "(7,jerry,5)\n" + "(7,tom,2)\n" + "(7,tom,2)\n" + "(7,alice,4)\n" + "(7,tom,4)\n" + "(7,jerry,4)\n" +
-			"(8,john,3)\n" + "(8,grace,4)\n" + "(8,tom,3)\n" + "(8,jerry,4)\n" + "(8,john,5)\n" + "(8,john,4)\n" + "(8,jerry,1)\n" +
-			"(9,john,5)\n" + "(9,alice,2)\n" + "(9,tom,1)\n" + "(9,alice,5)\n" + "(9,grace,4)\n" + "(9,bob,4)\n" + "(9,jerry,1)\n" +
-			"(10,john,5)\n" + "(10,tom,4)\n" + "(10,tom,5)\n" + "(10,jerry,5)\n" + "(10,tom,1)\n" + "(10,grace,3)\n" + "(10,bob,5)\n" +
-			"(11,john,1)\n" + "(11,alice,1)\n" + "(11,grace,3)\n" + "(11,grace,1)\n" + "(11,jerry,1)\n" + "(11,jerry,4)\n" +
-			"(12,bob,4)\n" + "(12,alice,3)\n" + "(12,tom,5)\n" + "(12,alice,4)\n" + "(12,alice,4)\n" + "(12,grace,4)\n" + "(12,john,5)\n" +
-			"(13,john,5)\n" + "(13,grace,4)\n" + "(13,tom,4)\n" + "(13,john,4)\n" + "(13,john,5)\n" + "(13,alice,5)\n" + "(13,jerry,5)\n" +
-			"(14,john,3)\n" + "(14,tom,5)\n" + "(14,jerry,4)\n" + "(14,grace,4)\n" + "(14,john,3)\n" + "(14,bob,2)";
-
-	public static final String SALARIES_INPUT = "(0,john,6469)\n" + "(0,jerry,6760)\n" + "(0,jerry,8069)\n" +
-			"(1,tom,3662)\n" + "(1,grace,8427)\n" + "(1,john,9425)\n" + "(1,bob,9018)\n" + "(1,john,352)\n" + "(1,tom,3770)\n" +
-			"(2,grace,7622)\n" + "(2,jerry,7441)\n" + "(2,alice,1468)\n" + "(2,bob,5472)\n" + "(2,grace,898)\n" +
-			"(3,tom,3849)\n" + "(3,grace,1865)\n" + "(3,alice,5582)\n" + "(3,john,9511)\n" + "(3,alice,1541)\n" +
-			"(4,john,2477)\n" + "(4,grace,3561)\n" + "(4,john,1670)\n" + "(4,grace,7290)\n" + "(4,grace,6565)\n" +
-			"(5,tom,6179)\n" + "(5,tom,1601)\n" + "(5,john,2940)\n" + "(5,bob,4685)\n" + "(5,bob,710)\n" + "(5,bob,5936)\n" +
-			"(6,jerry,1412)\n" + "(6,grace,6515)\n" + "(6,grace,3321)\n" + "(6,tom,8088)\n" + "(6,john,2876)\n" +
-			"(7,bob,9896)\n" + "(7,grace,7368)\n" + "(7,grace,9749)\n" + "(7,bob,2048)\n" + "(7,alice,4782)\n" +
-			"(8,alice,3375)\n" + "(8,tom,5841)\n" + "(8,bob,958)\n" + "(8,bob,5258)\n" + "(8,tom,3935)\n" + "(8,jerry,4394)\n" +
-			"(9,alice,102)\n" + "(9,alice,4931)\n" + "(9,alice,5240)\n" + "(9,jerry,7951)\n" + "(9,john,5675)\n" +
-			"(10,bob,609)\n" + "(10,alice,5997)\n" + "(10,jerry,9651)\n" + "(10,alice,1328)\n" + "(10,bob,1022)\n" +
-			"(11,grace,2578)\n" + "(11,jerry,9704)\n" + "(11,tom,4476)\n" + "(11,grace,3784)\n" + "(11,alice,6144)\n" +
-			"(12,bob,6213)\n" + "(12,alice,7525)\n" + "(12,jerry,2908)\n" + "(12,grace,8464)\n" + "(12,jerry,9920)\n" +
-			"(13,bob,3720)\n" + "(13,bob,7612)\n" + "(13,alice,7211)\n" + "(13,jerry,6484)\n" + "(13,alice,1711)\n" +
-			"(14,jerry,5994)\n" + "(14,grace,928)\n" + "(14,jerry,2492)\n" + "(14,grace,9080)\n" + "(14,tom,4330)\n" +
-			"(15,bob,8302)\n" + "(15,john,4981)\n" + "(15,tom,1781)\n" + "(15,grace,1379)\n" + "(15,jerry,3700)\n" +
-			"(16,jerry,3584)\n" + "(16,jerry,2038)\n" + "(16,jerry,3902)\n" + "(16,tom,1336)\n" + "(16,jerry,7500)\n" +
-			"(17,tom,3648)\n" + "(17,alice,2533)\n" + "(17,tom,8685)\n" + "(17,bob,3968)\n" + "(17,tom,3241)\n" + "(17,bob,7461)\n" +
-			"(18,jerry,2138)\n" + "(18,alice,7503)\n" + "(18,alice,6424)\n" + "(18,tom,140)\n" + "(18,john,9802)\n" +
-			"(19,grace,2977)\n" + "(19,grace,889)\n" + "(19,john,1338)";
-
-	private WindowJoinData() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/271071ad/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java
new file mode 100644
index 0000000..ba7feea
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.utils;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A variant of the collection source (emits a sequence of elements as a stream)
+ * that supports throttling the emission rate.
+ * @param <T>
+ */
+public class ThrottledIterator<T> implements Iterator<T>, Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	@SuppressWarnings("NonSerializableFieldInSerializableClass")
+	private final Iterator<T> source;
+
+	private final long sleepBatchSize;
+	private final long sleepBatchTime;
+
+	private long lastBatchCheckTime;
+	private long num;
+
+	public ThrottledIterator(Iterator<T> source, long elementsPerSecond) {
+		this.source = requireNonNull(source);
+
+		if (!(source instanceof Serializable)) {
+			throw new IllegalArgumentException("source must be java.io.Serializable");
+		}
+
+		if (elementsPerSecond >= 100) {
+			// how many elements would we emit per 50ms
+			this.sleepBatchSize = elementsPerSecond / 20;
+			this.sleepBatchTime = 50;
+		}
+		else if (elementsPerSecond >= 1) {
+			// how long does element take
+			this.sleepBatchSize = 1;
+			this.sleepBatchTime = 1000 / elementsPerSecond;
+		}
+		else {
+			throw new IllegalArgumentException("'elements per second' must be positive and not zero");
+		}
+	}
+
+	@Override
+	public boolean hasNext() {
+		return source.hasNext();
+	}
+
+	@Override
+	public T next() {
+		// delay if necessary
+		if (lastBatchCheckTime > 0) {
+			if (++num >= sleepBatchSize) {
+				num = 0;
+	
+				final long now = System.currentTimeMillis();
+				final long elapsed = now - lastBatchCheckTime; 
+				if (elapsed < sleepBatchTime) {
+					try {
+						Thread.sleep(sleepBatchTime - elapsed);
+					} catch (InterruptedException e) {
+						// restore interrupt flag and proceed
+						Thread.currentThread().interrupt();
+					}
+				}
+				lastBatchCheckTime = now;
+			}
+		} else {
+			lastBatchCheckTime = System.currentTimeMillis();
+		}
+
+		return source.next();
+	}
+
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/271071ad/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 7ea9c70..5bf7548 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -18,112 +18,79 @@
 
 package org.apache.flink.streaming.scala.examples.join
 
+import org.apache.flink.streaming.api.scala._
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 
-import scala.Stream._
-import scala.language.postfixOps
-import scala.util.Random
-
+/**
+ * Example illustrating a windowed stream join between two data streams.
+ *
+ * The example works on two input streams with pairs (name, grade) and (name, salary)
+ * respectively. It joins the steams based on "name" within a configurable window.
+ *
+ * The example uses a built-in sample data generator that generates
+ * the steams of pairs at a configurable rate.
+ */
 object WindowJoin {
 
   // *************************************************************************
-  // PROGRAM
+  //  Program Data Types
   // *************************************************************************
 
-  case class Grade(time: Long, name: String, grade: Int)
-  case class Salary(time: Long, name: String, salary: Int)
+  case class Grade(name: String, grade: Int)
+  
+  case class Salary(name: String, salary: Int)
+  
   case class Person(name: String, grade: Int, salary: Int)
 
-  def main(args: Array[String]) {
+  // *************************************************************************
+  //  Program
+  // *************************************************************************
 
+  def main(args: Array[String]) {
+    // parse the parameters
     val params = ParameterTool.fromArgs(args)
-    println("Usage: WindowJoin --grades <path> --salaries <path> --output <path>")
+    val windowSize = params.getLong("windowSize", 2000)
+    val rate = params.getLong("rate", 3)
 
+    println("Using windowSize=" + windowSize + ", data rate=" + rate)
+    println("To customize example, use: WindowJoin " +
+      "[--windowSize <window-size-in-millis>] [--rate <elements-per-second>]")
+
+    // obtain execution environment, run this example in "ingestion time"
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+    // make parameters available in the web interface
     env.getConfig.setGlobalJobParameters(params)
 
-    // Create streams for grades and salaries by mapping the inputs to the corresponding objects
-    val grades = setGradesDataStream(env, params)
-    val salaries = setSalariesDataStream(env, params)
+    // // create the data sources for both grades and salaries
+    val grades = WindowJoinSampleData.getGradeSource(env, rate)
+    val salaries = WindowJoinSampleData.getSalarySource(env, rate)
 
-    //Join the two input streams by name on the last 2 seconds every second and create new
-    //Person objects containing both grade and salary
-    val joined = grades.join(salaries)
-        .where(_.name)
-        .equalTo(_.name)
-        .window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
-        .apply { (g, s) => Person(g.name, g.grade, s.salary) }
+    // join the two input streams by name on a window.
+    // for testability, this functionality is in a separate method.
+    val joined = joinStreams(grades, salaries, windowSize)
 
-    if (params.has("output")) {
-      joined.writeAsText(params.get("output"))
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      joined.print()
-    }
+    // print the results with a single thread, rather than in parallel
+    joined.print().setParallelism(1)
 
+    // execute program
     env.execute("WindowJoin")
   }
-
-  // *************************************************************************
-  // USER FUNCTIONS
-  // *************************************************************************
-
-  val names = Array("tom", "jerry", "alice", "bob", "john", "grace")
-  val gradeCount = 5
-  val salaryMax = 10000
-  val sleepInterval = 100
   
-  def gradeStream: Stream[(Long, String, Int)] = {
-    def gradeMapper(names: Array[String])(x: Int): (Long, String, Int) =
-      {
-        if (x % sleepInterval == 0) Thread.sleep(sleepInterval)
-        (System.currentTimeMillis(),names(Random.nextInt(names.length)),Random.nextInt(gradeCount))
-      }
-    range(1, 100).map(gradeMapper(names))
-  }
-
-  def salaryStream: Stream[(Long, String, Int)] = {
-    def salaryMapper(x: Int): (Long, String, Int) =
-      {
-        if (x % sleepInterval == 0) Thread.sleep(sleepInterval)
-        (System.currentTimeMillis(), names(Random.nextInt(names.length)), Random.nextInt(salaryMax))
-      }
-    range(1, 100).map(salaryMapper)
-  }
-
-  def parseMap(line : String): (Long, String, Int) = {
-    val record = line.substring(1, line.length - 1).split(",")
-    (record(0).toLong, record(1), record(2).toInt)
-  }
-
-  // *************************************************************************
-  // UTIL METHODS
-  // *************************************************************************
-
-  private def setGradesDataStream(env: StreamExecutionEnvironment, params: ParameterTool) :
-                       DataStream[Grade] = {
-    if (params.has("grades")) {
-      env.readTextFile(params.get("grades")).map(parseMap _ ).map(x => Grade(x._1, x._2, x._3))
-    } else {
-      println("Executing WindowJoin example with default grades data set.")
-      println("Use --grades to specify file input.")
-      env.fromCollection(gradeStream).map(x => Grade(x._1, x._2, x._3))
-    }
-  }
-
-  private def setSalariesDataStream(env: StreamExecutionEnvironment, params: ParameterTool) :
-                         DataStream[Salary] = {
-    if (params.has("salaries")) {
-      env.readTextFile(params.get("salaries")).map(parseMap _).map(x => Salary(x._1, x._2, x._3))
-    } else {
-      println("Executing WindowJoin example with default salaries data set.")
-      println("Use --salaries to specify file input.")
-      env.fromCollection(salaryStream).map(x => Salary(x._1, x._2, x._3))
-    }
+  
+  def joinStreams(
+      grades: DataStream[Grade],
+      salaries: DataStream[Salary],
+      windowSize: Long) : DataStream[Person] = {
+
+    grades.join(salaries)
+      .where(_.name)
+      .equalTo(_.name)
+      .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
+      .apply { (g, s) => Person(g.name, g.grade, s.salary) }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/271071ad/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoinSampleData.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoinSampleData.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoinSampleData.scala
new file mode 100644
index 0000000..10c30c5
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoinSampleData.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.join
+
+import java.io.Serializable
+import java.util.Random
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.examples.utils.ThrottledIterator
+import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
+
+import scala.collection.JavaConverters._
+
+/**
+ * Sample data for the [[WindowJoin]] example.
+ */
+object WindowJoinSampleData {
+  
+  private[join] val NAMES = Array("tom", "jerry", "alice", "bob", "john", "grace")
+  private[join] val GRADE_COUNT = 5
+  private[join] val SALARY_MAX = 10000
+
+  /**
+   * Continuously generates (name, grade).
+   */
+  def getGradeSource(env: StreamExecutionEnvironment, rate: Long): DataStream[Grade] = {
+      env.fromCollection(new ThrottledIterator(new GradeSource().asJava, rate).asScala)
+  }
+
+  /**
+   * Continuously generates (name, salary).
+   */
+  def getSalarySource(env: StreamExecutionEnvironment, rate: Long): DataStream[Salary] = {
+    env.fromCollection(new ThrottledIterator(new SalarySource().asJava, rate).asScala)
+  }
+  
+  // --------------------------------------------------------------------------
+  
+  class GradeSource extends Iterator[Grade] with Serializable {
+    
+    private[this] val rnd = new Random(hashCode())
+
+    def hasNext: Boolean = true
+
+    def next: Grade = {
+      Grade(NAMES(rnd.nextInt(NAMES.length)), rnd.nextInt(GRADE_COUNT) + 1)
+    }
+  }
+  
+  class SalarySource extends Iterator[Salary] with Serializable {
+
+    private[this] val rnd = new Random(hashCode())
+
+    def hasNext: Boolean = true
+
+    def next: Salary = {
+      Salary(NAMES(rnd.nextInt(NAMES.length)), rnd.nextInt(SALARY_MAX) + 1)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/271071ad/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinData.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinData.java
new file mode 100644
index 0000000..cca6ada
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinData.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleJavaPrograms.join;
+
+/**
+ * Class with sample data for window join examples.
+ */
+public class WindowJoinData {
+
+	public static final String GRADES_INPUT = "0,john,5\n" + "0,tom,3\n" + "0,alice,1\n" + "0,grace,5\n" +
+			"1,john,4\n" + "1,bob,1\n" + "1,alice,2\n" + "1,alice,3\n" + "1,bob,5\n" + "1,alice,3\n" + "1,tom,5\n" +
+			"2,john,2\n" + "2,john,1\n" + "2,grace,2\n" + "2,jerry,2\n" + "2,tom,4\n" + "2,bob,4\n" + "2,bob,2\n" +
+			"3, tom,2\n" + "3,alice,5\n" + "3,grace,5\n" + "3,grace,1\n" + "3,alice,1\n" + "3,grace,3\n" + "3,tom,1\n" +
+			"4,jerry,5\n" + "4,john,3\n" + "4,john,4\n" + "4,john,1\n" + "4,jerry,3\n" + "4,grace,3\n" + "4,bob,3\n" +
+			"5,john,3\n" + "5,jerry,4\n" + "5,tom,5\n" + "5,tom,4\n" + "5,john,2\n" + "5,jerry,1\n" + "5,bob,1\n" +
+			"6,john,5\n" + "6,grace,4\n" + "6,tom,5\n" + "6,john,4\n" + "6,tom,1\n" + "6,grace,1\n" + "6,john,2\n" +
+			"7,jerry,3\n" + "7,jerry,5\n" + "7,tom,2\n" + "7,tom,2\n" + "7,alice,4\n" + "7,tom,4\n" + "7,jerry,4\n" +
+			"8,john,3\n" + "8,grace,4\n" + "8,tom,3\n" + "8,jerry,4\n" + "8,john,5\n" + "8,john,4\n" + "8,jerry,1\n" +
+			"9,john,5\n" + "9,alice,2\n" + "9,tom,1\n" + "9,alice,5\n" + "9,grace,4\n" + "9,bob,4\n" + "9,jerry,1\n" +
+			"10,john,5\n" + "10,tom,4\n" + "10,tom,5\n" + "10,jerry,5\n" + "10,tom,1\n" + "10,grace,3\n" + "10,bob,5\n" +
+			"11,john,1\n" + "11,alice,1\n" + "11,grace,3\n" + "11,grace,1\n" + "11,jerry,1\n" + "11,jerry,4\n" +
+			"12,bob,4\n" + "12,alice,3\n" + "12,tom,5\n" + "12,alice,4\n" + "12,alice,4\n" + "12,grace,4\n" + "12,john,5\n" +
+			"13,john,5\n" + "13,grace,4\n" + "13,tom,4\n" + "13,john,4\n" + "13,john,5\n" + "13,alice,5\n" + "13,jerry,5\n" +
+			"14,john,3\n" + "14,tom,5\n" + "14,jerry,4\n" + "14,grace,4\n" + "14,john,3\n" + "14,bob,2";
+
+	public static final String SALARIES_INPUT = "0,john,6469\n" + "0,jerry,6760\n" + "0,jerry,8069\n" +
+			"1,tom,3662\n" + "1,grace,8427\n" + "1,john,9425\n" + "1,bob,9018\n" + "1,john,352\n" + "1,tom,3770\n" +
+			"2,grace,7622\n" + "2,jerry,7441\n" + "2,alice,1468\n" + "2,bob,5472\n" + "2,grace,898\n" +
+			"3,tom,3849\n" + "3,grace,1865\n" + "3,alice,5582\n" + "3,john,9511\n" + "3,alice,1541\n" +
+			"4,john,2477\n" + "4,grace,3561\n" + "4,john,1670\n" + "4,grace,7290\n" + "4,grace,6565\n" +
+			"5,tom,6179\n" + "5,tom,1601\n" + "5,john,2940\n" + "5,bob,4685\n" + "5,bob,710\n" + "5,bob,5936\n" +
+			"6,jerry,1412\n" + "6,grace,6515\n" + "6,grace,3321\n" + "6,tom,8088\n" + "6,john,2876\n" +
+			"7,bob,9896\n" + "7,grace,7368\n" + "7,grace,9749\n" + "7,bob,2048\n" + "7,alice,4782\n" +
+			"8,alice,3375\n" + "8,tom,5841\n" + "8,bob,958\n" + "8,bob,5258\n" + "8,tom,3935\n" + "8,jerry,4394\n" +
+			"9,alice,102\n" + "9,alice,4931\n" + "9,alice,5240\n" + "9,jerry,7951\n" + "9,john,5675\n" +
+			"10,bob,609\n" + "10,alice,5997\n" + "10,jerry,9651\n" + "10,alice,1328\n" + "10,bob,1022\n" +
+			"11,grace,2578\n" + "11,jerry,9704\n" + "11,tom,4476\n" + "11,grace,3784\n" + "11,alice,6144\n" +
+			"12,bob,6213\n" + "12,alice,7525\n" + "12,jerry,2908\n" + "12,grace,8464\n" + "12,jerry,9920\n" +
+			"13,bob,3720\n" + "13,bob,7612\n" + "13,alice,7211\n" + "13,jerry,6484\n" + "13,alice,1711\n" +
+			"14,jerry,5994\n" + "14,grace,928\n" + "14,jerry,2492\n" + "14,grace,9080\n" + "14,tom,4330\n" +
+			"15,bob,8302\n" + "15,john,4981\n" + "15,tom,1781\n" + "15,grace,1379\n" + "15,jerry,3700\n" +
+			"16,jerry,3584\n" + "16,jerry,2038\n" + "16,jerry,3902\n" + "16,tom,1336\n" + "16,jerry,7500\n" +
+			"17,tom,3648\n" + "17,alice,2533\n" + "17,tom,8685\n" + "17,bob,3968\n" + "17,tom,3241\n" + "17,bob,7461\n" +
+			"18,jerry,2138\n" + "18,alice,7503\n" + "18,alice,6424\n" + "18,tom,140\n" + "18,john,9802\n" +
+			"19,grace,2977\n" + "19,grace,889\n" + "19,john,1338";
+
+	/** Utility class, should not be instantiated */
+	private WindowJoinData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/271071ad/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
index 285c9e2..736438f 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/join/WindowJoinITCase.java
@@ -18,36 +18,64 @@
 
 package org.apache.flink.streaming.test.exampleJavaPrograms.join;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.examples.join.WindowJoin;
-import org.apache.flink.streaming.examples.join.util.WindowJoinData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 
-public class WindowJoinITCase extends StreamingProgramTestBase {
+import org.junit.Test;
 
-	protected String gradesPath;
-	protected String salariesPath;
-	protected String resultPath;
+import java.io.File;
 
-	@Override
-	protected void preSubmit() throws Exception {
-		gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
-		salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
-		resultPath = getTempDirPath("result");
-	}
+@SuppressWarnings("serial")
+public class WindowJoinITCase extends StreamingMultipleProgramsTestBase {
+
+	@Test
+	public void testProgram() throws Exception {
+		final String resultPath = File.createTempFile("result-path", "dir").toURI().toString();
+		try {
+			final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+			
+			DataStream<Tuple2<String, Integer>> grades = env
+					.fromElements(WindowJoinData.GRADES_INPUT.split("\n"))
+					.map(new Parser());
+	
+			DataStream<Tuple2<String, Integer>> salaries = env
+					.fromElements(WindowJoinData.SALARIES_INPUT.split("\n"))
+					.map(new Parser());
+			
+			WindowJoin
+					.runWindowJoin(grades, salaries, 100)
+					.writeAsText(resultPath, WriteMode.OVERWRITE);
+			
+			env.execute();
 
-	@Override
-	protected void postSubmit() throws Exception {
-		// since the two sides of the join might have different speed
-		// the exact output can not be checked just whether it is well-formed
-		// checks that the result lines look like e.g. (bob, 2, 2015)
-		checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)");
+			// since the two sides of the join might have different speed
+			// the exact output can not be checked just whether it is well-formed
+			// checks that the result lines look like e.g. (bob, 2, 2015)
+			checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d),(\\d)+\\)");
+		}
+		finally {
+			try {
+				FileUtils.deleteDirectory(new File(resultPath));
+			} catch (Throwable ignored) {}
+		}
 	}
+	
+	//-------------------------------------------------------------------------
+	
+	public static final class Parser implements MapFunction<String, Tuple2<String, Integer>> {
 
-	@Override
-	protected void testProgram() throws Exception {
-		WindowJoin.main(new String[]{
-				"--grades", gradesPath,
-				"--salaries", salariesPath,
-				"--output", resultPath});
+		@Override
+		public Tuple2<String, Integer> map(String value) throws Exception {
+			String[] fields = value.split(",");
+			return new Tuple2<>(fields[1], Integer.parseInt(fields[2]));
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/271071ad/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
deleted file mode 100644
index 53c5919..0000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/join/WindowJoinITCase.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.exampleScalaPrograms.join;
-
-import org.apache.flink.streaming.scala.examples.join.WindowJoin;
-import org.apache.flink.streaming.examples.join.util.WindowJoinData;
-import org.apache.flink.streaming.util.StreamingProgramTestBase;
-
-public class WindowJoinITCase extends StreamingProgramTestBase {
-
-	protected String gradesPath;
-	protected String salariesPath;
-	protected String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		gradesPath = createTempFile("gradesText.txt", WindowJoinData.GRADES_INPUT);
-		salariesPath = createTempFile("salariesText.txt", WindowJoinData.SALARIES_INPUT);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		// since the two sides of the join might have different speed
-		// the exact output can not be checked just whether it is well-formed
-		// checks that the result lines look like e.g. Person(bob, 2, 2015)
-		checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		WindowJoin.main(new String[]{
-				"--grades", gradesPath,
-				"--salaries", salariesPath,
-				"--output", resultPath});
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/271071ad/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala
new file mode 100644
index 0000000..88953c5
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/WindowJoinITCase.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.scala.examples.join.WindowJoin
+import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Person, Salary}
+import org.apache.flink.streaming.test.exampleJavaPrograms.join.WindowJoinData
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.test.util.TestBaseUtils
+
+import org.junit.Test
+
+class WindowJoinITCase extends StreamingMultipleProgramsTestBase {
+  
+  @Test
+  def testProgram(): Unit = {
+    
+    val resultPath: String = File.createTempFile("result-path", "dir").toURI().toString()
+    try {
+      val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+      env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+      
+      val grades: DataStream[Grade] = env
+        .fromCollection(WindowJoinData.GRADES_INPUT.split("\n"))
+        .map( line => {
+          val fields = line.split(",")
+          Grade(fields(1), fields(2).toInt)
+        })
+
+      val salaries: DataStream[Salary] = env
+        .fromCollection(WindowJoinData.SALARIES_INPUT.split("\n"))
+        .map( line => {
+          val fields = line.split(",")
+          Salary(fields(1), fields(2).toInt)
+        })
+      
+      WindowJoin.joinStreams(grades, salaries, 100)
+        .writeAsText(resultPath, WriteMode.OVERWRITE)
+      
+      env.execute()
+
+      TestBaseUtils.checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)")
+    }
+    finally {
+      try {
+        FileUtils.deleteDirectory(new File(resultPath))
+      }
+      catch {
+        case _ : Throwable => 
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/271071ad/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TryTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TryTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TryTypeInfoTest.scala
index 3a5fc80..5e68951 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TryTypeInfoTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TryTypeInfoTest.scala
@@ -29,7 +29,7 @@ import scala.util.Try
 class TryTypeInfoTest extends TestLogger with JUnitSuiteLike {
 
   @Test
-  def testTryTypeEquality: Unit = {
+  def testTryTypeEquality(): Unit = {
     val TryTypeInfo1 = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
     val TryTypeInfo2 = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
 
@@ -38,18 +38,20 @@ class TryTypeInfoTest extends TestLogger with JUnitSuiteLike {
   }
 
   @Test
-  def testTryTypeInequality: Unit = {
+  def testTryTypeInequality(): Unit = {
     val TryTypeInfo1 = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
     val TryTypeInfo2 = new TryTypeInfo[String, Try[String]](BasicTypeInfo.STRING_TYPE_INFO)
 
+    //noinspection ComparingUnrelatedTypes
     assert(!TryTypeInfo1.equals(TryTypeInfo2))
   }
 
   @Test
-  def testTryTypeInequalityWithDifferentType: Unit = {
+  def testTryTypeInequalityWithDifferentType(): Unit = {
     val TryTypeInfo = new TryTypeInfo[Integer, Try[Integer]](BasicTypeInfo.INT_TYPE_INFO)
     val genericTypeInfo = new GenericTypeInfo[Double](Double.getClass.asInstanceOf[Class[Double]])
 
+    //noinspection ComparingUnrelatedTypes
     assert(!TryTypeInfo.equals(genericTypeInfo))
   }
 }


Mime
View raw message