flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [11/15] flink git commit: [FLINK-3995] [build] flink-test-utils also contains the streaming test utilities.
Date Tue, 05 Jul 2016 14:38:48 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
new file mode 100644
index 0000000..da3de3d
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@SuppressWarnings("serial")
+public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
+
+	private static List<String> testResults;
+
+	@Test
+	public void testCoGroup() throws Exception {
+
+		testResults = new ArrayList<>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple2.of("a", 0));
+				ctx.collect(Tuple2.of("a", 1));
+				ctx.collect(Tuple2.of("a", 2));
+
+				ctx.collect(Tuple2.of("b", 3));
+				ctx.collect(Tuple2.of("b", 4));
+				ctx.collect(Tuple2.of("b", 5));
+
+				ctx.collect(Tuple2.of("a", 6));
+				ctx.collect(Tuple2.of("a", 7));
+				ctx.collect(Tuple2.of("a", 8));
+
+				// source is finite, so it will have an implicit MAX watermark when it finishes
+			}
+
+			@Override
+			public void cancel() {
+			}
+		}).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
+
+		DataStream<Tuple2<String, Integer>> source2 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+
+			@Override
+			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple2.of("a", 0));
+				ctx.collect(Tuple2.of("a", 1));
+
+				ctx.collect(Tuple2.of("b", 3));
+
+				ctx.collect(Tuple2.of("c", 6));
+				ctx.collect(Tuple2.of("c", 7));
+				ctx.collect(Tuple2.of("c", 8));
+
+				// source is finite, so it will have an implicit MAX watermark when it finishes
+			}
+
+			@Override
+			public void cancel() {
+			}
+		}).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
+
+
+		source1.coGroup(source2)
+				.where(new Tuple2KeyExtractor())
+				.equalTo(new Tuple2KeyExtractor())
+				.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+				.apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() {
+					@Override
+					public void coGroup(Iterable<Tuple2<String, Integer>> first,
+							Iterable<Tuple2<String, Integer>> second,
+							Collector<String> out) throws Exception {
+						StringBuilder result = new StringBuilder();
+						result.append("F:");
+						for (Tuple2<String, Integer> t: first) {
+							result.append(t.toString());
+						}
+						result.append(" S:");
+						for (Tuple2<String, Integer> t: second) {
+							result.append(t.toString());
+						}
+						out.collect(result.toString());
+					}
+				})
+				.addSink(new SinkFunction<String>() {
+					@Override
+					public void invoke(String value) throws Exception {
+						testResults.add(value);
+					}
+				});
+
+		env.execute("CoGroup Test");
+
+		List<String> expectedResult = Arrays.asList(
+				"F:(a,0)(a,1)(a,2) S:(a,0)(a,1)",
+				"F:(b,3)(b,4)(b,5) S:(b,3)",
+				"F:(a,6)(a,7)(a,8) S:",
+				"F: S:(c,6)(c,7)(c,8)");
+
+		Collections.sort(expectedResult);
+		Collections.sort(testResults);
+
+		Assert.assertEquals(expectedResult, testResults);
+	}
+
+	@Test
+	public void testJoin() throws Exception {
+
+		testResults = new ArrayList<>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+
+			@Override
+			public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple3.of("a", "x", 0));
+				ctx.collect(Tuple3.of("a", "y", 1));
+				ctx.collect(Tuple3.of("a", "z", 2));
+
+				ctx.collect(Tuple3.of("b", "u", 3));
+				ctx.collect(Tuple3.of("b", "w", 5));
+
+				ctx.collect(Tuple3.of("a", "i", 6));
+				ctx.collect(Tuple3.of("a", "j", 7));
+				ctx.collect(Tuple3.of("a", "k", 8));
+
+				// source is finite, so it will have an implicit MAX watermark when it finishes
+			}
+
+			@Override
+			public void cancel() {}
+			
+		}).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());
+
+		DataStream<Tuple3<String, String, Integer>> source2 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+
+			@Override
+			public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple3.of("a", "u", 0));
+				ctx.collect(Tuple3.of("a", "w", 1));
+
+				ctx.collect(Tuple3.of("b", "i", 3));
+				ctx.collect(Tuple3.of("b", "k", 5));
+
+				ctx.collect(Tuple3.of("a", "x", 6));
+				ctx.collect(Tuple3.of("a", "z", 8));
+
+				// source is finite, so it will have an implicit MAX watermark when it finishes
+			}
+
+			@Override
+			public void cancel() {}
+			
+		}).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());
+
+
+		source1.join(source2)
+				.where(new Tuple3KeyExtractor())
+				.equalTo(new Tuple3KeyExtractor())
+				.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+				.apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
+					@Override
+					public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
+						return first + ":" + second;
+					}
+				})
+				.addSink(new SinkFunction<String>() {
+					@Override
+					public void invoke(String value) throws Exception {
+						testResults.add(value);
+					}
+				});
+
+		env.execute("Join Test");
+
+		List<String> expectedResult = Arrays.asList(
+				"(a,x,0):(a,u,0)",
+				"(a,x,0):(a,w,1)",
+				"(a,y,1):(a,u,0)",
+				"(a,y,1):(a,w,1)",
+				"(a,z,2):(a,u,0)",
+				"(a,z,2):(a,w,1)",
+				"(b,u,3):(b,i,3)",
+				"(b,u,3):(b,k,5)",
+				"(b,w,5):(b,i,3)",
+				"(b,w,5):(b,k,5)",
+				"(a,i,6):(a,x,6)",
+				"(a,i,6):(a,z,8)",
+				"(a,j,7):(a,x,6)",
+				"(a,j,7):(a,z,8)",
+				"(a,k,8):(a,x,6)",
+				"(a,k,8):(a,z,8)");
+
+		Collections.sort(expectedResult);
+		Collections.sort(testResults);
+
+		Assert.assertEquals(expectedResult, testResults);
+	}
+
+	@Test
+	public void testSelfJoin() throws Exception {
+
+		testResults = new ArrayList<>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
+				ctx.collect(Tuple3.of("a", "x", 0));
+				ctx.collect(Tuple3.of("a", "y", 1));
+				ctx.collect(Tuple3.of("a", "z", 2));
+
+				ctx.collect(Tuple3.of("b", "u", 3));
+				ctx.collect(Tuple3.of("b", "w", 5));
+
+				ctx.collect(Tuple3.of("a", "i", 6));
+				ctx.collect(Tuple3.of("a", "j", 7));
+				ctx.collect(Tuple3.of("a", "k", 8));
+
+				// source is finite, so it will have an implicit MAX watermark when it finishes
+			}
+
+			@Override
+			public void cancel() {
+			}
+		}).assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());
+
+		source1.join(source1)
+				.where(new Tuple3KeyExtractor())
+				.equalTo(new Tuple3KeyExtractor())
+				.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+				.apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
+					@Override
+					public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
+						return first + ":" + second;
+					}
+				})
+				.addSink(new SinkFunction<String>() {
+					@Override
+					public void invoke(String value) throws Exception {
+						testResults.add(value);
+					}
+				});
+
+		env.execute("Self-Join Test");
+
+		List<String> expectedResult = Arrays.asList(
+				"(a,x,0):(a,x,0)",
+				"(a,x,0):(a,y,1)",
+				"(a,x,0):(a,z,2)",
+				"(a,y,1):(a,x,0)",
+				"(a,y,1):(a,y,1)",
+				"(a,y,1):(a,z,2)",
+				"(a,z,2):(a,x,0)",
+				"(a,z,2):(a,y,1)",
+				"(a,z,2):(a,z,2)",
+				"(b,u,3):(b,u,3)",
+				"(b,u,3):(b,w,5)",
+				"(b,w,5):(b,u,3)",
+				"(b,w,5):(b,w,5)",
+				"(a,i,6):(a,i,6)",
+				"(a,i,6):(a,j,7)",
+				"(a,i,6):(a,k,8)",
+				"(a,j,7):(a,i,6)",
+				"(a,j,7):(a,j,7)",
+				"(a,j,7):(a,k,8)",
+				"(a,k,8):(a,i,6)",
+				"(a,k,8):(a,j,7)",
+				"(a,k,8):(a,k,8)");
+
+		Collections.sort(expectedResult);
+		Collections.sort(testResults);
+
+		Assert.assertEquals(expectedResult, testResults);
+	}
+
+	private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {
+		
+		@Override
+		public long extractTimestamp(Tuple2<String, Integer> element, long previousTimestamp) {
+			return element.f1;
+		}
+
+		@Override
+		public Watermark checkAndGetNextWatermark(Tuple2<String, Integer> element, long extractedTimestamp) {
+			return new Watermark(extractedTimestamp - 1);
+		}
+	}
+
+	private static class Tuple3TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple3<String, String, Integer>> {
+
+		@Override
+		public long extractTimestamp(Tuple3<String, String, Integer> element, long previousTimestamp) {
+			return element.f2;
+		}
+
+		@Override
+		public Watermark checkAndGetNextWatermark(Tuple3<String, String, Integer> lastElement, long extractedTimestamp) {
+			return new Watermark(lastElement.f2 - 1);
+		}
+	}
+
+	private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String,Integer>, String> {
+
+		@Override
+		public String getKey(Tuple2<String, Integer> value) throws Exception {
+			return value.f0;
+		}
+	}
+
+	private static class Tuple3KeyExtractor implements KeySelector<Tuple3<String, String, Integer>, String> {
+
+		@Override
+		public String getKey(Tuple3<String, String, Integer> value) throws Exception {
+			return value.f0;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoStreamITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoStreamITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoStreamITCase.java
new file mode 100644
index 0000000..360ceb3
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoStreamITCase.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class CoStreamITCase extends StreamingMultipleProgramsTestBase {
+
+	@Test
+	public void test() throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		TestListResultSink<String> resultSink = new TestListResultSink<String>();
+
+		DataStream<Integer> src = env.fromElements(1, 3, 5);
+
+		DataStream<Integer> filter1 = src
+				.filter(new FilterFunction<Integer>() {
+					@Override
+					public boolean filter(Integer value) throws Exception {
+						return true;
+					}
+				})
+				
+				.keyBy(new KeySelector<Integer, Integer>() {
+					@Override
+					public Integer getKey(Integer value) throws Exception {
+						return value;
+					}
+				});
+
+		DataStream<Tuple2<Integer, Integer>> filter2 = src
+				.map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
+
+					@Override
+					public Tuple2<Integer, Integer> map(Integer value) throws Exception {
+						return new Tuple2<>(value, value + 1);
+					}
+				})
+				.rebalance()
+				.filter(new FilterFunction<Tuple2<Integer, Integer>>() {
+
+					@Override
+					public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
+						return true;
+					}
+				})
+				.disableChaining()
+				.keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
+
+					@Override
+					public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
+						return value.f0;
+					}
+				});
+
+		DataStream<String> connected = filter1.connect(filter2)
+				.flatMap(new CoFlatMapFunction<Integer, Tuple2<Integer, Integer>, String>() {
+
+					@Override
+					public void flatMap1(Integer value, Collector<String> out) throws Exception {
+						out.collect(value.toString());
+					}
+		
+					@Override
+					public void flatMap2(Tuple2<Integer, Integer> value, Collector<String> out) throws Exception {
+						out.collect(value.toString());
+					}
+				});
+
+		connected.addSink(resultSink);
+
+		
+		env.execute();
+
+		List<String> expected = Arrays.asList("(1,2)", "(3,4)", "(5,6)", "1", "3", "5");
+		List<String> result = resultSink.getResult();
+		Collections.sort(result);
+		assertEquals(expected, result);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
new file mode 100644
index 0000000..c345b37
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Integration test for streaming programs using POJOs and key selectors
+ *
+ * See FLINK-3697
+ */
+public class DataStreamPojoITCase extends StreamingMultipleProgramsTestBase {
+	static List<Data> elements = new ArrayList<>();
+	static {
+		elements.add(new Data(0,0,0));
+		elements.add(new Data(0,0,0));
+		elements.add(new Data(1,1,1));
+		elements.add(new Data(1,1,1));
+		elements.add(new Data(2,2,3));
+		elements.add(new Data(2,2,3));
+	}
+
+	/**
+	 * Test composite key on the Data POJO (with nested fields)
+	 */
+	@Test
+	public void testCompositeKeyOnNestedPojo() throws Exception {
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.getConfig().disableObjectReuse();
+		see.setParallelism(3);
+
+		DataStream<Data> dataStream = see.fromCollection(elements);
+
+		DataStream<Data> summedStream = dataStream
+				.keyBy("aaa", "abc", "wxyz")
+				.sum("sum")
+				.keyBy("aaa", "abc", "wxyz")
+				.flatMap(new FlatMapFunction<Data, Data>() {
+					Data[] first = new Data[3];
+					@Override
+					public void flatMap(Data value, Collector<Data> out) throws Exception {
+						if(first[value.aaa] == null) {
+							first[value.aaa] = value;
+							if(value.sum != 1) {
+								throw new RuntimeException("Expected the sum to be one");
+							}
+						} else {
+							if(value.sum != 2) {
+								throw new RuntimeException("Expected the sum to be two");
+							}
+							if(first[value.aaa].aaa != value.aaa) {
+								throw new RuntimeException("aaa key wrong");
+							}
+							if(first[value.aaa].abc != value.abc) {
+								throw new RuntimeException("abc key wrong");
+							}
+							if(first[value.aaa].wxyz != value.wxyz) {
+								throw new RuntimeException("wxyz key wrong");
+							}
+						}
+					}
+				});
+
+		summedStream.print();
+
+		see.execute();
+	}
+
+	/**
+	 * Test composite & nested key on the Data POJO
+	 */
+	@Test
+	public void testNestedKeyOnNestedPojo() throws Exception {
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.getConfig().disableObjectReuse();
+		see.setParallelism(4);
+
+		DataStream<Data> dataStream = see.fromCollection(elements);
+
+		DataStream<Data> summedStream = dataStream
+				.keyBy("aaa", "stats.count")
+				.sum("sum")
+				.keyBy("aaa", "stats.count")
+				.flatMap(new FlatMapFunction<Data, Data>() {
+					Data[] first = new Data[3];
+					@Override
+					public void flatMap(Data value, Collector<Data> out) throws Exception {
+						if(value.stats.count != 123) {
+							throw new RuntimeException("Wrong value for value.stats.count");
+						}
+						if(first[value.aaa] == null) {
+							first[value.aaa] = value;
+							if(value.sum != 1) {
+								throw new RuntimeException("Expected the sum to be one");
+							}
+						} else {
+							if(value.sum != 2) {
+								throw new RuntimeException("Expected the sum to be two");
+							}
+							if(first[value.aaa].aaa != value.aaa) {
+								throw new RuntimeException("aaa key wrong");
+							}
+							if(first[value.aaa].abc != value.abc) {
+								throw new RuntimeException("abc key wrong");
+							}
+							if(first[value.aaa].wxyz != value.wxyz) {
+								throw new RuntimeException("wxyz key wrong");
+							}
+						}
+					}
+				});
+
+		summedStream.print();
+
+		see.execute();
+	}
+
+
+	/**
+	 * As per FLINK-3702 Flink doesn't support nested pojo fields for sum()
+	 */
+	@Test(expected = IllegalArgumentException.class)
+	public void testFailOnNestedPojoFieldAccessor() throws Exception {
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Data> dataStream = see.fromCollection(elements);
+		dataStream.keyBy("aaa", "stats.count").sum("stats.count");
+	}
+
+	public static class Data {
+		public int sum; // sum
+		public int aaa; // keyBy
+		public int abc; //keyBy
+		public long wxyz; // keyBy
+		public int t1;
+		public int t2;
+		public Policy policy;
+		public Stats stats;
+
+		public Data() {
+		}
+		public Data(int aaa, int abc, int wxyz) {
+			this.sum = 1;
+			this.aaa = aaa;
+			this.abc = abc;
+			this.wxyz = wxyz;
+			this.stats = new Stats();
+			this.stats.count = 123L;
+		}
+
+		@Override
+		public String toString() {
+			return "Data{" +
+					"sum=" + sum +
+					", aaa=" + aaa +
+					", abc=" + abc +
+					", wxyz=" + wxyz +
+					'}';
+		}
+	}
+	public static class Policy {
+		public short a;
+		public short b;
+		public boolean c;
+		public boolean d;
+
+		public Policy() {}
+	}
+
+	public static class Stats {
+		public long count;
+		public float a;
+		public float b;
+		public float c;
+		public float d;
+		public float e;
+
+		public Stats() {}
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java
new file mode 100644
index 0000000..8b84112
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DirectedOutputITCase.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class DirectedOutputITCase extends StreamingMultipleProgramsTestBase {
+
+	private static final String TEN = "ten";
+	private static final String ODD = "odd";
+	private static final String EVEN = "even";
+	private static final String NON_SELECTED = "nonSelected";
+
+	static final class MyOutputSelector implements OutputSelector<Long> {
+		private static final long serialVersionUID = 1L;
+
+		List<String> outputs = new ArrayList<String>();
+
+		@Override
+		public Iterable<String> select(Long value) {
+			outputs.clear();
+			if (value % 2 == 0) {
+				outputs.add(EVEN);
+			} else {
+				outputs.add(ODD);
+			}
+
+			if (value == 10L) {
+				outputs.add(TEN);
+			}
+
+			if (value == 11L) {
+				outputs.add(NON_SELECTED);
+			}
+			return outputs;
+		}
+	}
+
+	@Test
+	public void outputSelectorTest() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(3);
+
+		TestListResultSink<Long> evenSink = new TestListResultSink<Long>();
+		TestListResultSink<Long> oddAndTenSink = new TestListResultSink<Long>();
+		TestListResultSink<Long> evenAndOddSink = new TestListResultSink<Long>();
+		TestListResultSink<Long> allSink = new TestListResultSink<Long>();
+
+		SplitStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
+		source.select(EVEN).addSink(evenSink);
+		source.select(ODD, TEN).addSink(oddAndTenSink);
+		source.select(EVEN, ODD).addSink(evenAndOddSink);
+		source.addSink(allSink);
+
+		env.execute();
+		assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), evenSink.getSortedResult());
+		assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), oddAndTenSink.getSortedResult());
+		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
+				evenAndOddSink.getSortedResult());
+		assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
+				allSink.getSortedResult());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
new file mode 100644
index 0000000..1fbebd0
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
@@ -0,0 +1,703 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.streaming.runtime.util.EvenOddOutputSelector;
+import org.apache.flink.test.streaming.runtime.util.NoOpIntMap;
+import org.apache.flink.test.streaming.runtime.util.ReceiveCheckNoOpSink;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MathUtils;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@SuppressWarnings({ "unchecked", "unused", "serial" })
+public class IterateITCase extends StreamingMultipleProgramsTestBase {
+
+	private static final Logger LOG = LoggerFactory.getLogger(IterateITCase.class);
+
+	private static boolean iterated[];
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testIncorrectParallelism() throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> source = env.fromElements(1, 10);
+
+		IterativeStream<Integer> iter1 = source.iterate();
+		SingleOutputStreamOperator<Integer> map1 = iter1.map(NoOpIntMap);
+		iter1.closeWith(map1).print();
+	}
+
+	@Test
+	public void testDoubleClosing() throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// introduce dummy mapper to get to correct parallelism
+		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
+		IterativeStream<Integer> iter1 = source.iterate();
+
+		iter1.closeWith(iter1.map(NoOpIntMap));
+		iter1.closeWith(iter1.map(NoOpIntMap));
+	}
+
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testDifferingParallelism() throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// introduce dummy mapper to get to correct parallelism
+		DataStream<Integer> source = env.fromElements(1, 10)
+				.map(NoOpIntMap);
+
+		IterativeStream<Integer> iter1 = source.iterate();
+
+
+		iter1.closeWith(iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2));
+
+	}
+
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testCoDifferingParallelism() throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// introduce dummy mapper to get to correct parallelism
+		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
+		ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
+				Integer.class);
+
+
+		coIter.closeWith(coIter.map(NoOpIntCoMap).setParallelism(DEFAULT_PARALLELISM / 2));
+
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClosingFromOutOfLoop() throws Exception {
+
+		// this test verifies that we cannot close an iteration with a DataStream that does not
+		// have the iteration in its predecessors
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// introduce dummy mapper to get to correct parallelism
+		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
+		IterativeStream<Integer> iter1 = source.iterate();
+		IterativeStream<Integer> iter2 = source.iterate();
+
+
+		iter2.closeWith(iter1.map(NoOpIntMap));
+
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testCoIterClosingFromOutOfLoop() throws Exception {
+
+		// this test verifies that we cannot close an iteration with a DataStream that does not
+		// have the iteration in its predecessors
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// introduce dummy mapper to get to correct parallelism
+		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
+		IterativeStream<Integer> iter1 = source.iterate();
+		ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
+				Integer.class);
+
+
+		coIter.closeWith(iter1.map(NoOpIntMap));
+
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testExecutionWithEmptyIteration() throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);
+
+		IterativeStream<Integer> iter1 = source.iterate();
+
+		iter1.map(NoOpIntMap).print();
+
+		env.execute();
+	}
+
+	@Test
+	public void testImmutabilityWithCoiteration() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap); // for rebalance
+
+		IterativeStream<Integer> iter1 = source.iterate();
+		// Calling withFeedbackType should create a new iteration
+		ConnectedIterativeStreams<Integer, String> iter2 = iter1.withFeedbackType(String.class);
+
+		iter1.closeWith(iter1.map(NoOpIntMap)).print();
+		iter2.closeWith(iter2.map(NoOpCoMap)).print();
+
+		StreamGraph graph = env.getStreamGraph();
+
+		assertEquals(2, graph.getIterationSourceSinkPairs().size());
+
+		for (Tuple2<StreamNode, StreamNode> sourceSinkPair: graph.getIterationSourceSinkPairs()) {
+			assertEquals(sourceSinkPair.f0.getOutEdges().get(0).getTargetVertex(), sourceSinkPair.f1.getInEdges().get(0).getSourceVertex());
+		}
+	}
+
+	@Test
+	public void testmultipleHeadsTailsSimple() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5)
+				.shuffle()
+				.map(NoOpIntMap).name("ParallelizeMapShuffle");
+		DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
+				.map(NoOpIntMap).name("ParallelizeMapRebalance");
+
+		IterativeStream<Integer> iter1 = source1.union(source2).iterate();
+
+		DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("IterRebalanceMap").setParallelism(DEFAULT_PARALLELISM / 2);
+		DataStream<Integer> head2 = iter1.map(NoOpIntMap).name("IterForwardMap");
+		DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).addSink(new ReceiveCheckNoOpSink<Integer>());
+		DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
+
+		SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
+				.map(NoOpIntMap).name("EvenOddSourceMap")
+				.split(new EvenOddOutputSelector());
+
+		iter1.closeWith(source3.select("even").union(
+				head1.rebalance().map(NoOpIntMap).broadcast(), head2.shuffle()));
+
+		StreamGraph graph = env.getStreamGraph();
+
+		JobGraph jg = graph.getJobGraph();
+
+		assertEquals(1, graph.getIterationSourceSinkPairs().size());
+
+		Tuple2<StreamNode, StreamNode> sourceSinkPair = graph.getIterationSourceSinkPairs().iterator().next();
+		StreamNode itSource = sourceSinkPair.f0;
+		StreamNode itSink = sourceSinkPair.f1;
+
+		assertEquals(4, itSource.getOutEdges().size());
+		assertEquals(3, itSink.getInEdges().size());
+
+		assertEquals(itSource.getParallelism(), itSink.getParallelism());
+
+		for (StreamEdge edge : itSource.getOutEdges()) {
+			if (edge.getTargetVertex().getOperatorName().equals("IterRebalanceMap")) {
+				assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+			} else if (edge.getTargetVertex().getOperatorName().equals("IterForwardMap")) {
+				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+			}
+		}
+		for (StreamEdge edge : itSink.getInEdges()) {
+			if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("ParallelizeMapShuffle")) {
+				assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
+			}
+
+			if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("ParallelizeMapForward")) {
+				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+			}
+
+			if (graph.getStreamNode(edge.getSourceId()).getOperatorName().equals("EvenOddSourceMap")) {
+				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+				assertTrue(edge.getSelectedNames().contains("even"));
+			}
+		}
+
+		// Test co-location
+
+		JobVertex itSource1 = null;
+		JobVertex itSink1 = null;
+
+		for (JobVertex vertex : jg.getVertices()) {
+			if (vertex.getName().contains("IterationSource")) {
+				itSource1 = vertex;
+			} else if (vertex.getName().contains("IterationSink")) {
+
+				itSink1 = vertex;
+
+			}
+		}
+
+		assertTrue(itSource1.getCoLocationGroup() != null);
+		assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup());
+	}
+
+	@Test
+	public void testmultipleHeadsTailsWithTailPartitioning() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> source1 = env.fromElements(1, 2, 3, 4, 5)
+				.shuffle()
+				.map(NoOpIntMap);
+
+		DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
+				.map(NoOpIntMap);
+
+		IterativeStream<Integer> iter1 = source1.union(source2).iterate();
+
+		DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("map1");
+		DataStream<Integer> head2 = iter1.map(NoOpIntMap)
+				.setParallelism(DEFAULT_PARALLELISM / 2)
+				.name("shuffle").rebalance();
+		DataStreamSink<Integer> head3 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2)
+				.addSink(new ReceiveCheckNoOpSink<Integer>());
+		DataStreamSink<Integer> head4 = iter1.map(NoOpIntMap).addSink(new ReceiveCheckNoOpSink<Integer>());
+
+		SplitStream<Integer> source3 = env.fromElements(1, 2, 3, 4, 5)
+				.map(NoOpIntMap)
+				.name("split")
+				.split(new EvenOddOutputSelector());
+
+		iter1.closeWith(
+				source3.select("even").union(
+						head1.map(NoOpIntMap).name("bc").broadcast(),
+						head2.map(NoOpIntMap).shuffle()));
+
+		StreamGraph graph = env.getStreamGraph();
+
+		JobGraph jg = graph.getJobGraph();
+
+		assertEquals(1, graph.getIterationSourceSinkPairs().size());
+
+		Tuple2<StreamNode, StreamNode> sourceSinkPair = graph.getIterationSourceSinkPairs().iterator().next();
+		StreamNode itSource = sourceSinkPair.f0;
+		StreamNode itSink = sourceSinkPair.f1;
+
+		assertEquals(4, itSource.getOutEdges().size());
+		assertEquals(3, itSink.getInEdges().size());
+
+
+		assertEquals(itSource.getParallelism(), itSink.getParallelism());
+
+		for (StreamEdge edge : itSource.getOutEdges()) {
+			if (edge.getTargetVertex().getOperatorName().equals("map1")) {
+				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+				assertEquals(4, edge.getTargetVertex().getParallelism());
+			} else if (edge.getTargetVertex().getOperatorName().equals("shuffle")) {
+				assertTrue(edge.getPartitioner() instanceof RebalancePartitioner);
+				assertEquals(2, edge.getTargetVertex().getParallelism());
+			}
+		}
+		for (StreamEdge edge : itSink.getInEdges()) {
+			String tailName = edge.getSourceVertex().getOperatorName();
+			if (tailName.equals("split")) {
+				assertTrue(edge.getPartitioner() instanceof ForwardPartitioner);
+				assertTrue(edge.getSelectedNames().contains("even"));
+			} else if (tailName.equals("bc")) {
+				assertTrue(edge.getPartitioner() instanceof BroadcastPartitioner);
+			} else if (tailName.equals("shuffle")) {
+				assertTrue(edge.getPartitioner() instanceof ShufflePartitioner);
+			}
+		}
+
+		// Test co-location
+
+		JobVertex itSource1 = null;
+		JobVertex itSink1 = null;
+
+		for (JobVertex vertex : jg.getVertices()) {
+			if (vertex.getName().contains("IterationSource")) {
+				itSource1 = vertex;
+			} else if (vertex.getName().contains("IterationSink")) {
+				itSink1 = vertex;
+			}
+		}
+
+		assertTrue(itSource1.getCoLocationGroup() != null);
+		assertTrue(itSink1.getCoLocationGroup() != null);
+
+		assertEquals(itSource1.getCoLocationGroup(), itSink1.getCoLocationGroup());
+	}
+
+	@SuppressWarnings("rawtypes")
+	@Test
+	public void testSimpleIteration() throws Exception {
+		int numRetries = 5;
+		int timeoutScale = 1;
+
+		for (int numRetry = 0; numRetry < numRetries; numRetry++) {
+			try {
+				StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+				iterated = new boolean[DEFAULT_PARALLELISM];
+
+				DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
+						.map(NoOpBoolMap).name("ParallelizeMap");
+
+				IterativeStream<Boolean> iteration = source.iterate(3000 * timeoutScale);
+
+				DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap);
+
+				iteration.map(NoOpBoolMap).addSink(new ReceiveCheckNoOpSink());
+
+				iteration.closeWith(increment).addSink(new ReceiveCheckNoOpSink());
+
+				env.execute();
+
+				for (boolean iter : iterated) {
+					assertTrue(iter);
+				}
+
+				break; // success
+			} catch (Throwable t) {
+				LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
+
+				if (numRetry >= numRetries - 1) {
+					throw t;
+				} else {
+					timeoutScale *= 2;
+				}
+			}
+		}
+	}
+
+	@Test
+	public void testCoIteration() throws Exception {
+		int numRetries = 5;
+		int timeoutScale = 1;
+
+		for (int numRetry = 0; numRetry < numRetries; numRetry++) {
+			try {
+				TestSink.collected = new ArrayList<>();
+
+				StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+				env.setParallelism(2);
+
+				DataStream<String> otherSource = env.fromElements("1000", "2000")
+						.map(NoOpStrMap).name("ParallelizeMap");
+
+
+				ConnectedIterativeStreams<Integer, String> coIt = env.fromElements(0, 0)
+						.map(NoOpIntMap).name("ParallelizeMap")
+						.iterate(2000 * timeoutScale)
+						.withFeedbackType("String");
+
+				try {
+					coIt.keyBy(1, 2);
+					fail();
+				} catch (InvalidProgramException e) {
+					// this is expected
+				}
+
+				DataStream<String> head = coIt
+						.flatMap(new RichCoFlatMapFunction<Integer, String, String>() {
+
+							private static final long serialVersionUID = 1L;
+							boolean seenFromSource = false;
+
+							@Override
+							public void flatMap1(Integer value, Collector<String> out) throws Exception {
+								out.collect(((Integer) (value + 1)).toString());
+							}
+
+							@Override
+							public void flatMap2(String value, Collector<String> out) throws Exception {
+								Integer intVal = Integer.valueOf(value);
+								if (intVal < 2) {
+									out.collect(((Integer) (intVal + 1)).toString());
+								}
+								if (intVal == 1000 || intVal == 2000) {
+									seenFromSource = true;
+								}
+							}
+
+							@Override
+							public void close() {
+								assertTrue(seenFromSource);
+							}
+						});
+
+				coIt.map(new CoMapFunction<Integer, String, String>() {
+
+					@Override
+					public String map1(Integer value) throws Exception {
+						return value.toString();
+					}
+
+					@Override
+					public String map2(String value) throws Exception {
+						return value;
+					}
+				}).addSink(new ReceiveCheckNoOpSink<String>());
+
+				coIt.closeWith(head.broadcast().union(otherSource));
+
+				head.addSink(new TestSink()).setParallelism(1);
+
+				assertEquals(1, env.getStreamGraph().getIterationSourceSinkPairs().size());
+
+				env.execute();
+
+				Collections.sort(TestSink.collected);
+				assertEquals(Arrays.asList("1", "1", "2", "2", "2", "2"), TestSink.collected);
+
+				break; // success
+			} catch (Throwable t) {
+				LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
+
+				if (numRetry >= numRetries - 1) {
+					throw t;
+				} else {
+					timeoutScale *= 2;
+				}
+			}
+		}
+	}
+
+	/**
+	 * This test relies on the hash function used by the {@link DataStream#keyBy}, which is
+	 * assumed to be {@link MathUtils#murmurHash}.
+	 *
+	 * For the test to pass all FlatMappers must see at least two records in the iteration,
+	 * which can only be achieved if the hashed values of the input keys map to a complete
+	 * congruence system. Given that the test is designed for 3 parallel FlatMapper instances
+	 * keys chosen from the [1,3] range are a suitable choice.
+     */
+	@Test
+	public void testGroupByFeedback() throws Exception {
+		int numRetries = 5;
+		int timeoutScale = 1;
+
+		for (int numRetry = 0; numRetry < numRetries; numRetry++) {
+			try {
+				StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+				env.setParallelism(DEFAULT_PARALLELISM - 1);
+
+				KeySelector<Integer, Integer> key = new KeySelector<Integer, Integer>() {
+
+					@Override
+					public Integer getKey(Integer value) throws Exception {
+						return value % 3;
+					}
+				};
+
+				DataStream<Integer> source = env.fromElements(1, 2, 3)
+						.map(NoOpIntMap).name("ParallelizeMap");
+
+				IterativeStream<Integer> it = source.keyBy(key).iterate(3000 * timeoutScale);
+
+				DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>() {
+
+					int received = 0;
+					int key = -1;
+
+					@Override
+					public void flatMap(Integer value, Collector<Integer> out) throws Exception {
+						received++;
+						if (key == -1) {
+							key = MathUtils.murmurHash(value % 3) % 3;
+						} else {
+							assertEquals(key, MathUtils.murmurHash(value % 3) % 3);
+						}
+						if (value > 0) {
+							out.collect(value - 1);
+						}
+					}
+
+					@Override
+					public void close() {
+						assertTrue(received > 1);
+					}
+				});
+
+				it.closeWith(head.keyBy(key).union(head.map(NoOpIntMap).keyBy(key))).addSink(new ReceiveCheckNoOpSink<Integer>());
+
+				env.execute();
+
+				break; // success
+			} catch (Throwable t) {
+				LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
+
+				if (numRetry >= numRetries - 1) {
+					throw t;
+				} else {
+					timeoutScale *= 2;
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings("deprecation")
+	@Test
+	public void testWithCheckPointing() throws Exception {
+		int numRetries = 5;
+		int timeoutScale = 1;
+
+		for (int numRetry = 0; numRetry < numRetries; numRetry++) {
+			try {
+				StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+				env.enableCheckpointing();
+
+				DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
+						.map(NoOpBoolMap).name("ParallelizeMap");
+
+
+				IterativeStream<Boolean> iteration = source.iterate(3000 * timeoutScale);
+
+				iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>());
+
+				try {
+					env.execute();
+
+					// this statement should never be reached
+					fail();
+				} catch (UnsupportedOperationException e) {
+					// expected behaviour
+				}
+
+				// Test force checkpointing
+
+				try {
+					env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false);
+					env.execute();
+
+					// this statement should never be reached
+					fail();
+				} catch (UnsupportedOperationException e) {
+					// expected behaviour
+				}
+
+				env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true);
+				env.getStreamGraph().getJobGraph();
+
+				break; // success
+			} catch (Throwable t) {
+				LOG.info("Run " + (numRetry + 1) + "/" + numRetries + " failed", t);
+
+				if (numRetry >= numRetries - 1) {
+					throw t;
+				} else {
+					timeoutScale *= 2;
+				}
+			}
+		}
+	}
+
+	public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
+		public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
+			int indx = getRuntimeContext().getIndexOfThisSubtask();
+			if (value) {
+				iterated[indx] = true;
+			} else {
+				out.collect(true);
+			}
+		}
+	}
+
+	public static CoMapFunction<Integer, String, String> NoOpCoMap = new CoMapFunction<Integer, String, String>() {
+
+		public String map1(Integer value) throws Exception {
+			return value.toString();
+		}
+
+		public String map2(String value) throws Exception {
+			return value;
+		}
+	};
+
+	public static MapFunction<Integer, Integer> NoOpIntMap = new NoOpIntMap();
+
+	public static MapFunction<String, String> NoOpStrMap = new MapFunction<String, String>() {
+
+		public String map(String value) throws Exception {
+			return value;
+		}
+
+	};
+
+	public static CoMapFunction<Integer, Integer, Integer> NoOpIntCoMap = new CoMapFunction<Integer, Integer, Integer>() {
+
+		public Integer map1(Integer value) throws Exception {
+			return value;
+		}
+
+		public Integer map2(Integer value) throws Exception {
+			return value;
+		}
+
+	};
+
+	public static MapFunction<Boolean, Boolean> NoOpBoolMap = new MapFunction<Boolean, Boolean>() {
+
+		public Boolean map(Boolean value) throws Exception {
+			return value;
+		}
+
+	};
+
+	public static class TestSink implements SinkFunction<String> {
+
+		private static final long serialVersionUID = 1L;
+		public static List<String> collected = new ArrayList<String>();
+
+		@Override
+		public void invoke(String value) throws Exception {
+			collected.add(value);
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java
new file mode 100644
index 0000000..0902a3c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/OutputSplitterITCase.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class OutputSplitterITCase extends StreamingMultipleProgramsTestBase {
+
+	private static ArrayList<Integer> expectedSplitterResult = new ArrayList<Integer>();
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testOnMergedDataStream() throws Exception {
+		TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
+		TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.setBufferTimeout(1);
+
+		DataStream<Integer> d1 = env.fromElements(0, 2, 4, 6, 8);
+		DataStream<Integer> d2 = env.fromElements(1, 3, 5, 7, 9);
+
+		d1 = d1.union(d2);
+
+		d1.split(new OutputSelector<Integer>() {
+			private static final long serialVersionUID = 8354166915727490130L;
+
+			@Override
+			public Iterable<String> select(Integer value) {
+				List<String> s = new ArrayList<String>();
+				if (value > 4) {
+					s.add(">");
+				} else {
+					s.add("<");
+				}
+				return s;
+			}
+		}).select(">").addSink(splitterResultSink1);
+
+		d1.split(new OutputSelector<Integer>() {
+			private static final long serialVersionUID = -6822487543355994807L;
+
+			@Override
+			public Iterable<String> select(Integer value) {
+				List<String> s = new ArrayList<String>();
+				if (value % 3 == 0) {
+					s.add("yes");
+				} else {
+					s.add("no");
+				}
+				return s;
+			}
+		}).select("yes").addSink(splitterResultSink2);
+		env.execute();
+
+		expectedSplitterResult.clear();
+		expectedSplitterResult.addAll(Arrays.asList(5, 6, 7, 8, 9));
+		assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
+
+		expectedSplitterResult.clear();
+		expectedSplitterResult.addAll(Arrays.asList(0, 3, 6, 9));
+		assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
+	}
+
+	@Test
+	public void testOnSingleDataStream() throws Exception {
+		TestListResultSink<Integer> splitterResultSink1 = new TestListResultSink<Integer>();
+		TestListResultSink<Integer> splitterResultSink2 = new TestListResultSink<Integer>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.setBufferTimeout(1);
+
+		DataStream<Integer> ds = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+		ds.split(new OutputSelector<Integer>() {
+			private static final long serialVersionUID = 2524335410904414121L;
+
+			@Override
+			public Iterable<String> select(Integer value) {
+				List<String> s = new ArrayList<String>();
+				if (value % 2 == 0) {
+					s.add("even");
+				} else {
+					s.add("odd");
+				}
+				return s;
+			}
+		}).select("even").addSink(splitterResultSink1);
+
+		ds.split(new OutputSelector<Integer>() {
+
+			private static final long serialVersionUID = -511693919586034092L;
+
+			@Override
+			public Iterable<String> select(Integer value) {
+				List<String> s = new ArrayList<String>();
+				if (value % 4 == 0) {
+					s.add("yes");
+				} else {
+					s.add("no");
+				}
+				return s;
+			}
+		}).select("yes").addSink(splitterResultSink2);
+		env.execute();
+
+		expectedSplitterResult.clear();
+		expectedSplitterResult.addAll(Arrays.asList(0, 2, 4, 6, 8));
+		assertEquals(expectedSplitterResult, splitterResultSink1.getSortedResult());
+
+		expectedSplitterResult.clear();
+		expectedSplitterResult.addAll(Arrays.asList(0, 4, 8));
+		assertEquals(expectedSplitterResult, splitterResultSink2.getSortedResult());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
new file mode 100644
index 0000000..bff8df1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.streaming.runtime.util.NoOpIntMap;
+import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * IT case that tests the different stream partitioning schemes.
+ */
+@SuppressWarnings("serial")
+public class PartitionerITCase extends StreamingMultipleProgramsTestBase {
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testForwardFailsLowToHighParallelism() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Integer> src = env.fromElements(1, 2, 3);
+
+		// this doesn't work because it goes from 1 to 3
+		src.forward().map(new NoOpIntMap());
+
+		env.execute();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testForwardFailsHightToLowParallelism() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// this does a rebalance that works
+		DataStream<Integer> src = env.fromElements(1, 2, 3).map(new NoOpIntMap());
+
+		// this doesn't work because it goes from 3 to 1
+		src.forward().map(new NoOpIntMap()).setParallelism(1);
+
+		env.execute();
+	}
+
+
+	@Test
+	public void partitionerTest() {
+
+		TestListResultSink<Tuple2<Integer, String>> hashPartitionResultSink =
+				new TestListResultSink<Tuple2<Integer, String>>();
+		TestListResultSink<Tuple2<Integer, String>> customPartitionResultSink =
+				new TestListResultSink<Tuple2<Integer, String>>();
+		TestListResultSink<Tuple2<Integer, String>> broadcastPartitionResultSink =
+				new TestListResultSink<Tuple2<Integer, String>>();
+		TestListResultSink<Tuple2<Integer, String>> forwardPartitionResultSink =
+				new TestListResultSink<Tuple2<Integer, String>>();
+		TestListResultSink<Tuple2<Integer, String>> rebalancePartitionResultSink =
+				new TestListResultSink<Tuple2<Integer, String>>();
+		TestListResultSink<Tuple2<Integer, String>> globalPartitionResultSink =
+				new TestListResultSink<Tuple2<Integer, String>>();
+
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(3);
+
+		DataStream<Tuple1<String>> src = env.fromElements(
+				new Tuple1<String>("a"),
+				new Tuple1<String>("b"),
+				new Tuple1<String>("b"),
+				new Tuple1<String>("a"),
+				new Tuple1<String>("a"),
+				new Tuple1<String>("c"),
+				new Tuple1<String>("a")
+		);
+
+		// partition by hash
+		src
+				.keyBy(0)
+				.map(new SubtaskIndexAssigner())
+				.addSink(hashPartitionResultSink);
+
+		// partition custom
+		DataStream<Tuple2<Integer, String>> partitionCustom = src
+				.partitionCustom(new Partitioner<String>() {
+					@Override
+					public int partition(String key, int numPartitions) {
+						if (key.equals("c")) {
+							return 2;
+						} else {
+							return 0;
+						}
+					}
+				}, 0)
+				.map(new SubtaskIndexAssigner());
+
+		partitionCustom.addSink(customPartitionResultSink);
+
+		// partition broadcast
+		src.broadcast().map(new SubtaskIndexAssigner()).addSink(broadcastPartitionResultSink);
+
+		// partition rebalance
+		src.rebalance().map(new SubtaskIndexAssigner()).addSink(rebalancePartitionResultSink);
+
+		// partition forward
+		src.map(new MapFunction<Tuple1<String>, Tuple1<String>>() {
+			private static final long serialVersionUID = 1L;
+			@Override
+			public Tuple1<String> map(Tuple1<String> value) throws Exception {
+				return value;
+			}
+		})
+				.forward()
+				.map(new SubtaskIndexAssigner())
+				.addSink(forwardPartitionResultSink);
+
+		// partition global
+		src.global().map(new SubtaskIndexAssigner()).addSink(globalPartitionResultSink);
+
+		try {
+			env.execute();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+
+		List<Tuple2<Integer, String>> hashPartitionResult = hashPartitionResultSink.getResult();
+		List<Tuple2<Integer, String>> customPartitionResult = customPartitionResultSink.getResult();
+		List<Tuple2<Integer, String>> broadcastPartitionResult = broadcastPartitionResultSink.getResult();
+		List<Tuple2<Integer, String>> forwardPartitionResult = forwardPartitionResultSink.getResult();
+		List<Tuple2<Integer, String>> rebalancePartitionResult = rebalancePartitionResultSink.getResult();
+		List<Tuple2<Integer, String>> globalPartitionResult = globalPartitionResultSink.getResult();
+
+		verifyHashPartitioning(hashPartitionResult);
+		verifyCustomPartitioning(customPartitionResult);
+		verifyBroadcastPartitioning(broadcastPartitionResult);
+		verifyRebalancePartitioning(forwardPartitionResult);
+		verifyRebalancePartitioning(rebalancePartitionResult);
+		verifyGlobalPartitioning(globalPartitionResult);
+	}
+
+	private static void verifyHashPartitioning(List<Tuple2<Integer, String>> hashPartitionResult) {
+		HashMap<String, Integer> verifier = new HashMap<String, Integer>();
+		for (Tuple2<Integer, String> elem : hashPartitionResult) {
+			Integer subtaskIndex = verifier.get(elem.f1);
+			if (subtaskIndex == null) {
+				verifier.put(elem.f1, elem.f0);
+			} else if (subtaskIndex != elem.f0) {
+				fail();
+			}
+		}
+	}
+
+	private static void verifyCustomPartitioning(List<Tuple2<Integer, String>> customPartitionResult) {
+		for (Tuple2<Integer, String> stringWithSubtask : customPartitionResult) {
+			if (stringWithSubtask.f1.equals("c")) {
+				assertEquals(new Integer(2), stringWithSubtask.f0);
+			} else {
+				assertEquals(new Integer(0), stringWithSubtask.f0);
+			}
+		}
+	}
+
+	private static void verifyBroadcastPartitioning(List<Tuple2<Integer, String>> broadcastPartitionResult) {
+		List<Tuple2<Integer, String>> expected = Arrays.asList(
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(0, "b"),
+				new Tuple2<Integer, String>(0, "b"),
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(0, "c"),
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(1, "a"),
+				new Tuple2<Integer, String>(1, "b"),
+				new Tuple2<Integer, String>(1, "b"),
+				new Tuple2<Integer, String>(1, "a"),
+				new Tuple2<Integer, String>(1, "a"),
+				new Tuple2<Integer, String>(1, "c"),
+				new Tuple2<Integer, String>(1, "a"),
+				new Tuple2<Integer, String>(2, "a"),
+				new Tuple2<Integer, String>(2, "b"),
+				new Tuple2<Integer, String>(2, "b"),
+				new Tuple2<Integer, String>(2, "a"),
+				new Tuple2<Integer, String>(2, "a"),
+				new Tuple2<Integer, String>(2, "c"),
+				new Tuple2<Integer, String>(2, "a"));
+
+		assertEquals(
+				new HashSet<Tuple2<Integer, String>>(expected),
+				new HashSet<Tuple2<Integer, String>>(broadcastPartitionResult));
+	}
+
+	private static void verifyRebalancePartitioning(List<Tuple2<Integer, String>> rebalancePartitionResult) {
+		List<Tuple2<Integer, String>> expected = Arrays.asList(
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(1, "b"),
+				new Tuple2<Integer, String>(2, "b"),
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(1, "a"),
+				new Tuple2<Integer, String>(2, "c"),
+				new Tuple2<Integer, String>(0, "a"));
+
+		assertEquals(
+				new HashSet<Tuple2<Integer, String>>(expected),
+				new HashSet<Tuple2<Integer, String>>(rebalancePartitionResult));
+	}
+
+	private static void verifyGlobalPartitioning(List<Tuple2<Integer, String>> globalPartitionResult) {
+		List<Tuple2<Integer, String>> expected = Arrays.asList(
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(0, "b"),
+				new Tuple2<Integer, String>(0, "b"),
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(0, "a"),
+				new Tuple2<Integer, String>(0, "c"),
+				new Tuple2<Integer, String>(0, "a"));
+
+		assertEquals(
+				new HashSet<Tuple2<Integer, String>>(expected),
+				new HashSet<Tuple2<Integer, String>>(globalPartitionResult));
+	}
+
+	private static class SubtaskIndexAssigner extends RichMapFunction<Tuple1<String>, Tuple2<Integer, String>> {
+		private static final long serialVersionUID = 1L;
+
+		private int indexOfSubtask;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			RuntimeContext runtimeContext = getRuntimeContext();
+			indexOfSubtask = runtimeContext.getIndexOfThisSubtask();
+		}
+
+		@Override
+		public Tuple2<Integer, String> map(Tuple1<String> value) throws Exception {
+			return new Tuple2<Integer, String>(indexOfSubtask, value.f0);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java
new file mode 100644
index 0000000..d33a2b1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SelfConnectionITCase.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class SelfConnectionITCase extends StreamingMultipleProgramsTestBase {
+
+	/**
+	 * We connect two different data streams in a chain to a CoMap.
+	 */
+	@Test
+	public void differentDataStreamSameChain() throws Exception {
+
+		TestListResultSink<String> resultSink = new TestListResultSink<String>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		DataStream<Integer> src = env.fromElements(1, 3, 5);
+
+		DataStream<String> stringMap = src.map(new MapFunction<Integer, String>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public String map(Integer value) throws Exception {
+				return "x " + value;
+			}
+		});
+
+		stringMap.connect(src).map(new CoMapFunction<String, Integer, String>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public String map1(String value) {
+				return value;
+			}
+
+			@Override
+			public String map2(Integer value) {
+				return String.valueOf(value + 1);
+			}
+		}).addSink(resultSink);
+
+		env.execute();
+
+		List<String> expected = Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6");
+
+		List<String> result = resultSink.getResult();
+
+		Collections.sort(expected);
+		Collections.sort(result);
+
+		assertEquals(expected, result);
+	}
+
+	/**
+	 * We connect two different data streams in different chains to a CoMap.
+	 * (This is not actually self-connect.)
+	 */
+	@Test
+	public void differentDataStreamDifferentChain() {
+
+		TestListResultSink<String> resultSink = new TestListResultSink<String>();
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(3);
+
+		DataStream<Integer> src = env.fromElements(1, 3, 5).disableChaining();
+
+		DataStream<String> stringMap = src.flatMap(new FlatMapFunction<Integer, String>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void flatMap(Integer value, Collector<String> out) throws Exception {
+				out.collect("x " + value);
+			}
+		}).keyBy(new KeySelector<String, Integer>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer getKey(String value) throws Exception {
+				return value.length();
+			}
+		});
+
+		DataStream<Long> longMap = src.map(new MapFunction<Integer, Long>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Long map(Integer value) throws Exception {
+				return (long) (value + 1);
+			}
+		}).keyBy(new KeySelector<Long, Integer>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer getKey(Long value) throws Exception {
+				return value.intValue();
+			}
+		});
+
+
+		stringMap.connect(longMap).map(new CoMapFunction<String, Long, String>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public String map1(String value) {
+				return value;
+			}
+
+			@Override
+			public String map2(Long value) {
+				return value.toString();
+			}
+		}).addSink(resultSink);
+
+		try {
+			env.execute();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+
+		List<String> expected = Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6");
+		List<String> result = resultSink.getResult();
+
+		Collections.sort(expected);
+		Collections.sort(result);
+
+		assertEquals(expected, result);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9f42e91/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
new file mode 100644
index 0000000..6288946
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.fail;
+
+public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
+
+	/**
+	 * Verify that the user-specified state backend is used even if checkpointing is disabled.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testStateBackendWithoutCheckpointing() throws Exception {
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(1);
+
+		see.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		see.setStateBackend(new FailingStateBackend());
+
+
+		see.fromElements(new Tuple2<>("Hello", 1))
+			.keyBy(0)
+			.map(new RichMapFunction<Tuple2<String,Integer>, String>() {
+				private static final long serialVersionUID = 1L;
+
+				@Override
+				public void open(Configuration parameters) throws Exception {
+					super.open(parameters);
+					getRuntimeContext().getKeyValueState("test", String.class, "");
+				}
+
+				@Override
+				public String map(Tuple2<String, Integer> value) throws Exception {
+					return value.f0;
+				}
+			})
+			.print();
+		
+		try {
+			see.execute();
+			fail();
+		}
+		catch (JobExecutionException e) {
+			Throwable t = e.getCause();
+			if (!(t != null && t.getCause() instanceof SuccessException)) {
+				throw e;
+			}
+		}
+	}
+
+
+	public static class FailingStateBackend extends AbstractStateBackend {
+		
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void initializeForJob(Environment env, String operatorIdentifier, TypeSerializer<?> keySerializer) throws Exception {
+			throw new SuccessException();
+		}
+
+		@Override
+		public void disposeAllStateForCurrentJob() throws Exception {}
+
+		@Override
+		public void close() throws Exception {}
+
+		@Override
+		protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception {
+			return null;
+		}
+
+		@Override
+		protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
+			return null;
+		}
+
+		@Override
+		protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
+			return null;
+		}
+
+		@Override
+		protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
+			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+			return null;
+		}
+
+		@Override
+		public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID,
+			long timestamp) throws Exception {
+			return null;
+		}
+
+		@Override
+		public <S extends Serializable> StateHandle<S> checkpointStateSerializable(S state,
+			long checkpointID,
+			long timestamp) throws Exception {
+			return null;
+		}
+	}
+
+	static final class SuccessException extends Exception {
+		private static final long serialVersionUID = -9218191172606739598L;
+	}
+
+}


Mime
View raw message