flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/2] flink git commit: Add new RPC abstraction
Date Thu, 04 Aug 2016 12:17:23 GMT
Add new RPC abstraction

Introduce explicit executor serivces for future handling

Restructured rpc package into components

Add TaskExecutorTest

Add rpc completeness test

The completeness test checks that the RpcGateway methods are contained in the corresponding
RpcServer implementation annotated with RpcMethod.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58391149
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58391149
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58391149

Branch: refs/heads/flip-6
Commit: 58391149715ddc1c078cc6c61f52013c0410bf6c
Parents: 245f02b
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Aug 3 19:31:34 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Aug 4 14:17:35 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcGateway.java    |  25 +++
 .../org/apache/flink/runtime/rpc/RpcMethod.java |  29 +++
 .../org/apache/flink/runtime/rpc/RpcServer.java |  40 ++++
 .../apache/flink/runtime/rpc/RpcService.java    |  31 +++
 .../flink/runtime/rpc/akka/AkkaGateway.java     |  29 +++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 135 +++++++++++++
 .../rpc/akka/jobmaster/JobMasterAkkaActor.java  |  63 ++++++
 .../akka/jobmaster/JobMasterAkkaGateway.java    |  65 ++++++
 .../runtime/rpc/akka/messages/CancelTask.java   |  36 ++++
 .../runtime/rpc/akka/messages/ExecuteTask.java  |  36 ++++
 .../messages/HandleRegistrationResponse.java    |  40 ++++
 .../rpc/akka/messages/RegisterJobMaster.java    |  36 ++++
 .../runtime/rpc/akka/messages/RequestSlot.java  |  37 ++++
 .../TriggerResourceManagerRegistration.java     |  31 +++
 .../akka/messages/UpdateTaskExecutionState.java |  37 ++++
 .../ResourceManagerAkkaActor.java               |  63 ++++++
 .../ResourceManagerAkkaGateway.java             |  60 ++++++
 .../taskexecutor/TaskExecutorAkkaActor.java     |  77 ++++++++
 .../taskexecutor/TaskExecutorAkkaGateway.java   |  59 ++++++
 .../flink/runtime/rpc/jobmaster/JobMaster.java  | 110 +++++++++++
 .../runtime/rpc/jobmaster/JobMasterGateway.java |  34 ++++
 .../resourcemanager/JobMasterRegistration.java  |  25 +++
 .../resourcemanager/RegistrationResponse.java   |  25 +++
 .../rpc/resourcemanager/ResourceManager.java    |  63 ++++++
 .../resourcemanager/ResourceManagerGateway.java |  27 +++
 .../rpc/resourcemanager/SlotAssignment.java     |  25 +++
 .../rpc/resourcemanager/SlotRequest.java        |  25 +++
 .../runtime/rpc/taskexecutor/TaskExecutor.java  |  72 +++++++
 .../rpc/taskexecutor/TaskExecutorGateway.java   |  30 +++
 .../resourcemanager/ResourceManagerITCase.java  |   1 -
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 194 ++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  77 ++++++++
 .../rpc/taskexecutor/TaskExecutorTest.java      |  93 +++++++++
 .../runtime/util/DirectExecutorService.java     | 197 +++++++++++++++++++
 flink-tests/pom.xml                             |   1 -
 pom.xml                                         |   7 +
 36 files changed, 1933 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
