flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-6783] Changed passing index of type argument while extracting return type.
Date Thu, 08 Jun 2017 09:41:05 GMT
Repository: flink
Updated Branches:
  refs/heads/master 1cc1bb41e -> bcaf816dc


http://git-wip-us.apache.org/repos/asf/flink/blob/bcaf816d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index ced27b6..8748ed4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -260,23 +260,28 @@ public class WindowTranslationTest {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello",
1), Tuple2.of("hello", 2));
+		DataStream<Integer> source = env.fromElements(1, 2);
 
-		DataStream<Tuple3<String, String, Integer>> window1 = source
-				.keyBy(new TupleKeySelector())
+		DataStream<String> window1 = source
+				.keyBy(new KeySelector<Integer, String>() {
+					@Override
+					public String getKey(Integer value) throws Exception {
+						return value.toString();
+					}
+				})
 				.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
 				.evictor(CountEvictor.of(5))
 				.process(new TestProcessWindowFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String,
Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
operator = transform.getOperator();
+		final OneInputTransformation<Integer, String> transform = (OneInputTransformation<Integer,
String>) window1.getTransformation();
+		final OneInputStreamOperator<Integer, String> operator = transform.getOperator();
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String,
Tuple2<String, Integer>, ?, ?, ?>) operator;
+		WindowOperator<String, Integer, ?, ?, ?> winOperator = (WindowOperator<String,
Integer, ?, ?, ?>) operator;
 		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator.getWindowAssigner() instanceof EventTimeSessionWindows);
 		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
 
-		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO,
new Tuple2<>("hello", 1));
+		processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO,
 1);
 	}
 
 	// ------------------------------------------------------------------------
@@ -604,28 +609,30 @@ public class WindowTranslationTest {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello",
1), Tuple2.of("hello", 2));
+		DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
+			Tuple3.of("hello", "hallo", 1),
+			Tuple3.of("hello", "hallo", 2));
 
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.keyBy(new TupleKeySelector())
+		DataStream<Integer> window1 = source
+				.keyBy(new Tuple3KeySelector())
 				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.aggregate(new DummyAggregationFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>
transform =
-				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>)
window1.getTransformation();
+		final OneInputTransformation<Tuple3<String, String, Integer>, Integer> transform
=
+			(OneInputTransformation<Tuple3<String, String, Integer>, Integer>) window1.getTransformation();
 
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>
operator = transform.getOperator();
+		final OneInputStreamOperator<Tuple3<String, String, Integer>, Integer> operator
= transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
-				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator
=
+				(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
 		Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
 
 		processElementAndEnsureOutput(
-				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello",
1));
+				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello",
"hallo", 1));
 	}
 
 	@Test
@@ -633,28 +640,30 @@ public class WindowTranslationTest {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello",
1), Tuple2.of("hello", 2));
+		DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
+			Tuple3.of("hello", "hallo", 1),
+			Tuple3.of("hello", "hallo", 2));
 
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.keyBy(new TupleKeySelector())
+		DataStream<Integer> window1 = source
+				.keyBy(new Tuple3KeySelector())
 				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.aggregate(new DummyAggregationFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>
transform =
-				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>)
window1.getTransformation();
+		final OneInputTransformation<Tuple3<String, String, Integer>, Integer> transform
=
+			(OneInputTransformation<Tuple3<String, String, Integer>, Integer>) window1.getTransformation();
 
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>
operator = transform.getOperator();
+		final OneInputStreamOperator<Tuple3<String, String, Integer>, Integer> operator
= transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
-				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator
=
+				(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
 		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
 		Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
 
 		processElementAndEnsureOutput(
-				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello",
1));
+				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello",
"hallo", 1));
 	}
 
 	@Test
@@ -662,30 +671,32 @@ public class WindowTranslationTest {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello",
1), Tuple2.of("hello", 2));
+		DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
+			Tuple3.of("hello", "hallo", 1),
+			Tuple3.of("hello", "hallo", 2));
 
 		DummyReducer reducer = new DummyReducer();
 
-		DataStream<Tuple3<String, String, Integer>> window = source
-				.keyBy(new TupleKeySelector())
+		DataStream<String> window = source
+				.keyBy(new Tuple3KeySelector())
 				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.aggregate(new DummyAggregationFunction(), new TestWindowFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
transform =
-				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>)
window.getTransformation();
+		final OneInputTransformation<Tuple3<String, String, Integer>, String> transform
=
+			(OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();
 
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
operator = transform.getOperator();
+		final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator
= transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
-				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator
=
+			(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
 		Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
 
 		processElementAndEnsureOutput(
-				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello",
1));
+				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello",
"hallo", 1));
 	}
 
 	@Test
