flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/2] flink git commit: [FLINK-5798] [rpc] Let the RpcService provide a ScheduledExecutorService
Date Fri, 24 Feb 2017 13:49:23 GMT
[FLINK-5798] [rpc] Let the RpcService provide a ScheduledExecutorService

This PR adds the getScheduledExecutorService method to the RpcService interface. So
henceforth all RpcService implementations have to provide a ScheduledExecutorService
implementation.

Currently, we only support the AkkaRpcService. The AkkaRpcService returns a
ScheduledExecutorService proxy which forwards the schedule calls to the ActorSystem's
internal scheduler.

Introduce ScheduledExecutor interface to hide service methods from the ScheduledExecutorService

This closes #3310.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ccf458dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ccf458dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ccf458dd

Branch: refs/heads/master
Commit: ccf458dd4d173b3370257177c2bbd9680baa6511
Parents: 5983069
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Feb 14 16:50:43 2017 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Feb 24 14:48:52 2017 +0100

----------------------------------------------------------------------
 .../runtime/concurrent/ScheduledExecutor.java   |  92 ++++++++++
 .../ScheduledExecutorServiceAdapter.java        |  64 +++++++
 .../apache/flink/runtime/rpc/RpcService.java    |  15 ++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 178 +++++++++++++++++++
 .../runtime/rpc/TestingSerialRpcService.java    |  34 ++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    | 160 +++++++++++++++++
 6 files changed, 543 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
