flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/2] flink git commit: [FLINK-7078] [rpc] Introduce FencedRpcEndpoint
Date Sun, 03 Sep 2017 21:37:43 GMT
[FLINK-7078] [rpc] Introduce FencedRpcEndpoint

Introduce FencedRpcEndpoint which requires all RPC messages to have a
fencing token attached. Based on the received fencing token and the
actual fencing token, the message will either be discarded if they are
not equal or it will be processed. That way we are able to filter out
old messages or messages which originate from a split brain situation

Add support for callAsyncWithoutFencing

Introduce local and remote fenced messages

This closes #4578.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1804aa33
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1804aa33
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1804aa33

Branch: refs/heads/master
Commit: 1804aa33d0996810ad35bfa4dd526c86a1e12828
Parents: d7cea58
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Aug 23 17:08:55 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Sun Sep 3 23:35:06 2017 +0200

----------------------------------------------------------------------
 .../runtime/rpc/FencedMainThreadExecutable.java |  48 +++
 .../flink/runtime/rpc/FencedRpcEndpoint.java    | 116 +++++++
 .../flink/runtime/rpc/FencedRpcGateway.java     |  37 ++
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |  15 +-
 .../apache/flink/runtime/rpc/RpcService.java    |  38 +-
 .../runtime/rpc/akka/AkkaInvocationHandler.java | 178 +++++++---
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 123 ++++---
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 234 +++++++++----
 .../rpc/akka/FencedAkkaInvocationHandler.java   | 145 ++++++++
 .../runtime/rpc/akka/FencedAkkaRpcActor.java    |  76 ++++
 .../exceptions/AkkaUnknownMessageException.java |  40 +++
 .../runtime/rpc/akka/messages/CallAsync.java    |  41 ---
 .../rpc/akka/messages/LocalRpcInvocation.java   |  54 ---
 .../runtime/rpc/akka/messages/Processing.java   |   2 +-
 .../rpc/akka/messages/RemoteRpcInvocation.java  | 206 -----------
 .../rpc/akka/messages/RpcInvocation.java        |  58 ----
 .../runtime/rpc/akka/messages/RunAsync.java     |  54 ---
 .../runtime/rpc/akka/messages/Shutdown.java     |  36 --
 .../FencingTokenMismatchException.java          |  42 +++
 .../flink/runtime/rpc/messages/CallAsync.java   |  41 +++
 .../runtime/rpc/messages/ControlMessage.java    |  26 ++
 .../runtime/rpc/messages/FencedMessage.java     |  34 ++
 .../rpc/messages/LocalFencedMessage.java        |  56 +++
 .../rpc/messages/LocalRpcInvocation.java        |  79 +++++
 .../rpc/messages/RemoteFencedMessage.java       |  57 +++
 .../rpc/messages/RemoteRpcInvocation.java       | 237 +++++++++++++
 .../runtime/rpc/messages/RpcInvocation.java     |  58 ++++
 .../flink/runtime/rpc/messages/RunAsync.java    |  54 +++
 .../flink/runtime/rpc/messages/Shutdown.java    |  36 ++
 .../runtime/rpc/messages/UnfencedMessage.java   |  49 +++
 .../flink/runtime/rpc/AsyncCallsTest.java       | 157 +++++++++
 .../runtime/rpc/FencedRpcEndpointTest.java      | 344 +++++++++++++++++++
 .../flink/runtime/rpc/TestingRpcService.java    |  23 +-
 33 files changed, 2165 insertions(+), 629 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedMainThreadExecutable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedMainThreadExecutable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedMainThreadExecutable.java
