flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-3648] Introduce Trigger Test Harness
Date Thu, 07 Jul 2016 15:24:44 GMT
Repository: flink
Updated Branches:
  refs/heads/master 2302f74b6 -> 7fb94173e


[FLINK-3648] Introduce Trigger Test Harness


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fb94173
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fb94173
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fb94173

Branch: refs/heads/master
Commit: 7fb94173e38bba25c4caf121f6105c4556d1983f
Parents: 2302f74
Author: kl0u <kkloudas@gmail.com>
Authored: Mon Jul 4 12:21:55 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Jul 7 17:24:18 2016 +0200

----------------------------------------------------------------------
 .../windowing/WindowingTestHarnessTest.java     | 233 +++++++++++++++++++
 .../streaming/util/WindowingTestHarness.java    | 229 ++++++++++++++++++
 2 files changed, 462 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7fb94173/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
new file mode 100644
index 0000000..7242e1c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.streaming.util.WindowingTestHarness;
+import org.junit.Test;
+
+public class WindowingTestHarnessTest {
+
+	@Test
+	public void testEventTimeTumblingWindows() throws Exception {
+		final int WINDOW_SIZE = 2000;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
+
+		TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
+
+		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness
= new WindowingTestHarness<>(
+			new ExecutionConfig(),
+			windowAssigner,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			inputType,
+			new TupleKeySelector(),
+			EventTimeTrigger.create(),
+			0);
+
+		// normal element
+		testHarness.processElement(new Tuple2<>("key2", 1), 1000);
+		testHarness.processWatermark(1985);
+
+		testHarness.addExpectedWatermark(1985);
+
+		// this will not be dropped because window.maxTimestamp() + allowedLateness > currentWatermark
+		testHarness.processElement(new Tuple2<>("key2", 1), 1980);
+
+		// dropped as late
+		testHarness.processElement(new Tuple2<>("key2", 1), 1998);
+
+		testHarness.processElement(new Tuple2<>("key2", 1), 2001);
+		testHarness.processWatermark(2999);
+
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999);
+		testHarness.addExpectedWatermark(2999);
+
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 3999);
+
+		testHarness.processWatermark(3999);
+		testHarness.addExpectedWatermark(3999);
+
+		testHarness.compareActualToExpectedOutput("Output is not correct");
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testProcessingTimeTumblingWindows() throws Exception {
+		final int WINDOW_SIZE = 3000;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
+
+		TumblingProcessingTimeWindows windowAssigner = TumblingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
+
+		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness
= new WindowingTestHarness<>(
+			new ExecutionConfig(),
+			windowAssigner,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			inputType,
+			new TupleKeySelector(),
+			ProcessingTimeTrigger.create(),
+			0);
+
+		testHarness.setProcessingTime(3);
+
+		// timestamp is ignored in processing time
+		testHarness.processElement(new Tuple2<>("key2", 1), Long.MAX_VALUE);
+		testHarness.processElement(new Tuple2<>("key2", 1), 7000);
+		testHarness.processElement(new Tuple2<>("key2", 1), 7000);
+
+		testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+		testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+
+		testHarness.setProcessingTime(5000);
+
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999);
+		testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999);
+		testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999);
+
+		testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+		testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+		testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+		testHarness.processElement(new Tuple2<>("key1", 1), 7000);
+
+		testHarness.setProcessingTime(7000);
+
+		testHarness.addExpectedElement(new Tuple2<>("key1", 1), 5999);
+		testHarness.addExpectedElement(new Tuple2<>("key1", 1), 5999);
+		testHarness.addExpectedElement(new Tuple2<>("key1", 1), 5999);
+
+		testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testSnapshotingAndRecovery() throws Exception {
+
+		final int WINDOW_SIZE = 3000;
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String,
Integer>");
+
+		TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
+
+		WindowingTestHarness<String, Tuple2<String, Integer>, TimeWindow> testHarness
= new WindowingTestHarness<>(
+			new ExecutionConfig(),
+			windowAssigner,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			inputType,
+			new TupleKeySelector(),
+			EventTimeTrigger.create(),
+			0);
+
+		// add elements out-of-order
+		testHarness.processElement(new Tuple2<>("key2", 1), 3999);
+		testHarness.processElement(new Tuple2<>("key2", 1), 3000);
+
+		testHarness.processElement(new Tuple2<>("key1", 1), 20);
+		testHarness.processElement(new Tuple2<>("key1", 1), 0);
+		testHarness.processElement(new Tuple2<>("key1", 1), 999);
+
+		testHarness.processElement(new Tuple2<>("key2", 1), 1998);
+		testHarness.processElement(new Tuple2<>("key2", 1), 1999);
+		testHarness.processElement(new Tuple2<>("key2", 1), 1000);
+
+		testHarness.processWatermark(999);
+		testHarness.addExpectedWatermark(999);
+
+		testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+		testHarness.processWatermark(1999);
+		testHarness.addExpectedWatermark(1999);
+		testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+		// do a snapshot, close and restore again
+		StreamTaskState snapshot = testHarness.snapshot(0L, 0L);
+		testHarness.close();
+		testHarness.restore(snapshot, 10L);
+
+		testHarness.processWatermark(2999);
+
+		testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999);
+		testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999);
+		testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999);
+
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999);
+
+		testHarness.addExpectedWatermark(2999);
+
+		testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+		testHarness.processWatermark(3999);
+		testHarness.addExpectedWatermark(3999);
+
+		testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+		testHarness.processWatermark(4999);
+		testHarness.addExpectedWatermark(4999);
+
+		testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+		testHarness.processWatermark(5999);
+
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 5999);
+		testHarness.addExpectedElement(new Tuple2<>("key2", 1), 5999);
+		testHarness.addExpectedWatermark(5999);
+
+		testHarness.compareActualToExpectedOutput("Output was not correct.");
+
+
+		// those don't have any effect...
+		testHarness.processWatermark(6999);
+		testHarness.processWatermark(7999);
+
+		testHarness.addExpectedWatermark(6999);
+		testHarness.addExpectedWatermark(7999);
+
+		testHarness.compareActualToExpectedOutput("Output was not correct.");
+	}
+
+	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>,
String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Tuple2<String, Integer> value) throws Exception {
+			return value.f0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fb94173/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
new file mode 100644
index 0000000..faf38a3
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Comparator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * A utility class that facilitates the testing of custom {@link Trigger Triggers} and
+ * {@link WindowAssigner WindowAssigners}.
+ *
+ * <p>For examples on how to use this class, see
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowingTestHarnessTest}.
+ *
+ * <p>The input elements of type {@code IN} must implement the {@code equals()} method
because
+ * it is used to compare the expected output to the actual output.
+ */
+public class WindowingTestHarness<K, IN, W extends Window> {
+
+	private final TestTimeServiceProvider timeServiceProvider;
+
+	private final OneInputStreamOperatorTestHarness<IN, IN> testHarness;
+
+	private final ConcurrentLinkedQueue<Object> expectedOutputs = new ConcurrentLinkedQueue<>();
+
+	private volatile boolean isOpen = false;
+
+	public WindowingTestHarness(ExecutionConfig executionConfig,
+								WindowAssigner<? super IN, W> windowAssigner,
+								TypeInformation<K> keyType,
+								TypeInformation<IN> inputType,
+								KeySelector<IN, K> keySelector,
+								Trigger<? super IN, ? super W> trigger,
+								long allowedLateness) {
+
+		ListStateDescriptor<IN> windowStateDesc =
+				new ListStateDescriptor<>("window-contents", inputType.createSerializer(executionConfig));
+
+		WindowOperator<K, IN, Iterable<IN>, IN, W> operator =
+			new WindowOperator<>(
+				windowAssigner,
+				windowAssigner.getWindowSerializer(executionConfig),
+				keySelector,
+				keyType.createSerializer(executionConfig),
+				windowStateDesc,
+				new InternalIterableWindowFunction<>(new PassThroughFunction()),
+				trigger,
+				allowedLateness);
+
+		operator.setInputType(inputType, executionConfig);
+
+		timeServiceProvider = new TestTimeServiceProvider();
+		testHarness = new OneInputStreamOperatorTestHarness<>(operator, executionConfig,
timeServiceProvider);
+		testHarness.configureForKeyedStream(keySelector, keyType);
+	}
+
+	/**
+	 * Simulates the processing of a new incoming element.
+	 */
+	public void processElement(IN element, long timestamp) throws Exception {
+		openOperator();
+		testHarness.processElement(new StreamRecord<>(element, timestamp));
+	}
+
+	/**
+	 * Simulates the processing of a new incoming watermark.
+	 */
+	public void processWatermark(long timestamp) throws Exception {
+		openOperator();
+		testHarness.processWatermark(new Watermark(timestamp));
+	}
+
+	/**
+	 * Sets the current processing time to {@code timestamp}.
+	 * This is useful when working on processing time.
+	 */
+	public void setProcessingTime(long timestamp) throws Exception {
+		openOperator();
+		timeServiceProvider.setCurrentTime(timestamp);
+	}
+
+	/**
+	 * Gets the current output of the windowing operator, as produced by the
+	 * synergies between the window assigner and the trigger. This will also
+	 * contain the received watermarks.
+	 */
+	public ConcurrentLinkedQueue<Object> getOutput() throws Exception {
+		return testHarness.getOutput();
+	}
+
+	/**
+	 * Closes the testing window operator.
+	 */
+	public void close() throws Exception {
+		if (isOpen) {
+			testHarness.close();
+			isOpen = false;
+		}
+	}
+
+	/**
+	 * Adds a watermark to the expected output.
+	 *
+	 * <p>The expected output should contain the elements and watermarks that we expect
the output of the operator to
+	 * contain, in the correct order. This will be used to check if the produced output is the
expected one, and
+	 * thus determine the success or failure of the test.
+	 */
+	public void addExpectedWatermark(long timestamp) {
+		expectedOutputs.add(new Watermark(timestamp));
+	}
+
+	/**
+	 * Adds an element to the expected output.
+	 *
+	 * <p>The expected output should contain the elements and watermarks that we expect
the output of the operator to
+	 * contain, in the correct order. This will be used to check if the produced output is the
expected one, and
+	 * thus determine the success or failure of the test.
+	 */
+	public void addExpectedElement(IN element, long timestamp) {
+		expectedOutputs.add(new StreamRecord<>(element, timestamp));
+	}
+
+	/**
+	 * Compares the current produced output with the expected one. The latter contains elements
and watermarks added
+	 * using the {@link #addExpectedElement(Object, long)} and {@link #addExpectedWatermark(long)}
methods.
+	 *
+	 * <p><b>NOTE:</b> This methods uses an {@code assert()} internally, thus
failing the test if the {@code expected} output
+     * does not match the {@code actual} one.
+	 */
+	public void compareActualToExpectedOutput(String errorMessage) {
+		TestHarnessUtil.assertOutputEqualsSorted(errorMessage, expectedOutputs, testHarness.getOutput(),
new StreamRecordComparator());
+	}
+
+	/**
+	 * Takes a snapshot of the current state of the operator. This can be used to test fault-tolerance.
+	 */
+	public StreamTaskState snapshot(long checkpointId, long timestamp) throws Exception {
+		return testHarness.snapshot(checkpointId, timestamp);
+	}
+
+	/**
+	 * Resumes execution from a provided {@link StreamTaskState}. This is used to test recovery
after a failure.
+	 */
+	public void restore(StreamTaskState snapshot, long recoveryTime) throws Exception {
+		Preconditions.checkArgument(!isOpen,
+			"You are trying to restore() while the operator is still open. " +
+				"Please call close() first.");
+
+		testHarness.setup();
+		testHarness.restore(snapshot, recoveryTime);
+		openOperator();
+	}
+
+	private void openOperator() throws Exception {
+		if (!isOpen) {
+			testHarness.open();
+			isOpen = true;
+		}
+	}
+
+	private class PassThroughFunction implements WindowFunction<IN, IN, K, W> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void apply(K k, W window, Iterable<IN> input, Collector<IN> out) throws
Exception {
+			for (IN in: input) {
+				out.collect(in);
+			}
+		}
+	}
+
+	/**
+	 * {@link Comparator} for sorting the expected and actual output by timestamp.
+	 */
+	@SuppressWarnings("unchecked")
+	private class StreamRecordComparator implements Comparator<Object> {
+		@Override
+		public int compare(Object o1, Object o2) {
+			if (o1 instanceof Watermark || o2 instanceof Watermark) {
+				return 0;
+			} else {
+				StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String,
Integer>>) o1;
+				StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String,
Integer>>) o2;
+				if (sr0.getTimestamp() != sr1.getTimestamp()) {
+					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
+				}
+				int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
+				if (comparison != 0) {
+					return comparison;
+				} else {
+					return sr0.getValue().f1 - sr1.getValue().f1;
+				}
+			}
+		}
+	}
+}


Mime
View raw message