flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] flink git commit: [FLINK-5006] [streaming] Remove assumption of order in SystemProcessingTimeServiceTest
Date Wed, 16 Nov 2016 23:35:32 GMT
Repository: flink
Updated Branches:
  refs/heads/master a5ec8494c -> a1362c3af


[FLINK-5006] [streaming] Remove assumption of order in SystemProcessingTimeServiceTest

This closes #2785


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1362c3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1362c3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1362c3a

Branch: refs/heads/master
Commit: a1362c3af25304b4120232527a2e6008df315de7
Parents: 72b295b
Author: Boris Osipov <boris_osipov@epam.com>
Authored: Thu Nov 10 18:52:56 2016 +0300
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Nov 17 00:34:55 2016 +0100

----------------------------------------------------------------------
 .../tasks/SystemProcessingTimeService.java      |  8 +++
 .../tasks/SystemProcessingTimeServiceTest.java  | 68 --------------------
 2 files changed, 8 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1362c3a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index 153aedf..071dbce 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -90,6 +90,14 @@ public class SystemProcessingTimeService extends ProcessingTimeService
{
 		return System.currentTimeMillis();
 	}
 
+	/**
+	 * Registers a task to be executed no sooner than time {@code timestamp}, but without strong
guarantees of order
+	 *
+	 * @param timestamp Time when the task is to be enabled (in processing time)
+	 * @param target    The task to be executed
+	 * @return The future that represents the scheduled task. This always returns some future,
+	 *         even if the timer was shut down
+	 */
 	@Override
 	public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target)
{
 		long delay = Math.max(timestamp - getCurrentProcessingTime(), 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/a1362c3a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 797e18a..766b313 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTes
 
 import org.junit.Test;
 
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -242,71 +241,4 @@ public class SystemProcessingTimeServiceTest {
 		latch.await();
 		assertTrue(exceptionWasThrown.get());
 	}
-
-	@Test
-	public void testTimerSorting() throws Exception {
-		final Object lock = new Object();
-		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
-		final SystemProcessingTimeService timer = new SystemProcessingTimeService(
-				new ReferenceSettingExceptionHandler(errorRef), lock);
-
-		try {
-			final OneShotLatch sync = new OneShotLatch();
-
-			// we block the timer execution to make sure we have all the time
-			// to register some additional timers out of order
-			timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
-				@Override
-				public void onProcessingTime(long timestamp) throws Exception {
-					sync.await();
-				}
-			});
-			
-			// schedule two timers out of order something
-			final long now = System.currentTimeMillis();
-			final long time1 = now + 6;
-			final long time2 = now + 5;
-			final long time3 = now + 8;
-			final long time4 = now - 2;
-
-			final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4);
-			ProcessingTimeCallback trigger = new ProcessingTimeCallback() {
-				@Override
-				public void onProcessingTime(long timestamp) {
-					timestamps.add(timestamp);
-				}
-			};
-
-			// schedule
-			ScheduledFuture<?> future1 = timer.registerTimer(time1, trigger);
-			ScheduledFuture<?> future2 = timer.registerTimer(time2, trigger);
-			ScheduledFuture<?> future3 = timer.registerTimer(time3, trigger);
-			ScheduledFuture<?> future4 = timer.registerTimer(time4, trigger);
-
-			// now that everything is scheduled, unblock the timer service
-			sync.trigger();
-
-			// wait until both are complete
-			future1.get();
-			future2.get();
-			future3.get();
-			future4.get();
-
-			// verify that the order is 4 - 2 - 1 - 3
-			assertEquals(4, timestamps.size());
-			assertEquals(time4, timestamps.take().longValue());
-			assertEquals(time2, timestamps.take().longValue());
-			assertEquals(time1, timestamps.take().longValue());
-			assertEquals(time3, timestamps.take().longValue());
-
-			// check that no asynchronous error was reported
-			if (errorRef.get() != null) {
-				throw new Exception(errorRef.get());
-			}
-		}
-		finally {
-			timer.shutdownService();
-		}
-	}
 }


Mime
View raw message