flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [09/13] flink git commit: [FLINK-5969] Add WindowOperatorFrom12MigrationTest
Date Wed, 03 May 2017 14:28:15 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
deleted file mode 100644
index 1168eb0..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ /dev/null
@@ -1,896 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
-import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
-import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-import java.net.URL;
-import java.util.Comparator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.fail;
-
-/**
- * Tests for checking whether {@link WindowOperator} can restore from snapshots that were
done
- * using the Flink 1.1 {@link WindowOperator}.
- *
- * <p>
- * This also checks whether {@link WindowOperator} can restore from a checkpoint of the Flink
1.1
- * aligned processing-time windows operator.
- *
- * <p>For regenerating the binary snapshot file you have to run the commented out portion
- * of each test on a checkout of the Flink 1.1 branch.
- */
-public class WindowOperatorMigrationTest {
-
-	private static String getResourceFilename(String filename) {
-		ClassLoader cl = WindowOperatorMigrationTest.class.getClassLoader();
-		URL resource = cl.getResource(filename);
-		if (resource == null) {
-			throw new NullPointerException("Missing snapshot resource.");
-		}
-		return resource.getFile();
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testRestoreSessionWindowsWithCountTriggerFromFlink11() throws Exception {
-
-		final int SESSION_SIZE = 3;
-
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
-
-		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
-				inputType.createSerializer(new ExecutionConfig()));
-
-		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String,
Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
-				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				stateDesc,
-				new InternalIterableWindowFunction<>(new SessionWindowFunction()),
-				PurgingTrigger.of(CountTrigger.of(4)),
-				0,
-				null /* late data output tag */);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		/*
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String,
Integer>"), new ExecutionConfig());
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long,
Long>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
-
-		// do snapshot and save to file
-		StreamTaskState snapshot = testHarness.snapshot(0, 0);
-		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot");
-		testHarness.close();
-        */
-
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long,
Long>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-				"win-op-migration-test-session-with-stateful-trigger-flink1.1-snapshot"));
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 6000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));
-
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple3ResultSortComparator());
-
-		// add an element that merges the two "key1" sessions, they should now have count 6, and
therfore fire
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), 4500));
-
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L),
9999L));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple3ResultSortComparator());
-
-		testHarness.close();
-	}
-
-	/**
-	 * This checks that we can restore from a virgin {@code WindowOperator} that has never seen
-	 * any elements.
-	 */
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testRestoreSessionWindowsWithCountTriggerInMintConditionFromFlink11() throws
Exception {
-
-		final int SESSION_SIZE = 3;
-
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
-
-		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
-				inputType.createSerializer(new ExecutionConfig()));
-
-		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String,
Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
-				EventTimeSessionWindows.withGap(Time.seconds(SESSION_SIZE)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				stateDesc,
-				new InternalIterableWindowFunction<>(new SessionWindowFunction()),
-				PurgingTrigger.of(CountTrigger.of(4)),
-				0,
-				null /* late data output tag */);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		/*
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String,
Integer>"), new ExecutionConfig());
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long,
Long>> testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		// do snapshot and save to file
-		StreamTaskState snapshot = testHarness.snapshot(0, 0);
-		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot");
-		testHarness.close();
-		*/
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long,
Long>> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-				"win-op-migration-test-session-with-stateful-trigger-mint-flink1.1-snapshot"));
-		testHarness.open();
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 1000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), 2500));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 3500));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 1000));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 2500));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 6000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), 6500));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 7000));
-
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-10", 0L, 6500L), 6499));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple3ResultSortComparator());
-
-		// add an element that merges the two "key1" sessions, they should now have count 6, and
therfore fire
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 10), 4500));
-
-		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-22", 10L, 10000L),
9999L));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple3ResultSortComparator());
-
-		testHarness.close();
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testRestoreReducingEventTimeWindowsFromFlink11() throws Exception {
-		final int WINDOW_SIZE = 3;
-
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
-
-		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
-				new SumReducer(),
-				inputType.createSerializer(new ExecutionConfig()));
-
-		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>,
Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				stateDesc,
-				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String,
TimeWindow, Tuple2<String, Integer>>()),
-				EventTimeTrigger.create(),
-				0,
-				null /* late data output tag */);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		/*
-
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String,
Integer>"), new ExecutionConfig());
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
-
-		testHarness.processWatermark(new Watermark(999));
-		expectedOutput.add(new Watermark(999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(1999));
-		expectedOutput.add(new Watermark(1999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
-
-		// do snapshot and save to file
-		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
-		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-reduce-event-time-flink1.1-snapshot");
-		testHarness.close();
-
-		*/
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-				"win-op-migration-test-reduce-event-time-flink1.1-snapshot"));
-		testHarness.open();
-
-		testHarness.processWatermark(new Watermark(2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
-		expectedOutput.add(new Watermark(2999));
-
-		testHarness.processWatermark(new Watermark(3999));
-		expectedOutput.add(new Watermark(3999));
-
-		testHarness.processWatermark(new Watermark(4999));
-		expectedOutput.add(new Watermark(4999));
-
-		testHarness.processWatermark(new Watermark(5999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999));
-		expectedOutput.add(new Watermark(5999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
-		testHarness.close();
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testRestoreApplyEventTimeWindowsFromFlink11() throws Exception {
-		final int WINDOW_SIZE = 3;
-
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
-
-		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
-				inputType.createSerializer(new ExecutionConfig()));
-
-		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String,
Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				stateDesc,
-				new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
-				EventTimeTrigger.create(),
-				0,
-				null /* late data output tag */);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		/*
-
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String,
Integer>"), new ExecutionConfig());
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		// add elements out-of-order
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
-
-		testHarness.processWatermark(new Watermark(999));
-		expectedOutput.add(new Watermark(999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
-
-		testHarness.processWatermark(new Watermark(1999));
-		expectedOutput.add(new Watermark(1999));
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
-
-		// do snapshot and save to file
-		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
-		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-apply-event-time-flink1.1-snapshot");
-		testHarness.close();
-
-		*/
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-				"win-op-migration-test-apply-event-time-flink1.1-snapshot"));
-		testHarness.open();
-
-		testHarness.processWatermark(new Watermark(2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
-		expectedOutput.add(new Watermark(2999));
-
-		testHarness.processWatermark(new Watermark(3999));
-		expectedOutput.add(new Watermark(3999));
-
-		testHarness.processWatermark(new Watermark(4999));
-		expectedOutput.add(new Watermark(4999));
-
-		testHarness.processWatermark(new Watermark(5999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), 5999));
-		expectedOutput.add(new Watermark(5999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
-		testHarness.close();
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testRestoreReducingProcessingTimeWindowsFromFlink11() throws Exception {
-		final int WINDOW_SIZE = 3;
-
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
-
-		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
-				new SumReducer(),
-				inputType.createSerializer(new ExecutionConfig()));
-
-		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>,
Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				stateDesc,
-				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String,
TimeWindow, Tuple2<String, Integer>>()),
-				ProcessingTimeTrigger.create(),
-				0,
-				null /* late data output tag */);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		/*
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String,
Integer>"), new ExecutionConfig());
-
-		TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), timeServiceProvider);
-
-		testHarness.configureForKeyedStream(new WindowOperatorTest.TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		timeServiceProvider.setCurrentTime(10);
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
-
-		timeServiceProvider.setCurrentTime(3010);
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key3", 1)));
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 1), 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 2999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new WindowOperatorTest.Tuple2ResultSortComparator());
-
-		// do snapshot and save to file
-		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
-		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-reduce-processing-time-flink1.1-snapshot");
-		testHarness.close();
-		*/
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-				"win-op-migration-test-reduce-processing-time-flink1.1-snapshot"));
-		testHarness.open();
-
-		testHarness.setProcessingTime(3020);
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3)));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3)));
-
-		testHarness.setProcessingTime(6000);
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 5999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key3", 1), 5999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
-		testHarness.close();
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testRestoreApplyProcessingTimeWindowsFromFlink11() throws Exception {
-		final int WINDOW_SIZE = 3;
-
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
-
-		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
-				inputType.createSerializer(new ExecutionConfig()));
-
-		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String,
Integer>>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				stateDesc,
-				new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
-				ProcessingTimeTrigger.create(),
-				0,
-				null /* late data output tag */);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		/*
-		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String,
Integer>"), new ExecutionConfig());
-
-		TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-				new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), timeServiceProvider);
-
-		testHarness.configureForKeyedStream(new WindowOperatorTest.TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		timeServiceProvider.setCurrentTime(10);
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1)));
-
-		timeServiceProvider.setCurrentTime(3010);
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1)));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key3", 1)));
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 1), 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 1), 2999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new WindowOperatorTest.Tuple2ResultSortComparator());
-
-		// do snapshot and save to file
-		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
-		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-apply-processing-time-flink1.1-snapshot");
-		testHarness.close();
-		*/
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename(
-				"win-op-migration-test-apply-processing-time-flink1.1-snapshot"));
-		testHarness.open();
-
-		testHarness.setProcessingTime(3020);
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3)));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3)));
-
-		testHarness.setProcessingTime(6000);
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), 5999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key3", 1), 5999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
-		testHarness.close();
-	}
-
-	@Test
-	public void testRestoreAggregatingAlignedProcessingTimeWindowsFromFlink11() throws Exception
{
-		/*
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
-
-		AggregatingProcessingTimeWindowOperator<String, Tuple2<String, Integer>> operator
=
-			new AggregatingProcessingTimeWindowOperator<>(
-				new ReduceFunction<Tuple2<String, Integer>>() {
-					private static final long serialVersionUID = -8913160567151867987L;
-
-					@Override
-					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String,
Integer> value2) throws Exception {
-						return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
-					}
-				},
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				inputType.createSerializer(new ExecutionConfig()),
-				3000,
-				3000);
-
-		TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		testTimeProvider.setCurrentTime(3);
-
-		// timestamp is ignored in processing time
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-
-		// do a snapshot, close and restore again
-		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
-		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot");
-		testHarness.close();
-
-		*/
-
-		final int WINDOW_SIZE = 3;
-
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
-
-		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
-				new SumReducer(),
-				inputType.createSerializer(new ExecutionConfig()));
-
-		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>,
Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				stateDesc,
-				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String,
TimeWindow, Tuple2<String, Integer>>()),
-				ProcessingTimeTrigger.create(),
-				0,
-				null /* late data output tag */,
-				LegacyWindowOperatorType.FAST_AGGREGATING);
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.setup();
-		testHarness.initializeStateFromLegacyCheckpoint("src/test/resources/win-op-migration-test-aggr-aligned-flink1.1-snapshot");
-		testHarness.open();
-
-		testHarness.setProcessingTime(5000);
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-
-		testHarness.setProcessingTime(7000);
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testRestoreAccumulatingAlignedProcessingTimeWindowsFromFlink11() throws Exception
{
-		/*
-
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
-
-		AccumulatingProcessingTimeWindowOperator<String, Tuple2<String, Integer>, Tuple2<String,
Integer>> operator =
-			new AccumulatingProcessingTimeWindowOperator<>(
-				new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String,
TimeWindow>() {
-
-					private static final long serialVersionUID = 6551516443265733803L;
-
-					@Override
-					public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>>
input, Collector<Tuple2<String, Integer>> out) throws Exception {
-						int sum = 0;
-						for (Tuple2<String, Integer> anInput : input) {
-							sum += anInput.f1;
-						}
-						out.collect(new Tuple2<>(s, sum));
-					}
-				},
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				inputType.createSerializer(new ExecutionConfig()),
-				3000,
-				3000);
-
-		TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-			new OneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider);
-
-		testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.open();
-
-		testTimeProvider.setCurrentTime(3);
-
-		// timestamp is ignored in processing time
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), Long.MAX_VALUE));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 7000));
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-
-		// do a snapshot, close and restore again
-		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
-		testHarness.snaphotToFile(snapshot, "src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot");
-		testHarness.close();
-
-		*/
-		final int WINDOW_SIZE = 3;
-
-		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
-
-		ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
-				new SumReducer(),
-				inputType.createSerializer(new ExecutionConfig()));
-
-		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>,
Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingProcessingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
-				new TimeWindow.Serializer(),
-				new TupleKeySelector(),
-				BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				stateDesc,
-				new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String,
TimeWindow, Tuple2<String, Integer>>()),
-				ProcessingTimeTrigger.create(),
-				0,
-				null /* late data output tag */,
-				LegacyWindowOperatorType.FAST_ACCUMULATING);
-
-		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		testHarness.setup();
-		testHarness.initializeStateFromLegacyCheckpoint("src/test/resources/win-op-migration-test-accum-aligned-flink1.1-snapshot");
-		testHarness.open();
-
-		testHarness.setProcessingTime(5000);
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), 2999));
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 2), 2999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
-
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 7000));
-
-		testHarness.setProcessingTime(7000);
-
-		expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), 5999));
-
-		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(),
new Tuple2ResultSortComparator());
-
-		testHarness.close();
-	}
-
-
-	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>,
String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple2<String, Integer> value) throws Exception {
-			return value.f0;
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	private static class Tuple2ResultSortComparator implements Comparator<Object> {
-		@Override
-		public int compare(Object o1, Object o2) {
-			if (o1 instanceof Watermark || o2 instanceof Watermark) {
-				return 0;
-			} else {
-				StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String,
Integer>>) o1;
-				StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String,
Integer>>) o2;
-				if (sr0.getTimestamp() != sr1.getTimestamp()) {
-					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
-				}
-				int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
-				if (comparison != 0) {
-					return comparison;
-				} else {
-					return sr0.getValue().f1 - sr1.getValue().f1;
-				}
-			}
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	private static class Tuple3ResultSortComparator implements Comparator<Object> {
-		@Override
-		public int compare(Object o1, Object o2) {
-			if (o1 instanceof Watermark || o2 instanceof Watermark) {
-				return 0;
-			} else {
-				StreamRecord<Tuple3<String, Long, Long>> sr0 = (StreamRecord<Tuple3<String,
Long, Long>>) o1;
-				StreamRecord<Tuple3<String, Long, Long>> sr1 = (StreamRecord<Tuple3<String,
Long, Long>>) o2;
-				if (sr0.getTimestamp() != sr1.getTimestamp()) {
-					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
-				}
-				int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
-				if (comparison != 0) {
-					return comparison;
-				} else {
-					comparison = (int) (sr0.getValue().f1 - sr1.getValue().f1);
-					if (comparison != 0) {
-						return comparison;
-					}
-					return (int) (sr0.getValue().f2 - sr1.getValue().f2);
-				}
-			}
-		}
-	}
-
-	public static class SumReducer implements ReduceFunction<Tuple2<String, Integer>>
{
-		private static final long serialVersionUID = 1L;
-		@Override
-		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
-				Tuple2<String, Integer> value2) throws Exception {
-			return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
-		}
-	}
-
-	public static class RichSumReducer<W extends Window> extends RichWindowFunction<Tuple2<String,
Integer>, Tuple2<String, Integer>, String, W> {
-		private static final long serialVersionUID = 1L;
-
-		private boolean openCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-		}
-
-		@Override
-		public void apply(String key,
-				W window,
-				Iterable<Tuple2<String, Integer>> input,
-				Collector<Tuple2<String, Integer>> out) throws Exception {
-
-			if (!openCalled) {
-				fail("Open was not called");
-			}
-			int sum = 0;
-
-			for (Tuple2<String, Integer> t: input) {
-				sum += t.f1;
-			}
-			out.collect(new Tuple2<>(key, sum));
-
-		}
-
-	}
-
-	public static class SessionWindowFunction implements WindowFunction<Tuple2<String,
Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void apply(String key,
-				TimeWindow window,
-				Iterable<Tuple2<String, Integer>> values,
-				Collector<Tuple3<String, Long, Long>> out) throws Exception {
-			int sum = 0;
-			for (Tuple2<String, Integer> i: values) {
-				sum += i.f1;
-			}
-			String resultString = key + "-" + sum;
-			out.collect(new Tuple3<>(resultString, window.getStart(), window.getEnd()));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot
b/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot
new file mode 100644
index 0000000..d182bee
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-accum-aligned-flink1.2-snapshot
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot
b/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot
new file mode 100644
index 0000000..83741c1
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-aggr-aligned-flink1.2-snapshot
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot
b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot
new file mode 100644
index 0000000..3411df6
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-event-time-flink1.2-snapshot
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot
b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot
new file mode 100644
index 0000000..060397f
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-apply-processing-time-flink1.2-snapshot
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot
b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot
new file mode 100644
index 0000000..dad4630
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-event-time-flink1.2-snapshot
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot
b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot
new file mode 100644
index 0000000..0edf25e
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-reduce-processing-time-flink1.2-snapshot
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot
b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot
new file mode 100644
index 0000000..d0afbe0
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-flink1.2-snapshot
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/2c6377f2/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot
b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot
new file mode 100644
index 0000000..7d0b6cc
Binary files /dev/null and b/flink-streaming-java/src/test/resources/win-op-migration-test-session-with-stateful-trigger-mint-flink1.2-snapshot
differ


Mime
View raw message