@@ -693,28 +704,30 @@ public class WindowTranslationTest {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello",
1), Tuple2.of("hello", 2));
+		DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
+			Tuple3.of("hello", "hallo", 1),
+			Tuple3.of("hello", "hallo", 2));
 
-		DataStream<Tuple3<String, String, Integer>> window = source
-				.keyBy(new TupleKeySelector())
+		DataStream<String> window = source
+				.keyBy(new Tuple3KeySelector())
 				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.aggregate(new DummyAggregationFunction(), new TestWindowFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
transform =
-				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>)
window.getTransformation();
+		final OneInputTransformation<Tuple3<String, String, Integer>, String> transform
=
+			(OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();
 
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
operator = transform.getOperator();
+		final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator
= transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
-				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator
=
+			(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
 		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
 		Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
 
 		processElementAndEnsureOutput(
-				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello",
1));
+				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello",
"hallo", 1));
 	}
 
 	@Test
@@ -722,30 +735,30 @@ public class WindowTranslationTest {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello",
1), Tuple2.of("hello", 2));
+		DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
+			Tuple3.of("hello", "hallo", 1),
+			Tuple3.of("hello", "hallo", 2));
 
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple3<String, String, Integer>> window = source
-				.keyBy(new TupleKeySelector())
+		DataStream<String> window = source
+				.keyBy(new Tuple3KeySelector())
 				.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
transform =
-				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>)
window.getTransformation();
+		final OneInputTransformation<Tuple3<String, String, Integer>, String> transform
=
+			(OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();
 
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
operator = transform.getOperator();
+		final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator
= transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
-				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator
=
+			(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
 		Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
 
 		processElementAndEnsureOutput(
-				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello",
1));
+				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello",
"hallo", 1));
 	}
 
 	@Test
@@ -753,28 +766,30 @@ public class WindowTranslationTest {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello",
1), Tuple2.of("hello", 2));
+		DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
+			Tuple3.of("hello", "hallo", 1),
+			Tuple3.of("hello", "hallo", 2));
 
-		DataStream<Tuple3<String, String, Integer>> window = source
-				.keyBy(new TupleKeySelector())
+		DataStream<String> window = source
+				.keyBy(new Tuple3KeySelector())
 				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.aggregate(new DummyAggregationFunction(), new TestProcessWindowFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
transform =
-				(OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>)
window.getTransformation();
+		final OneInputTransformation<Tuple3<String, String, Integer>, String> transform
=
+			(OneInputTransformation<Tuple3<String, String, Integer>, String>) window.getTransformation();
 
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>>
operator = transform.getOperator();
+		final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator
= transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
-				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator
=
+			(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
 		Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
 		Assert.assertTrue(winOperator.getStateDescriptor() instanceof AggregatingStateDescriptor);
 
 		processElementAndEnsureOutput(
-				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello",
1));
+				operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello",
"hallo", 1));
 	}
 
 	// ------------------------------------------------------------------------
@@ -1406,29 +1421,31 @@ public class WindowTranslationTest {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello",
1), Tuple2.of("hello", 2));
+		DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
+			Tuple3.of("hello", "hallo", 1),
+			Tuple3.of("hello", "hallo", 2));
 
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.keyBy(new TupleKeySelector())
+		DataStream<Integer> window1 = source
+				.keyBy(new Tuple3KeySelector())
 				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.evictor(CountEvictor.of(100))
 				.aggregate(new DummyAggregationFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>
transform =
-				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>)
window1.getTransformation();
+		final OneInputTransformation<Tuple3<String, String, Integer>, Integer> transform
=
+			(OneInputTransformation<Tuple3<String, String, Integer>, Integer>) window1.getTransformation();
 
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>
operator = transform.getOperator();
+		final OneInputStreamOperator<Tuple3<String, String, Integer>, Integer> operator
= transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
-				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator
=
+				(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
 		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
 
 		processElementAndEnsureOutput(
-				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello",
1));
+				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello",
"hallo", 1));
 	}
 
 	@Test
