flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [15/50] [abbrv] flink git commit: [FLINK-4362] [rpc] Auto generate rpc gateways via Java proxies
Date Wed, 21 Sep 2016 09:52:48 GMT
[FLINK-4362] [rpc] Auto generate rpc gateways via Java proxies

This PR introduces a generic AkkaRpcActor which receives rpc calls as a
RpcInvocation message. The RpcInvocation message is generated by the
AkkaInvocationHandler which gets them from automatically generated Java Proxies.

Add documentation for proxy based akka rpc service

Log unknown message type in AkkaRpcActor but do not fail actor

Use ReflectionUtil to extract RpcGateway type from RpcEndpoint

This closes #2357.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5cf6b56
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5cf6b56
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5cf6b56

Branch: refs/heads/flip-6
Commit: f5cf6b5680ff3e7bc44f2084e2754dbf8e8d5ec0
Parents: 94e0092
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Aug 10 18:42:26 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Sep 21 11:39:12 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/util/ReflectionUtil.java   |  10 +-
 .../flink/runtime/rpc/MainThreadExecutor.java   |   4 +-
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |  22 +-
 .../apache/flink/runtime/rpc/RpcService.java    |   2 +-
 .../flink/runtime/rpc/akka/AkkaGateway.java     |   4 +-
 .../runtime/rpc/akka/AkkaInvocationHandler.java | 226 +++++++++++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 175 ++++++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 121 +++++-----
 .../flink/runtime/rpc/akka/BaseAkkaActor.java   |  50 ----
 .../flink/runtime/rpc/akka/BaseAkkaGateway.java |  41 ----
 .../rpc/akka/jobmaster/JobMasterAkkaActor.java  |  58 -----
 .../akka/jobmaster/JobMasterAkkaGateway.java    |  57 -----
 .../runtime/rpc/akka/messages/CallAsync.java    |  41 ++++
 .../rpc/akka/messages/CallableMessage.java      |  33 ---
 .../runtime/rpc/akka/messages/CancelTask.java   |  36 ---
 .../runtime/rpc/akka/messages/ExecuteTask.java  |  36 ---
 .../messages/RegisterAtResourceManager.java     |  36 ---
 .../rpc/akka/messages/RegisterJobMaster.java    |  36 ---
 .../runtime/rpc/akka/messages/RequestSlot.java  |  37 ---
 .../rpc/akka/messages/RpcInvocation.java        |  98 ++++++++
 .../runtime/rpc/akka/messages/RunAsync.java     |  40 ++++
 .../rpc/akka/messages/RunnableMessage.java      |  31 ---
 .../akka/messages/UpdateTaskExecutionState.java |  37 ---
 .../ResourceManagerAkkaActor.java               |  65 ------
 .../ResourceManagerAkkaGateway.java             |  67 ------
 .../taskexecutor/TaskExecutorAkkaActor.java     |  77 -------
 .../taskexecutor/TaskExecutorAkkaGateway.java   |  59 -----
 .../flink/runtime/rpc/jobmaster/JobMaster.java  |   4 +-
 .../rpc/resourcemanager/ResourceManager.java    |   4 +-
 .../runtime/rpc/taskexecutor/TaskExecutor.java  |   4 +-
 .../flink/runtime/rpc/RpcCompletenessTest.java  |  50 ++--
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |   4 +-
 .../rpc/taskexecutor/TaskExecutorTest.java      |   2 +-
 33 files changed, 700 insertions(+), 867 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
index fe2d4c0..b851eba 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
@@ -48,6 +48,14 @@ public final class ReflectionUtil {
 		return getTemplateType(clazz, 0);
 	}
 
+	public static <T> Class<T> getTemplateType1(Type type) {
+		if (type instanceof ParameterizedType) {
+			return (Class<T>) getTemplateTypes((ParameterizedType) type)[0];
+		} else {
+			throw new IllegalArgumentException();
+		}
+	}
+
 	public static <T> Class<T> getTemplateType2(Class<?> clazz) {
 		return getTemplateType(clazz, 1);
 	}