new file mode 100644
index 0000000..bd8730e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+/**
+ * Marker interface for rpc gateways
+ */
+public interface RpcGateway {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
new file mode 100644
index 0000000..3bceaec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RpcMethod {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
new file mode 100644
index 0000000..fba9250
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+/**
+ * Marker interface for rpc servers. Every rpc server should implement this interface.
+ *
+ * @param <C> Rpc client counter part matching the RpcServer
+ */
+public interface RpcServer<C extends RpcGateway> {
+	/**
+	 * Get self-gateway which should be used to run asynchronous rpc calls on the server.
+	 *
+	 * IMPORTANT: Always issue local method calls via the self-gateway if the current thread
+	 * is not the main thread of the rpc server, e.g. from within a future callback.
+	 *
+	 * @return Self gateway
+	 */
+	C getSelf();
+
+	void start();
+
+	void shutDown();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
new file mode 100644
index 0000000..a12039c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import scala.concurrent.Future;
+
+public interface RpcService {
+	<C extends RpcGateway> Future<C> connect(String address, Class<C> clazz);
+
+	<S extends RpcServer, C extends RpcGateway> C startServer(S methodHandler, Class<C> rpcClientClass);
+
+	<T> void stopServer(T server);
+
+	void stopService();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
new file mode 100644
index 0000000..a96a600
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorRef;
+
+/**
+ * Interface for Akka based rpc gateways
+ */
+public interface AkkaGateway {
+
+	ActorRef getActorRef();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
new file mode 100644
index 0000000..cb707bb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorIdentity;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Identify;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Mapper;
+import akka.pattern.AskableActorSelection;
+import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcServer;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaActor;
+import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaGateway;
+import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaActor;
+import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaGateway;
+import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaActor;
+import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaGateway;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor;
+import scala.concurrent.Future;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class AkkaRpcService implements RpcService {
+	private final ActorSystem actorSystem;
+	private final Timeout timeout;
+	private final Set<ActorRef> actors = new HashSet<>();
+
+	public AkkaRpcService(ActorSystem actorSystem, Timeout timeout) {
+		this.actorSystem = actorSystem;
+		this.timeout = timeout;
+	}
+
+	@Override
+	public <C extends RpcGateway> Future<C> connect(String address, final Class<C> clazz) {
+		ActorSelection actorSel = actorSystem.actorSelection(address);
+
+		AskableActorSelection asker = new AskableActorSelection(actorSel);
+
+		Future<Object> identify = asker.ask(new Identify(42), timeout);
+
+		return identify.map(new Mapper<Object, C>(){
+			public C apply(Object obj) {
+				ActorRef actorRef = ((ActorIdentity) obj).getRef();
+
+				if (clazz == TaskExecutorGateway.class) {
+					return (C) new TaskExecutorAkkaGateway(actorRef, timeout);
+				} else if (clazz == ResourceManagerGateway.class) {
+					return (C) new ResourceManagerAkkaGateway(actorRef, timeout);
+				} else if (clazz == JobMasterGateway.class) {
+					return (C) new JobMasterAkkaGateway(actorRef, timeout);
+				} else {
+					throw new RuntimeException("Could not find remote endpoint " + clazz);
+				}
+			}
+		}, actorSystem.dispatcher());
+	}
+
+	@Override
+	public <S extends RpcServer, C extends RpcGateway> C startServer(S methodHandler, Class<C> rpcClientClass) {
+		ActorRef ref;
+		C self;
+		if (methodHandler instanceof TaskExecutor) {
+			ref = actorSystem.actorOf(
+				Props.create(TaskExecutorAkkaActor.class, methodHandler)
+			);
+
+			self = (C) new TaskExecutorAkkaGateway(ref, timeout);
+		} else if (methodHandler instanceof ResourceManager) {
+			ref = actorSystem.actorOf(
+				Props.create(ResourceManagerAkkaActor.class, methodHandler)
+			);
+
+			self = (C) new ResourceManagerAkkaGateway(ref, timeout);
+		} else if (methodHandler instanceof JobMaster) {
+			ref = actorSystem.actorOf(
+				Props.create(JobMasterAkkaActor.class, methodHandler)
+			);
+
+			self = (C) new JobMasterAkkaGateway(ref, timeout);
+		} else {
+			throw new RuntimeException("Could not start RPC server for class " + methodHandler.getClass());
+		}
+
+		actors.add(ref);
+
+		return self;
+	}
+
+	@Override
+	public <T> void stopServer(T server) {
+		if (server instanceof AkkaGateway) {
+			AkkaGateway akkaClient = (AkkaGateway) server;
+
+			if (actors.contains(akkaClient.getActorRef())) {
+				akkaClient.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			} else {
+				// don't stop this actor since it was not started by this RPC service
+			}
+		}
+	}
+
+	@Override
+	public void stopService() {
+		actorSystem.shutdown();
+		actorSystem.awaitTermination();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
new file mode 100644
index 0000000..a91d7d4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.jobmaster;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import akka.actor.UntypedActor;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.messages.HandleRegistrationResponse;
+import org.apache.flink.runtime.rpc.akka.messages.TriggerResourceManagerRegistration;
+import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
+
+public class JobMasterAkkaActor extends UntypedActor {
+	private final JobMaster jobMaster;
+
+	public JobMasterAkkaActor(JobMaster jobMaster) {
+		this.jobMaster = jobMaster;
+	}
+
+	@Override
+	public void onReceive(Object message) throws Exception {
+		if (message instanceof UpdateTaskExecutionState) {
+
+			final ActorRef sender = getSender();
+
+			UpdateTaskExecutionState updateTaskExecutionState = (UpdateTaskExecutionState) message;
+
+			try {
+				Acknowledge result = jobMaster.updateTaskExecutionState(updateTaskExecutionState.getTaskExecutionState());
+				sender.tell(new Status.Success(result), getSelf());
+			} catch (Exception e) {
+				sender.tell(new Status.Failure(e), getSelf());
+			}
+		} else if (message instanceof TriggerResourceManagerRegistration) {
+			TriggerResourceManagerRegistration triggerResourceManagerRegistration = (TriggerResourceManagerRegistration) message;
+
+			jobMaster.triggerResourceManagerRegistration(triggerResourceManagerRegistration.getAddress());
+		} else if (message instanceof HandleRegistrationResponse) {
+			HandleRegistrationResponse registrationResponse = (HandleRegistrationResponse) message;
+
+			jobMaster.handleRegistrationResponse(registrationResponse.getRegistrationResponse(), registrationResponse.getResourceManagerGateway());
+		} else {
+			throw new RuntimeException("Unknown message type received: " + message.getClass());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
new file mode 100644
index 0000000..fe4b0c1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.jobmaster;
+
+import akka.actor.ActorRef;
+import akka.pattern.AskableActorRef;
+import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.akka.AkkaGateway;
+import org.apache.flink.runtime.rpc.akka.messages.HandleRegistrationResponse;
+import org.apache.flink.runtime.rpc.akka.messages.TriggerResourceManagerRegistration;
+import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import scala.concurrent.Future;
+import scala.reflect.ClassTag$;
+
+public class JobMasterAkkaGateway implements JobMasterGateway, AkkaGateway {
+	private final AskableActorRef actorRef;
+	private final Timeout timeout;
+
+	public JobMasterAkkaGateway(ActorRef actorRef, Timeout timeout) {
+		this.actorRef = new AskableActorRef(actorRef);
+		this.timeout = timeout;
+	}
+
+	@Override
+	public Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+		return actorRef.ask(new UpdateTaskExecutionState(taskExecutionState), timeout)
+			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+	}
+
+	@Override
+	public void triggerResourceManagerRegistration(String address) {
+		actorRef.actorRef().tell(new TriggerResourceManagerRegistration(address), ActorRef.noSender());
+	}
+
+	@Override
+	public void handleRegistrationResponse(RegistrationResponse response, ResourceManagerGateway resourceManager) {
+		actorRef.actorRef().tell(new HandleRegistrationResponse(response, resourceManager), ActorRef.noSender());
+	}
+
+	@Override
+	public ActorRef getActorRef() {
+		return actorRef.actorRef();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
new file mode 100644
index 0000000..0b9e9dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.io.Serializable;
+
+public class CancelTask implements Serializable {
+	private static final long serialVersionUID = -2998176874447950595L;
+	private final ExecutionAttemptID executionAttemptID;
+
+	public CancelTask(ExecutionAttemptID executionAttemptID) {
+		this.executionAttemptID = executionAttemptID;
+	}
+
+	public ExecutionAttemptID getExecutionAttemptID() {
+		return executionAttemptID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
new file mode 100644
index 0000000..a83d539
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+
+import java.io.Serializable;
+
+public class ExecuteTask implements Serializable {
+	private static final long serialVersionUID = -6769958430967048348L;
+	private final TaskDeploymentDescriptor taskDeploymentDescriptor;
+
+	public ExecuteTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
+		this.taskDeploymentDescriptor = taskDeploymentDescriptor;
+	}
+
+	public TaskDeploymentDescriptor getTaskDeploymentDescriptor() {
+		return taskDeploymentDescriptor;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/HandleRegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/HandleRegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/HandleRegistrationResponse.java
new file mode 100644
index 0000000..ad0d2a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/HandleRegistrationResponse.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+
+public class HandleRegistrationResponse {
+	private final RegistrationResponse registrationResponse;
+	private final ResourceManagerGateway resourceManagerGateway;
+
+	public HandleRegistrationResponse(RegistrationResponse registrationResponse, ResourceManagerGateway resourceManagerGateway) {
+		this.registrationResponse = registrationResponse;
+		this.resourceManagerGateway = resourceManagerGateway;
+	}
+
+	public RegistrationResponse getRegistrationResponse() {
+		return registrationResponse;
+	}
+
+	public ResourceManagerGateway getResourceManagerGateway() {
+		return resourceManagerGateway;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
new file mode 100644
index 0000000..b35ea38
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
+
+import java.io.Serializable;
+
+public class RegisterJobMaster implements Serializable{
+	private static final long serialVersionUID = -4616879574192641507L;
+	private final JobMasterRegistration jobMasterRegistration;
+
+	public RegisterJobMaster(JobMasterRegistration jobMasterRegistration) {
+		this.jobMasterRegistration = jobMasterRegistration;
+	}
+
+	public JobMasterRegistration getJobMasterRegistration() {
+		return jobMasterRegistration;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
new file mode 100644
index 0000000..85ceeec
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+
+import java.io.Serializable;
+
+public class RequestSlot implements Serializable {
+	private static final long serialVersionUID = 7207463889348525866L;
+
+	private final SlotRequest slotRequest;
+
+	public RequestSlot(SlotRequest slotRequest) {
+		this.slotRequest = slotRequest;
+	}
+
+	public SlotRequest getSlotRequest() {
+		return slotRequest;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/TriggerResourceManagerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/TriggerResourceManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/TriggerResourceManagerRegistration.java
new file mode 100644
index 0000000..48776a1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/TriggerResourceManagerRegistration.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+public class TriggerResourceManagerRegistration {
+	private final String address;
+
+	public TriggerResourceManagerRegistration(String address) {
+		this.address = address;
+	}
+
+	public String getAddress() {
+		return address;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
new file mode 100644
index 0000000..f89cd2f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import java.io.Serializable;
+
+public class UpdateTaskExecutionState implements Serializable{
+	private static final long serialVersionUID = -6662229114427331436L;
+
+	private final TaskExecutionState taskExecutionState;
+
+	public UpdateTaskExecutionState(TaskExecutionState taskExecutionState) {
+		this.taskExecutionState = taskExecutionState;
+	}
+
+	public TaskExecutionState getTaskExecutionState() {
+		return taskExecutionState;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
new file mode 100644
index 0000000..7fc2ffa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.resourcemanager;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import akka.actor.UntypedActor;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
+import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
+
+public class ResourceManagerAkkaActor extends UntypedActor {
+	private final ResourceManager resourceManager;
+
+	public ResourceManagerAkkaActor(ResourceManager resourceManager) {
+		this.resourceManager = resourceManager;
+	}
+
+	@Override
+	public void onReceive(Object message) throws Exception {
+		final ActorRef sender = getSender();
+
+		if (message instanceof RegisterJobMaster) {
+			RegisterJobMaster registerJobMaster = (RegisterJobMaster) message;
+
+			try {
+				RegistrationResponse response = resourceManager.registerJobMaster(registerJobMaster.getJobMasterRegistration());
+				sender.tell(new Status.Success(response), getSelf());
+			} catch (Exception e) {
+				sender.tell(new Status.Failure(e), getSelf());
+			}
+		} else if (message instanceof RequestSlot) {
+			RequestSlot requestSlot = (RequestSlot) message;
+
+			try {
+				SlotAssignment response = resourceManager.requestSlot(requestSlot.getSlotRequest());
+				sender.tell(new Status.Success(response), getSelf());
+			} catch (Exception e) {
+				sender.tell(new Status.Failure(e), getSelf());
+			}
+		} else {
+			throw new RuntimeException("Encountered unknown message type: " + message.getClass());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
new file mode 100644
index 0000000..c902bff
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.resourcemanager;
+
+import akka.actor.ActorRef;
+import akka.pattern.AskableActorRef;
+import akka.util.Timeout;
+import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rpc.akka.AkkaGateway;
+import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster;
+import org.apache.flink.runtime.rpc.akka.messages.RequestSlot;
+import scala.concurrent.Future;
+import scala.reflect.ClassTag$;
+
+public class ResourceManagerAkkaGateway implements ResourceManagerGateway, AkkaGateway {
+	private final AskableActorRef actorRef;
+	private final Timeout timeout;
+
+	public ResourceManagerAkkaGateway(ActorRef actorRef, Timeout timeout) {
+		this.actorRef = new AskableActorRef(actorRef);
+		this.timeout = timeout;
+	}
+
+	@Override
+	public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) {
+		return actorRef.ask(new RegisterJobMaster(jobMasterRegistration), timeout)
+			.mapTo(ClassTag$.MODULE$.<RegistrationResponse>apply(RegistrationResponse.class));
+	}
+
+	@Override
+	public Future<SlotAssignment> requestSlot(SlotRequest slotRequest) {
+		return actorRef.ask(new RequestSlot(slotRequest), timeout)
+			.mapTo(ClassTag$.MODULE$.<SlotAssignment>apply(SlotAssignment.class));
+	}
+
+	@Override
+	public ActorRef getActorRef() {
+		return actorRef.actorRef();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
new file mode 100644
index 0000000..8974d2b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.taskexecutor;
+
+import akka.actor.ActorRef;
+import akka.actor.Status;
+import akka.actor.UntypedActor;
+import akka.dispatch.OnComplete;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
+import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
+
+public class TaskExecutorAkkaActor extends UntypedActor {
+	private final TaskExecutorGateway taskExecutor;
+
+	public TaskExecutorAkkaActor(TaskExecutorGateway taskExecutor) {
+		this.taskExecutor = taskExecutor;
+	}
+
+	@Override
+	public void onReceive(Object message) throws Exception {
+		final ActorRef sender = getSender();
+
+		if (message instanceof ExecuteTask) {
+			ExecuteTask executeTask = (ExecuteTask) message;
+
+			taskExecutor.executeTask(executeTask.getTaskDeploymentDescriptor()).onComplete(
+				new OnComplete<Acknowledge>() {
+					@Override
+					public void onComplete(Throwable failure, Acknowledge success) throws Throwable {
+						if (failure != null) {
+							sender.tell(new Status.Failure(failure), getSelf());
+						} else {
+							sender.tell(new Status.Success(Acknowledge.get()), getSelf());
+						}
+					}
+				},
+				getContext().dispatcher()
+			);
+		} else if (message instanceof CancelTask) {
+			CancelTask cancelTask = (CancelTask) message;
+
+			taskExecutor.cancelTask(cancelTask.getExecutionAttemptID()).onComplete(
+				new OnComplete<Acknowledge>() {
+					@Override
+					public void onComplete(Throwable failure, Acknowledge success) throws Throwable {
+						if (failure != null) {
+							sender.tell(new Status.Failure(failure), getSelf());
+						} else {
+							sender.tell(new Status.Success(Acknowledge.get()), getSelf());
+						}
+					}
+				},
+				getContext().dispatcher()
+			);
+		} else {
+			throw new RuntimeException("Encountered unknown message type: " + message.getClass());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
new file mode 100644
index 0000000..ca064c1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.taskexecutor;
+
+import akka.actor.ActorRef;
+import akka.pattern.AskableActorRef;
+import akka.util.Timeout;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.akka.AkkaGateway;
+import org.apache.flink.runtime.rpc.akka.messages.CancelTask;
+import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask;
+import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway;
+import scala.concurrent.Future;
+import scala.reflect.ClassTag$;
+
+public class TaskExecutorAkkaGateway implements TaskExecutorGateway, AkkaGateway {
+	private final AskableActorRef actorRef;
+	private final Timeout timeout;
+
+	public TaskExecutorAkkaGateway(ActorRef actorRef, Timeout timeout) {
+		this.actorRef = new AskableActorRef(actorRef);
+		this.timeout = timeout;
+	}
+
+	@Override
+	public Future<Acknowledge> executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
+		return actorRef.ask(new ExecuteTask(taskDeploymentDescriptor), timeout)
+			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+	}
+
+	@Override
+	public Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId) {
+		return actorRef.ask(new CancelTask(executionAttemptId), timeout)
+			.mapTo(ClassTag$.MODULE$.<Acknowledge>apply(Acknowledge.class));
+	}
+
+	@Override
+	public ActorRef getActorRef() {
+		return actorRef.actorRef();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
new file mode 100644
index 0000000..c2e2686
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.jobmaster;
+
+import akka.dispatch.Mapper;
+import akka.dispatch.OnComplete;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcServer;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future;
+
+import java.util.concurrent.ExecutorService;
+
+public class JobMaster implements RpcServer<JobMasterGateway> {
+	private final Logger LOG = LoggerFactory.getLogger(JobMaster.class);
+	private final RpcService rpcService;
+	private final ExecutionContext executionContext;
+	private JobMasterGateway self;
+
+	private ResourceManagerGateway resourceManager = null;
+
+	public JobMaster(RpcService rpcService, ExecutorService executorService) {
+		this.rpcService = rpcService;
+		executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
+	}
+
+	public ResourceManagerGateway getResourceManager() {
+		return resourceManager;
+	}
+
+	@Override
+	public void start() {
+		// start rpc server
+		self = rpcService.startServer(this, JobMasterGateway.class);
+	}
+
+	@Override
+	public void shutDown() {
+		rpcService.stopServer(getSelf());
+	}
+
+	@RpcMethod
+	public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) {
+		System.out.println("TaskExecutionState: " + taskExecutionState);
+		return Acknowledge.get();
+	}
+
+	@RpcMethod
+	public void triggerResourceManagerRegistration(final String address) {
+		Future<ResourceManagerGateway> resourceManagerFuture = rpcService.connect(address, ResourceManagerGateway.class);
+
+		Future<RegistrationResponse> registrationResponseFuture = resourceManagerFuture.flatMap(new Mapper<ResourceManagerGateway, Future<RegistrationResponse>>() {
+			@Override
+			public Future<RegistrationResponse> apply(ResourceManagerGateway resourceManagerGateway) {
+ 				return resourceManagerGateway.registerJobMaster(new JobMasterRegistration());
+			}
+		}, executionContext);
+
+		resourceManagerFuture.zip(registrationResponseFuture).onComplete(new OnComplete<Tuple2<ResourceManagerGateway, RegistrationResponse>>() {
+			@Override
+			public void onComplete(Throwable failure, Tuple2<ResourceManagerGateway, RegistrationResponse> success) throws Throwable {
+				if (failure != null) {
+					LOG.info("Registration at resource manager {} failed. Tyr again.", address);
+				} else {
+					getSelf().handleRegistrationResponse(success._2(), success._1());
+				}
+			}
+		}, executionContext);
+	}
+
+	@RpcMethod
+	public void handleRegistrationResponse(RegistrationResponse response, ResourceManagerGateway resourceManager) {
+		System.out.println("Received registration response: " + response);
+		this.resourceManager = resourceManager;
+	}
+
+	public boolean isConnected() {
+		return resourceManager != null;
+	}
+
+	public JobMasterGateway getSelf() {
+		return self;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
new file mode 100644
index 0000000..9e485b1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.jobmaster;
+
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import scala.concurrent.Future;
+
+public interface JobMasterGateway extends RpcGateway {
+	Future<Acknowledge> updateTaskExecutionState(TaskExecutionState taskExecutionState);
+
+	void triggerResourceManagerRegistration(String address);
+
+	void handleRegistrationResponse(RegistrationResponse response, ResourceManagerGateway resourceManager);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
new file mode 100644
index 0000000..2b015fd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import java.io.Serializable;
+
+public class JobMasterRegistration implements Serializable {
+	private static final long serialVersionUID = 8411214999193765202L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
new file mode 100644
index 0000000..7292a87
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import java.io.Serializable;
+
+public class RegistrationResponse implements Serializable {
+	private static final long serialVersionUID = -2379003255993119993L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
new file mode 100644
index 0000000..00aad0d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcServer;
+import org.apache.flink.runtime.rpc.RpcService;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+
+import java.util.concurrent.ExecutorService;
+
+public class ResourceManager implements RpcServer<ResourceManagerGateway> {
+	private final RpcService rpcService;
+	private final ExecutionContext executionContext;
+
+	private ResourceManagerGateway self;
+
+	public ResourceManager(RpcService rpcService, ExecutorService executorService) {
+		this.rpcService = rpcService;
+		this.executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService);
+	}
+
+	public void start() {
+		self = rpcService.startServer(this, ResourceManagerGateway.class);
+	}
+
+	public void shutDown() {
+		rpcService.stopServer(getSelf());
+	}
+
+	@RpcMethod
+	public RegistrationResponse registerJobMaster(JobMasterRegistration jobMasterRegistration) {
+		System.out.println("JobMasterRegistration: " + jobMasterRegistration);
+		return new RegistrationResponse();
+	}
+
+	@RpcMethod
+	public SlotAssignment requestSlot(SlotRequest slotRequest) {
+		System.out.println("SlotRequest: " + slotRequest);
+		return new SlotAssignment();
+	}
+
+	public ResourceManagerGateway getSelf() {
+		return self;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
new file mode 100644
index 0000000..54caa89
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import org.apache.flink.runtime.rpc.RpcGateway;
+import scala.concurrent.Future;
+
+public interface ResourceManagerGateway extends RpcGateway {
+	Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration);
+	Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
new file mode 100644
index 0000000..86cd8b7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotAssignment.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import java.io.Serializable;
+
+public class SlotAssignment implements Serializable{
+	private static final long serialVersionUID = -6990813455942742322L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
new file mode 100644
index 0000000..d8fe268
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import java.io.Serializable;
+
+public class SlotRequest implements Serializable{
+	private static final long serialVersionUID = -6586877187990445986L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
new file mode 100644
index 0000000..5c5b8f4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import akka.dispatch.ExecutionContexts$;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcServer;
+import org.apache.flink.runtime.rpc.RpcService;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+public class TaskExecutor implements RpcServer<TaskExecutorGateway> {
+	private final RpcService rpcService;
+	private final ExecutionContext executionContext;
+	private final Set<ExecutionAttemptID> tasks = new HashSet<>();
+
+	private TaskExecutorGateway self;
+
+	public TaskExecutor(RpcService rpcService, ExecutorService executorService) {
+		this.rpcService = rpcService;
+		this.executionContext = ExecutionContexts$.MODULE$.fromExecutor(executorService);
+	}
+
+	public void start() {
+		self = rpcService.startServer(this, TaskExecutorGateway.class);
+	}
+
+	public void shutDown() {
+		rpcService.stopServer(getSelf());
+	}
+
+	@RpcMethod
+	public Acknowledge executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) {
+		tasks.add(taskDeploymentDescriptor.getExecutionId());
+		return Acknowledge.get();
+	}
+
+	@RpcMethod
+	public Acknowledge cancelTask(ExecutionAttemptID executionAttemptId) throws Exception {
+		if (tasks.contains(executionAttemptId)) {
+			return Acknowledge.get();
+		} else {
+			throw new Exception("Could not find task.");
+		}
+	}
+
+	public TaskExecutorGateway getSelf() {
+		return self;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
new file mode 100644
index 0000000..f2785df
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import scala.concurrent.Future;
+
+public interface TaskExecutorGateway extends RpcGateway {
+	Future<Acknowledge> executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor);
+	Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
index ca09634..ce57fe6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
new file mode 100644
index 0000000..e05bbac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.reflections.Reflections;
+import scala.concurrent.Future;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class RpcCompletenessTest extends TestLogger {
+
+	@Test
+	public void testRpcCompleteness() {
+		Reflections reflections = new Reflections("org.apache.flink");
+
+		Set<Class<? extends RpcServer>> classes = reflections.getSubTypesOf(RpcServer.class);
+
+		Class<? extends RpcServer> c = null;
+
+		for (Class<? extends RpcServer> rpcServer :classes){
+			c = rpcServer;
+			Type[] interfaces = c.getGenericInterfaces();
+
+			boolean foundRpcServerInterface = false;
+
+			for (Type in: interfaces) {
+				if (in instanceof ParameterizedType) {
+					ParameterizedType parameterizedType = (ParameterizedType) in;
+
+					if (parameterizedType.getRawType() == RpcServer.class) {
+						foundRpcServerInterface = true;
+						Type[] typeArguments = parameterizedType.getActualTypeArguments();
+
+						assertEquals(1, typeArguments.length);
+						assertTrue(typeArguments[0] instanceof Class<?>);
+
+						Type rpcGatewayType = typeArguments[0];
+
+						assertTrue(rpcGatewayType instanceof Class);
+
+						checkCompleteness(rpcServer, (Class<?>) rpcGatewayType);
+					}
+				}
+			}
+
+			assertTrue("The class " + rpcServer + " does not implement the " + RpcServer.class + " interface.", foundRpcServerInterface);
+		}
+	}
+
+	private void checkCompleteness(Class<?> rpcServer, Class<?> rpcGateway) {
+		Method[] gatewayMethods = rpcGateway.getDeclaredMethods();
+		Method[] serverMethods = rpcServer.getDeclaredMethods();
+
+		Map<String, List<Method>> rpcMethods = new HashMap<>();
+		int numberServerRpcMethods = 0;
+
+		for (Method serverMethod : serverMethods) {
+			if (serverMethod.isAnnotationPresent(RpcMethod.class)) {
+				if (rpcMethods.containsKey(serverMethod.getName())) {
+					List<Method> methods = rpcMethods.get(serverMethod.getName());
+					methods.add(serverMethod);
+
+					rpcMethods.put(serverMethod.getName(), methods);
+				} else {
+					List<Method> methods = new ArrayList<>();
+					methods.add(serverMethod);
+
+					rpcMethods.put(serverMethod.getName(), methods);
+				}
+
+				numberServerRpcMethods++;
+			}
+		}
+
+		assertEquals(
+			"Server class " + rpcServer + " does not have the same number of rpc methods than " +
+				"the gateway class " + rpcGateway ,
+			gatewayMethods.length,
+			numberServerRpcMethods);
+
+		for (Method gatewayMethod : gatewayMethods) {
+			assertTrue(rpcMethods.containsKey(gatewayMethod.getName()));
+
+			checkGatewayMethod(gatewayMethod, rpcMethods.get(gatewayMethod.getName()));
+		}
+	}
+
+	/**
+	 * Checks whether we find a matching overloaded version for the gateway method among the methods
+	 * with the same name in the rpc server.
+	 *
+	 * @param gatewayMethod Gateway method
+	 * @param rpcMethods List of rpc methods on the rpc server with the same name as the gateway
+	 *                   method
+	 */
+	private void checkGatewayMethod(Method gatewayMethod, List<Method> rpcMethods) {
+		for (Method rpcMethod : rpcMethods) {
+			if (checkMethod(gatewayMethod, rpcMethod)) {
+				return;
+			}
+		}
+
+		fail("Could not find rpc method which is compatible to " + gatewayMethod);
+	}
+
+	private boolean checkMethod(Method gatewayMethod, Method rpcMethod) {
+		Class<?>[] firstParameterTypes = gatewayMethod.getParameterTypes();
+		Class<?>[] secondParameterTypes = rpcMethod.getParameterTypes();
+
+		if (firstParameterTypes.length != secondParameterTypes.length) {
+			return false;
+		} else {
+			// check the parameter types
+			for (int i = 0; i < firstParameterTypes.length; i++) {
+				if (!checkType(firstParameterTypes[i], secondParameterTypes[i])) {
+					return false;
+				}
+			}
+
+			// check the return types
+			if (rpcMethod.getReturnType() == void.class) {
+				if (gatewayMethod.getReturnType() != void.class) {
+					return false;
+				}
+			} else {
+				// has return value. The gateway method should be wrapped in a future
+				Class<?> futureClass = gatewayMethod.getReturnType();
+
+				if (futureClass != Future.class) {
+					return false;
+				}
+
+				Type futureType = gatewayMethod.getGenericReturnType();
+
+				if (futureType instanceof ParameterizedType) {
+					ParameterizedType parameterizedType = (ParameterizedType) futureType;
+
+					Type[] typeArguments = parameterizedType.getActualTypeArguments();
+
+					// check that we only have one type argument
+					if (typeArguments.length == 1) {
+						Type typeArgument = typeArguments[0];
+
+						// check that the type argument is a Class
+						if (typeArgument instanceof Class<?>) {
+							if (!checkType((Class<?>) typeArgument, rpcMethod.getReturnType())) {
+								return false;
+							}
+						}
+					} else {
+						return false;
+					}
+				}
+
+
+			}
+
+			return gatewayMethod.getName().equals(rpcMethod.getName());
+		}
+	}
+
+	private boolean checkType(Class<?> firstType, Class<?> secondType) {
+		return firstType == secondType;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
new file mode 100644
index 0000000..2805cb1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AkkaRpcServiceTest extends TestLogger {
+
+	@Test
+	public void testAkkaRpcService() throws Exception {
+		Timeout akkaTimeout = new Timeout(10, TimeUnit.SECONDS);
+		ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+		ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
+		AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, akkaTimeout);
+		AkkaRpcService akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaTimeout);
+		ExecutorService executorService = new ForkJoinPool();
+
+		ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService);
+		JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService);
+
+		resourceManager.start();
+
+		ResourceManagerGateway rm = resourceManager.getSelf();
+
+		assertTrue(rm instanceof AkkaGateway);
+
+		AkkaGateway akkaClient = (AkkaGateway) rm;
+
+		jobMaster.start();
+		jobMaster.triggerResourceManagerRegistration(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getActorRef()));
+
+		// wait for successful registration
+		FiniteDuration timeout = new FiniteDuration(20, TimeUnit.SECONDS);
+		Deadline deadline = timeout.fromNow();
+
+		while (deadline.hasTimeLeft() && !jobMaster.isConnected()) {
+			Thread.sleep(100);
+		}
+
+		assertFalse(deadline.isOverdue());
+
+		jobMaster.shutDown();
+		resourceManager.shutDown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58391149/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
new file mode 100644
index 0000000..8e5c154
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.util.DirectExecutorService;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+public class TaskExecutorTest extends TestLogger {
+
+	/**
+	 * Tests that we can deploy and cancel a task on the TaskExecutor without exceptions
+	 */
+	@Test
+	public void testTaskExecution() throws Exception {
+		RpcService testingRpcService = mock(RpcService.class);
+		DirectExecutorService directExecutorService = null;
+		TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
+
+		TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+			new JobID(),
+			"Test job",
+			new JobVertexID(),
+			new ExecutionAttemptID(),
+			new SerializedValue<ExecutionConfig>(null),
+			"Test task",
+			0,
+			1,
+			0,
+			new Configuration(),
+			new Configuration(),
+			"Invokable",
+			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+			Collections.<InputGateDeploymentDescriptor>emptyList(),
+			Collections.<BlobKey>emptyList(),
+			Collections.<URL>emptyList(),
+			0
+		);
+
+		Acknowledge ack = taskExecutor.executeTask(tdd);
+
+		ack = taskExecutor.cancelTask(tdd.getExecutionId());
+	}
+
+	/**
+	 * Tests that cancelling a non-existing task will return an exception
+	 * @throws Exception
+	 */
+	@Test(expected=Exception.class)
+	public void testWrongTaskCancellation() throws Exception {
+		RpcService testingRpcService = mock(RpcService.class);
+		DirectExecutorService directExecutorService = null;
+		TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService);
+
+		taskExecutor.cancelTask(new ExecutionAttemptID());
+
+		fail("The cancellation should have thrown an exception.");
+	}
+}


Mime
View raw message