flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [FLINK-2675] [streaming] Integrate Timer Service with StreamTask
Date Wed, 23 Sep 2015 19:12:22 GMT
Repository: flink
Updated Branches:
  refs/heads/master 721204258 -> 717d54c82


http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
new file mode 100644
index 0000000..7a53ceb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.Semaphore;
+
+/**
+ * Tests for the timer service of {@code StreamTask}.
+ *
+ * <p>
+ * These tests ensure that exceptions are properly forwarded from the timer thread to
+ * the task thread and that operator methods are not invoked concurrently.
+ */
+public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
+
+	/**
+	 * Note: this test fails if we don't have the synchronized block in
+	 * {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask.SourceOutput}
+	 *
+	 * <p>
+	 * This test never finishes if exceptions from the timer thread are not forwarded. Thus
+	 * a success here means that the exception forwarding works.
+	 */
+	@Test
+	public void testOperatorChainedToSource() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		DataStream<String> source = env.addSource(new InfiniteTestSource());
+
+		source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(StreamOperator.ChainingStrategy.ALWAYS));
+
+		boolean testSuccess = false;
+		try {
+			env.execute("Timer test");
+		} catch (JobExecutionException e) {
+			if (e.getCause() instanceof TimerException) {
+				TimerException te = (TimerException) e.getCause();
+				if (te.getCause() instanceof RuntimeException) {
+					RuntimeException re = (RuntimeException) te.getCause();
+					if (re.getMessage().equals("TEST SUCCESS")) {
+						testSuccess = true;
+					} else {
+						throw e;
+					}
+				} else {
+					throw e;
+				}
+			} else {
+				throw e;
+			}
+		}
+		Assert.assertTrue(testSuccess);
+	}
+
+	/**
+	 * Note: this test fails if we don't have the synchronized block in
+	 * {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask.SourceOutput}
+	 */
+	@Test
+	public void testOneInputOperatorWithoutChaining() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		DataStream<String> source = env.addSource(new InfiniteTestSource());
+
+		source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(StreamOperator.ChainingStrategy.NEVER));
+
+		boolean testSuccess = false;
+		try {
+			env.execute("Timer test");
+		} catch (JobExecutionException e) {
+			if (e.getCause() instanceof TimerException) {
+				TimerException te = (TimerException) e.getCause();
+				if (te.getCause() instanceof RuntimeException) {
+					RuntimeException re = (RuntimeException) te.getCause();
+					if (re.getMessage().equals("TEST SUCCESS")) {
+						testSuccess = true;
+					} else {
+						throw e;
+					}
+				} else {
+					throw e;
+				}
+			} else {
+				throw e;
+			}
+		}
+		Assert.assertTrue(testSuccess);
+	}
+
+	/**
+	 * Note: this test fails if we don't have the synchronized block in
+	 * {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask.SourceOutput}
+	 */
+	@Test
+	public void testTwoInputOperatorWithoutChaining() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		DataStream<String> source = env.addSource(new InfiniteTestSource());
+
+		source.connect(source).transform(
+				"Custom Operator",
+				BasicTypeInfo.STRING_TYPE_INFO,
+				new TwoInputTimerOperator(StreamOperator.ChainingStrategy.NEVER));
+
+		boolean testSuccess = false;
+		try {
+			env.execute("Timer test");
+		} catch (JobExecutionException e) {
+			if (e.getCause() instanceof TimerException) {
+				TimerException te = (TimerException) e.getCause();
+				if (te.getCause() instanceof RuntimeException) {
+					RuntimeException re = (RuntimeException) te.getCause();
+					if (re.getMessage().equals("TEST SUCCESS")) {
+						testSuccess = true;
+					} else {
+						throw e;
+					}
+				} else {
+					throw e;
+				}
+			} else {
+				throw e;
+			}
+		}
+		Assert.assertTrue(testSuccess);
+	}
+
+	public static class TimerOperator extends AbstractStreamOperator<String> implements
OneInputStreamOperator<String, String>, Triggerable {
+		private static final long serialVersionUID = 1L;
+
+		int numTimers = 0;
+		int numElements = 0;
+
+		private boolean first = true;
+
+		private Semaphore semaphore = new Semaphore(1);
+
+		public TimerOperator(ChainingStrategy chainingStrategy) {
+			setChainingStrategy(chainingStrategy);
+		}
+
+		@Override
+		public void processElement(StreamRecord<String> element) throws Exception {
+			if (!semaphore.tryAcquire()) {
+				Assert.fail("Concurrent invocation of operator functions.");
+			}
+
+			if (first) {
+				getRuntimeContext().registerTimer(System.currentTimeMillis() + 100, this);
+				first = false;
+			}
+			numElements++;
+
+			semaphore.release();
+		}
+
+		@Override
+		public void trigger(long time) throws Exception {
+			if (!semaphore.tryAcquire()) {
+				Assert.fail("Concurrent invocation of operator functions.");
+			}
+
+			try {
+				numTimers++;
+				throwIfDone();
+				getRuntimeContext().registerTimer(System.currentTimeMillis() + 1, this);
+			} finally {
+				semaphore.release();
+			}
+		}
+
+		private void throwIfDone() {
+			if (numTimers > 1000 && numElements > 10_000) {
+				throw new RuntimeException("TEST SUCCESS");
+			}
+		}
+
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			//ignore
+		}
+	}
+
+	public static class TwoInputTimerOperator extends AbstractStreamOperator<String> implements
TwoInputStreamOperator<String, String, String>, Triggerable {
+		private static final long serialVersionUID = 1L;
+
+		int numTimers = 0;
+		int numElements = 0;
+
+		private boolean first = true;
+
+		private Semaphore semaphore = new Semaphore(1);
+
+		public TwoInputTimerOperator(ChainingStrategy chainingStrategy) {
+			setChainingStrategy(chainingStrategy);
+		}
+
+		@Override
+		public void processElement1(StreamRecord<String> element) throws Exception {
+			if (!semaphore.tryAcquire()) {
+				Assert.fail("Concurrent invocation of operator functions.");
+			}
+
+			if (first) {
+				getRuntimeContext().registerTimer(System.currentTimeMillis() + 100, this);
+				first = false;
+			}
+			numElements++;
+
+			semaphore.release();
+		}
+
+		@Override
+		public void processElement2(StreamRecord<String> element) throws Exception {
+			if (!semaphore.tryAcquire()) {
+				Assert.fail("Concurrent invocation of operator functions.");
+			}
+
+			if (first) {
+				getRuntimeContext().registerTimer(System.currentTimeMillis() + 100, this);
+				first = false;
+			}
+			numElements++;
+
+			semaphore.release();
+		}
+
+
+		@Override
+		public void trigger(long time) throws Exception {
+			if (!semaphore.tryAcquire()) {
+				Assert.fail("Concurrent invocation of operator functions.");
+			}
+
+			try {
+				numTimers++;
+				throwIfDone();
+				getRuntimeContext().registerTimer(System.currentTimeMillis() + 1, this);
+			} finally {
+				semaphore.release();
+			}
+		}
+
+		private void throwIfDone() {
+			if (numTimers > 1000 && numElements > 10_000) {
+				throw new RuntimeException("TEST SUCCESS");
+			}
+		}
+
+		@Override
+		public void processWatermark1(Watermark mark) throws Exception {
+			//ignore
+		}
+
+		@Override
+		public void processWatermark2(Watermark mark) throws Exception {
+			//ignore
+		}
+	}
+
+
+	private static class InfiniteTestSource implements SourceFunction<String> {
+		private static final long serialVersionUID = 1L;
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			while (running) {
+				ctx.collect("hello");
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index f9b0b09..3651230 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -61,7 +61,8 @@ public class MockContext<IN, OUT> {
 				new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
 				new ExecutionConfig(), 
 				null, null, 
-				new HashMap<String, Accumulator<?, ?>>());
+				new HashMap<String, Accumulator<?, ?>>(),
+				null);
 
 		operator.setup(mockContext.output, runtimeContext);
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index f404d01..f5ce3fc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 import java.io.Serializable;
@@ -64,7 +65,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 				executionConfig,
 				null,
 				new LocalStateHandle.LocalStateHandleProvider<Serializable>(),
-				new HashMap<String, Accumulator<?, ?>>());
+				new HashMap<String, Accumulator<?, ?>>(),
+				new OneInputStreamTask());
 
 		operator.setup(new MockOutput(), runtimeContext);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 711dd41..428131a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -47,7 +47,8 @@ public class SourceFunctionUtil<T> {
 					new ExecutionConfig(), 
 					null, 
 					new LocalStateHandle.LocalStateHandleProvider<Serializable>(),
-					new HashMap<String, Accumulator<?, ?>>());
+					new HashMap<String, Accumulator<?, ?>>(),
+					null);
 			
 			((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 21e2e1e..2418f19 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -63,8 +64,9 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
 				new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(),
1024),
 				new ExecutionConfig(),
 				null,
-				new LocalStateHandle.LocalStateHandleProvider<Serializable>(),
-				new HashMap<String, Accumulator<?, ?>>());
+				new LocalStateHandle.LocalStateHandleProvider<>(),
+				new HashMap<String, Accumulator<?, ?>>(),
+				new TwoInputStreamTask());
 
 		operator.setup(new MockOutput(), runtimeContext);
 	}
@@ -134,7 +136,7 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
 			if (outputSerializer == null) {
 				outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
 			}
-			outputList.add(new StreamRecord<OUT>(outputSerializer.copy(element.getValue()),
+			outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()),
 					element.getTimestamp()));
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/717d54c8/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index 6e0724d..7387a1e 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -75,7 +75,7 @@ public class GroupedProcessingTimeWindowExample {
 				});
 		
 		stream
-//				.groupBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
+				.groupBy(new FirstFieldKeyExtractor<Tuple2<Long, Long>, Long>())
 //				.window(Time.of(2500, TimeUnit.MILLISECONDS)).every(Time.of(500, TimeUnit.MILLISECONDS))
 //				.reduceWindow(new SummingReducer())
 //				.flatten()


Mime
View raw message