new file mode 100644
index 0000000..16cacc8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedMainThreadExecutable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.api.common.time.Time;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Extended {@link MainThreadExecutable} interface which allows to run unfenced runnables
+ * in the main thread.
+ */
+public interface FencedMainThreadExecutable extends MainThreadExecutable {
+
+	/**
+	 * Run the given runnable in the main thread without attaching a fencing token.
+	 *
+	 * @param runnable to run in the main thread without validating the fencing token.
+	 */
+	void runAsyncWithoutFencing(Runnable runnable);
+
+	/**
+	 * Run the given callable in the main thread without attaching a fencing token.
+	 *
+	 * @param callable to run in the main thread without validating the fencing token.
+	 * @param timeout for the operation
+	 * @param <V> type of the callable result
+	 * @return Future containing the callable result
+	 */
+	<V> CompletableFuture<V> callAsyncWithoutFencing(Callable<V> callable, Time timeout);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
new file mode 100644
index 0000000..81bae29
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Base class for fenced {@link RpcEndpoint}. A fenced rpc endpoint expects all rpc messages
+ * being enriched with fencing tokens. Furthermore, the rpc endpoint has its own fencing token
+ * assigned. The rpc is then only executed if the attached fencing token equals the endpoint's own
+ * token.
+ *
+ * @param <F> type of the fencing token
+ */
+public class FencedRpcEndpoint<F extends Serializable> extends RpcEndpoint {
+
+	private volatile F fencingToken;
+	private volatile MainThreadExecutor fencedMainThreadExecutor;
+
+	protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F initialFencingToken) {
+		super(rpcService, endpointId);
+
+		this.fencingToken = Preconditions.checkNotNull(initialFencingToken);
+		this.fencedMainThreadExecutor = new MainThreadExecutor(
+			getRpcService().fenceRpcServer(
+				rpcServer,
+				initialFencingToken));
+	}
+
+	protected FencedRpcEndpoint(RpcService rpcService, F initialFencingToken) {
+		this(rpcService, UUID.randomUUID().toString(), initialFencingToken);
+	}
+
+	public F getFencingToken() {
+		return fencingToken;
+	}
+
+	protected void setFencingToken(F newFencingToken) {
+		// this method should only be called from within the main thread
+		validateRunsInMainThread();
+
+		this.fencingToken = newFencingToken;
+
+		// setting a new fencing token entails that we need a new MainThreadExecutor
+		// which is bound to the new fencing token
+		MainThreadExecutable mainThreadExecutable = getRpcService().fenceRpcServer(
+			rpcServer,
+			newFencingToken);
+
+		this.fencedMainThreadExecutor = new MainThreadExecutor(mainThreadExecutable);
+	}
+
+	/**
+	 * Returns a main thread executor which is bound to the currently valid fencing token.
+	 * This means that runnables which are executed with this executor fail after the fencing
+	 * token has changed. This allows to scope operations by the fencing token.
+	 *
+	 * @return MainThreadExecutor bound to the current fencing token
+	 */
+	@Override
+	protected MainThreadExecutor getMainThreadExecutor() {
+		return fencedMainThreadExecutor;
+	}
+
+	/**
+	 * Run the given runnable in the main thread of the RpcEndpoint without checking the fencing
+	 * token. This allows to run operations outside of the fencing token scope.
+	 *
+	 * @param runnable to execute in the main thread of the rpc endpoint without checking the fencing token.
+	 */
+	protected void runAsyncWithoutFencing(Runnable runnable) {
+		if (rpcServer instanceof FencedMainThreadExecutable) {
+			((FencedMainThreadExecutable) rpcServer).runAsyncWithoutFencing(runnable);
+		} else {
+			throw new RuntimeException("FencedRpcEndpoint has not been started with a FencedMainThreadExecutable RpcServer.");
+		}
+	}
+
+	/**
+	 * Run the given callable in the main thread of the RpcEndpoint without checking the fencing
+	 * token. This allows to run operations outside of the fencing token scope.
+	 *
+	 * @param callable to run in the main thread of the rpc endpoint without checkint the fencing token.
+	 * @param timeout for the operation.
+	 * @return Future containing the callable result.
+	 */
+	protected <V> CompletableFuture<V> callAsyncWithoutFencing(Callable<V> callable, Time timeout) {
+		if (rpcServer instanceof FencedMainThreadExecutable) {
+			return ((FencedMainThreadExecutable) rpcServer).callAsyncWithoutFencing(callable, timeout);
+		} else {
+			throw new RuntimeException("FencedRpcEndpoint has not been started with a FencedMainThreadExecutable RpcServer.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcGateway.java
new file mode 100644
index 0000000..fab638f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcGateway.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.io.Serializable;
+
+/**
+ * Fenced {@link RpcGateway}. This gateway allows to have access to the associated
+ * fencing token.
+ *
+ * @param <F> type of the fencing token
+ */
+public interface FencedRpcGateway<F extends Serializable> extends RpcGateway {
+
+	/**
+	 * Get the current fencing token.
+	 *
+	 * @return current fencing token
+	 */
+	F getFencingToken();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 980ae48..563674a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
@@ -66,14 +67,14 @@ public abstract class RpcEndpoint implements RpcGateway {
 	private final String endpointId;
 
 	/** Interface to access the underlying rpc server */
-	private final RpcServer rpcServer;
+	protected final RpcServer rpcServer;
+
+	/** A reference to the endpoint's main thread, if the current method is called by the main thread */
+	final AtomicReference<Thread> currentMainThread = new AtomicReference<>(null);
 
 	/** The main thread executor to be used to execute future callbacks in the main thread
 	 * of the executing rpc server. */
-	private final Executor mainThreadExecutor;
-
-	/** A reference to the endpoint's main thread, if the current method is called by the main thread */
-	final AtomicReference<Thread> currentMainThread = new AtomicReference<>(null); 
+	private final MainThreadExecutor mainThreadExecutor;
 
 	/**
 	 * Initializes the RPC endpoint.
@@ -208,7 +209,7 @@ public abstract class RpcEndpoint implements RpcGateway {
 	 *
 	 * @return Main thread execution context
 	 */
-	protected Executor getMainThreadExecutor() {
+	protected MainThreadExecutor getMainThreadExecutor() {
 		return mainThreadExecutor;
 	}
 
@@ -310,7 +311,7 @@ public abstract class RpcEndpoint implements RpcGateway {
 	/**
 	 * Executor which executes runnables in the main thread context.
 	 */
-	private static class MainThreadExecutor implements Executor {
+	protected static class MainThreadExecutor implements Executor {
 
 		private final MainThreadExecutable gateway;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 3b5a5e2..9b2e318 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 
+import java.io.Serializable;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -61,7 +62,27 @@ public interface RpcService {
 	 * @return Future containing the rpc gateway or an {@link RpcConnectionException} if the
 	 * connection attempt failed
 	 */
-	<C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);
+	<C extends RpcGateway> CompletableFuture<C> connect(
+		String address,
+		Class<C> clazz);
+
+	/**
+	 * Connect to ta remote fenced rpc server under the provided address. Returns a fenced rpc gateway
+	 * which can be used to communicate with the rpc server. If the connection failed, then the
+	 * returned future is failed with a {@link RpcConnectionException}.
+	 *
+	 * @param address Address of the remote rpc server
+	 * @param fencingToken Fencing token to be used when communicating with the server
+	 * @param clazz Class of the rpc gateway to return
+	 * @param <F> Type of the fencing token
+	 * @param <C> Type of the rpc gateway to return
+	 * @return Future containing the fenced rpc gateway or an {@link RpcConnectionException} if the
+	 * connection attempt failed
+	 */
+	<F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
+		String address,
+		F fencingToken,
+		Class<C> clazz);
 
 	/**
 	 * Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint.
@@ -72,6 +93,21 @@ public interface RpcService {
 	 */
 	<C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint);
 
+
+	/**
+	 * Fence the given RpcServer with the given fencing token.
+	 *
+	 * <p>Fencing the RpcServer means that we fix the fencing token to the provided value.
+	 * All RPCs will then be enriched with this fencing token. This expects that the receiving
+	 * RPC endpoint extends {@link FencedRpcEndpoint}.
+	 *
+	 * @param rpcServer to fence with the given fencing token
+	 * @param fencingToken to fence the RpcServer with
+	 * @param <F> type of the fencing token
+	 * @return Fenced RpcServer
+	 */
+	<F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken);
+
 	/**
 	 * Stop the underlying rpc server of the provided self gateway.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 0521f2e..fc785cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -22,17 +22,18 @@ import akka.actor.ActorRef;
 import akka.pattern.Patterns;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcServer;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.StartStoppable;
-import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
-import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.messages.CallAsync;
+import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.Processing;
-import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
-import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
-import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
+import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
+import org.apache.flink.runtime.rpc.messages.RpcInvocation;
+import org.apache.flink.runtime.rpc.messages.RunAsync;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,6 +47,7 @@ import java.lang.reflect.Method;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -72,7 +74,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer
 	private final ActorRef rpcEndpoint;
 
 	// whether the actor ref is local and thus no message serialization is needed
-	private final boolean isLocal;
+	protected final boolean isLocal;
 
 	// default timeout for asks
 	private final Time timeout;
@@ -112,53 +114,13 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer
 			declaringClass.equals(MainThreadExecutable.class) ||
 			declaringClass.equals(RpcServer.class)) {
 			result = method.invoke(this, args);
+		} else if (declaringClass.equals(FencedRpcGateway.class)) {
+			throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" +
+				method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +
+				"fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +
+				"retrieve a properly FencedRpcGateway.");
 		} else {
-			String methodName = method.getName();
-			Class<?>[] parameterTypes = method.getParameterTypes();
-			Annotation[][] parameterAnnotations = method.getParameterAnnotations();
-			Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
-
-			RpcInvocation rpcInvocation;
-
-			if (isLocal) {
-				rpcInvocation = new LocalRpcInvocation(
-					methodName,
-					parameterTypes,
-					args);
-			} else {
-				try {
-					RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation(
-						methodName,
-						parameterTypes,
-						args);
-
-					if (remoteRpcInvocation.getSize() > maximumFramesize) {
-						throw new IOException("The rpc invocation size exceeds the maximum akka framesize.");
-					} else {
-						rpcInvocation = remoteRpcInvocation;
-					}
-				} catch (IOException e) {
-					LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", e);
-					throw e;
-				}
-			}
-
-			Class<?> returnType = method.getReturnType();
-
-			if (Objects.equals(returnType, Void.TYPE)) {
-				rpcEndpoint.tell(rpcInvocation, ActorRef.noSender());
-
-				result = null;
-			} else if (Objects.equals(returnType,CompletableFuture.class)) {
-				// execute an asynchronous call
-				result = FutureUtils.toJava(Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds()));
-			} else {
-				// execute a synchronous call
-				CompletableFuture<?> futureResult = FutureUtils.toJava(
-					Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds()));
-
-				result = futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
-			}
+			result = invokeRpc(method, args);
 		}
 
 		return result;
@@ -171,7 +133,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer
 
 	@Override
 	public void runAsync(Runnable runnable) {
-		scheduleRunAsync(runnable, 0);
+		scheduleRunAsync(runnable, 0L);
 	}
 
 	@Override
@@ -181,7 +143,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer
 
 		if (isLocal) {
 			long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);
-			rpcEndpoint.tell(new RunAsync(runnable, atTimeNanos), ActorRef.noSender());
+			tell(new RunAsync(runnable, atTimeNanos));
 		} else {
 			throw new RuntimeException("Trying to send a Runnable to a remote actor at " +
 				rpcEndpoint.path() + ". This is not supported.");
@@ -192,9 +154,9 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer
 	public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {
 		if(isLocal) {
 			@SuppressWarnings("unchecked")
-			scala.concurrent.Future<V> resultFuture = (scala.concurrent.Future<V>) Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout.toMilliseconds());
+			CompletableFuture<V> resultFuture = (CompletableFuture<V>) ask(new CallAsync(callable), callTimeout);
 
-			return FutureUtils.toJava(resultFuture);
+			return resultFuture;
 		} else {
 			throw new RuntimeException("Trying to send a Callable to a remote actor at " +
 				rpcEndpoint.path() + ". This is not supported.");
@@ -212,6 +174,88 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer
 	}
 
 	// ------------------------------------------------------------------------
+	//  Private methods
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Invokes a RPC method by sending the RPC invocation details to the rpc endpoint.
+	 *
+	 * @param method to call
+	 * @param args of the method call
+	 * @return result of the RPC
+	 * @throws Exception if the RPC invocation fails
+	 */
+	private Object invokeRpc(Method method, Object[] args) throws Exception {
+		String methodName = method.getName();
+		Class<?>[] parameterTypes = method.getParameterTypes();
+		Annotation[][] parameterAnnotations = method.getParameterAnnotations();
+		Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
+
+		final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args);
+
+		Class<?> returnType = method.getReturnType();
+
+		final Object result;
+
+		if (Objects.equals(returnType, Void.TYPE)) {
+			tell(rpcInvocation);
+
+			result = null;
+		} else if (Objects.equals(returnType,CompletableFuture.class)) {
+			// execute an asynchronous call
+			result = ask(rpcInvocation, futureTimeout);
+		} else {
+			// execute a synchronous call
+			CompletableFuture<?> futureResult = ask(rpcInvocation, futureTimeout);
+
+			result = futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
+		}
+
+		return result;
+	}
+
+	/**
+	 * Create the RpcInvocation message for the given RPC.
+	 *
+	 * @param methodName of the RPC
+	 * @param parameterTypes of the RPC
+	 * @param args of the RPC
+	 * @return RpcInvocation message which encapsulates the RPC details
+	 * @throws IOException if we cannot serialize the RPC invocation parameters
+	 */
+	protected RpcInvocation createRpcInvocationMessage(
+			final String methodName,
+			final Class<?>[] parameterTypes,
+			final Object[] args) throws IOException {
+		final RpcInvocation rpcInvocation;
+
+		if (isLocal) {
+			rpcInvocation = new LocalRpcInvocation(
+				methodName,
+				parameterTypes,
+				args);
+		} else {
+			try {
+				RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation(
+					methodName,
+					parameterTypes,
+					args);
+
+				if (remoteRpcInvocation.getSize() > maximumFramesize) {
+					throw new IOException("The rpc invocation size exceeds the maximum akka framesize.");
+				} else {
+					rpcInvocation = remoteRpcInvocation;
+				}
+			} catch (IOException e) {
+				LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", e);
+				throw e;
+			}
+		}
+
+		return rpcInvocation;
+	}
+
+	// ------------------------------------------------------------------------
 	//  Helper methods
 	// ------------------------------------------------------------------------
 
@@ -262,6 +306,28 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, RpcServer
 		return false;
 	}
 
+	/**
+	 * Sends the message to the RPC endpoint.
+	 *
+	 * @param message to send to the RPC endpoint.
+	 */
+	protected void tell(Object message) {
+		rpcEndpoint.tell(message, ActorRef.noSender());
+	}
+
+	/**
+	 * Sends the message to the RPC endpoint and returns a future containing
+	 * its response.
+	 *
+	 * @param message to send to the RPC endpoint
+	 * @param timeout time to wait until the response future is failed with a {@link TimeoutException}
+	 * @return Response future
+	 */
+	protected CompletableFuture<?> ask(Object message, Time timeout) {
+		return FutureUtils.toJava(
+			Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds()));
+	}
+
 	@Override
 	public String getAddress() {
 		return address;

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index d51607e..74c1509 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -21,19 +21,20 @@ package org.apache.flink.runtime.rpc.akka;
 import akka.actor.ActorRef;
 import akka.actor.Status;
 import akka.actor.UntypedActor;
-import akka.japi.Procedure;
 import akka.pattern.Patterns;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
-import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
-import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
+import org.apache.flink.runtime.rpc.messages.CallAsync;
+import org.apache.flink.runtime.rpc.messages.ControlMessage;
+import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.Processing;
-import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
-import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
+import org.apache.flink.runtime.rpc.messages.RpcInvocation;
+import org.apache.flink.runtime.rpc.messages.RunAsync;
 
-import org.apache.flink.runtime.rpc.akka.messages.Shutdown;
+import org.apache.flink.runtime.rpc.messages.Shutdown;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
@@ -70,10 +71,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 	
-	private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActor.class);
+	protected final Logger log = LoggerFactory.getLogger(getClass());
 
 	/** the endpoint to invoke the methods on */
-	private final T rpcEndpoint;
+	protected final T rpcEndpoint;
 
 	/** the helper that tracks whether calls come from the main thread */
 	private final MainThreadValidatorUtil mainThreadValidator;
@@ -110,48 +111,63 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 	@Override
 	public void onReceive(final Object message) {
 		if (message.equals(Processing.START)) {
-			getContext().become(new Procedure<Object>() {
-				@Override
-				public void apply(Object msg) throws Exception {
+			getContext().become(
+				(Object msg) -> {
 					if (msg.equals(Processing.STOP)) {
 						getContext().unbecome();
 					} else {
-						handleMessage(msg);
+						mainThreadValidator.enterMainThread();
+
+						try {
+							if (msg instanceof ControlMessage) {
+								handleControlMessage(((ControlMessage) msg));
+							} else {
+								handleMessage(msg);
+							}
+						} finally {
+							mainThreadValidator.exitMainThread();
+						}
 					}
-				}
-			});
+				});
 		} else {
-			LOG.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
+			log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
 				rpcEndpoint.getClass().getName(),
 				message.getClass().getName());
 
-			if (!getSender().equals(ActorRef.noSender())) {
-				// fail a possible future if we have a sender
-				getSender().tell(new Status.Failure(new AkkaRpcException("Discard message, because " +
-					"the rpc endpoint has not been started yet.")), getSelf());
-			}
+			sendErrorIfSender(new AkkaRpcException("Discard message, because " +
+				"the rpc endpoint has not been started yet."));
 		}
 	}
 
-	private void handleMessage(Object message) {
-		mainThreadValidator.enterMainThread();
-		try {
-			if (message instanceof RunAsync) {
-				handleRunAsync((RunAsync) message);
-			} else if (message instanceof CallAsync) {
-				handleCallAsync((CallAsync) message);
-			} else if (message instanceof RpcInvocation) {
-				handleRpcInvocation((RpcInvocation) message);
-			} else if (message instanceof Shutdown) {
-				triggerShutdown();
-			} else {
-				LOG.warn(
-					"Received message of unknown type {} with value {}. Dropping this message!",
-					message.getClass().getName(),
-					message);
-			}
-		} finally {
-			mainThreadValidator.exitMainThread();
+	private void handleControlMessage(ControlMessage controlMessage) {
+		if (controlMessage instanceof Shutdown) {
+			triggerShutdown();
+		} else {
+			log.warn(
+				"Received control message of unknown type {} with value {}. Dropping this control message!",
+				controlMessage.getClass().getName(),
+				controlMessage);
+
+			sendErrorIfSender(new AkkaUnknownMessageException("Received unknown control message " + controlMessage +
+				" of type " + controlMessage.getClass().getSimpleName() + '.'));
+		}
+	}
+
+	protected void handleMessage(Object message) {
+		if (message instanceof RunAsync) {
+			handleRunAsync((RunAsync) message);
+		} else if (message instanceof CallAsync) {
+			handleCallAsync((CallAsync) message);
+		} else if (message instanceof RpcInvocation) {
+			handleRpcInvocation((RpcInvocation) message);
+		} else {
+			log.warn(
+				"Received message of unknown type {} with value {}. Dropping this message!",
+				message.getClass().getName(),
+				message);
+
+			sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message +
+				" of type " + message.getClass().getSimpleName() + '.'));
 		}
 	}
 
@@ -171,17 +187,17 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 
 			rpcMethod = lookupRpcMethod(methodName, parameterTypes);
 		} catch(ClassNotFoundException e) {
-			LOG.error("Could not load method arguments.", e);
+			log.error("Could not load method arguments.", e);
 
 			RpcConnectionException rpcException = new RpcConnectionException("Could not load method arguments.", e);
 			getSender().tell(new Status.Failure(rpcException), getSelf());
 		} catch (IOException e) {
-			LOG.error("Could not deserialize rpc invocation message.", e);
+			log.error("Could not deserialize rpc invocation message.", e);
 
 			RpcConnectionException rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e);
 			getSender().tell(new Status.Failure(rpcException), getSelf());
 		} catch (final NoSuchMethodException e) {
-			LOG.error("Could not find rpc method for rpc invocation.", e);
+			log.error("Could not find rpc method for rpc invocation.", e);
 
 			RpcConnectionException rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
 			getSender().tell(new Status.Failure(rpcException), getSelf());
@@ -202,7 +218,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 						result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
 					}
 					catch (InvocationTargetException e) {
-						LOG.trace("Reporting back error thrown in remote procedure {}", rpcMethod, e);
+						log.trace("Reporting back error thrown in remote procedure {}", rpcMethod, e);
 
 						// tell the sender about the failure
 						getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
@@ -229,7 +245,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 					}
 				}
 			} catch (Throwable e) {
-				LOG.error("Error while executing remote procedure call {}.", rpcMethod, e);
+				log.error("Error while executing remote procedure call {}.", rpcMethod, e);
 				// tell the sender about the failure
 				getSender().tell(new Status.Failure(e), getSelf());
 			}
