flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [16/17] flink git commit: [FLINK-4700] [tests] Expand and harden TimeServiceProvider test
Date Wed, 05 Oct 2016 22:17:06 GMT
[FLINK-4700] [tests] Expand and harden TimeServiceProvider test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4fc54e3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4fc54e3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4fc54e3e

Branch: refs/heads/master
Commit: 4fc54e3eb341a049529476ef966380d183d099d4
Parents: 8aea8c8
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Oct 5 16:44:56 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Oct 5 20:31:56 2016 +0200

----------------------------------------------------------------------
 .../AbstractFetcherTimestampsTest.java          |   2 +-
 .../runtime/operators/TestTimeProviderTest.java | 113 ++++++++
 .../runtime/operators/TimeProviderTest.java     | 269 -------------------
 ...AlignedProcessingTimeWindowOperatorTest.java |   2 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |   2 +-
 .../tasks/DefaultTimeServiceProviderTest.java   | 136 +++++++++-
 6 files changed, 251 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index c3ba7b7..9b5d2e6 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
new file mode 100644
index 0000000..a8f2dc4
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+public class TestTimeProviderTest {
+
+	@Test
+	public void testCustomTimeServiceProvider() throws Throwable {
+		TestTimeServiceProvider tp = new TestTimeServiceProvider();
+
+		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
+		mapTask.setTimeService(tp);
+
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+			mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+
+		StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>());
+		streamConfig.setStreamOperator(mapOperator);
+
+		testHarness.invoke();
+
+		assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 0);
+
+		tp.setCurrentTime(11);
+		assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 11);
+
+		tp.setCurrentTime(15);
+		tp.setCurrentTime(16);
+		assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 16);
+
+		// register 2 tasks
+		mapTask.getTimerService().registerTimer(30, new Triggerable() {
+			@Override
+			public void trigger(long timestamp) {
+
+			}
+		});
+
+		mapTask.getTimerService().registerTimer(40, new Triggerable() {
+			@Override
+			public void trigger(long timestamp) {
+
+			}
+		});
+
+		assertEquals(2, tp.getNumRegisteredTimers());
+
+		tp.setCurrentTime(35);
+		assertEquals(1, tp.getNumRegisteredTimers());
+
+		tp.setCurrentTime(40);
+		assertEquals(0, tp.getNumRegisteredTimers());
+
+		tp.shutdownService();
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler {
+
+		private final AtomicReference<Throwable> errorReference;
+
+		public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference)
{
+			this.errorReference = errorReference;
+		}
+
+		@Override
+		public void handleAsyncException(String message, Throwable exception) {
+			errorReference.compareAndSet(null, exception);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
deleted file mode 100644
index 8d3e621..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-public class TimeProviderTest {
-
-	@Test
-	public void testDefaultTimeProvider() throws InterruptedException {
-		final OneShotLatch latch = new OneShotLatch();
-
-		final Object lock = new Object();
-		final AtomicReference<Throwable> error = new AtomicReference<>();
-		
-		TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider(
-				new ReferenceSettingExceptionHandler(error), lock);
-
-		final List<Long> timestamps = new ArrayList<>();
-
-		long interval = 50L;
-		final long noOfTimers = 20;
-
-		// we add 2 timers per iteration minus the first that would have a negative timestamp
-		final long expectedNoOfTimers = 2 * noOfTimers;
-
-		for (int i = 0; i < noOfTimers; i++) {
-
-			// we add a delay (100ms) so that both timers are inserted before the first is processed.
-			// If not, and given that we add timers out of order, we may have a timer firing
-			// before the next one (with smaller timestamp) is added.
-
-			double nextTimer = timeServiceProvider.getCurrentProcessingTime() + 100 + i * interval;
-
-			timeServiceProvider.registerTimer((long) nextTimer, new Triggerable() {
-				@Override
-				public void trigger(long timestamp) throws Exception {
-					timestamps.add(timestamp);
-					if (timestamps.size() == expectedNoOfTimers) {
-						latch.trigger();
-					}
-				}
-			});
-
-			// add also out-of-order tasks to verify that eventually
-			// they will be executed in the correct order.
-
-			timeServiceProvider.registerTimer((long) (nextTimer - 10L), new Triggerable() {
-				@Override
-				public void trigger(long timestamp) throws Exception {
-					timestamps.add(timestamp);
-					if (timestamps.size() == expectedNoOfTimers) {
-						latch.trigger();
-					}
-				}
-			});
-		}
-
-		if (!latch.isTriggered()) {
-			latch.await();
-		}
-
-		Assert.assertEquals(timestamps.size(), expectedNoOfTimers);
-
-		// verify that the tasks are executed
-		// in ascending timestamp order
-
-		int counter = 0;
-		long lastTs = Long.MIN_VALUE;
-		for (long timestamp: timestamps) {
-			Assert.assertTrue(timestamp >= lastTs);
-			if (lastTs != Long.MIN_VALUE && counter % 2 == 1) {
-				Assert.assertEquals((timestamp - lastTs), 10);
-			}
-			lastTs = timestamp;
-			counter++;
-		}
-
-		assertNull(error.get());
-	}
-
-	@Test
-	public void testDefaultTimeProviderExceptionHandling() throws InterruptedException {
-		final OneShotLatch latch = new OneShotLatch();
-
-		final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
-
-		final Object lock = new Object();
-
-		TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider(
-			new AsyncExceptionHandler() {
-				@Override
-				public void handleAsyncException(String message, Throwable exception) {
-					exceptionWasThrown.compareAndSet(false, true);
-					latch.trigger();
-				}
-			}, lock);
-
-		long now = System.currentTimeMillis();
-		timeServiceProvider.registerTimer(now, new Triggerable() {
-			@Override
-			public void trigger(long timestamp) throws Exception {
-				throw new Exception("Exception in Timer");
-			}
-		});
-
-		if (!latch.isTriggered()) {
-			latch.await();
-		}
-		Assert.assertTrue(exceptionWasThrown.get());
-	}
-
-	@Test
-	public void testTimerSorting() throws Exception {
-
-		final List<Long> result = new ArrayList<>();
-
-		TestTimeServiceProvider provider = new TestTimeServiceProvider();
-
-		provider.registerTimer(45, new Triggerable() {
-			@Override
-			public void trigger(long timestamp) {
-				result.add(timestamp);
-			}
-		});
-
-		provider.registerTimer(50, new Triggerable() {
-			@Override
-			public void trigger(long timestamp) {
-				result.add(timestamp);
-			}
-		});
-
-		provider.registerTimer(30, new Triggerable() {
-			@Override
-			public void trigger(long timestamp) {
-				result.add(timestamp);
-			}
-		});
-
-		provider.registerTimer(50, new Triggerable() {
-			@Override
-			public void trigger(long timestamp) {
-				result.add(timestamp);
-			}
-		});
-
-		Assert.assertEquals(provider.getNumRegisteredTimers(), 4);
-
-		provider.setCurrentTime(100);
-		long seen = 0;
-		for (Long l: result) {
-			Assert.assertTrue(l >= seen);
-			seen = l;
-		}
-	}
-
-	@Test
-	public void testCustomTimeServiceProvider() throws Throwable {
-		TestTimeServiceProvider tp = new TestTimeServiceProvider();
-
-		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
-		mapTask.setTimeService(tp);
-
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
-			mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-
-		StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>());
-		streamConfig.setStreamOperator(mapOperator);
-
-		testHarness.invoke();
-
-		assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 0);
-
-		tp.setCurrentTime(11);
-		assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 11);
-
-		tp.setCurrentTime(15);
-		tp.setCurrentTime(16);
-		assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 16);
-
-		// register 2 tasks
-		mapTask.getTimerService().registerTimer(30, new Triggerable() {
-			@Override
-			public void trigger(long timestamp) {
-
-			}
-		});
-
-		mapTask.getTimerService().registerTimer(40, new Triggerable() {
-			@Override
-			public void trigger(long timestamp) {
-
-			}
-		});
-
-		assertEquals(2, tp.getNumRegisteredTimers());
-
-		tp.setCurrentTime(35);
-		assertEquals(1, tp.getNumRegisteredTimers());
-
-		tp.setCurrentTime(40);
-		assertEquals(0, tp.getNumRegisteredTimers());
-
-		tp.shutdownService();
-	}
-
-	// ------------------------------------------------------------------------
-
-	public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler {
-
-		private final AtomicReference<Throwable> errorReference;
-
-		public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference)
{
-			this.errorReference = errorReference;
-		}
-
-		@Override
-		public void handleAsyncException(String message, Throwable exception) {
-			errorReference.compareAndSet(null, exception);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 4c6d391..2f687f6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 88e28bc..cd82a9c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -40,7 +40,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
index ae895b6..29e13ed 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
@@ -19,12 +19,14 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 
 import org.junit.Test;
 
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -37,6 +39,40 @@ import static org.junit.Assert.fail;
 public class DefaultTimeServiceProviderTest {
 
 	@Test
+	public void testTriggerHoldsLock() throws Exception {
+
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+		final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		try {
+			assertEquals(0, timer.getNumTasksScheduled());
+
+			// schedule something
+			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new
Triggerable() {
+				@Override
+				public void trigger(long timestamp) {
+					assertTrue(Thread.holdsLock(lock));
+				}
+			});
+
+			// wait until the execution is over
+			future.get();
+			assertEquals(0, timer.getNumTasksScheduled());
+
+			// check that no asynchronous error was reported
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
+		}
+		finally {
+			timer.shutdownService();
+		}
+	}
+
+	@Test
 	public void testImmediateShutdown() throws Exception {
 
 		final Object lock = new Object();
@@ -171,6 +207,104 @@ public class DefaultTimeServiceProviderTest {
 			future.cancel(false);
 
 			assertEquals(0, timer.getNumTasksScheduled());
+
+			// check that no asynchronous error was reported
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
+		}
+		finally {
+			timer.shutdownService();
+		}
+	}
+
+	@Test
+	public void testExceptionReporting() throws InterruptedException {
+		final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
+		final OneShotLatch latch = new OneShotLatch();
+		final Object lock = new Object();
+
+		TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider(
+				new AsyncExceptionHandler() {
+					@Override
+					public void handleAsyncException(String message, Throwable exception) {
+						exceptionWasThrown.set(true);
+						latch.trigger();
+					}
+				}, lock);
+		
+		timeServiceProvider.registerTimer(System.currentTimeMillis(), new Triggerable() {
+			@Override
+			public void trigger(long timestamp) throws Exception {
+				throw new Exception("Exception in Timer");
+			}
+		});
+
+		latch.await();
+		assertTrue(exceptionWasThrown.get());
+	}
+
+	@Test
+	public void testTimerSorting() throws Exception {
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+		final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
+				new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		try {
+			final OneShotLatch sync = new OneShotLatch();
+
+			// we block the timer execution to make sure we have all the time
+			// to register some additional timers out of order
+			timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+				@Override
+				public void trigger(long timestamp) throws Exception {
+					sync.await();
+				}
+			});
+			
+			// schedule two timers out of order something
+			final long now = System.currentTimeMillis();
+			final long time1 = now + 6;
+			final long time2 = now + 5;
+			final long time3 = now + 8;
+			final long time4 = now - 2;
+
+			final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4);
+			Triggerable trigger = new Triggerable() {
+				@Override
+				public void trigger(long timestamp) {
+					timestamps.add(timestamp);
+				}
+			};
+
+			// schedule
+			ScheduledFuture<?> future1 = timer.registerTimer(time1, trigger);
+			ScheduledFuture<?> future2 = timer.registerTimer(time2, trigger);
+			ScheduledFuture<?> future3 = timer.registerTimer(time3, trigger);
+			ScheduledFuture<?> future4 = timer.registerTimer(time4, trigger);
+
+			// now that everything is scheduled, unblock the timer service
+			sync.trigger();
+
+			// wait until both are complete
+			future1.get();
+			future2.get();
+			future3.get();
+			future4.get();
+
+			// verify that the order is 4 - 2 - 1 - 3
+			assertEquals(4, timestamps.size());
+			assertEquals(time4, timestamps.take().longValue());
+			assertEquals(time2, timestamps.take().longValue());
+			assertEquals(time1, timestamps.take().longValue());
+			assertEquals(time3, timestamps.take().longValue());
+
+			// check that no asynchronous error was reported
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
 		}
 		finally {
 			timer.shutdownService();


Mime
View raw message