new file mode 100644
index 0000000..c1b47e2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Extension for the {@link Executor} interface which is enriched by method for scheduling
tasks
+ * in the future.
+ */
+public interface ScheduledExecutor extends Executor {
+
+	/**
+	 * Executes the given command after the given delay.
+	 *
+	 * @param command the task to execute in the future
+	 * @param delay the time from now to delay the execution
+	 * @param unit the time unit of the delay parameter
+	 * @return a ScheduledFuture representing the completion of the scheduled task
+	 */
+	ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
+
+	/**
+	 * Executes the given callable after the given delay. The result of the callable is returned
+	 * as a {@link ScheduledFuture}.
+	 *
+	 * @param callable the callable to execute
+	 * @param delay the time from now to delay the execution
+	 * @param unit the time unit of the delay parameter
+	 * @param <V> result type of the callable
+	 * @return a ScheduledFuture which holds the future value of the given callable
+	 */
+	<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit
unit);
+
+	/**
+	 * Executes the given command periodically. The first execution is started after the
+	 * {@code initialDelay}, the second execution is started after {@code initialDelay + period},
+	 * the third after {@code initialDelay + 2*period} and so on.
+	 * The task is executed until either an execution fails, or the returned {@link ScheduledFuture}
+	 * is cancelled.
+	 *
+	 * @param command the task to be executed periodically
+	 * @param initialDelay the time from now until the first execution is triggered
+	 * @param period the time after which the next execution is triggered
+	 * @param unit the time unit of the delay and period parameter
+	 * @return a ScheduledFuture representing the periodic task. This future never completes
+	 * unless an execution of the given task fails or if the future is cancelled
+	 */
+	ScheduledFuture<?> scheduleAtFixedRate(
+		Runnable command,
+		long initialDelay,
+		long period,
+		TimeUnit unit);
+
+	/**
+	 * Executed the given command repeatedly with the given delay between the end of an execution
+	 * and the start of the next execution.
+	 * The task is executed repeatedly until either an exception occurs or if the returned
+	 * {@link ScheduledFuture} is cancelled.
+	 *
+	 * @param command the task to execute repeatedly
+	 * @param initialDelay the time from now until the first execution is triggered
+	 * @param delay the time between the end of the current and the start of the next execution
+	 * @param unit the time unit of the initial delay and the delay parameter
+	 * @return a ScheduledFuture representing the repeatedly executed task. This future never
+	 * completes unless th exectuion of the given task fails or if the future is cancelled
+	 */
+	ScheduledFuture<?> scheduleWithFixedDelay(
+		Runnable command,
+		long initialDelay,
+		long delay,
+		TimeUnit unit);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java
new file mode 100644
index 0000000..7662c35
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Adapter class for a {@link ScheduledExecutorService} which shall be used as a
+ * {@link ScheduledExecutor}.
+ */
+public class ScheduledExecutorServiceAdapter implements ScheduledExecutor {
+
+	private final ScheduledExecutorService scheduledExecutorService;
+
+	public ScheduledExecutorServiceAdapter(ScheduledExecutorService scheduledExecutorService)
{
+		this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
+	}
+
+	@Override
+	public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+		return scheduledExecutorService.schedule(command, delay, unit);
+	}
+
+	@Override
+	public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit unit) {
+		return scheduledExecutorService.schedule(callable, delay, unit);
+	}
+
+	@Override
+	public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit) {
+		return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
+	}
+
+	@Override
+	public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
+		return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+	}
+
+	@Override
+	public void execute(Runnable command) {
+		scheduledExecutorService.execute(command);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 4b9100a..2d2019a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 
 import java.util.concurrent.Callable;
@@ -98,6 +99,20 @@ public interface RpcService {
 	Executor getExecutor();
 
 	/**
+	 * Gets a scheduled executor from the RPC service. This executor can be used to schedule
+	 * tasks to be executed in the future.
+	 *
+	 * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations
against
+	 * any concurrent invocations and is therefore not suitable to run completion methods of
futures
+	 * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the
+	 * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that
+	 * {@code RpcEndpoint}.
+	 *
+	 * @return The RPC service provided scheduled executor
+	 */
+	ScheduledExecutor getScheduledExecutor();
+
+	/**
 	 * Execute the runnable in the execution context of this RPC Service, as returned by
 	 * {@link #getExecutor()}, after a scheduled delay.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 6e3fb40..6a6a85d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -23,6 +23,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
+import akka.actor.Cancellable;
 import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -34,6 +35,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
@@ -43,18 +45,24 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import scala.concurrent.duration.FiniteDuration;
 
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Delayed;
 import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -81,6 +89,8 @@ public class AkkaRpcService implements RpcService {
 
 	private final String address;
 
+	private final ScheduledExecutor internalScheduledExecutor;
+
 	private volatile boolean stopped;
 
 	public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) {
@@ -101,6 +111,8 @@ public class AkkaRpcService implements RpcService {
 		} else {
 			address = "";
 		}
+
+		internalScheduledExecutor = new InternalScheduledExecutorImpl(actorSystem);
 	}
 
 	@Override
@@ -259,6 +271,10 @@ public class AkkaRpcService implements RpcService {
 		return actorSystem.dispatcher();
 	}
 
+	public ScheduledExecutor getScheduledExecutor() {
+		return internalScheduledExecutor;
+	}
+
 	@Override
 	public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) {
 		checkNotNull(runnable, "runnable");
@@ -279,4 +295,166 @@ public class AkkaRpcService implements RpcService {
 
 		return new FlinkFuture<>(scalaFuture);
 	}
+
+	/**
+	 * Helper class to expose the internal scheduling logic via a {@link ScheduledExecutor}.
+	 */
+	private static final class InternalScheduledExecutorImpl implements ScheduledExecutor {
+
+		private final ActorSystem actorSystem;
+
+		private InternalScheduledExecutorImpl(ActorSystem actorSystem) {
+			this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService");
+		}
+
+		@Override
+		@Nonnull
+		public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull
TimeUnit unit) {
+			ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command,
unit.toNanos(delay), 0L);
+
+			Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);
+
+			scheduledFutureTask.setCancellable(cancellable);
+
+			return scheduledFutureTask;
+		}
+
+		@Override
+		@Nonnull
+		public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable,
long delay, @Nonnull TimeUnit unit) {
+			ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable,
unit.toNanos(delay), 0L);
+
+			Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);
+
+			scheduledFutureTask.setCancellable(cancellable);
+
+			return scheduledFutureTask;
+		}
+
+		@Override
+		@Nonnull
+		public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay,
long period, @Nonnull TimeUnit unit) {
+			ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
+				command,
+				triggerTime(unit.toNanos(initialDelay)),
+				unit.toNanos(period));
+
+			Cancellable cancellable = actorSystem.scheduler().schedule(
+				new FiniteDuration(initialDelay, unit),
+				new FiniteDuration(period, unit),
+				scheduledFutureTask,
+				actorSystem.dispatcher());
+
+			scheduledFutureTask.setCancellable(cancellable);
+
+			return scheduledFutureTask;
+		}
+
+		@Override
+		@Nonnull
+		public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long
initialDelay, long delay, @Nonnull TimeUnit unit) {
+			ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
+				command,
+				triggerTime(unit.toNanos(initialDelay)),
+				unit.toNanos(-delay));
+
+			Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit);
+
+			scheduledFutureTask.setCancellable(cancellable);
+
+			return scheduledFutureTask;
+		}
+
+		@Override
+		public void execute(@Nonnull Runnable command) {
+			actorSystem.dispatcher().execute(command);
+		}
+
+		private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) {
+			return actorSystem.scheduler().scheduleOnce(
+				new FiniteDuration(delay, unit),
+				runnable,
+				actorSystem.dispatcher());
+		}
+
+		private long now() {
+			return System.nanoTime();
+		}
+
+		private long triggerTime(long delay) {
+			return now() + delay;
+		}
+
+		private final class ScheduledFutureTask<V> extends FutureTask<V> implements
RunnableScheduledFuture<V> {
+
+			private long time;
+
+			private final long period;
+
+			private volatile Cancellable cancellable;
+
+			ScheduledFutureTask(Callable<V> callable, long time, long period) {
+				super(callable);
+				this.time = time;
+				this.period = period;
+			}
+
+			ScheduledFutureTask(Runnable runnable, long time, long period) {
+				super(runnable, null);
+				this.time = time;
+				this.period = period;
+			}
+
+			public void setCancellable(Cancellable newCancellable) {
+				this.cancellable = newCancellable;
+			}
+
+			@Override
+			public void run() {
+				if (!isPeriodic()) {
+					super.run();
+				} else if (runAndReset()){
+					if (period > 0L) {
+						time += period;
+					} else {
+						cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS);
+
+						// check whether we have been cancelled concurrently
+						if (isCancelled()) {
+							cancellable.cancel();
+						} else {
+							time = triggerTime(-period);
+						}
+					}
+				}
+			}
+
+			@Override
+			public boolean cancel(boolean mayInterruptIfRunning) {
+				boolean result = super.cancel(mayInterruptIfRunning);
+
+				return result && cancellable.cancel();
+			}
+
+			@Override
+			public long getDelay(@Nonnull  TimeUnit unit) {
+				return unit.convert(time - now(), TimeUnit.NANOSECONDS);
+			}
+
+			@Override
+			public int compareTo(@Nonnull Delayed o) {
+				if (o == this) {
+					return 0;
+				}
+
+				long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
+				return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0;
+			}
+
+			@Override
+			public boolean isPeriodic() {
+				return period != 0L;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 1d30ea4..07edfef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.util.Preconditions;
@@ -31,10 +33,13 @@ import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.util.BitSet;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -46,13 +51,19 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class TestingSerialRpcService implements RpcService {
 
 	private final DirectExecutorService executorService;
+	private final ScheduledExecutorService scheduledExecutorService;
 	private final ConcurrentHashMap<String, RpcGateway> registeredConnections;
 	private final CompletableFuture<Void> terminationFuture;
 
+	private final ScheduledExecutor scheduledExecutorServiceAdapter;
+
 	public TestingSerialRpcService() {
 		executorService = new DirectExecutorService();
+		scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
 		this.registeredConnections = new ConcurrentHashMap<>(16);
 		this.terminationFuture = new FlinkCompletableFuture<>();
+
+		this.scheduledExecutorServiceAdapter = new ScheduledExecutorServiceAdapter(scheduledExecutorService);
 	}
 
 	@Override
@@ -86,9 +97,32 @@ public class TestingSerialRpcService implements RpcService {
 		return executorService;
 	}
 
+	public ScheduledExecutor getScheduledExecutor() {
+		return scheduledExecutorServiceAdapter;
+	}
+
 	@Override
 	public void stopService() {
 		executorService.shutdown();
+
+		scheduledExecutorService.shutdown();
+
+		boolean terminated = false;
+
+		try {
+			terminated = scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS);
+		} catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+		}
+
+		if (!terminated) {
+			List<Runnable> runnables = scheduledExecutorService.shutdownNow();
+
+			for (Runnable runnable : runnables) {
+				runnable.run();
+			}
+		}
+
 		registeredConnections.clear();
 		terminationFuture.complete(null);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ccf458dd/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 7c8defa..eb71287 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.util.TestLogger;
 
@@ -30,13 +31,16 @@ import org.junit.AfterClass;
 import org.junit.Test;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class AkkaRpcServiceTest extends TestLogger {
 
@@ -149,4 +153,160 @@ public class AkkaRpcServiceTest extends TestLogger {
 
 		terminationFuture.get();
 	}
+
+	/**
+	 * Tests a simple scheduled runnable being executed by the RPC services scheduled executor
+	 * service.
+	 */
+	@Test(timeout = 1000)
+	public void testScheduledExecutorServiceSimpleSchedule() throws ExecutionException, InterruptedException
{
+		ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
+
+		final OneShotLatch latch = new OneShotLatch();
+
+		ScheduledFuture<?> future = scheduledExecutor.schedule(
+			new Runnable() {
+				@Override
+				public void run() {
+					latch.trigger();
+				}
+			},
+			10L,
+			TimeUnit.MILLISECONDS);
+
+		future.get();
+
+		// once the future is completed, then the latch should have been triggered
+		assertTrue(latch.isTriggered());
+	}
+
+	/**
+	 * Tests that the RPC service's scheduled executor service can execute runnables at a fixed
+	 * rate.
+	 */
+	@Test(timeout = 1000)
+	public void testScheduledExecutorServicePeriodicSchedule() throws ExecutionException, InterruptedException
{
+		ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
+
+		final int tries = 4;
+		final long delay = 10L;
+		final CountDownLatch countDownLatch = new CountDownLatch(tries);
+
+		long currentTime = System.nanoTime();
+
+		ScheduledFuture<?> future = scheduledExecutor.scheduleAtFixedRate(
+			new Runnable() {
+				@Override
+				public void run() {
+					countDownLatch.countDown();
+				}
+			},
+			delay,
+			delay,
+			TimeUnit.MILLISECONDS);
+
+		assertTrue(!future.isDone());
+
+		countDownLatch.await();
+
+		// the future should not complete since we have a periodic task
+		assertTrue(!future.isDone());
+
+		long finalTime = System.nanoTime() - currentTime;
+
+		// the processing should have taken at least delay times the number of count downs.
+		assertTrue(finalTime >= tries * delay);
+
+		future.cancel(true);
+	}
+
+	/**
+	 * Tests that the RPC service's scheduled executor service can execute runnable with a fixed
+	 * delay.
+	 */
+	@Test(timeout = 1000)
+	public void testScheduledExecutorServiceWithFixedDelaySchedule() throws ExecutionException,
InterruptedException {
+		ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
+
+		final int tries = 4;
+		final long delay = 10L;
+		final CountDownLatch countDownLatch = new CountDownLatch(tries);
+
+		long currentTime = System.nanoTime();
+
+		ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(
+			new Runnable() {
+				@Override
+				public void run() {
+					countDownLatch.countDown();
+				}
+			},
+			delay,
+			delay,
+			TimeUnit.MILLISECONDS);
+
+		assertTrue(!future.isDone());
+
+		countDownLatch.await();
+
+		// the future should not complete since we have a periodic task
+		assertTrue(!future.isDone());
+
+		long finalTime = System.nanoTime() - currentTime;
+
+		// the processing should have taken at least delay times the number of count downs.
+		assertTrue(finalTime >= tries * delay);
+
+		future.cancel(true);
+	}
+
+	/**
+	 * Tests that canceling the returned future will stop the execution of the scheduled runnable.
+	 */
+	@Test
+	public void testScheduledExecutorServiceCancelWithFixedDelay() throws InterruptedException
{
+		ScheduledExecutor scheduledExecutor = akkaRpcService.getScheduledExecutor();
+
+		long delay = 10L;
+
+		final OneShotLatch futureTask = new OneShotLatch();
+		final OneShotLatch latch = new OneShotLatch();
+		final OneShotLatch shouldNotBeTriggeredLatch = new OneShotLatch();
+
+		ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(
+			new Runnable() {
+				@Override
+				public void run() {
+					try {
+						if (!futureTask.isTriggered()) {
+							// first run
+							futureTask.trigger();
+							latch.await();
+						} else {
+							shouldNotBeTriggeredLatch.trigger();
+						}
+					} catch (InterruptedException e) {
+						// ignore
+					}
+				}
+			},
+			delay,
+			delay,
+			TimeUnit.MILLISECONDS);
+
+		// wait until we're in the runnable
+		futureTask.await();
+
+		// cancel the scheduled future
+		future.cancel(false);
+
+		latch.trigger();
+
+		try {
+			shouldNotBeTriggeredLatch.await(5 * delay, TimeUnit.MILLISECONDS);
+			fail("The shouldNotBeTriggeredLatch should never be triggered.");
+		} catch (TimeoutException e) {
+			// expected
+		}
+	}
 }


Mime
View raw message