@@ -123,7 +131,7 @@ public final class ReflectionUtil {
 		Class<?>[] types = new Class<?>[paramterizedType.getActualTypeArguments().length];
 		int i = 0;
 		for (Type templateArgument : paramterizedType.getActualTypeArguments()) {
-			assert (templateArgument instanceof Class<?>);
+			assert templateArgument instanceof Class<?>;
 			types[i++] = (Class<?>) templateArgument;
 		}
 		return types;

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
index 14b2997..882c1b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java
@@ -47,9 +47,9 @@ public interface MainThreadExecutor {
 	 * future will throw a {@link TimeoutException}.
 	 *
 	 * @param callable Callable to be executed
-	 * @param timeout Timeout for the future to complete
+	 * @param callTimeout Timeout for the future to complete
 	 * @param <V> Return value of the callable
 	 * @return Future of the callable result
 	 */
-	<V> Future<V> callAsync(Callable<V> callable, Timeout timeout);
+	<V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 0d928a8..aef0803 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rpc;
 
 import akka.util.Timeout;
 
+import org.apache.flink.util.ReflectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +61,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	/** RPC service to be used to start the RPC server and to obtain rpc gateways */
 	private final RpcService rpcService;
 
+	/** Class of the self gateway */
+	private final Class<C> selfGatewayType;
+
 	/** Self gateway which can be used to schedule asynchronous calls on yourself */
 	private final C self;
 
@@ -70,15 +74,19 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * of the executing rpc server. */
 	private final MainThreadExecutionContext mainThreadExecutionContext;
 
-
 	/**
 	 * Initializes the RPC endpoint.
 	 * 
 	 * @param rpcService The RPC server that dispatches calls to this RPC endpoint. 
 	 */
-	public RpcEndpoint(RpcService rpcService) {
+	protected RpcEndpoint(final RpcService rpcService) {
 		this.rpcService = checkNotNull(rpcService, "rpcService");
+
+		// IMPORTANT: Don't change order of selfGatewayType and self because rpcService.startServer
+		// requires that selfGatewayType has been initialized
+		this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass());
 		this.self = rpcService.startServer(this);
+		
 		this.selfAddress = rpcService.getAddress(self);
 		this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self);
 	}
@@ -149,6 +157,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	//  Asynchronous executions
 	// ------------------------------------------------------------------------
 
+
 	/**
 	 * Execute the runnable in the main thread of the underlying RPC endpoint.
 	 *
@@ -172,6 +181,15 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 		return ((MainThreadExecutor) self).callAsync(callable, timeout);
 	}
 
+	/**
+	 * Returns the class of the self gateway type.
+	 *
+	 * @return Class of the self gateway type
+	 */
+	public final Class<C> getSelfGatewayType() {
+		return selfGatewayType;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 90ff7b6..f93be83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -46,7 +46,7 @@ public interface RpcService {
 	 * @param <C> Type of the self rpc gateway associated with the rpc server
 	 * @return Self gateway to dispatch remote procedure calls to oneself
 	 */
-	<S extends RpcEndpoint, C extends RpcGateway> C startServer(S rpcEndpoint);
+	<C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint);
 
 	/**
 	 * Stop the underlying rpc server of the provided self gateway.

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
index a96a600..a826e7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
@@ -23,7 +23,7 @@ import akka.actor.ActorRef;
 /**
  * Interface for Akka based rpc gateways
  */
-public interface AkkaGateway {
+interface AkkaGateway {
 
-	ActorRef getActorRef();
+	ActorRef getRpcServer();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
new file mode 100644
index 0000000..e8e383a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
+import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.util.BitSet;
+import java.util.concurrent.Callable;
+
+/**
+ * Invocation handler to be used with a {@link AkkaRpcActor}. The invocation handler wraps the
+ * rpc in a {@link RpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is
+ * executed.
+ */
+class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor {
+	private final ActorRef rpcServer;
+
+	// default timeout for asks
+	private final Timeout timeout;
+
+	AkkaInvocationHandler(ActorRef rpcServer, Timeout timeout) {
+		this.rpcServer = Preconditions.checkNotNull(rpcServer);
+		this.timeout = Preconditions.checkNotNull(timeout);
+	}
+
+	@Override
+	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+		Class<?> declaringClass = method.getDeclaringClass();
+
+		Object result;
+
+		if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class) || declaringClass.equals(Object.class)) {
+			result = method.invoke(this, args);
+		} else {
+			String methodName = method.getName();
+			Class<?>[] parameterTypes = method.getParameterTypes();
+			Annotation[][] parameterAnnotations = method.getParameterAnnotations();
+			Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
+
+			Tuple2<Class<?>[], Object[]> filteredArguments = filterArguments(
+				parameterTypes,
+				parameterAnnotations,
+				args);
+
+			RpcInvocation rpcInvocation = new RpcInvocation(
+				methodName,
+				filteredArguments.f0,
+				filteredArguments.f1);
+
+			Class<?> returnType = method.getReturnType();
+
+			if (returnType.equals(Void.TYPE)) {
+				rpcServer.tell(rpcInvocation, ActorRef.noSender());
+
+				result = null;
+			} else if (returnType.equals(Future.class)) {
+				// execute an asynchronous call
+				result = Patterns.ask(rpcServer, rpcInvocation, futureTimeout);
+			} else {
+				// execute a synchronous call
+				Future<?> futureResult = Patterns.ask(rpcServer, rpcInvocation, futureTimeout);
+				FiniteDuration duration = timeout.duration();
+
+				result = Await.result(futureResult, duration);
+			}
+		}
+
+		return result;
+	}
+
+	@Override
+	public ActorRef getRpcServer() {
+		return rpcServer;
+	}
+
+	@Override
+	public void runAsync(Runnable runnable) {
+		// Unfortunately I couldn't find a way to allow only local communication. Therefore, the
+		// runnable field is transient transient
+		rpcServer.tell(new RunAsync(runnable), ActorRef.noSender());
+	}
+
+	@Override
+	public <V> Future<V> callAsync(Callable<V> callable, Timeout callTimeout) {
+		// Unfortunately I couldn't find a way to allow only local communication. Therefore, the
+		// callable field is declared transient
+		@SuppressWarnings("unchecked")
+		Future<V> result = (Future<V>) Patterns.ask(rpcServer, new CallAsync(callable), callTimeout);
+
+		return result;
+	}
+
+	/**
+	 * Extracts the {@link RpcTimeout} annotated rpc timeout value from the list of given method
+	 * arguments. If no {@link RpcTimeout} annotated parameter could be found, then the default
+	 * timeout is returned.
+	 *
+	 * @param parameterAnnotations Parameter annotations
+	 * @param args Array of arguments
+	 * @param defaultTimeout Default timeout to return if no {@link RpcTimeout} annotated parameter
+	 *                       has been found
+	 * @return Timeout extracted from the array of arguments or the default timeout
+	 */
+	private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Timeout defaultTimeout) {
+		if (args != null) {
+			Preconditions.checkArgument(parameterAnnotations.length == args.length);
+
+			for (int i = 0; i < parameterAnnotations.length; i++) {
+				if (isRpcTimeout(parameterAnnotations[i])) {
+					if (args[i] instanceof FiniteDuration) {
+						return new Timeout((FiniteDuration) args[i]);
+					} else {
+						throw new RuntimeException("The rpc timeout parameter must be of type " +
+							FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() +
+							" is not supported.");
+					}
+				}
+			}
+		}
+
+		return defaultTimeout;
+	}
+
+	/**
+	 * Removes all {@link RpcTimeout} annotated parameters from the parameter type and argument
+	 * list.
+	 *
+	 * @param parameterTypes Array of parameter types
+	 * @param parameterAnnotations Array of parameter annotations
+	 * @param args Arary of arguments
+	 * @return Tuple of filtered parameter types and arguments which no longer contain the
+	 * {@link RpcTimeout} annotated parameter types and arguments
+	 */
+	private static Tuple2<Class<?>[], Object[]> filterArguments(
+		Class<?>[] parameterTypes,
+		Annotation[][] parameterAnnotations,
+		Object[] args) {
+
+		Class<?>[] filteredParameterTypes;
+		Object[] filteredArgs;
+
+		if (args == null) {
+			filteredParameterTypes = parameterTypes;
+			filteredArgs = null;
+		} else {
+			Preconditions.checkArgument(parameterTypes.length == parameterAnnotations.length);
+			Preconditions.checkArgument(parameterAnnotations.length == args.length);
+
+			BitSet isRpcTimeoutParameter = new BitSet(parameterTypes.length);
+			int numberRpcParameters = parameterTypes.length;
+
+			for (int i = 0; i < parameterTypes.length; i++) {
+				if (isRpcTimeout(parameterAnnotations[i])) {
+					isRpcTimeoutParameter.set(i);
+					numberRpcParameters--;
+				}
+			}
+
+			if (numberRpcParameters == parameterTypes.length) {
+				filteredParameterTypes = parameterTypes;
+				filteredArgs = args;
+			} else {
+				filteredParameterTypes = new Class<?>[numberRpcParameters];
+				filteredArgs = new Object[numberRpcParameters];
+				int counter = 0;
+
+				for (int i = 0; i < parameterTypes.length; i++) {
+					if (!isRpcTimeoutParameter.get(i)) {
+						filteredParameterTypes[counter] = parameterTypes[i];
+						filteredArgs[counter] = args[i];
+						counter++;
+					}
+				}
+			}
+		}
+
+		return Tuple2.of(filteredParameterTypes, filteredArgs);
+	}
+
+	/**
+	 * Checks whether any of the annotations is of type {@link RpcTimeout}
+	 *
+	 * @param annotations Array of annotations
+	 * @return True if {@link RpcTimeout} was found; otherwise false
+	 */
+	private static boolean isRpcTimeout(Annotation[] annotations) {
+		for (Annotation annotation : annotations) {
+			if (annotation.annotationType().equals(RpcTimeout.class)) {
+				return true;
+			}
+		}
+
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
new file mode 100644
index 0000000..57da38a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.Status;
+import akka.actor.UntypedActor;
+import akka.pattern.Patterns;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
+import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+
+/**
+ * Akka rpc actor which receives {@link RpcInvocation}, {@link RunAsync} and {@link CallAsync}
+ * messages.
+ * <p>
+ * The {@link RpcInvocation} designates a rpc and is dispatched to the given {@link RpcEndpoint}
+ * instance.
+ * <p>
+ * The {@link RunAsync} and {@link CallAsync} messages contain executable code which is executed
+ * in the context of the actor thread.
+ *
+ * @param <C> Type of the {@link RpcGateway} associated with the {@link RpcEndpoint}
+ * @param <T> Type of the {@link RpcEndpoint}
+ */
+class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends UntypedActor {
+	private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcActor.class);
+
+	private final T rpcEndpoint;
+
+	AkkaRpcActor(final T rpcEndpoint) {
+		this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint, "rpc endpoint");
+	}
+
+	@Override
+	public void onReceive(final Object message)  {
+		if (message instanceof RunAsync) {
+			handleRunAsync((RunAsync) message);
+		} else if (message instanceof CallAsync) {
+			handleCallAsync((CallAsync) message);
+		} else if (message instanceof RpcInvocation) {
+			handleRpcInvocation((RpcInvocation) message);
+		} else {
+			LOG.warn("Received message of unknown type {}. Dropping this message!", message.getClass());
+		}
+	}
+
+	/**
+	 * Handle rpc invocations by looking up the rpc method on the rpc endpoint and calling this
+	 * method with the provided method arguments. If the method has a return value, it is returned
+	 * to the sender of the call.
+	 *
+	 * @param rpcInvocation Rpc invocation message
+	 */
+	private void handleRpcInvocation(RpcInvocation rpcInvocation) {
+		Method rpcMethod = null;
+
+		try {
+			rpcMethod = lookupRpcMethod(rpcInvocation.getMethodName(), rpcInvocation.getParameterTypes());
+		} catch (final NoSuchMethodException e) {
+			LOG.error("Could not find rpc method for rpc invocation: {}.", rpcInvocation, e);
+		}
+
+		if (rpcMethod != null) {
+			if (rpcMethod.getReturnType().equals(Void.TYPE)) {
+				// No return value to send back
+				try {
+					rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+				} catch (Throwable e) {
+					LOG.error("Error while executing remote procedure call {}.", rpcMethod, e);
+				}
+			} else {
+				try {
+					Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
+
+					if (result instanceof Future) {
+						// pipe result to sender
+						Patterns.pipe((Future<?>) result, getContext().dispatcher()).to(getSender());
+					} else {
+						// tell the sender the result of the computation
+						getSender().tell(new Status.Success(result), getSelf());
+					}
+				} catch (Throwable e) {
+					// tell the sender about the failure
+					getSender().tell(new Status.Failure(e), getSelf());
+				}
+			}
+		}
+	}
+
+	/**
+	 * Handle asynchronous {@link Callable}. This method simply executes the given {@link Callable}
+	 * in the context of the actor thread.
+	 *
+	 * @param callAsync Call async message
+	 */
+	private void handleCallAsync(CallAsync callAsync) {
+		if (callAsync.getCallable() == null) {
+			final String result = "Received a " + callAsync.getClass().getName() + " message with an empty " +
+				"callable field. This indicates that this message has been serialized " +
+				"prior to sending the message. The " + callAsync.getClass().getName() +
+				" is only supported with local communication.";
+
+			LOG.warn(result);
+
+			getSender().tell(new Status.Failure(new Exception(result)), getSelf());
+		} else {
+			try {
+				Object result = callAsync.getCallable().call();
+
+				getSender().tell(new Status.Success(result), getSelf());
+			} catch (Throwable e) {
+				getSender().tell(new Status.Failure(e), getSelf());
+			}
+		}
+	}
+
+	/**
+	 * Handle asynchronous {@link Runnable}. This method simply executes the given {@link Runnable}
+	 * in the context of the actor thread.
+	 *
+	 * @param runAsync Run async message
+	 */
+	private void handleRunAsync(RunAsync runAsync) {
+		if (runAsync.getRunnable() == null) {
+			LOG.warn("Received a {} message with an empty runnable field. This indicates " +
+				"that this message has been serialized prior to sending the message. The " +
+				"{} is only supported with local communication.",
+				runAsync.getClass().getName(),
+				runAsync.getClass().getName());
+		} else {
+			try {
+				runAsync.getRunnable().run();
+			} catch (final Throwable e) {
+				LOG.error("Caught exception while executing runnable in main thread.", e);
+			}
+		}
+	}
+
+	/**
+	 * Look up the rpc method on the given {@link RpcEndpoint} instance.
+	 *
+	 * @param methodName Name of the method
+	 * @param parameterTypes Parameter types of the method
+	 * @return Method of the rpc endpoint
+	 * @throws NoSuchMethodException
+	 */
+	private Method lookupRpcMethod(final String methodName, final Class<?>[] parameterTypes) throws NoSuchMethodException {
+		return rpcEndpoint.getClass().getMethod(methodName, parameterTypes);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index d55bd13..17983d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -29,88 +29,82 @@ import akka.dispatch.Mapper;
 import akka.pattern.AskableActorSelection;
 import akka.util.Timeout;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
-import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaActor;
-import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaGateway;
-import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaActor;
-import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaGateway;
-import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaActor;
-import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaGateway;
-import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
-import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import java.util.Collection;
 import java.util.HashSet;
-import java.util.Set;
 
+/**
+ * Akka based {@link RpcService} implementation. The rpc service starts an Akka actor to receive
+ * rpcs from a {@link RpcGateway}.
+ */
 public class AkkaRpcService implements RpcService {
+	private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcService.class);
+
 	private final ActorSystem actorSystem;
 	private final Timeout timeout;
-	private final Set<ActorRef> actors = new HashSet<>();
+	private final Collection<ActorRef> actors = new HashSet<>(4);
 
-	public AkkaRpcService(ActorSystem actorSystem, Timeout timeout) {
-		this.actorSystem = actorSystem;
-		this.timeout = timeout;
+	public AkkaRpcService(final ActorSystem actorSystem, final Timeout timeout) {
+		this.actorSystem = Preconditions.checkNotNull(actorSystem, "actor system");
+		this.timeout = Preconditions.checkNotNull(timeout, "timeout");
 	}
 
 	@Override
-	public <C extends RpcGateway> Future<C> connect(String address, final Class<C> clazz) {
-		ActorSelection actorSel = actorSystem.actorSelection(address);
+	public <C extends RpcGateway> Future<C> connect(final String address, final Class<C> clazz) {
+		LOG.info("Try to connect to remote rpc server with address {}. Returning a {} gateway.", address, clazz.getName());
 
-		AskableActorSelection asker = new AskableActorSelection(actorSel);
+		final ActorSelection actorSel = actorSystem.actorSelection(address);
 
-		Future<Object> identify = asker.ask(new Identify(42), timeout);
+		final AskableActorSelection asker = new AskableActorSelection(actorSel);
+
+		final Future<Object> identify = asker.ask(new Identify(42), timeout);
 
 		return identify.map(new Mapper<Object, C>(){
+			@Override
 			public C apply(Object obj) {
 				ActorRef actorRef = ((ActorIdentity) obj).getRef();
 
-				if (clazz == TaskExecutorGateway.class) {
-					return (C) new TaskExecutorAkkaGateway(actorRef, timeout);
-				} else if (clazz == ResourceManagerGateway.class) {
-					return (C) new ResourceManagerAkkaGateway(actorRef, timeout);
-				} else if (clazz == JobMasterGateway.class) {
-					return (C) new JobMasterAkkaGateway(actorRef, timeout);
-				} else {
-					throw new RuntimeException("Could not find remote endpoint " + clazz);
-				}
+				InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(actorRef, timeout);
+
+				@SuppressWarnings("unchecked")
+				C proxy = (C) Proxy.newProxyInstance(
+					ClassLoader.getSystemClassLoader(),
+					new Class<?>[] {clazz},
+					akkaInvocationHandler);
+
+				return proxy;
 			}
 		}, actorSystem.dispatcher());
 	}
 
 	@Override
-	public <S extends RpcEndpoint, C extends RpcGateway> C startServer(S rpcEndpoint) {
-		ActorRef ref;
-		C self;
-		if (rpcEndpoint instanceof TaskExecutor) {
-			ref = actorSystem.actorOf(
-				Props.create(TaskExecutorAkkaActor.class, rpcEndpoint)
-			);
-
-			self = (C) new TaskExecutorAkkaGateway(ref, timeout);
-		} else if (rpcEndpoint instanceof ResourceManager) {
-			ref = actorSystem.actorOf(
-				Props.create(ResourceManagerAkkaActor.class, rpcEndpoint)
-			);
-
-			self = (C) new ResourceManagerAkkaGateway(ref, timeout);
-		} else if (rpcEndpoint instanceof JobMaster) {
-			ref = actorSystem.actorOf(
-				Props.create(JobMasterAkkaActor.class, rpcEndpoint)
-			);
-
-			self = (C) new JobMasterAkkaGateway(ref, timeout);
-		} else {
-			throw new RuntimeException("Could not start RPC server for class " + rpcEndpoint.getClass());
-		}
+	public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S rpcEndpoint) {
+		Preconditions.checkNotNull(rpcEndpoint, "rpc endpoint");
 
-		actors.add(ref);
+		LOG.info("Start Akka rpc actor to handle rpcs for {}.", rpcEndpoint.getClass().getName());
+
+		Props akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint);
+
+		ActorRef actorRef = actorSystem.actorOf(akkaRpcActorProps);
+		actors.add(actorRef);
+
+		InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(actorRef, timeout);
+
+		@SuppressWarnings("unchecked")
+		C self = (C) Proxy.newProxyInstance(
+			ClassLoader.getSystemClassLoader(),
+			new Class<?>[]{rpcEndpoint.getSelfGatewayType(), MainThreadExecutor.class, AkkaGateway.class},
+			akkaInvocationHandler);
 
 		return self;
 	}
@@ -120,16 +114,19 @@ public class AkkaRpcService implements RpcService {
 		if (selfGateway instanceof AkkaGateway) {
 			AkkaGateway akkaClient = (AkkaGateway) selfGateway;
 
-			if (actors.contains(akkaClient.getActorRef())) {
-				akkaClient.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
-			} else {
-				// don't stop this actor since it was not started by this RPC service
+			if (actors.contains(akkaClient.getRpcServer())) {
+				ActorRef selfActorRef = akkaClient.getRpcServer();
+
+				LOG.info("Stop Akka rpc actor {}.", selfActorRef.path());
+
+				selfActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
 			}
 		}
 	}
 
 	@Override
 	public void stopService() {
+		LOG.info("Stop Akka rpc service.");
 		actorSystem.shutdown();
 		actorSystem.awaitTermination();
 	}
@@ -137,9 +134,11 @@ public class AkkaRpcService implements RpcService {
 	@Override
 	public <C extends RpcGateway> String getAddress(C selfGateway) {
 		if (selfGateway instanceof AkkaGateway) {
-			return AkkaUtils.getAkkaURL(actorSystem, ((AkkaGateway) selfGateway).getActorRef());
+			ActorRef actorRef = ((AkkaGateway) selfGateway).getRpcServer();
+			return AkkaUtils.getAkkaURL(actorSystem, actorRef);
 		} else {
-			throw new RuntimeException("Cannot get address for non " + AkkaGateway.class.getName() + ".");
+			String className = AkkaGateway.class.getName();
+			throw new RuntimeException("Cannot get address for non " + className + '.');
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
deleted file mode 100644
index 3cb499c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka;
-
-import akka.actor.Status;
-import akka.actor.UntypedActor;
-import org.apache.flink.runtime.rpc.akka.messages.CallableMessage;
-import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BaseAkkaActor extends UntypedActor {
-	private static final Logger LOG = LoggerFactory.getLogger(BaseAkkaActor.class);
-
-	@Override
-	public void onReceive(Object message) throws Exception {
-		if (message instanceof RunnableMessage) {
-			try {
-				((RunnableMessage) message).getRunnable().run();
-			} catch (Exception e) {
-				LOG.error("Encountered error while executing runnable.", e);
-			}
-		} else if (message instanceof CallableMessage<?>) {
-			try {
-				Object result = ((CallableMessage<?>) message).getCallable().call();
-				sender().tell(new Status.Success(result), getSelf());
-			} catch (Exception e) {
-				sender().tell(new Status.Failure(e), getSelf());
-			}
-		} else {
-			throw new RuntimeException("Unknown message " + message);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
deleted file mode 100644
index 512790d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka;
-
-import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import org.apache.flink.runtime.rpc.MainThreadExecutor;
-import org.apache.flink.runtime.rpc.akka.messages.CallableMessage;
-import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage;
-import scala.concurrent.Future;
-
-import java.util.concurrent.Callable;
-
-public abstract class BaseAkkaGateway implements MainThreadExecutor, AkkaGateway {
-	@Override
-	public void runAsync(Runnable runnable) {
-		getActorRef().tell(new RunnableMessage(runnable), ActorRef.noSender());
-	}
-
-	@Override
-	public <V> Future<V> callAsync(Callable<V> callable, Timeout timeout) {
-		return (Future<V>) Patterns.ask(getActorRef(), new CallableMessage(callable), timeout);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
deleted file mode 100644
index 9e04ea9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.jobmaster;
-
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
-import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager;
-import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
-
-public class JobMasterAkkaActor extends BaseAkkaActor {
-	private final JobMaster jobMaster;
-
-	public JobMasterAkkaActor(JobMaster jobMaster) {
-		this.jobMaster = jobMaster;
-	}
-
-	@Override
-	public void onReceive(Object message) throws Exception {
-		if (message instanceof UpdateTaskExecutionState) {
-
-			final ActorRef sender = getSender();
-
-			UpdateTaskExecutionState updateTaskExecutionState = (UpdateTaskExecutionState) message;
-
-			try {
-				Acknowledge result = jobMaster.updateTaskExecutionState(updateTaskExecutionState.getTaskExecutionState());
-				sender.tell(new Status.Success(result), getSelf());
-			} catch (Exception e) {
-				sender.tell(new Status.Failure(e), getSelf());
-			}
-		} else if (message instanceof RegisterAtResourceManager) {
-			RegisterAtResourceManager registerAtResourceManager = (RegisterAtResourceManager) message;
-
-			jobMaster.registerAtResourceManager(registerAtResourceManager.getAddress());
-		} else {
-			super.onReceive(message);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
deleted file mode 100644
index e6bf061..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.jobmaster;
-
-import akka.actor.ActorRef;
-import akka.pattern.AskableActorRef;
-import akka.util.Timeout;
-import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
-import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager;
-import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import scala.concurrent.Future;
-import scala.reflect.ClassTag$;
-
-public class JobMasterAkkaGateway extends BaseAkkaGateway implements JobMasterGateway {
-	private final AskableActorRef actorRef;
-	private final Timeout timeout;
-
-	public JobMasterAkkaGateway(ActorRef actorRef, Timeout timeout) {
-		this.actorRef = new AskableActorRef(actorRef);
-		this.timeout = timeout;
-	}
-
-	@Override
-	public Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState) {
-		return actorRef.ask(new UpdateTaskExecutionState(taskExecutionState), timeout)
-			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
-	}
-
-	@Override
-	public void registerAtResourceManager(String address) {
-		actorRef.actorRef().tell(new RegisterAtResourceManager(address), actorRef.actorRef());
-	}
-
-	@Override
-	public ActorRef getActorRef() {
-		return actorRef.actorRef();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java
new file mode 100644
index 0000000..79b7825
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallAsync.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.concurrent.Callable;
+
+/**
+ * Message for asynchronous callable invocations
+ */
+public final class CallAsync implements Serializable {
+	private static final long serialVersionUID = 2834204738928484060L;
+
+	private transient Callable<?> callable;
+
+	public CallAsync(Callable<?> callable) {
+		this.callable = Preconditions.checkNotNull(callable);
+	}
+
+	public Callable<?> getCallable() {
+		return callable;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
deleted file mode 100644
index f0e555f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import java.util.concurrent.Callable;
-
-public class CallableMessage<V> {
-	private final Callable<V> callable;
-
-	public CallableMessage(Callable<V> callable) {
-		this.callable = callable;
-	}
-
-	public Callable<V> getCallable() {
-		return callable;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
deleted file mode 100644
index 0b9e9dc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-
-import java.io.Serializable;
-
-public class CancelTask implements Serializable {
-	private static final long serialVersionUID = -2998176874447950595L;
-	private final ExecutionAttemptID executionAttemptID;
-
-	public CancelTask(ExecutionAttemptID executionAttemptID) {
-		this.executionAttemptID = executionAttemptID;
-	}
-
-	public ExecutionAttemptID getExecutionAttemptID() {
-		return executionAttemptID;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
deleted file mode 100644
index a83d539..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-
-import java.io.Serializable;
-
-public class ExecuteTask implements Serializable {
-	private static final long serialVersionUID = -6769958430967048348L;
-	private final TaskDeploymentDescriptor taskDeploymentDescriptor;
-
-	public ExecuteTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
-		this.taskDeploymentDescriptor = taskDeploymentDescriptor;
-	}
-
-	public TaskDeploymentDescriptor getTaskDeploymentDescriptor() {
-		return taskDeploymentDescriptor;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
deleted file mode 100644
index 3ade082..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import java.io.Serializable;
-
-public class RegisterAtResourceManager implements Serializable {
-
-	private static final long serialVersionUID = -4175905742620903602L;
-
-	private final String address;
-
-	public RegisterAtResourceManager(String address) {
-		this.address = address;
-	}
-
-	public String getAddress() {
-		return address;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
deleted file mode 100644
index b35ea38..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
-
-import java.io.Serializable;
-
-public class RegisterJobMaster implements Serializable{
-	private static final long serialVersionUID = -4616879574192641507L;
-	private final JobMasterRegistration jobMasterRegistration;
-
-	public RegisterJobMaster(JobMasterRegistration jobMasterRegistration) {
-		this.jobMasterRegistration = jobMasterRegistration;
-	}
-
-	public JobMasterRegistration getJobMasterRegistration() {
-		return jobMasterRegistration;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
deleted file mode 100644
index 85ceeec..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
-
-import java.io.Serializable;
-
-public class RequestSlot implements Serializable {
-	private static final long serialVersionUID = 7207463889348525866L;
-
-	private final SlotRequest slotRequest;
-
-	public RequestSlot(SlotRequest slotRequest) {
-		this.slotRequest = slotRequest;
-	}
-
-	public SlotRequest getSlotRequest() {
-		return slotRequest;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java
new file mode 100644
index 0000000..5d52ef1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RpcInvocation.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Rpc invocation message containing the remote procedure name, its parameter types and the
+ * corresponding call arguments.
+ */
+public final class RpcInvocation implements Serializable {
+	private static final long serialVersionUID = -7058254033460536037L;
+
+	private final String methodName;
+	private final Class<?>[] parameterTypes;
+	private transient Object[] args;
+
+	public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] args) {
+		this.methodName = Preconditions.checkNotNull(methodName);
+		this.parameterTypes = Preconditions.checkNotNull(parameterTypes);
+		this.args = args;
+	}
+
+	public String getMethodName() {
+		return methodName;
+	}
+
+	public Class<?>[] getParameterTypes() {
+		return parameterTypes;
+	}
+
+	public Object[] getArgs() {
+		return args;
+	}
+
+	private void writeObject(ObjectOutputStream oos) throws IOException {
+		oos.defaultWriteObject();
+
+		if (args != null) {
+			// write has args true
+			oos.writeBoolean(true);
+
+			for (int i = 0; i < args.length; i++) {
+				try {
+					oos.writeObject(args[i]);
+				} catch (IOException e) {
+					Class<?> argClass = args[i].getClass();
+
+					throw new IOException("Could not write " + i + "th argument of method " +
+						methodName + ". The argument type is " + argClass + ". " +
+						"Make sure that this type is serializable.", e);
+				}
+			}
+		} else {
+			// write has args false
+			oos.writeBoolean(false);
+		}
+	}
+
+	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
+		ois.defaultReadObject();
+
+		boolean hasArgs = ois.readBoolean();
+
+		if (hasArgs) {
+			int numberArguments = parameterTypes.length;
+
+			args = new Object[numberArguments];
+
+			for (int i = 0; i < numberArguments; i++) {
+				args[i] = ois.readObject();
+			}
+		} else {
+			args = null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
new file mode 100644
index 0000000..fb95852
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Message for asynchronous runnable invocations
+ */
+public final class RunAsync implements Serializable {
+	private static final long serialVersionUID = -3080595100695371036L;
+
+	private final transient Runnable runnable;
+
+	public RunAsync(Runnable runnable) {
+		this.runnable = Preconditions.checkNotNull(runnable);
+	}
+
+	public Runnable getRunnable() {
+		return runnable;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
deleted file mode 100644
index 3556738..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-public class RunnableMessage {
-	private final Runnable runnable;
-
-	public RunnableMessage(Runnable runnable) {
-		this.runnable = runnable;
-	}
-
-	public Runnable getRunnable() {
-		return runnable;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
deleted file mode 100644
index f89cd2f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.messages;
-
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-
-import java.io.Serializable;
-
-public class UpdateTaskExecutionState implements Serializable{
-	private static final long serialVersionUID = -6662229114427331436L;
-
-	private final TaskExecutionState taskExecutionState;
-
-	public UpdateTaskExecutionState(TaskExecutionState taskExecutionState) {
-		this.taskExecutionState = taskExecutionState;
-	}
-
-	public TaskExecutionState getTaskExecutionState() {
-		return taskExecutionState;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
deleted file mode 100644
index 13101f9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.resourcemanager;
-
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import akka.pattern.Patterns;
-import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
-import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
-import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
-import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
-import scala.concurrent.Future;
-
-public class ResourceManagerAkkaActor extends BaseAkkaActor {
-	private final ResourceManager resourceManager;
-
-	public ResourceManagerAkkaActor(ResourceManager resourceManager) {
-		this.resourceManager = resourceManager;
-	}
-
-	@Override
-	public void onReceive(Object message) throws Exception {
-		final ActorRef sender = getSender();
-
-		if (message instanceof RegisterJobMaster) {
-			RegisterJobMaster registerJobMaster = (RegisterJobMaster) message;
-
-			try {
-				Future<RegistrationResponse> response = resourceManager.registerJobMaster(registerJobMaster.getJobMasterRegistration());
-				Patterns.pipe(response, getContext().dispatcher()).to(sender());
-			} catch (Exception e) {
-				sender.tell(new Status.Failure(e), getSelf());
-			}
-		} else if (message instanceof RequestSlot) {
-			RequestSlot requestSlot = (RequestSlot) message;
-
-			try {
-				SlotAssignment response = resourceManager.requestSlot(requestSlot.getSlotRequest());
-				sender.tell(new Status.Success(response), getSelf());
-			} catch (Exception e) {
-				sender.tell(new Status.Failure(e), getSelf());
-			}
-		} else {
-			super.onReceive(message);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
deleted file mode 100644
index 1304707..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.resourcemanager;
-
-import akka.actor.ActorRef;
-import akka.pattern.AskableActorRef;
-import akka.util.Timeout;
-import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
-import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
-import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
-import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
-import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-public class ResourceManagerAkkaGateway extends BaseAkkaGateway implements ResourceManagerGateway {
-	private final AskableActorRef actorRef;
-	private final Timeout timeout;
-
-	public ResourceManagerAkkaGateway(ActorRef actorRef, Timeout timeout) {
-		this.actorRef = new AskableActorRef(actorRef);
-		this.timeout = timeout;
-	}
-
-	@Override
-	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration, FiniteDuration timeout) {
-		return actorRef.ask(new RegisterJobMaster(jobMasterRegistration), new Timeout(timeout))
-			.mapTo(ClassTag$.MODULE$.<RegistrationResponse>apply(RegistrationResponse.class));
-	}
-
-	@Override
-	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
-		return actorRef.ask(new RegisterJobMaster(jobMasterRegistration), timeout)
-			.mapTo(ClassTag$.MODULE$.<RegistrationResponse>apply(RegistrationResponse.class));
-	}
-
-	@Override
-	public Future<SlotAssignment> requestSlot(SlotRequest slotRequest) {
-		return actorRef.ask(new RequestSlot(slotRequest), timeout)
-			.mapTo(ClassTag$.MODULE$.<SlotAssignment>apply(SlotAssignment.class));
-	}
-
-	@Override
-	public ActorRef getActorRef() {
-		return actorRef.actorRef();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
deleted file mode 100644
index ed522cc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.taskexecutor;
-
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import akka.dispatch.OnComplete;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.akka.BaseAkkaActor;
-import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
-import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
-import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
-
-public class TaskExecutorAkkaActor extends BaseAkkaActor {
-	private final TaskExecutorGateway taskExecutor;
-
-	public TaskExecutorAkkaActor(TaskExecutorGateway taskExecutor) {
-		this.taskExecutor = taskExecutor;
-	}
-
-	@Override
-	public void onReceive(Object message) throws Exception {
-		final ActorRef sender = getSender();
-
-		if (message instanceof ExecuteTask) {
-			ExecuteTask executeTask = (ExecuteTask) message;
-
-			taskExecutor.executeTask(executeTask.getTaskDeploymentDescriptor()).onComplete(
-				new OnComplete<Acknowledge>() {
-					@Override
-					public void onComplete(Throwable failure, Acknowledge success) throws Throwable {
-						if (failure != null) {
-							sender.tell(new Status.Failure(failure), getSelf());
-						} else {
-							sender.tell(new Status.Success(Acknowledge.get()), getSelf());
-						}
-					}
-				},
-				getContext().dispatcher()
-			);
-		} else if (message instanceof CancelTask) {
-			CancelTask cancelTask = (CancelTask) message;
-
-			taskExecutor.cancelTask(cancelTask.getExecutionAttemptID()).onComplete(
-				new OnComplete<Acknowledge>() {
-					@Override
-					public void onComplete(Throwable failure, Acknowledge success) throws Throwable {
-						if (failure != null) {
-							sender.tell(new Status.Failure(failure), getSelf());
-						} else {
-							sender.tell(new Status.Success(Acknowledge.get()), getSelf());
-						}
-					}
-				},
-				getContext().dispatcher()
-			);
-		} else {
-			super.onReceive(message);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
deleted file mode 100644
index 7f0a522..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.akka.taskexecutor;
-
-import akka.actor.ActorRef;
-import akka.pattern.AskableActorRef;
-import akka.util.Timeout;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway;
-import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
-import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
-import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
-import scala.concurrent.Future;
-import scala.reflect.ClassTag$;
-
-public class TaskExecutorAkkaGateway extends BaseAkkaGateway implements TaskExecutorGateway {
-	private final AskableActorRef actorRef;
-	private final Timeout timeout;
-
-	public TaskExecutorAkkaGateway(ActorRef actorRef, Timeout timeout) {
-		this.actorRef = new AskableActorRef(actorRef);
-		this.timeout = timeout;
-	}
-
-	@Override
-	public Future<Acknowledge> executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
-		return actorRef.ask(new ExecuteTask(taskDeploymentDescriptor), timeout)
-			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
-	}
-
-	@Override
-	public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId) {
-		return actorRef.ask(new CancelTask(executionAttemptId), timeout)
-			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
-	}
-
-	@Override
-	public ActorRef getActorRef() {
-		return actorRef.actorRef();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
index b81b19c..e53cd68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.util.Preconditions;
 import scala.Tuple2;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
@@ -76,7 +77,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	public JobMaster(RpcService rpcService, ExecutorService executorService) {
 		super(rpcService);
-		executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
+		executionContext = ExecutionContext$.MODULE$.fromExecutor(
+			Preconditions.checkNotNull(executorService));
 		scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
index c7e8def..729ef0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import org.apache.flink.util.Preconditions;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.Future;
@@ -49,7 +50,8 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
 
 	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
 		super(rpcService);
-		this.executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
+		this.executionContext = ExecutionContext$.MODULE$.fromExecutor(
+			Preconditions.checkNotNull(executorService));
 		this.jobMasterGateways = new HashMap<>();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
index cdfc3bd..3a7dd9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
 import scala.concurrent.ExecutionContext;
 
 import java.util.HashSet;
@@ -47,7 +48,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 
 	public TaskExecutor(RpcService rpcService, ExecutorService executorService) {
 		super(rpcService);
-		this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(executorService);
+		this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(
+			Preconditions.checkNotNull(executorService));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f5cf6b56/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index 0ded25e..e50533e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.util.ReflectionUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.reflections.Reflections;
 import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -51,9 +51,8 @@ public class RpcCompletenessTest extends TestLogger {
 
 		for (Class<? extends RpcEndpoint> rpcEndpoint :classes){
 			c = rpcEndpoint;
-			Type superClass = c.getGenericSuperclass();
 
-			Class<?> rpcGatewayType = extractTypeParameter(superClass, 0);
+			Class<?> rpcGatewayType = ReflectionUtil.getTemplateType1(c);
 
 			if (rpcGatewayType != null) {
 				checkCompleteness(rpcEndpoint, (Class<? extends RpcGateway>) rpcGatewayType);
@@ -137,13 +136,16 @@ public class RpcCompletenessTest extends TestLogger {
 		}
 
 		Annotation[][] parameterAnnotations = gatewayMethod.getParameterAnnotations();
+		Class<?>[] parameterTypes = gatewayMethod.getParameterTypes();
 		int rpcTimeoutParameters = 0;
 
-		for (Annotation[] parameterAnnotation : parameterAnnotations) {
-			for (Annotation annotation : parameterAnnotation) {
-				if (annotation.equals(RpcTimeout.class)) {
-					rpcTimeoutParameters++;
-				}
+		for (int i = 0; i < parameterAnnotations.length; i++) {
+			if (isRpcTimeout(parameterAnnotations[i])) {
+				assertTrue(
+					"The rpc timeout has to be of type " + FiniteDuration.class.getName() + ".",
+					parameterTypes[i].equals(FiniteDuration.class));
+
+				rpcTimeoutParameters++;
 			}
 		}
 
@@ -211,10 +213,10 @@ public class RpcCompletenessTest extends TestLogger {
 				if (!futureClass.equals(RpcCompletenessTest.futureClass)) {
 					return false;
 				} else {
-					Class<?> valueClass = extractTypeParameter(futureClass, 0);
+					Class<?> valueClass = ReflectionUtil.getTemplateType1(gatewayMethod.getGenericReturnType());
 
 					if (endpointMethod.getReturnType().equals(futureClass)) {
-						Class<?> rpcEndpointValueClass = extractTypeParameter(endpointMethod.getReturnType(), 0);
+						Class<?> rpcEndpointValueClass = ReflectionUtil.getTemplateType1(endpointMethod.getGenericReturnType());
 
 						// check if we have the same future value types
 						if (valueClass != null && rpcEndpointValueClass != null && !checkType(valueClass, rpcEndpointValueClass)) {
@@ -251,7 +253,7 @@ public class RpcCompletenessTest extends TestLogger {
 		if (method.getReturnType().equals(Void.TYPE)) {
 			builder.append("void").append(" ");
 		} else if (method.getReturnType().equals(futureClass)) {
-			Class<?> valueClass = extractTypeParameter(method.getGenericReturnType(), 0);
+			Class<?> valueClass = ReflectionUtil.getTemplateType1(method.getGenericReturnType());
 
 			builder
 				.append(futureClass.getSimpleName())
@@ -291,30 +293,6 @@ public class RpcCompletenessTest extends TestLogger {
 		return builder.toString();
 	}
 
-	private Class<?> extractTypeParameter(Type genericType, int position) {
-		if (genericType instanceof ParameterizedType) {
-			ParameterizedType parameterizedType = (ParameterizedType) genericType;
-
-			Type[] typeArguments = parameterizedType.getActualTypeArguments();
-
-			if (position < 0 || position >= typeArguments.length) {
-				throw new IndexOutOfBoundsException("The generic type " +
-					parameterizedType.getRawType() + " only has " + typeArguments.length +
-					" type arguments.");
-			} else {
-				Type typeArgument = typeArguments[position];
-
-				if (typeArgument instanceof Class<?>) {
-					return (Class<?>) typeArgument;
-				} else {
-					return null;
-				}
-			}
-		} else {
-			return null;
-		}
-	}
-
 	private boolean isRpcTimeout(Annotation[] annotations) {
 		for (Annotation annotation : annotations) {
 			if (annotation.annotationType().equals(RpcTimeout.class)) {


Mime
View raw message