flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: Add ManualWindowSpeedITCase For Assessing State Performance
Date Fri, 19 Aug 2016 14:20:40 GMT
Repository: flink
Updated Branches:
  refs/heads/master f0fef6f44 -> b7ae3e533


Add ManualWindowSpeedITCase For Assessing State Performance

This should be used to test whether there are any obvious performance
regressions between releases. Somewhat similar to the other manual tests
that have @Ignore set these have to be run manually. (for now)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7ae3e53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7ae3e53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7ae3e53

Branch: refs/heads/master
Commit: b7ae3e53382258d9f811b6c34cd0df9564b00370
Parents: f0fef6f
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Aug 17 14:24:51 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Aug 19 16:20:24 2016 +0200

----------------------------------------------------------------------
 .../test/state/ManualWindowSpeedITCase.java     | 260 +++++++++++++++++++
 1 file changed, 260 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7ae3e53/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
new file mode 100644
index 0000000..428c47c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * A collection of manual tests that serve to assess the performance of windowed operations.
These
+ * run in local mode with parallelism 1 with a source that emits data as fast as possible.
Thus,
+ * these mostly test the performance of the state backend.
+ *
+ * <p>When doing a release we should manually run theses tests on the version that
is to be released
+ * and on older version to see if there are performance regressions.
+ *
+ * <p>When a test is executed it will output how many elements of key {@code "Tuple
0"} have
+ * been processed in each window. This gives an estimate of the throughput.
+ */
+@Ignore
+public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void testTumblingIngestionTimeWindowsWithFsBackend() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.setParallelism(1);
+
+		String checkpoints = tempFolder.newFolder().toURI().toString();
+		env.setStateBackend(new FsStateBackend(checkpoints));
+
+		env.addSource(new InfiniteTupleSource(10_000))
+				.keyBy(0)
+				.timeWindow(Time.seconds(3))
+				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+							Tuple2<String, Integer> value2) throws Exception {
+						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
+					}
+				})
+				.filter(new FilterFunction<Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public boolean filter(Tuple2<String, Integer> value) throws Exception {
+						return value.f0.startsWith("Tuple 0");
+					}
+				})
+				.print();
+
+		env.execute();
+	}
+
+	@Test
+	public void testTumblingIngestionTimeWindowsWithFsBackendWithLateness() throws Exception
{
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.setParallelism(1);
+
+		String checkpoints = tempFolder.newFolder().toURI().toString();
+		env.setStateBackend(new FsStateBackend(checkpoints));
+
+		env.addSource(new InfiniteTupleSource(10_000))
+				.keyBy(0)
+				.timeWindow(Time.seconds(3))
+				.allowedLateness(Time.seconds(1))
+				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+							Tuple2<String, Integer> value2) throws Exception {
+						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
+					}
+				})
+				.filter(new FilterFunction<Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public boolean filter(Tuple2<String, Integer> value) throws Exception {
+						return value.f0.startsWith("Tuple 0");
+					}
+				})
+				.print();
+
+		env.execute();
+	}
+
+	@Test
+	public void testTumblingIngestionTimeWindowsWithRocksDBBackend() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.setParallelism(1);
+
+		String checkpoints = tempFolder.newFolder().toURI().toString();
+		env.setStateBackend(new RocksDBStateBackend(checkpoints, new MemoryStateBackend()));
+
+		env.addSource(new InfiniteTupleSource(10_000))
+				.keyBy(0)
+				.timeWindow(Time.seconds(3))
+				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+							Tuple2<String, Integer> value2) throws Exception {
+						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
+					}
+				})
+				.filter(new FilterFunction<Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public boolean filter(Tuple2<String, Integer> value) throws Exception {
+						return value.f0.startsWith("Tuple 0");
+					}
+				})
+				.print();
+
+		env.execute();
+	}
+
+	@Test
+	public void testTumblingIngestionTimeWindowsWithRocksDBBackendWithLateness() throws Exception
{
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		env.setParallelism(1);
+
+		String rocksDbBackups = tempFolder.newFolder().toURI().toString();
+		env.setStateBackend(new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()));
+
+		env.addSource(new InfiniteTupleSource(10_000))
+				.keyBy(0)
+				.timeWindow(Time.seconds(3))
+				.allowedLateness(Time.seconds(1))
+				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+							Tuple2<String, Integer> value2) throws Exception {
+						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
+					}
+				})
+				.filter(new FilterFunction<Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public boolean filter(Tuple2<String, Integer> value) throws Exception {
+						return value.f0.startsWith("Tuple 0");
+					}
+				})
+				.print();
+
+		env.execute();
+	}
+
+	@Test
+	public void testAlignedProcessingTimeWindows() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+		env.setParallelism(1);
+
+		String rocksDbBackups = tempFolder.newFolder().toURI().toString();
+		env.setStateBackend(new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend()));
+
+		env.addSource(new InfiniteTupleSource(10_000))
+				.keyBy(0)
+				.timeWindow(Time.seconds(3))
+				.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
+							Tuple2<String, Integer> value2) throws Exception {
+						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
+					}
+				})
+				.filter(new FilterFunction<Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public boolean filter(Tuple2<String, Integer> value) throws Exception {
+						return value.f0.startsWith("Tuple 0");
+					}
+				})
+				.print();
+
+		env.execute();
+	}
+
+	/**
+	 * A source that emits elements with a fixed set of keys as fast as possible. Used for
+	 * rough performance estimation.
+	 */
+	public static class InfiniteTupleSource implements ParallelSourceFunction<Tuple2<String,
Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private int numKeys;
+
+		private volatile boolean running = true;
+
+		public InfiniteTupleSource(int numKeys) {
+			this.numKeys = numKeys;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<String, Integer>> out) throws Exception
{
+			long index = 0;
+			while (running) {
+				Tuple2<String, Integer> tuple = new Tuple2<String, Integer>("Tuple " + (index
% numKeys), 1);
+				out.collect(tuple);
+				index++;
+			}
+		}
+
+		@Override
+		public void cancel() {
+			this.running = false;
+		}
+	}
+}


Mime
View raw message