@@ -1436,42 +1453,33 @@ public class WindowTranslationTest {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello",
1), Tuple2.of("hello", 2));
+		DataStream<Tuple3<String, String, Integer>> source = env.fromElements(
+			Tuple3.of("hello", "hallo", 1),
+			Tuple3.of("hello", "hallo", 2));
 
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.keyBy(new TupleKeySelector())
+		DataStream<String> window1 = source
+				.keyBy(new Tuple3KeySelector())
 				.window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.evictor(CountEvictor.of(100))
 				.aggregate(
 						new DummyAggregationFunction(),
-						new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>,
String, TimeWindow>() {
-							@Override
-							public void process(
-									String s,
-									Context context,
-									Iterable<Tuple2<String, Integer>> elements,
-									Collector<Tuple2<String, Integer>> out) throws Exception {
-								for (Tuple2<String, Integer> in : elements) {
-									out.collect(in);
-								}
-							}
-						});
+						new TestProcessWindowFunction());
 
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>
transform =
-				(OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>)
window1.getTransformation();
+		final OneInputTransformation<Tuple3<String, String, Integer>, String> transform
=
+			(OneInputTransformation<Tuple3<String, String, Integer>, String>) window1.getTransformation();
 
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>
operator = transform.getOperator();
+		final OneInputStreamOperator<Tuple3<String, String, Integer>, String> operator
= transform.getOperator();
 
 		Assert.assertTrue(operator instanceof WindowOperator);
-		WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
-				(WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+		WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?> winOperator
=
+				(WindowOperator<String, Tuple3<String, String, Integer>, ?, ?, ?>) operator;
 
 		Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
 		Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
 		Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
 
 		processElementAndEnsureOutput(
-				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello",
1));
+				winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple3<>("hello",
"hallo", 1));
 	}
 
 	@Test
@@ -1683,7 +1691,7 @@ public class WindowTranslationTest {
 	}
 
 	private static class DummyAggregationFunction
-			implements AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>,
Tuple2<String, Integer>> {
+			implements AggregateFunction<Tuple3<String, String, Integer>, Tuple2<String,
Integer>, Integer> {
 
 		@Override
 		public Tuple2<String, Integer> createAccumulator() {
@@ -1691,14 +1699,14 @@ public class WindowTranslationTest {
 		}
 
 		@Override
-		public void add(Tuple2<String, Integer> value, Tuple2<String, Integer> accumulator)
{
+		public void add(Tuple3<String, String, Integer> value, Tuple2<String, Integer>
accumulator) {
 			accumulator.f0 = value.f0;
-			accumulator.f1 = value.f1;
+			accumulator.f1 = value.f2;
 		}
 
 		@Override
-		public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator)
{
-			return accumulator;
+		public Integer getResult(Tuple2<String, Integer> accumulator) {
+			return accumulator.f1;
 		}
 
 		@Override
@@ -1729,31 +1737,31 @@ public class WindowTranslationTest {
 	}
 
 	private static class TestWindowFunction
-			implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, String,
Integer>, String, TimeWindow> {
+			implements WindowFunction<Integer, String, String, TimeWindow> {
 
 		@Override
 		public void apply(String key,
 				TimeWindow window,
-				Iterable<Tuple2<String, Integer>> values,
-				Collector<Tuple3<String, String, Integer>> out) throws Exception {
+				Iterable<Integer> values,
+				Collector<String> out) throws Exception {
 
-			for (Tuple2<String, Integer> in : values) {
-				out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+			for (Integer in : values) {
+				out.collect(in.toString());
 			}
 		}
 	}
 
 	private static class TestProcessWindowFunction
-			extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, String,
Integer>, String, TimeWindow> {
+			extends ProcessWindowFunction<Integer, String, String, TimeWindow> {
 
 		@Override
 		public void process(String key,
 				Context ctx,
-				Iterable<Tuple2<String, Integer>> values,
-				Collector<Tuple3<String, String, Integer>> out) throws Exception {
+				Iterable<Integer> values,
+				Collector<String> out) throws Exception {
 
-			for (Tuple2<String, Integer> in : values) {
-				out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+			for (Integer in : values) {
+				out.collect(in.toString());
 			}
 		}
 	}
@@ -1765,4 +1773,12 @@ public class WindowTranslationTest {
 			return value.f0;
 		}
 	}
+
+	private static class Tuple3KeySelector implements KeySelector<Tuple3<String, String,
Integer>, String> {
+
+		@Override
+		public String getKey(Tuple3<String, String, Integer> value) throws Exception {
+			return value.f0;
+		}
+	}
 }


Mime
View raw message