Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AE10E200B6B for ; Thu, 25 Aug 2016 20:48:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AC959160AA5; Thu, 25 Aug 2016 18:48:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6007C160ADB for ; Thu, 25 Aug 2016 20:48:10 +0200 (CEST) Received: (qmail 18066 invoked by uid 500); 25 Aug 2016 18:48:07 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 15782 invoked by uid 99); 25 Aug 2016 18:48:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Aug 2016 18:48:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D2654E1075; Thu, 25 Aug 2016 18:48:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Thu, 25 Aug 2016 18:49:15 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [72/89] [abbrv] flink git commit: [FLINK-4346] [rpc] Add new RPC abstraction archived-at: Thu, 25 Aug 2016 18:48:16 -0000 [FLINK-4346] [rpc] Add new RPC abstraction Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b273afad Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b273afad Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b273afad Branch: refs/heads/flip-6 Commit: b273afad3423b9ca403403c0fa92f1fcf3ec2cf9 Parents: 4810910 Author: Till Rohrmann Authored: Wed Aug 3 19:31:34 2016 +0200 Committer: Stephan Ewen Committed: Thu Aug 25 20:21:02 2016 +0200 ---------------------------------------------------------------------- flink-runtime/pom.xml | 5 + .../flink/runtime/rpc/MainThreadExecutor.java | 54 +++ .../apache/flink/runtime/rpc/RpcEndpoint.java | 182 +++++++++++ .../apache/flink/runtime/rpc/RpcGateway.java | 25 ++ .../org/apache/flink/runtime/rpc/RpcMethod.java | 35 ++ .../apache/flink/runtime/rpc/RpcService.java | 74 +++++ .../apache/flink/runtime/rpc/RpcTimeout.java | 34 ++ .../flink/runtime/rpc/akka/AkkaGateway.java | 29 ++ .../flink/runtime/rpc/akka/AkkaRpcService.java | 145 ++++++++ .../flink/runtime/rpc/akka/BaseAkkaActor.java | 50 +++ .../flink/runtime/rpc/akka/BaseAkkaGateway.java | 41 +++ .../rpc/akka/jobmaster/JobMasterAkkaActor.java | 58 ++++ .../akka/jobmaster/JobMasterAkkaGateway.java | 57 ++++ .../rpc/akka/messages/CallableMessage.java | 33 ++ .../runtime/rpc/akka/messages/CancelTask.java | 36 ++ .../runtime/rpc/akka/messages/ExecuteTask.java | 36 ++ .../messages/RegisterAtResourceManager.java | 36 ++ .../rpc/akka/messages/RegisterJobMaster.java | 36 ++ .../runtime/rpc/akka/messages/RequestSlot.java | 37 +++ .../rpc/akka/messages/RunnableMessage.java | 31 ++ .../akka/messages/UpdateTaskExecutionState.java | 37 +++ .../ResourceManagerAkkaActor.java | 65 ++++ .../ResourceManagerAkkaGateway.java | 67 ++++ .../taskexecutor/TaskExecutorAkkaActor.java | 77 +++++ .../taskexecutor/TaskExecutorAkkaGateway.java | 59 ++++ .../flink/runtime/rpc/jobmaster/JobMaster.java | 249 ++++++++++++++ .../runtime/rpc/jobmaster/JobMasterGateway.java | 45 +++ .../resourcemanager/JobMasterRegistration.java | 35 ++ .../resourcemanager/RegistrationResponse.java | 43 +++ .../rpc/resourcemanager/ResourceManager.java | 94 ++++++ .../resourcemanager/ResourceManagerGateway.java | 58 ++++ .../rpc/resourcemanager/SlotAssignment.java | 25 ++ .../rpc/resourcemanager/SlotRequest.java | 25 ++ .../runtime/rpc/taskexecutor/TaskExecutor.java | 82 +++++ .../rpc/taskexecutor/TaskExecutorGateway.java | 48 +++ .../resourcemanager/ResourceManagerITCase.java | 1 - .../flink/runtime/rpc/RpcCompletenessTest.java | 327 +++++++++++++++++++ .../runtime/rpc/akka/AkkaRpcServiceTest.java | 81 +++++ .../rpc/taskexecutor/TaskExecutorTest.java | 92 ++++++ .../runtime/util/DirectExecutorService.java | 234 +++++++++++++ flink-tests/pom.xml | 1 - pom.xml | 7 + 42 files changed, 2784 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 5fea8fb..09c6fd0 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -189,6 +189,11 @@ under the License. akka-testkit_${scala.binary.version} + + org.reflections + reflections + + http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java new file mode 100644 index 0000000..e06711e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import akka.util.Timeout; +import scala.concurrent.Future; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; + +/** + * Interface to execute {@link Runnable} and {@link Callable} in the main thread of the underlying + * rpc server. + * + * This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint} + * implementation which allows to dispatch local procedures to the main thread of the underlying + * rpc server. + */ +public interface MainThreadExecutor { + /** + * Execute the runnable in the main thread of the underlying rpc server. + * + * @param runnable Runnable to be executed + */ + void runAsync(Runnable runnable); + + /** + * Execute the callable in the main thread of the underlying rpc server and return a future for + * the callable result. If the future is not completed within the given timeout, the returned + * future will throw a {@link TimeoutException}. + * + * @param callable Callable to be executed + * @param timeout Timeout for the future to complete + * @param Return value of the callable + * @return Future of the callable result + */ + Future callAsync(Callable callable, Timeout timeout); +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java new file mode 100644 index 0000000..3d8757f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import akka.util.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; + +import java.util.concurrent.Callable; + +/** + * Base class for rpc endpoints. Distributed components which offer remote procedure calls have to + * extend the rpc endpoint base class. + * + * The main idea is that a rpc endpoint is backed by a rpc server which has a single thread + * processing the rpc calls. Thus, by executing all state changing operations within the main + * thread, we don't have to reason about concurrent accesses. The rpc provides provides + * {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)} and the + * {@link #getMainThreadExecutionContext()} to execute code in the rpc server's main thread. + * + * @param Rpc gateway counterpart for the implementing rpc endpoint + */ +public abstract class RpcEndpoint { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + /** Rpc service to be used to start the rpc server and to obtain rpc gateways */ + private final RpcService rpcService; + + /** Self gateway which can be used to schedule asynchronous calls on yourself */ + private C self; + + /** + * The main thread execution context to be used to execute future callbacks in the main thread + * of the executing rpc server. + * + * IMPORTANT: The main thread context is only available after the rpc server has been started. + */ + private MainThreadExecutionContext mainThreadExecutionContext; + + public RpcEndpoint(RpcService rpcService) { + this.rpcService = rpcService; + } + + /** + * Get self-gateway which should be used to run asynchronous rpc calls on this endpoint. + * + * IMPORTANT: Always issue local method calls via the self-gateway if the current thread + * is not the main thread of the underlying rpc server, e.g. from within a future callback. + * + * @return Self gateway + */ + public C getSelf() { + return self; + } + + /** + * Execute the runnable in the main thread of the underlying rpc server. + * + * @param runnable Runnable to be executed in the main thread of the underlying rpc server + */ + public void runAsync(Runnable runnable) { + ((MainThreadExecutor) self).runAsync(runnable); + } + + /** + * Execute the callable in the main thread of the underlying rpc server returning a future for + * the result of the callable. If the callable is not completed within the given timeout, then + * the future will be failed with a {@link java.util.concurrent.TimeoutException}. + * + * @param callable Callable to be executed in the main thread of the underlying rpc server + * @param timeout Timeout for the callable to be completed + * @param Return type of the callable + * @return Future for the result of the callable. + */ + public Future callAsync(Callable callable, Timeout timeout) { + return ((MainThreadExecutor) self).callAsync(callable, timeout); + } + + /** + * Gets the main thread execution context. The main thread execution context can be used to + * execute tasks in the main thread of the underlying rpc server. + * + * @return Main thread execution context + */ + public ExecutionContext getMainThreadExecutionContext() { + return mainThreadExecutionContext; + } + + /** + * Gets the used rpc service. + * + * @return Rpc service + */ + public RpcService getRpcService() { + return rpcService; + } + + /** + * Starts the underlying rpc server via the rpc service and creates the main thread execution + * context. This makes the rpc endpoint effectively reachable from the outside. + * + * Can be overriden to add rpc endpoint specific start up code. Should always call the parent + * start method. + */ + public void start() { + self = rpcService.startServer(this); + mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self); + } + + + /** + * Shuts down the underlying rpc server via the rpc service. + * + * Can be overriden to add rpc endpoint specific shut down code. Should always call the parent + * shut down method. + */ + public void shutDown() { + rpcService.stopServer(self); + } + + /** + * Gets the address of the underlying rpc server. The address should be fully qualified so that + * a remote system can connect to this rpc server via this address. + * + * @return Fully qualified address of the underlying rpc server + */ + public String getAddress() { + return rpcService.getAddress(self); + } + + /** + * Execution context which executes runnables in the main thread context. A reported failure + * will cause the underlying rpc server to shut down. + */ + private class MainThreadExecutionContext implements ExecutionContext { + private final MainThreadExecutor gateway; + + MainThreadExecutionContext(MainThreadExecutor gateway) { + this.gateway = gateway; + } + + @Override + public void execute(Runnable runnable) { + gateway.runAsync(runnable); + } + + @Override + public void reportFailure(final Throwable t) { + gateway.runAsync(new Runnable() { + @Override + public void run() { + log.error("Encountered failure in the main thread execution context.", t); + shutDown(); + } + }); + } + + @Override + public ExecutionContext prepare() { + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java new file mode 100644 index 0000000..e3a16b4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcGateway.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +/** + * Rpc gateway interface which has to be implemented by Rpc gateways. + */ +public interface RpcGateway { +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java new file mode 100644 index 0000000..875e557 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcMethod.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation for rpc method in a {@link RpcEndpoint} implementation. Every rpc method must have a + * respective counterpart in the {@link RpcGateway} implementation for this rpc server. The + * RpcCompletenessTest makes sure that the set of rpc methods in a rpc server and the set of + * gateway methods in the corresponding gateway implementation are identical. + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface RpcMethod { +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java new file mode 100644 index 0000000..90ff7b6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import scala.concurrent.Future; + +/** + * Interface for rpc services. An rpc service is used to start and connect to a {@link RpcEndpoint}. + * Connecting to a rpc server will return a {@link RpcGateway} which can be used to call remote + * procedures. + */ +public interface RpcService { + + /** + * Connect to a remote rpc server under the provided address. Returns a rpc gateway which can + * be used to communicate with the rpc server. + * + * @param address Address of the remote rpc server + * @param clazz Class of the rpc gateway to return + * @param Type of the rpc gateway to return + * @return Future containing the rpc gateway + */ + Future connect(String address, Class clazz); + + /** + * Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint. + * + * @param rpcEndpoint Rpc protocl to dispath the rpcs to + * @param Type of the rpc endpoint + * @param Type of the self rpc gateway associated with the rpc server + * @return Self gateway to dispatch remote procedure calls to oneself + */ + C startServer(S rpcEndpoint); + + /** + * Stop the underlying rpc server of the provided self gateway. + * + * @param selfGateway Self gateway describing the underlying rpc server + * @param Type of the rpc gateway + */ + void stopServer(C selfGateway); + + /** + * Stop the rpc service shutting down all started rpc servers. + */ + void stopService(); + + /** + * Get the fully qualified address of the underlying rpc server represented by the self gateway. + * It must be possible to connect from a remote host to the rpc server via the returned fully + * qualified address. + * + * @param selfGateway Self gateway associated with the underlying rpc server + * @param Type of the rpc gateway + * @return Fully qualified address + */ + String getAddress(C selfGateway); +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java new file mode 100644 index 0000000..3d36d47 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcTimeout.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation for {@link RpcGateway} methods to specify an additional timeout parameter for the + * returned future to be completed. The rest of the provided parameters is passed to the remote rpc + * server for the rpc. + */ +@Target(ElementType.PARAMETER) +@Retention(RetentionPolicy.RUNTIME) +public @interface RpcTimeout { +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java new file mode 100644 index 0000000..a96a600 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaGateway.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import akka.actor.ActorRef; + +/** + * Interface for Akka based rpc gateways + */ +public interface AkkaGateway { + + ActorRef getActorRef(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java new file mode 100644 index 0000000..d55bd13 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import akka.actor.ActorIdentity; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Identify; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.dispatch.Mapper; +import akka.pattern.AskableActorSelection; +import akka.util.Timeout; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.rpc.jobmaster.JobMaster; +import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager; +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaActor; +import org.apache.flink.runtime.rpc.akka.jobmaster.JobMasterAkkaGateway; +import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaActor; +import org.apache.flink.runtime.rpc.akka.resourcemanager.ResourceManagerAkkaGateway; +import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaActor; +import org.apache.flink.runtime.rpc.akka.taskexecutor.TaskExecutorAkkaGateway; +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutor; +import scala.concurrent.Future; + +import java.util.HashSet; +import java.util.Set; + +public class AkkaRpcService implements RpcService { + private final ActorSystem actorSystem; + private final Timeout timeout; + private final Set actors = new HashSet<>(); + + public AkkaRpcService(ActorSystem actorSystem, Timeout timeout) { + this.actorSystem = actorSystem; + this.timeout = timeout; + } + + @Override + public Future connect(String address, final Class clazz) { + ActorSelection actorSel = actorSystem.actorSelection(address); + + AskableActorSelection asker = new AskableActorSelection(actorSel); + + Future identify = asker.ask(new Identify(42), timeout); + + return identify.map(new Mapper(){ + public C apply(Object obj) { + ActorRef actorRef = ((ActorIdentity) obj).getRef(); + + if (clazz == TaskExecutorGateway.class) { + return (C) new TaskExecutorAkkaGateway(actorRef, timeout); + } else if (clazz == ResourceManagerGateway.class) { + return (C) new ResourceManagerAkkaGateway(actorRef, timeout); + } else if (clazz == JobMasterGateway.class) { + return (C) new JobMasterAkkaGateway(actorRef, timeout); + } else { + throw new RuntimeException("Could not find remote endpoint " + clazz); + } + } + }, actorSystem.dispatcher()); + } + + @Override + public C startServer(S rpcEndpoint) { + ActorRef ref; + C self; + if (rpcEndpoint instanceof TaskExecutor) { + ref = actorSystem.actorOf( + Props.create(TaskExecutorAkkaActor.class, rpcEndpoint) + ); + + self = (C) new TaskExecutorAkkaGateway(ref, timeout); + } else if (rpcEndpoint instanceof ResourceManager) { + ref = actorSystem.actorOf( + Props.create(ResourceManagerAkkaActor.class, rpcEndpoint) + ); + + self = (C) new ResourceManagerAkkaGateway(ref, timeout); + } else if (rpcEndpoint instanceof JobMaster) { + ref = actorSystem.actorOf( + Props.create(JobMasterAkkaActor.class, rpcEndpoint) + ); + + self = (C) new JobMasterAkkaGateway(ref, timeout); + } else { + throw new RuntimeException("Could not start RPC server for class " + rpcEndpoint.getClass()); + } + + actors.add(ref); + + return self; + } + + @Override + public void stopServer(C selfGateway) { + if (selfGateway instanceof AkkaGateway) { + AkkaGateway akkaClient = (AkkaGateway) selfGateway; + + if (actors.contains(akkaClient.getActorRef())) { + akkaClient.getActorRef().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } else { + // don't stop this actor since it was not started by this RPC service + } + } + } + + @Override + public void stopService() { + actorSystem.shutdown(); + actorSystem.awaitTermination(); + } + + @Override + public String getAddress(C selfGateway) { + if (selfGateway instanceof AkkaGateway) { + return AkkaUtils.getAkkaURL(actorSystem, ((AkkaGateway) selfGateway).getActorRef()); + } else { + throw new RuntimeException("Cannot get address for non " + AkkaGateway.class.getName() + "."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java new file mode 100644 index 0000000..3cb499c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaActor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import akka.actor.Status; +import akka.actor.UntypedActor; +import org.apache.flink.runtime.rpc.akka.messages.CallableMessage; +import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BaseAkkaActor extends UntypedActor { + private static final Logger LOG = LoggerFactory.getLogger(BaseAkkaActor.class); + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof RunnableMessage) { + try { + ((RunnableMessage) message).getRunnable().run(); + } catch (Exception e) { + LOG.error("Encountered error while executing runnable.", e); + } + } else if (message instanceof CallableMessage) { + try { + Object result = ((CallableMessage) message).getCallable().call(); + sender().tell(new Status.Success(result), getSelf()); + } catch (Exception e) { + sender().tell(new Status.Failure(e), getSelf()); + } + } else { + throw new RuntimeException("Unknown message " + message); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java new file mode 100644 index 0000000..512790d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/BaseAkkaGateway.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka; + +import akka.actor.ActorRef; +import akka.pattern.Patterns; +import akka.util.Timeout; +import org.apache.flink.runtime.rpc.MainThreadExecutor; +import org.apache.flink.runtime.rpc.akka.messages.CallableMessage; +import org.apache.flink.runtime.rpc.akka.messages.RunnableMessage; +import scala.concurrent.Future; + +import java.util.concurrent.Callable; + +public abstract class BaseAkkaGateway implements MainThreadExecutor, AkkaGateway { + @Override + public void runAsync(Runnable runnable) { + getActorRef().tell(new RunnableMessage(runnable), ActorRef.noSender()); + } + + @Override + public Future callAsync(Callable callable, Timeout timeout) { + return (Future) Patterns.ask(getActorRef(), new CallableMessage(callable), timeout); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java new file mode 100644 index 0000000..9e04ea9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaActor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.jobmaster; + +import akka.actor.ActorRef; +import akka.actor.Status; +import org.apache.flink.runtime.rpc.akka.BaseAkkaActor; +import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager; +import org.apache.flink.runtime.rpc.jobmaster.JobMaster; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState; + +public class JobMasterAkkaActor extends BaseAkkaActor { + private final JobMaster jobMaster; + + public JobMasterAkkaActor(JobMaster jobMaster) { + this.jobMaster = jobMaster; + } + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof UpdateTaskExecutionState) { + + final ActorRef sender = getSender(); + + UpdateTaskExecutionState updateTaskExecutionState = (UpdateTaskExecutionState) message; + + try { + Acknowledge result = jobMaster.updateTaskExecutionState(updateTaskExecutionState.getTaskExecutionState()); + sender.tell(new Status.Success(result), getSelf()); + } catch (Exception e) { + sender.tell(new Status.Failure(e), getSelf()); + } + } else if (message instanceof RegisterAtResourceManager) { + RegisterAtResourceManager registerAtResourceManager = (RegisterAtResourceManager) message; + + jobMaster.registerAtResourceManager(registerAtResourceManager.getAddress()); + } else { + super.onReceive(message); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java new file mode 100644 index 0000000..e6bf061 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/jobmaster/JobMasterAkkaGateway.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.jobmaster; + +import akka.actor.ActorRef; +import akka.pattern.AskableActorRef; +import akka.util.Timeout; +import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway; +import org.apache.flink.runtime.rpc.akka.messages.RegisterAtResourceManager; +import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.akka.messages.UpdateTaskExecutionState; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import scala.concurrent.Future; +import scala.reflect.ClassTag$; + +public class JobMasterAkkaGateway extends BaseAkkaGateway implements JobMasterGateway { + private final AskableActorRef actorRef; + private final Timeout timeout; + + public JobMasterAkkaGateway(ActorRef actorRef, Timeout timeout) { + this.actorRef = new AskableActorRef(actorRef); + this.timeout = timeout; + } + + @Override + public Future updateTaskExecutionState(TaskExecutionState taskExecutionState) { + return actorRef.ask(new UpdateTaskExecutionState(taskExecutionState), timeout) + .mapTo(ClassTag$.MODULE$.apply(Acknowledge.class)); + } + + @Override + public void registerAtResourceManager(String address) { + actorRef.actorRef().tell(new RegisterAtResourceManager(address), actorRef.actorRef()); + } + + @Override + public ActorRef getActorRef() { + return actorRef.actorRef(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java new file mode 100644 index 0000000..f0e555f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CallableMessage.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.messages; + +import java.util.concurrent.Callable; + +public class CallableMessage { + private final Callable callable; + + public CallableMessage(Callable callable) { + this.callable = callable; + } + + public Callable getCallable() { + return callable; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java new file mode 100644 index 0000000..0b9e9dc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/CancelTask.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.messages; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +import java.io.Serializable; + +public class CancelTask implements Serializable { + private static final long serialVersionUID = -2998176874447950595L; + private final ExecutionAttemptID executionAttemptID; + + public CancelTask(ExecutionAttemptID executionAttemptID) { + this.executionAttemptID = executionAttemptID; + } + + public ExecutionAttemptID getExecutionAttemptID() { + return executionAttemptID; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java new file mode 100644 index 0000000..a83d539 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/ExecuteTask.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.messages; + +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; + +import java.io.Serializable; + +public class ExecuteTask implements Serializable { + private static final long serialVersionUID = -6769958430967048348L; + private final TaskDeploymentDescriptor taskDeploymentDescriptor; + + public ExecuteTask(TaskDeploymentDescriptor taskDeploymentDescriptor) { + this.taskDeploymentDescriptor = taskDeploymentDescriptor; + } + + public TaskDeploymentDescriptor getTaskDeploymentDescriptor() { + return taskDeploymentDescriptor; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java new file mode 100644 index 0000000..3ade082 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterAtResourceManager.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.messages; + +import java.io.Serializable; + +public class RegisterAtResourceManager implements Serializable { + + private static final long serialVersionUID = -4175905742620903602L; + + private final String address; + + public RegisterAtResourceManager(String address) { + this.address = address; + } + + public String getAddress() { + return address; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java new file mode 100644 index 0000000..b35ea38 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RegisterJobMaster.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.messages; + +import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration; + +import java.io.Serializable; + +public class RegisterJobMaster implements Serializable{ + private static final long serialVersionUID = -4616879574192641507L; + private final JobMasterRegistration jobMasterRegistration; + + public RegisterJobMaster(JobMasterRegistration jobMasterRegistration) { + this.jobMasterRegistration = jobMasterRegistration; + } + + public JobMasterRegistration getJobMasterRegistration() { + return jobMasterRegistration; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java new file mode 100644 index 0000000..85ceeec --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RequestSlot.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.messages; + +import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest; + +import java.io.Serializable; + +public class RequestSlot implements Serializable { + private static final long serialVersionUID = 7207463889348525866L; + + private final SlotRequest slotRequest; + + public RequestSlot(SlotRequest slotRequest) { + this.slotRequest = slotRequest; + } + + public SlotRequest getSlotRequest() { + return slotRequest; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java new file mode 100644 index 0000000..3556738 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunnableMessage.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.messages; + +public class RunnableMessage { + private final Runnable runnable; + + public RunnableMessage(Runnable runnable) { + this.runnable = runnable; + } + + public Runnable getRunnable() { + return runnable; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java new file mode 100644 index 0000000..f89cd2f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/UpdateTaskExecutionState.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.messages; + +import org.apache.flink.runtime.taskmanager.TaskExecutionState; + +import java.io.Serializable; + +public class UpdateTaskExecutionState implements Serializable{ + private static final long serialVersionUID = -6662229114427331436L; + + private final TaskExecutionState taskExecutionState; + + public UpdateTaskExecutionState(TaskExecutionState taskExecutionState) { + this.taskExecutionState = taskExecutionState; + } + + public TaskExecutionState getTaskExecutionState() { + return taskExecutionState; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java new file mode 100644 index 0000000..13101f9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaActor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.resourcemanager; + +import akka.actor.ActorRef; +import akka.actor.Status; +import akka.pattern.Patterns; +import org.apache.flink.runtime.rpc.akka.BaseAkkaActor; +import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager; +import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment; +import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster; +import org.apache.flink.runtime.rpc.akka.messages.RequestSlot; +import scala.concurrent.Future; + +public class ResourceManagerAkkaActor extends BaseAkkaActor { + private final ResourceManager resourceManager; + + public ResourceManagerAkkaActor(ResourceManager resourceManager) { + this.resourceManager = resourceManager; + } + + @Override + public void onReceive(Object message) throws Exception { + final ActorRef sender = getSender(); + + if (message instanceof RegisterJobMaster) { + RegisterJobMaster registerJobMaster = (RegisterJobMaster) message; + + try { + Future response = resourceManager.registerJobMaster(registerJobMaster.getJobMasterRegistration()); + Patterns.pipe(response, getContext().dispatcher()).to(sender()); + } catch (Exception e) { + sender.tell(new Status.Failure(e), getSelf()); + } + } else if (message instanceof RequestSlot) { + RequestSlot requestSlot = (RequestSlot) message; + + try { + SlotAssignment response = resourceManager.requestSlot(requestSlot.getSlotRequest()); + sender.tell(new Status.Success(response), getSelf()); + } catch (Exception e) { + sender.tell(new Status.Failure(e), getSelf()); + } + } else { + super.onReceive(message); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java new file mode 100644 index 0000000..1304707 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/resourcemanager/ResourceManagerAkkaGateway.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.resourcemanager; + +import akka.actor.ActorRef; +import akka.pattern.AskableActorRef; +import akka.util.Timeout; +import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway; +import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.resourcemanager.SlotAssignment; +import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest; +import org.apache.flink.runtime.rpc.akka.messages.RegisterJobMaster; +import org.apache.flink.runtime.rpc.akka.messages.RequestSlot; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; +import scala.reflect.ClassTag$; + +public class ResourceManagerAkkaGateway extends BaseAkkaGateway implements ResourceManagerGateway { + private final AskableActorRef actorRef; + private final Timeout timeout; + + public ResourceManagerAkkaGateway(ActorRef actorRef, Timeout timeout) { + this.actorRef = new AskableActorRef(actorRef); + this.timeout = timeout; + } + + @Override + public Future registerJobMaster(JobMasterRegistration jobMasterRegistration, FiniteDuration timeout) { + return actorRef.ask(new RegisterJobMaster(jobMasterRegistration), new Timeout(timeout)) + .mapTo(ClassTag$.MODULE$.apply(RegistrationResponse.class)); + } + + @Override + public Future registerJobMaster(JobMasterRegistration jobMasterRegistration) { + return actorRef.ask(new RegisterJobMaster(jobMasterRegistration), timeout) + .mapTo(ClassTag$.MODULE$.apply(RegistrationResponse.class)); + } + + @Override + public Future requestSlot(SlotRequest slotRequest) { + return actorRef.ask(new RequestSlot(slotRequest), timeout) + .mapTo(ClassTag$.MODULE$.apply(SlotAssignment.class)); + } + + @Override + public ActorRef getActorRef() { + return actorRef.actorRef(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java new file mode 100644 index 0000000..ed522cc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaActor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.taskexecutor; + +import akka.actor.ActorRef; +import akka.actor.Status; +import akka.dispatch.OnComplete; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.akka.BaseAkkaActor; +import org.apache.flink.runtime.rpc.akka.messages.CancelTask; +import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask; +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway; + +public class TaskExecutorAkkaActor extends BaseAkkaActor { + private final TaskExecutorGateway taskExecutor; + + public TaskExecutorAkkaActor(TaskExecutorGateway taskExecutor) { + this.taskExecutor = taskExecutor; + } + + @Override + public void onReceive(Object message) throws Exception { + final ActorRef sender = getSender(); + + if (message instanceof ExecuteTask) { + ExecuteTask executeTask = (ExecuteTask) message; + + taskExecutor.executeTask(executeTask.getTaskDeploymentDescriptor()).onComplete( + new OnComplete() { + @Override + public void onComplete(Throwable failure, Acknowledge success) throws Throwable { + if (failure != null) { + sender.tell(new Status.Failure(failure), getSelf()); + } else { + sender.tell(new Status.Success(Acknowledge.get()), getSelf()); + } + } + }, + getContext().dispatcher() + ); + } else if (message instanceof CancelTask) { + CancelTask cancelTask = (CancelTask) message; + + taskExecutor.cancelTask(cancelTask.getExecutionAttemptID()).onComplete( + new OnComplete() { + @Override + public void onComplete(Throwable failure, Acknowledge success) throws Throwable { + if (failure != null) { + sender.tell(new Status.Failure(failure), getSelf()); + } else { + sender.tell(new Status.Success(Acknowledge.get()), getSelf()); + } + } + }, + getContext().dispatcher() + ); + } else { + super.onReceive(message); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java new file mode 100644 index 0000000..7f0a522 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/taskexecutor/TaskExecutorAkkaGateway.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.akka.taskexecutor; + +import akka.actor.ActorRef; +import akka.pattern.AskableActorRef; +import akka.util.Timeout; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.akka.BaseAkkaGateway; +import org.apache.flink.runtime.rpc.akka.messages.CancelTask; +import org.apache.flink.runtime.rpc.akka.messages.ExecuteTask; +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorGateway; +import scala.concurrent.Future; +import scala.reflect.ClassTag$; + +public class TaskExecutorAkkaGateway extends BaseAkkaGateway implements TaskExecutorGateway { + private final AskableActorRef actorRef; + private final Timeout timeout; + + public TaskExecutorAkkaGateway(ActorRef actorRef, Timeout timeout) { + this.actorRef = new AskableActorRef(actorRef); + this.timeout = timeout; + } + + @Override + public Future executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) { + return actorRef.ask(new ExecuteTask(taskDeploymentDescriptor), timeout) + .mapTo(ClassTag$.MODULE$.apply(Acknowledge.class)); + } + + @Override + public Future cancelTask(ExecutionAttemptID executionAttemptId) { + return actorRef.ask(new CancelTask(executionAttemptId), timeout) + .mapTo(ClassTag$.MODULE$.apply(Acknowledge.class)); + } + + @Override + public ActorRef getActorRef() { + return actorRef.actorRef(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java new file mode 100644 index 0000000..b81b19c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.jobmaster; + +import akka.dispatch.Futures; +import akka.dispatch.Mapper; +import akka.dispatch.OnComplete; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import scala.Tuple2; +import scala.concurrent.ExecutionContext; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * JobMaster implementation. The job master is responsible for the execution of a single + * {@link org.apache.flink.runtime.jobgraph.JobGraph}. + * + * It offers the following methods as part of its rpc interface to interact with the JobMaster + * remotely: + *
    + *
  • {@link #registerAtResourceManager(String)} triggers the registration at the resource manager
  • + *
  • {@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for + * given task
  • + *
+ */ +public class JobMaster extends RpcEndpoint { + /** Execution context for future callbacks */ + private final ExecutionContext executionContext; + + /** Execution context for scheduled runnables */ + private final ScheduledExecutorService scheduledExecutorService; + + private final FiniteDuration initialRegistrationTimeout = new FiniteDuration(500, TimeUnit.MILLISECONDS); + private final FiniteDuration maxRegistrationTimeout = new FiniteDuration(30, TimeUnit.SECONDS); + private final FiniteDuration registrationDuration = new FiniteDuration(365, TimeUnit.DAYS); + private final long failedRegistrationDelay = 10000; + + /** Gateway to connected resource manager, null iff not connected */ + private ResourceManagerGateway resourceManager = null; + + /** UUID to filter out old registration runs */ + private UUID currentRegistrationRun; + + public JobMaster(RpcService rpcService, ExecutorService executorService) { + super(rpcService); + executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService); + scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + } + + public ResourceManagerGateway getResourceManager() { + return resourceManager; + } + + //---------------------------------------------------------------------------------------------- + // RPC methods + //---------------------------------------------------------------------------------------------- + + /** + * Updates the task execution state for a given task. + * + * @param taskExecutionState New task execution state for a given task + * @return Acknowledge the task execution state update + */ + @RpcMethod + public Acknowledge updateTaskExecutionState(TaskExecutionState taskExecutionState) { + System.out.println("TaskExecutionState: " + taskExecutionState); + return Acknowledge.get(); + } + + /** + * Triggers the registration of the job master at the resource manager. + * + * @param address Address of the resource manager + */ + @RpcMethod + public void registerAtResourceManager(final String address) { + currentRegistrationRun = UUID.randomUUID(); + + Future resourceManagerFuture = getRpcService().connect(address, ResourceManagerGateway.class); + + handleResourceManagerRegistration( + new JobMasterRegistration(getAddress()), + 1, + resourceManagerFuture, + currentRegistrationRun, + initialRegistrationTimeout, + maxRegistrationTimeout, + registrationDuration.fromNow()); + } + + //---------------------------------------------------------------------------------------------- + // Helper methods + //---------------------------------------------------------------------------------------------- + + /** + * Helper method to handle the resource manager registration process. If a registration attempt + * times out, then a new attempt with the doubled time out is initiated. The whole registration + * process has a deadline. Once this deadline is overdue without successful registration, the + * job master shuts down. + * + * @param jobMasterRegistration Job master registration info which is sent to the resource + * manager + * @param attemptNumber Registration attempt number + * @param resourceManagerFuture Future of the resource manager gateway + * @param registrationRun UUID describing the current registration run + * @param timeout Timeout of the last registration attempt + * @param maxTimeout Maximum timeout between registration attempts + * @param deadline Deadline for the registration + */ + void handleResourceManagerRegistration( + final JobMasterRegistration jobMasterRegistration, + final int attemptNumber, + final Future resourceManagerFuture, + final UUID registrationRun, + final FiniteDuration timeout, + final FiniteDuration maxTimeout, + final Deadline deadline) { + + // filter out concurrent registration runs + if (registrationRun.equals(currentRegistrationRun)) { + + log.info("Start registration attempt #{}.", attemptNumber); + + if (deadline.isOverdue()) { + // we've exceeded our registration deadline. This means that we have to shutdown the JobMaster + log.error("Exceeded registration deadline without successfully registering at the ResourceManager."); + shutDown(); + } else { + Future> registrationResponseFuture = resourceManagerFuture.flatMap(new Mapper>>() { + @Override + public Future> apply(ResourceManagerGateway resourceManagerGateway) { + return resourceManagerGateway.registerJobMaster(jobMasterRegistration, timeout).zip(Futures.successful(resourceManagerGateway)); + } + }, executionContext); + + registrationResponseFuture.onComplete(new OnComplete>() { + @Override + public void onComplete(Throwable failure, Tuple2 tuple) throws Throwable { + if (failure != null) { + if (failure instanceof TimeoutException) { + // we haven't received an answer in the given timeout interval, + // so increase it and try again. + final FiniteDuration newTimeout = timeout.$times(2L).min(maxTimeout); + + handleResourceManagerRegistration( + jobMasterRegistration, + attemptNumber + 1, + resourceManagerFuture, + registrationRun, + newTimeout, + maxTimeout, + deadline); + } else { + log.error("Received unknown error while registering at the ResourceManager.", failure); + shutDown(); + } + } else { + final RegistrationResponse response = tuple._1(); + final ResourceManagerGateway gateway = tuple._2(); + + if (response.isSuccess()) { + finishResourceManagerRegistration(gateway, response.getInstanceID()); + } else { + log.info("The registration was refused. Try again."); + + scheduledExecutorService.schedule(new Runnable() { + @Override + public void run() { + // we have to execute scheduled runnable in the main thread + // because we need consistency wrt currentRegistrationRun + runAsync(new Runnable() { + @Override + public void run() { + // our registration attempt was refused. Start over. + handleResourceManagerRegistration( + jobMasterRegistration, + 1, + resourceManagerFuture, + registrationRun, + initialRegistrationTimeout, + maxTimeout, + deadline); + } + }); + } + }, failedRegistrationDelay, TimeUnit.MILLISECONDS); + } + } + } + }, getMainThreadExecutionContext()); // use the main thread execution context to execute the call back in the main thread + } + } else { + log.info("Discard out-dated registration run."); + } + } + + /** + * Finish the resource manager registration by setting the new resource manager gateway. + * + * @param resourceManager New resource manager gateway + * @param instanceID Instance id assigned by the resource manager + */ + void finishResourceManagerRegistration(ResourceManagerGateway resourceManager, InstanceID instanceID) { + log.info("Successfully registered at the ResourceManager under instance id {}.", instanceID); + this.resourceManager = resourceManager; + } + + /** + * Return if the job master is connected to a resource manager. + * + * @return true if the job master is connected to the resource manager + */ + public boolean isConnected() { + return resourceManager != null; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java new file mode 100644 index 0000000..17a4c3a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMasterGateway.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.jobmaster; + +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import scala.concurrent.Future; + +/** + * {@link JobMaster} rpc gateway interface + */ +public interface JobMasterGateway extends RpcGateway { + + /** + * Updates the task execution state for a given task. + * + * @param taskExecutionState New task execution state for a given task + * @return Future acknowledge of the task execution state update + */ + Future updateTaskExecutionState(TaskExecutionState taskExecutionState); + + /** + * Triggers the registration of the job master at the resource manager. + * + * @param address Address of the resource manager + */ + void registerAtResourceManager(final String address); +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java new file mode 100644 index 0000000..7a2deae --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/JobMasterRegistration.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.resourcemanager; + +import java.io.Serializable; + +public class JobMasterRegistration implements Serializable { + private static final long serialVersionUID = 8411214999193765202L; + + private final String address; + + public JobMasterRegistration(String address) { + this.address = address; + } + + public String getAddress() { + return address; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java new file mode 100644 index 0000000..8ac9e49 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/RegistrationResponse.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.resourcemanager; + +import org.apache.flink.runtime.instance.InstanceID; + +import java.io.Serializable; + +public class RegistrationResponse implements Serializable { + private static final long serialVersionUID = -2379003255993119993L; + + private final boolean isSuccess; + private final InstanceID instanceID; + + public RegistrationResponse(boolean isSuccess, InstanceID instanceID) { + this.isSuccess = isSuccess; + this.instanceID = instanceID; + } + + public boolean isSuccess() { + return isSuccess; + } + + public InstanceID getInstanceID() { + return instanceID; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b273afad/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java new file mode 100644 index 0000000..c7e8def --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.resourcemanager; + +import akka.dispatch.Mapper; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.jobmaster.JobMaster; +import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway; +import scala.concurrent.ExecutionContext; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.Future; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +/** + * ResourceManager implementation. The resource manager is responsible for resource de-/allocation + * and bookkeeping. + * + * It offers the following methods as part of its rpc interface to interact with the him remotely: + *
    + *
  • {@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager
  • + *
  • {@link #requestSlot(SlotRequest)} requests a slot from the resource manager
  • + *
+ */ +public class ResourceManager extends RpcEndpoint { + private final ExecutionContext executionContext; + private final Map jobMasterGateways; + + public ResourceManager(RpcService rpcService, ExecutorService executorService) { + super(rpcService); + this.executionContext = ExecutionContext$.MODULE$.fromExecutor(executorService); + this.jobMasterGateways = new HashMap<>(); + } + + /** + * Register a {@link JobMaster} at the resource manager. + * + * @param jobMasterRegistration Job master registration information + * @return Future registration response + */ + @RpcMethod + public Future registerJobMaster(JobMasterRegistration jobMasterRegistration) { + Future jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class); + + return jobMasterFuture.map(new Mapper() { + @Override + public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { + InstanceID instanceID; + + if (jobMasterGateways.containsKey(jobMasterGateway)) { + instanceID = jobMasterGateways.get(jobMasterGateway); + } else { + instanceID = new InstanceID(); + jobMasterGateways.put(jobMasterGateway, instanceID); + } + + return new RegistrationResponse(true, instanceID); + } + }, getMainThreadExecutionContext()); + } + + /** + * Requests a slot from the resource manager. + * + * @param slotRequest Slot request + * @return Slot assignment + */ + @RpcMethod + public SlotAssignment requestSlot(SlotRequest slotRequest) { + System.out.println("SlotRequest: " + slotRequest); + return new SlotAssignment(); + } +}