@@ -249,9 +265,9 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 				"prior to sending the message. The " + callAsync.getClass().getName() +
 				" is only supported with local communication.";
 
-			LOG.warn(result);
+			log.warn(result);
 
-			getSender().tell(new Status.Failure(new Exception(result)), getSelf());
+			getSender().tell(new Status.Failure(new AkkaRpcException(result)), getSelf());
 		} else {
 			try {
 				Object result = callAsync.getCallable().call();
@@ -271,7 +287,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 	 */
 	private void handleRunAsync(RunAsync runAsync) {
 		if (runAsync.getRunnable() == null) {
-			LOG.warn("Received a {} message with an empty runnable field. This indicates " +
+			log.warn("Received a {} message with an empty runnable field. This indicates " +
 				"that this message has been serialized prior to sending the message. The " +
 				"{} is only supported with local communication.",
 				runAsync.getClass().getName(),
@@ -286,7 +302,7 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 				try {
 					runAsync.getRunnable().run();
 				} catch (Throwable t) {
-					LOG.error("Caught exception while executing runnable in main thread.", t);
+					log.error("Caught exception while executing runnable in main thread.", t);
 					ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
 				}
 			}
@@ -324,4 +340,15 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
 	private Method lookupRpcMethod(final String methodName, final Class<?>[] parameterTypes) throws NoSuchMethodException {
 		return rpcEndpoint.getClass().getMethod(methodName, parameterTypes);
 	}
+
+	/**
+	 * Send throwable to sender if the sender is specified.
+	 *
+	 * @param throwable to send to the sender
+	 */
+	protected void sendErrorIfSender(Throwable throwable) {
+		if (!getSender().equals(ActorRef.noSender())) {
+			getSender().tell(new Status.Failure(throwable), getSelf());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index ab851f6..536a789 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -30,15 +30,19 @@ import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 import akka.pattern.Patterns;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.rpc.FencedMainThreadExecutable;
+import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
+import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcServer;
 import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.akka.messages.Shutdown;
+import org.apache.flink.runtime.rpc.messages.Shutdown;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -49,6 +53,8 @@ import scala.concurrent.duration.FiniteDuration;
 
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.Serializable;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.util.HashSet;
@@ -61,6 +67,7 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableScheduledFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -131,60 +138,44 @@ public class AkkaRpcService implements RpcService {
 
 	// this method does not mutate state and is thus thread-safe
 	@Override
-	public <C extends RpcGateway> CompletableFuture<C> connect(final String address, final Class<C> clazz) {
-		checkState(!stopped, "RpcService is stopped");
-
-		LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
-				address, clazz.getName());
-
-		final ActorSelection actorSel = actorSystem.actorSelection(address);
-
-		final scala.concurrent.Future<Object> identify = Patterns.ask(actorSel, new Identify(42), timeout.toMilliseconds());
-		final scala.concurrent.Future<C> resultFuture = identify.map(new Mapper<Object, C>(){
-			@Override
-			public C checkedApply(Object obj) throws Exception {
-
-				ActorIdentity actorIdentity = (ActorIdentity) obj;
-
-				if (actorIdentity.getRef() == null) {
-					throw new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.');
-				} else {
-					ActorRef actorRef = actorIdentity.getRef();
-
-					final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef);
-					final String hostname;
-					Option<String> host = actorRef.path().address().host();
-					if (host.isEmpty()) {
-						hostname = "localhost";
-					} else {
-						hostname = host.get();
-					}
-
-					InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(
-						address,
-						hostname,
-						actorRef,
-						timeout,
-						maximumFramesize,
-						null);
-
-					// Rather than using the System ClassLoader directly, we derive the ClassLoader
-					// from this class . That works better in cases where Flink runs embedded and all Flink
-					// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
-					ClassLoader classLoader = AkkaRpcService.this.getClass().getClassLoader();
-
-					@SuppressWarnings("unchecked")
-					C proxy = (C) Proxy.newProxyInstance(
-						classLoader,
-						new Class<?>[]{clazz},
-						akkaInvocationHandler);
+	public <C extends RpcGateway> CompletableFuture<C> connect(
+			final String address,
+			final Class<C> clazz) {
 
-					return proxy;
-				}
-			}
-		}, actorSystem.dispatcher());
+		return connectInternal(
+			address,
+			clazz,
+			(ActorRef actorRef) -> {
+				Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);
+
+				return new AkkaInvocationHandler(
+					addressHostname.f0,
+					addressHostname.f1,
+					actorRef,
+					timeout,
+					maximumFramesize,
+					null);
+			});
+	}
 
-		return FutureUtils.toJava(resultFuture);
+	// this method does not mutate state and is thus thread-safe
+	@Override
+	public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String address, F fencingToken, Class<C> clazz) {
+		return connectInternal(
+			address,
+			clazz,
+			(ActorRef actorRef) -> {
+				Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);
+
+				return new FencedAkkaInvocationHandler<>(
+					addressHostname.f0,
+					addressHostname.f1,
+					actorRef,
+					timeout,
+					maximumFramesize,
+					null,
+					() -> fencingToken);
+			});
 	}
 
 	@Override
@@ -192,7 +183,14 @@ public class AkkaRpcService implements RpcService {
 		checkNotNull(rpcEndpoint, "rpc endpoint");
 
 		CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
-		Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture);
+		final Props akkaRpcActorProps;
+
+		if (rpcEndpoint instanceof FencedRpcEndpoint) {
+			akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture);
+		} else {
+			akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture);
+		}
+
 		ActorRef actorRef;
 
 		synchronized (lock) {
@@ -212,24 +210,40 @@ public class AkkaRpcService implements RpcService {
 			hostname = host.get();
 		}
 
-		InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(
-			address,
-			hostname,
-			actorRef,
-			timeout,
-			maximumFramesize,
-			terminationFuture);
+		Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
+
+		implementedRpcGateways.add(RpcServer.class);
+		implementedRpcGateways.add(AkkaGateway.class);
+
+		final InvocationHandler akkaInvocationHandler;
+
+		if (rpcEndpoint instanceof FencedRpcEndpoint) {
+			// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
+			akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
+				address,
+				hostname,
+				actorRef,
+				timeout,
+				maximumFramesize,
+				terminationFuture,
+				((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);
+
+			implementedRpcGateways.add(FencedMainThreadExecutable.class);
+		} else {
+			akkaInvocationHandler = new AkkaInvocationHandler(
+				address,
+				hostname,
+				actorRef,
+				timeout,
+				maximumFramesize,
+				terminationFuture);
+		}
 
 		// Rather than using the System ClassLoader directly, we derive the ClassLoader
 		// from this class . That works better in cases where Flink runs embedded and all Flink
 		// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
 		ClassLoader classLoader = getClass().getClassLoader();
 
-		Set<Class<? extends RpcGateway>> implementedRpcGateways = RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass());
-
-		implementedRpcGateways.add(RpcServer.class);
-		implementedRpcGateways.add(AkkaGateway.class);
-
 		@SuppressWarnings("unchecked")
 		RpcServer server = (RpcServer) Proxy.newProxyInstance(
 			classLoader,
@@ -240,6 +254,33 @@ public class AkkaRpcService implements RpcService {
 	}
 
 	@Override
+	public <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken) {
+		if (rpcServer instanceof AkkaGateway) {
+
+			InvocationHandler fencedInvocationHandler = new FencedAkkaInvocationHandler<>(
+				rpcServer.getAddress(),
+				rpcServer.getHostname(),
+				((AkkaGateway) rpcServer).getRpcEndpoint(),
+				timeout,
+				maximumFramesize,
+				null,
+				() -> fencingToken);
+
+			// Rather than using the System ClassLoader directly, we derive the ClassLoader
+			// from this class . That works better in cases where Flink runs embedded and all Flink
+			// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
+			ClassLoader classLoader = getClass().getClassLoader();
+
+			return (RpcServer) Proxy.newProxyInstance(
+				classLoader,
+				new Class<?>[]{RpcServer.class, AkkaGateway.class},
+				fencedInvocationHandler);
+		} else {
+			throw new RuntimeException("The given RpcServer must implement the AkkaGateway in order to fence it.");
+		}
+	}
+
+	@Override
 	public void stopServer(RpcServer selfGateway) {
 		if (selfGateway instanceof AkkaGateway) {
 			AkkaGateway akkaClient = (AkkaGateway) selfGateway;
@@ -317,6 +358,67 @@ public class AkkaRpcService implements RpcService {
 		return FutureUtils.toJava(scalaFuture);
 	}
 
+	// ---------------------------------------------------------------------------------------
+	// Private helper methods
+	// ---------------------------------------------------------------------------------------
+
+	private Tuple2<String, String> extractAddressHostname(ActorRef actorRef) {
+		final String actorAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
+		final String hostname;
+		Option<String> host = actorRef.path().address().host();
+		if (host.isEmpty()) {
+			hostname = "localhost";
+		} else {
+			hostname = host.get();
+		}
+
+		return Tuple2.of(actorAddress, hostname);
+	}
+
+	private <C extends RpcGateway> CompletableFuture<C> connectInternal(
+			final String address,
+			final Class<C> clazz,
+			Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
+		checkState(!stopped, "RpcService is stopped");
+
+		LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
+			address, clazz.getName());
+
+		final ActorSelection actorSel = actorSystem.actorSelection(address);
+
+		final Future<Object> identify = Patterns.ask(actorSel, new Identify(42), timeout.toMilliseconds());
+		final Future<C> resultFuture = identify.map(new Mapper<Object, C>(){
+			@Override
+			public C checkedApply(Object obj) throws Exception {
+
+				ActorIdentity actorIdentity = (ActorIdentity) obj;
+
+				if (actorIdentity.getRef() == null) {
+					throw new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.');
+				} else {
+					ActorRef actorRef = actorIdentity.getRef();
+
+					InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
+
+					// Rather than using the System ClassLoader directly, we derive the ClassLoader
+					// from this class . That works better in cases where Flink runs embedded and all Flink
+					// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
+					ClassLoader classLoader = AkkaRpcService.this.getClass().getClassLoader();
+
+					@SuppressWarnings("unchecked")
+					C proxy = (C) Proxy.newProxyInstance(
+						classLoader,
+						new Class<?>[]{clazz},
+						invocationHandler);
+
+					return proxy;
+				}
+			}
+		}, actorSystem.dispatcher());
+
+		return FutureUtils.toJava(resultFuture);
+	}
+
 	/**
 	 * Helper class to expose the internal scheduling logic via a {@link ScheduledExecutor}.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
new file mode 100644
index 0000000..9d2c295
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rpc.FencedMainThreadExecutable;
+import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
+import org.apache.flink.runtime.rpc.FencedRpcGateway;
+import org.apache.flink.runtime.rpc.messages.CallAsync;
+import org.apache.flink.runtime.rpc.messages.FencedMessage;
+import org.apache.flink.runtime.rpc.messages.LocalFencedMessage;
+import org.apache.flink.runtime.rpc.messages.RemoteFencedMessage;
+import org.apache.flink.runtime.rpc.messages.RunAsync;
+import org.apache.flink.runtime.rpc.messages.UnfencedMessage;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Fenced extension of the {@link AkkaInvocationHandler}. This invocation handler will be used in combination
+ * with the {@link FencedRpcEndpoint}. The fencing is done by wrapping all messages in a {@link FencedMessage}.
+ *
+ * @param <F> type of the fencing token
+ */
+public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInvocationHandler implements FencedMainThreadExecutable, FencedRpcGateway<F> {
+
+	private final Supplier<F> fencingTokenSupplier;
+
+	public FencedAkkaInvocationHandler(
+			String address,
+			String hostname,
+			ActorRef rpcEndpoint,
+			Time timeout,
+			long maximumFramesize,
+			@Nullable CompletableFuture<Void> terminationFuture,
+			Supplier<F> fencingTokenSupplier) {
+		super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture);
+
+		this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier);
+	}
+
+	@Override
+	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+		Class<?> declaringClass = method.getDeclaringClass();
+
+		if (declaringClass.equals(FencedMainThreadExecutable.class) ||
+			declaringClass.equals(FencedRpcGateway.class)) {
+			return method.invoke(this, args);
+		} else {
+			return super.invoke(proxy, method, args);
+		}
+	}
+
+	@Override
+	public void runAsyncWithoutFencing(Runnable runnable) {
+		checkNotNull(runnable, "runnable");
+
+		if (isLocal) {
+			getRpcEndpoint().tell(
+				new UnfencedMessage<>(new RunAsync(runnable, 0L)), ActorRef.noSender());
+		} else {
+			throw new RuntimeException("Trying to send a Runnable to a remote actor at " +
+				getRpcEndpoint().path() + ". This is not supported.");
+		}
+	}
+
+	@Override
+	public <V> CompletableFuture<V> callAsyncWithoutFencing(Callable<V> callable, Time timeout) {
+		checkNotNull(callable, "callable");
+		checkNotNull(timeout, "timeout");
+
+		if (isLocal) {
+			@SuppressWarnings("unchecked")
+			CompletableFuture<V> resultFuture = (CompletableFuture<V>) FutureUtils.toJava(
+				Patterns.ask(
+					getRpcEndpoint(),
+					new UnfencedMessage<>(new CallAsync(callable)),
+					timeout.toMilliseconds()));
+
+			return resultFuture;
+		} else {
+			throw new RuntimeException("Trying to send a Runnable to a remote actor at " +
+				getRpcEndpoint().path() + ". This is not supported.");
+		}
+	}
+
+	@Override
+	public void tell(Object message) {
+		super.tell(fenceMessage(message));
+	}
+
+	@Override
+	public CompletableFuture<?> ask(Object message, Time timeout) {
+		return super.ask(fenceMessage(message), timeout);
+	}
+
+	@Override
+	public F getFencingToken() {
+		return fencingTokenSupplier.get();
+	}
+
+	private <P> FencedMessage<F, P> fenceMessage(P message) {
+		if (isLocal) {
+			return new LocalFencedMessage<>(fencingTokenSupplier.get(), message);
+		} else {
+			if (message instanceof Serializable) {
+				@SuppressWarnings("unchecked")
+				FencedMessage<F, P> result = (FencedMessage<F, P>) new RemoteFencedMessage<>(fencingTokenSupplier.get(), (Serializable) message);
+
+				return result;
+			} else {
+				throw new RuntimeException("Trying to send a non-serializable message " + message + " to a remote " +
+					"RpcEndpoint. Please make sure that the message implements java.io.Serializable.");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
new file mode 100644
index 0000000..b10f7de
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaRpcActor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
+import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.messages.FencedMessage;
+import org.apache.flink.runtime.rpc.messages.UnfencedMessage;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Fenced extension of the {@link AkkaRpcActor}. This actor will be started for {@link FencedRpcEndpoint} and is
+ * responsible for filtering out invalid messages with respect to the current fencing token.
+ *
+ * @param <F> type of the fencing token
+ * @param <T> type of the RpcEndpoint
+ */
+public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> {
+
+	public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Void> terminationFuture) {
+		super(rpcEndpoint, terminationFuture);
+	}
+
+	@Override
+	protected void handleMessage(Object message) {
+		if (message instanceof FencedMessage) {
+			@SuppressWarnings("unchecked")
+			FencedMessage<F, ?> fencedMessage = ((FencedMessage<F, ?>) message);
+
+			F fencingToken = fencedMessage.getFencingToken();
+
+			if (Objects.equals(rpcEndpoint.getFencingToken(), fencingToken)) {
+				super.handleMessage(fencedMessage.getPayload());
+			} else {
+				if (log.isDebugEnabled()) {
+					log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did " +
+						"not match the expected fencing token {}.", message, fencingToken, rpcEndpoint.getFencingToken());
+				}
+
+				sendErrorIfSender(new FencingTokenMismatchException("Expected fencing token " + rpcEndpoint.getFencingToken() + ", actual fencing token " + fencingToken));
+			}
+		} else if (message instanceof UnfencedMessage) {
+			super.handleMessage(((UnfencedMessage<?>) message).getPayload());
+		} else {
+			if (log.isDebugEnabled()) {
+				log.debug("Unknown message type: Ignoring message {} because it is neither of type {} nor {}.",
+					message, FencedMessage.class.getSimpleName(), UnfencedMessage.class.getSimpleName());
+			}
+
+			sendErrorIfSender(new AkkaUnknownMessageException("Unknown message type: Ignoring message " + message +
+				" of type " + message.getClass().getSimpleName() + " because it is neither of type " +
+				FencedMessage.class.getSimpleName() + " nor " + UnfencedMessage.class.getSimpleName() + '.'));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaUnknownMessageException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaUnknownMessageException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaUnknownMessageException.java
new file mode 100644
index 0000000..7504761
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/exceptions/AkkaUnknownMessageException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.exceptions;
+
+/**
+ * Exception which indicates that the AkkaRpcActor has received an
+ * unknown message type.
+ */
+public class AkkaUnknownMessageException extends AkkaRpcException {
+
+	private static final long serialVersionUID = 1691338049911020814L;
+
+	public AkkaUnknownMessageException(String message) {
+		super(message);
+	}
+
+	public AkkaUnknownMessageException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public AkkaUnknownMessageException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java
deleted file mode 100644
index 79b7825..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-import java.util.concurrent.Callable;
-
-/**
- * Message for asynchronous callable invocations
- */
-public final class CallAsync implements Serializable {
-	private static final long serialVersionUID = 2834204738928484060L;
-
-	private transient Callable<?> callable;
-
-	public CallAsync(Callable<?> callable) {
-		this.callable = Preconditions.checkNotNull(callable);
-	}
-
-	public Callable<?> getCallable() {
-		return callable;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/LocalRpcInvocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/LocalRpcInvocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/LocalRpcInvocation.java
deleted file mode 100644
index 97c10d7..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/LocalRpcInvocation.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.util.Preconditions;
-
-/**
- * Local rpc invocation message containing the remote procedure name, its parameter types and the
- * corresponding call arguments. This message will only be sent if the communication is local and,
- * thus, the message does not have to be serialized.
- */
-public final class LocalRpcInvocation implements RpcInvocation {
-
-	private final String methodName;
-	private final Class<?>[] parameterTypes;
-	private final Object[] args;
-
-	public LocalRpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] args) {
-		this.methodName = Preconditions.checkNotNull(methodName);
-		this.parameterTypes = Preconditions.checkNotNull(parameterTypes);
-		this.args = args;
-	}
-
-	@Override
-	public String getMethodName() {
-		return methodName;
-	}
-
-	@Override
-	public Class<?>[] getParameterTypes() {
-		return parameterTypes;
-	}
-
-	@Override
-	public Object[] getArgs() {
-		return args;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
index 5c7df5d..030ff60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.rpc.akka.messages;
 /**
  * Controls the processing behaviour of the {@link org.apache.flink.runtime.rpc.akka.AkkaRpcActor}
  */
-public enum Processing {
+public enum Processing  {
 	START, // Unstashes all stashed messages and starts processing incoming messages
 	STOP // Stop processing messages and stashes all incoming messages
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RemoteRpcInvocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RemoteRpcInvocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RemoteRpcInvocation.java
deleted file mode 100644
index bc26a29..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RemoteRpcInvocation.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedValue;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-/**
- * Remote rpc invocation message which is used when the actor communication is remote and, thus, the
- * message has to be serialized.
- * <p>
- * In order to fail fast and report an appropriate error message to the user, the method name, the
- * parameter types and the arguments are eagerly serialized. In case the the invocation call
- * contains a non-serializable object, then an {@link IOException} is thrown.
- */
-public class RemoteRpcInvocation implements RpcInvocation, Serializable {
-	private static final long serialVersionUID = 6179354390913843809L;
-
-	// Serialized invocation data
-	private SerializedValue<RemoteRpcInvocation.MethodInvocation> serializedMethodInvocation;
-
-	// Transient field which is lazily initialized upon first access to the invocation data
-	private transient RemoteRpcInvocation.MethodInvocation methodInvocation;
-
-	public  RemoteRpcInvocation(
-		final String methodName,
-		final Class<?>[] parameterTypes,
-		final Object[] args) throws IOException {
-
-		serializedMethodInvocation = new SerializedValue<>(new RemoteRpcInvocation.MethodInvocation(methodName, parameterTypes, args));
-		methodInvocation = null;
-	}
-
-	@Override
-	public String getMethodName() throws IOException, ClassNotFoundException {
-		deserializeMethodInvocation();
-
-		return methodInvocation.getMethodName();
-	}
-
-	@Override
-	public Class<?>[] getParameterTypes() throws IOException, ClassNotFoundException {
-		deserializeMethodInvocation();
-
-		return methodInvocation.getParameterTypes();
-	}
-
-	@Override
-	public Object[] getArgs() throws IOException, ClassNotFoundException {
-		deserializeMethodInvocation();
-
-		return methodInvocation.getArgs();
-	}
-
-	/**
-	 * Size (#bytes of the serialized data) of the rpc invocation message.
-	 *
-	 * @return Size of the remote rpc invocation message
-	 */
-	public long getSize() {
-		return serializedMethodInvocation.getByteArray().length;
-	}
-
-	private void deserializeMethodInvocation() throws IOException, ClassNotFoundException {
-		if (methodInvocation == null) {
-			methodInvocation = serializedMethodInvocation.deserializeValue(ClassLoader.getSystemClassLoader());
-		}
-	}
-
-	// -------------------------------------------------------------------
-	// Serialization methods
-	// -------------------------------------------------------------------
-
-	private void writeObject(ObjectOutputStream oos) throws IOException {
-		oos.writeObject(serializedMethodInvocation);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-		serializedMethodInvocation = (SerializedValue<RemoteRpcInvocation.MethodInvocation>) ois.readObject();
-		methodInvocation = null;
-	}
-
-	// -------------------------------------------------------------------
-	// Utility classes
-	// -------------------------------------------------------------------
-
-	/**
-	 * Wrapper class for the method invocation information
-	 */
-	private static final class MethodInvocation implements Serializable {
-		private static final long serialVersionUID = 9187962608946082519L;
-
-		private String methodName;
-		private Class<?>[] parameterTypes;
-		private Object[] args;
-
-		private MethodInvocation(final String methodName, final Class<?>[] parameterTypes, final Object[] args) {
-			this.methodName = methodName;
-			this.parameterTypes = Preconditions.checkNotNull(parameterTypes);
-			this.args = args;
-		}
-
-		String getMethodName() {
-			return methodName;
-		}
-
-		Class<?>[] getParameterTypes() {
-			return parameterTypes;
-		}
-
-		Object[] getArgs() {
-			return args;
-		}
-
-		private void writeObject(ObjectOutputStream oos) throws IOException {
-			oos.writeUTF(methodName);
-
-			oos.writeInt(parameterTypes.length);
-
-			for (Class<?> parameterType : parameterTypes) {
-				oos.writeObject(parameterType);
-			}
-
-			if (args != null) {
-				oos.writeBoolean(true);
-
-				for (int i = 0; i < args.length; i++) {
-					try {
-						oos.writeObject(args[i]);
-					} catch (IOException e) {
-						throw new IOException("Could not serialize " + i + "th argument of method " +
-							methodName + ". This indicates that the argument type " +
-							args.getClass().getName() + " is not serializable. Arguments have to " +
-							"be serializable for remote rpc calls.", e);
-					}
-				}
-			} else {
-				oos.writeBoolean(false);
-			}
-		}
-
-		private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-			methodName = ois.readUTF();
-
-			int length = ois.readInt();
-
-			parameterTypes = new Class<?>[length];
-
-			for (int i = 0; i < length; i++) {
-				try {
-					parameterTypes[i] = (Class<?>) ois.readObject();
-				} catch (IOException e) {
-					throw new IOException("Could not deserialize " + i + "th parameter type of method " +
-						methodName + '.', e);
-				} catch (ClassNotFoundException e) {
-					throw new ClassNotFoundException("Could not deserialize " + i + "th " +
-						"parameter type of method " + methodName + ". This indicates that the parameter " +
-						"type is not part of the system class loader.", e);
-				}
-			}
-
-			boolean hasArgs = ois.readBoolean();
-
-			if (hasArgs) {
-				args = new Object[length];
-
-				for (int i = 0; i < length; i++) {
-					try {
-						args[i] = ois.readObject();
-					} catch (IOException e) {
-						throw new IOException("Could not deserialize " + i + "th argument of method " +
-							methodName + '.', e);
-					} catch (ClassNotFoundException e) {
-						throw new ClassNotFoundException("Could not deserialize " + i + "th " +
-							"argument of method " + methodName + ". This indicates that the argument " +
-							"type is not part of the system class loader.", e);
-					}
-				}
-			} else {
-				args = null;
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java
deleted file mode 100644
index b174c99..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import java.io.IOException;
-
-/**
- * Interface for rpc invocation messages. The interface allows to request all necessary information
- * to lookup a method and call it with the corresponding arguments.
- */
-public interface RpcInvocation {
-
-	/**
-	 * Returns the method's name.
-	 *
-	 * @return Method name
-	 * @throws IOException if the rpc invocation message is a remote message and could not be deserialized
-	 * @throws ClassNotFoundException if the rpc invocation message is a remote message and contains
-	 * serialized classes which cannot be found on the receiving side
-	 */
-	String getMethodName() throws IOException, ClassNotFoundException;
-
-	/**
-	 * Returns the method's parameter types
-	 *
-	 * @return Method's parameter types
-	 * @throws IOException if the rpc invocation message is a remote message and could not be deserialized
-	 * @throws ClassNotFoundException if the rpc invocation message is a remote message and contains
-	 * serialized classes which cannot be found on the receiving side
-	 */
-	Class<?>[] getParameterTypes() throws IOException, ClassNotFoundException;
-
-	/**
-	 * Returns the arguments of the remote procedure call
-	 *
-	 * @return Arguments of the remote procedure call
-	 * @throws IOException if the rpc invocation message is a remote message and could not be deserialized
-	 * @throws ClassNotFoundException if the rpc invocation message is a remote message and contains
-	 * serialized classes which cannot be found on the receiving side
-	 */
-	Object[] getArgs() throws IOException, ClassNotFoundException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
deleted file mode 100644
index 4b8a0b4..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-/**
- * Message for asynchronous runnable invocations
- */
-public final class RunAsync {
-
-	/** The runnable to be executed. Transient, so it gets lost upon serialization */ 
-	private final Runnable runnable;
-
-	/** The delay after which the runnable should be called */
-	private final long atTimeNanos;
-
-	/**
-	 * Creates a new {@code RunAsync} message.
-	 * 
-	 * @param runnable    The Runnable to run.
-	 * @param atTimeNanos The time (as for System.nanoTime()) when to execute the runnable.
-	 */
-	public RunAsync(Runnable runnable, long atTimeNanos) {
-		checkArgument(atTimeNanos >= 0);
-		this.runnable = checkNotNull(runnable);
-		this.atTimeNanos = atTimeNanos;
-	}
-
-	public Runnable getRunnable() {
-		return runnable;
-	}
-
-	public long getTimeNanos() {
-		return atTimeNanos;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java
deleted file mode 100644
index c596d12..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Shutdown.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
-
-/**
- * Shut down message used to trigger the shut down of an AkkaRpcActor. This
- * message is only intended for internal use by the {@link AkkaRpcService}.
- */
-public final class Shutdown {
-
-	private static Shutdown instance = new Shutdown();
-
-	public static Shutdown getInstance() {
-		return instance;
-	}
-
-	private Shutdown() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java
new file mode 100644
index 0000000..9a59101
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/FencingTokenMismatchException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
+import org.apache.flink.runtime.rpc.exceptions.RpcException;
+
+/**
+ * Exception which is thrown if the fencing tokens of a {@link FencedRpcEndpoint} do
+ * not match.
+ */
+public class FencingTokenMismatchException extends RpcException {
+	private static final long serialVersionUID = -500634972988881467L;
+
+	public FencingTokenMismatchException(String message) {
+		super(message);
+	}
+
+	public FencingTokenMismatchException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
+	public FencingTokenMismatchException(Throwable cause) {
+		super(cause);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/CallAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/CallAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/CallAsync.java
new file mode 100644
index 0000000..9aa7d70
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/CallAsync.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+
+/**
+ * Message for asynchronous callable invocations
+ */
+public final class CallAsync implements Serializable {
+	private static final long serialVersionUID = 2834204738928484060L;
+
+	private transient Callable<?> callable;
+
+	public CallAsync(Callable<?> callable) {
+		this.callable = Preconditions.checkNotNull(callable);
+	}
+
+	public Callable<?> getCallable() {
+		return callable;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java
new file mode 100644
index 0000000..c16bdd7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/ControlMessage.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+/**
+ * Base interface for control messages which are treated separately by the RPC server
+ * implementation.
+ */
+public interface ControlMessage {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1804aa33/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/FencedMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/FencedMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/FencedMessage.java
new file mode 100644
index 0000000..b67e564
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/FencedMessage.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.messages;
+
+import java.io.Serializable;
+
+/**
+ * Interface for fenced messages.
+ *
+ * @param <F> type of the fencing token
+ * @param <P> type of the payload
+ */
+public interface FencedMessage<F extends Serializable, P> {
+
+	F getFencingToken();
+
+	P getPayload();
+}


Mime
View raw message