Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B9400200BA7 for ; Thu, 6 Oct 2016 13:48:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B8076160AED; Thu, 6 Oct 2016 11:48:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6C60F160B03 for ; Thu, 6 Oct 2016 13:48:24 +0200 (CEST) Received: (qmail 92427 invoked by uid 500); 6 Oct 2016 11:48:22 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 91813 invoked by uid 99); 6 Oct 2016 11:48:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Oct 2016 11:48:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8F695E0FC4; Thu, 6 Oct 2016 11:48:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Thu, 06 Oct 2016 11:48:49 -0000 Message-Id: <52866162acea40919d34d6bf6a06f06c@git.apache.org> In-Reply-To: <7e0fffbe2a644670983dae3a42039097@git.apache.org> References: <7e0fffbe2a644670983dae3a42039097@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [29/50] [abbrv] flink git commit: [FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction archived-at: Thu, 06 Oct 2016 11:48:28 -0000 [FLINK-4656] [rpc] Port the existing code to Flink's own future abstraction This closes #2530. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8fe0905c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8fe0905c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8fe0905c Branch: refs/heads/flip-6 Commit: 8fe0905cf52809da11f65e6595a482491a18d352 Parents: c046378 Author: Till Rohrmann Authored: Wed Sep 21 17:26:21 2016 +0200 Committer: Till Rohrmann Committed: Thu Oct 6 13:38:42 2016 +0200 ---------------------------------------------------------------------- .../runtime/concurrent/impl/FlinkFuture.java | 4 ++ .../flink/runtime/jobmaster/JobMaster.java | 2 +- .../runtime/jobmaster/JobMasterGateway.java | 2 +- .../registration/RetryingRegistration.java | 65 ++++++++--------- .../resourcemanager/ResourceManager.java | 13 ++-- .../resourcemanager/ResourceManagerGateway.java | 9 ++- .../slotmanager/SlotManager.java | 9 ++- .../flink/runtime/rpc/MainThreadExecutable.java | 64 +++++++++++++++++ .../flink/runtime/rpc/MainThreadExecutor.java | 64 ----------------- .../apache/flink/runtime/rpc/RpcEndpoint.java | 60 ++++++---------- .../apache/flink/runtime/rpc/RpcService.java | 17 +++-- .../runtime/rpc/akka/AkkaInvocationHandler.java | 42 +++++------ .../flink/runtime/rpc/akka/AkkaRpcActor.java | 21 +++++- .../flink/runtime/rpc/akka/AkkaRpcService.java | 28 ++++---- .../runtime/taskexecutor/TaskExecutor.java | 12 ++-- .../taskexecutor/TaskExecutorGateway.java | 6 +- ...TaskExecutorToResourceManagerConnection.java | 34 +++++---- .../registration/RetryingRegistrationTest.java | 75 ++++++++++---------- .../registration/TestRegistrationGateway.java | 6 +- .../resourcemanager/ResourceManagerHATest.java | 4 +- .../slotmanager/SlotProtocolTest.java | 14 ++-- .../flink/runtime/rpc/AsyncCallsTest.java | 13 ++-- .../flink/runtime/rpc/RpcCompletenessTest.java | 9 +-- .../flink/runtime/rpc/TestingGatewayBase.java | 18 ++--- .../flink/runtime/rpc/TestingRpcService.java | 20 +++--- .../runtime/rpc/TestingSerialRpcService.java | 54 +++++++------- .../runtime/rpc/akka/AkkaRpcActorTest.java | 19 ++--- .../runtime/rpc/akka/AkkaRpcServiceTest.java | 4 +- .../rpc/akka/MainThreadValidationTest.java | 7 +- .../rpc/akka/MessageSerializationTest.java | 19 +++-- .../runtime/taskexecutor/TaskExecutorTest.java | 9 ++- 31 files changed, 355 insertions(+), 368 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java index 277f4fa..004738b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java @@ -60,6 +60,10 @@ public class FlinkFuture implements Future { this.scalaFuture = Preconditions.checkNotNull(scalaFuture); } + public scala.concurrent.Future getScalaFuture() { + return scalaFuture; + } + //----------------------------------------------------------------------------------- // Future's methods //----------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 0a6a7ef..1537396 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -36,7 +36,7 @@ import java.util.UUID; /** * JobMaster implementation. The job master is responsible for the execution of a single - * {@link org.apache.flink.runtime.jobgraph.JobGraph}. + * {@link JobGraph}. *

* It offers the following methods as part of its rpc interface to interact with the JobMaster * remotely: http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index a53e383..86bf17c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -18,10 +18,10 @@ package org.apache.flink.runtime.jobmaster; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import scala.concurrent.Future; /** * {@link JobMaster} rpc gateway interface http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java index ea49e42..32dd978 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java @@ -18,19 +18,17 @@ package org.apache.flink.runtime.registration; -import akka.dispatch.OnFailure; -import akka.dispatch.OnSuccess; - import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; import org.slf4j.Logger; -import scala.concurrent.Future; -import scala.concurrent.Promise; -import scala.concurrent.impl.Promise.DefaultPromise; - import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -86,7 +84,7 @@ public abstract class RetryingRegistration> completionPromise; + private final CompletableFuture> completionFuture; private final long initialRegistrationTimeout; @@ -140,7 +138,7 @@ public abstract class RetryingRegistration(); + this.completionFuture = new FlinkCompletableFuture<>(); } // ------------------------------------------------------------------------ @@ -148,7 +146,7 @@ public abstract class RetryingRegistration> getFuture() { - return completionPromise.future(); + return completionFuture; } /** @@ -184,28 +182,30 @@ public abstract class RetryingRegistration resourceManagerFuture = rpcService.connect(targetAddress, targetType); // upon success, start the registration attempts - resourceManagerFuture.onSuccess(new OnSuccess() { + resourceManagerFuture.thenAcceptAsync(new AcceptFunction() { @Override - public void onSuccess(Gateway result) { + public void accept(Gateway result) { log.info("Resolved {} address, beginning registration", targetName); register(result, 1, initialRegistrationTimeout); } - }, rpcService.getExecutionContext()); - + }, rpcService.getExecutor()); + // upon failure, retry, unless this is cancelled - resourceManagerFuture.onFailure(new OnFailure() { + resourceManagerFuture.exceptionallyAsync(new ApplyFunction() { @Override - public void onFailure(Throwable failure) { + public Void apply(Throwable failure) { if (!isCanceled()) { log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress, failure); startRegistration(); } + + return null; } - }, rpcService.getExecutionContext()); + }, rpcService.getExecutor()); } catch (Throwable t) { cancel(); - completionPromise.tryFailure(t); + completionFuture.completeExceptionally(t); } } @@ -225,15 +225,14 @@ public abstract class RetryingRegistration registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis); // if the registration was successful, let the TaskExecutor know - registrationFuture.onSuccess(new OnSuccess() { - + registrationFuture.thenAcceptAsync(new AcceptFunction() { @Override - public void onSuccess(RegistrationResponse result) throws Throwable { + public void accept(RegistrationResponse result) { if (!isCanceled()) { if (result instanceof RegistrationResponse.Success) { // registration successful! Success success = (Success) result; - completionPromise.success(new Tuple2<>(gateway, success)); + completionFuture.complete(Tuple2.of(gateway, success)); } else { // registration refused or unknown @@ -241,7 +240,7 @@ public abstract class RetryingRegistration() { @Override - public void onFailure(Throwable failure) { + public Void apply(Throwable failure) { if (!isCanceled()) { if (failure instanceof TimeoutException) { // we simply have not received a response in time. maybe the timeout was @@ -262,26 +261,28 @@ public abstract class RetryingRegistration impleme getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class); final JobID jobID = jobMasterRegistration.getJobID(); - return jobMasterFuture.map(new Mapper() { + return jobMasterFuture.thenApplyAsync(new ApplyFunction() { @Override - public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { - + public RegistrationResponse apply(JobMasterGateway jobMasterGateway) { final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway); if (existingGateway != null) { LOG.info("Replacing existing gateway {} for JobID {} with {}.", @@ -137,7 +134,7 @@ public class ResourceManager extends RpcEndpoint impleme } return new RegistrationResponse(true); } - }, getMainThreadExecutionContext()); + }, getMainThreadExecutor()); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index c8e3488..5c8786c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -18,14 +18,13 @@ package org.apache.flink.runtime.resourcemanager; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.jobmaster.JobMaster; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import java.util.UUID; /** @@ -42,7 +41,7 @@ public interface ResourceManagerGateway extends RpcGateway { */ Future registerJobMaster( JobMasterRegistration jobMasterRegistration, - @RpcTimeout FiniteDuration timeout); + @RpcTimeout Time timeout); /** * Register a {@link JobMaster} at the resource manager. @@ -73,5 +72,5 @@ public interface ResourceManagerGateway extends RpcGateway { UUID resourceManagerLeaderId, String taskExecutorAddress, ResourceID resourceID, - @RpcTimeout FiniteDuration timeout); + @RpcTimeout Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index 96fde7d..97176b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -19,11 +19,13 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.ResourceSlot; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; @@ -33,14 +35,11 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -79,7 +78,7 @@ public abstract class SlotManager implements LeaderRetrievalListener { /** All allocations, we can lookup allocations either by SlotID or AllocationID */ private final AllocationMap allocationMap; - private final FiniteDuration timeout; + private final Time timeout; /** The current leader id set by the ResourceManager */ private UUID leaderID; @@ -90,7 +89,7 @@ public abstract class SlotManager implements LeaderRetrievalListener { this.freeSlots = new HashMap<>(16); this.allocationMap = new AllocationMap(); this.taskManagerGateways = new HashMap<>(); - this.timeout = new FiniteDuration(10, TimeUnit.SECONDS); + this.timeout = Time.seconds(10); } http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java new file mode 100644 index 0000000..ec1c984 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutable.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.Future; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; + +/** + * Interface to execute {@link Runnable} and {@link Callable} in the main thread of the underlying + * RPC endpoint. + * + *

This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint} + * implementation which allows to dispatch local procedures to the main thread of the underlying + * RPC endpoint. + */ +public interface MainThreadExecutable { + + /** + * Execute the runnable in the main thread of the underlying RPC endpoint. + * + * @param runnable Runnable to be executed + */ + void runAsync(Runnable runnable); + + /** + * Execute the callable in the main thread of the underlying RPC endpoint and return a future for + * the callable result. If the future is not completed within the given timeout, the returned + * future will throw a {@link TimeoutException}. + * + * @param callable Callable to be executed + * @param callTimeout Timeout for the future to complete + * @param Return value of the callable + * @return Future of the callable result + */ + Future callAsync(Callable callable, Time callTimeout); + + /** + * Execute the runnable in the main thread of the underlying RPC endpoint, with + * a delay of the given number of milliseconds. + * + * @param runnable Runnable to be executed + * @param delay The delay, in milliseconds, after which the runnable will be executed + */ + void scheduleRunAsync(Runnable runnable, long delay); +} http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java deleted file mode 100644 index 5e4fead..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/MainThreadExecutor.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc; - -import akka.util.Timeout; -import scala.concurrent.Future; - -import java.util.concurrent.Callable; -import java.util.concurrent.TimeoutException; - -/** - * Interface to execute {@link Runnable} and {@link Callable} in the main thread of the underlying - * RPC endpoint. - * - *

This interface is intended to be implemented by the self gateway in a {@link RpcEndpoint} - * implementation which allows to dispatch local procedures to the main thread of the underlying - * RPC endpoint. - */ -public interface MainThreadExecutor { - - /** - * Execute the runnable in the main thread of the underlying RPC endpoint. - * - * @param runnable Runnable to be executed - */ - void runAsync(Runnable runnable); - - /** - * Execute the callable in the main thread of the underlying RPC endpoint and return a future for - * the callable result. If the future is not completed within the given timeout, the returned - * future will throw a {@link TimeoutException}. - * - * @param callable Callable to be executed - * @param callTimeout Timeout for the future to complete - * @param Return value of the callable - * @return Future of the callable result - */ - Future callAsync(Callable callable, Timeout callTimeout); - - /** - * Execute the runnable in the main thread of the underlying RPC endpoint, with - * a delay of the given number of milliseconds. - * - * @param runnable Runnable to be executed - * @param delay The delay, in milliseconds, after which the runnable will be executed - */ - void scheduleRunAsync(Runnable runnable, long delay); -} http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index e9e2b2c..4e5e49a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -18,16 +18,15 @@ package org.apache.flink.runtime.rpc; -import akka.util.Timeout; - +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.ReflectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; - import java.util.concurrent.Callable; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -49,8 +48,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * thread, we don't have to reason about concurrent accesses, in the same way in the Actor Model * of Erlang or Akka. * - *

The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Timeout)} - * and the {@link #getMainThreadExecutionContext()} to execute code in the RPC endoint's main thread. + *

The RPC endpoint provides provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Time)} + * and the {@link #getMainThreadExecutor()} to execute code in the RPC endoint's main thread. * * @param The RPC gateway counterpart for the implementing RPC endpoint */ @@ -69,9 +68,9 @@ public abstract class RpcEndpoint { /** Self gateway which can be used to schedule asynchronous calls on yourself */ private final C self; - /** The main thread execution context to be used to execute future callbacks in the main thread + /** The main thread executor to be used to execute future callbacks in the main thread * of the executing rpc server. */ - private final ExecutionContext mainThreadExecutionContext; + private final Executor mainThreadExecutor; /** A reference to the endpoint's main thread, if the current method is called by the main thread */ final AtomicReference currentMainThread = new AtomicReference<>(null); @@ -89,7 +88,7 @@ public abstract class RpcEndpoint { this.selfGatewayType = ReflectionUtil.getTemplateType1(getClass()); this.self = rpcService.startServer(this); - this.mainThreadExecutionContext = new MainThreadExecutionContext((MainThreadExecutor) self); + this.mainThreadExecutor = new MainThreadExecutor((MainThreadExecutable) self); } /** @@ -120,7 +119,7 @@ public abstract class RpcEndpoint { * Shuts down the underlying RPC endpoint via the RPC service. * After this method was called, the RPC endpoint will no longer be reachable, neither remotely, * not via its {@link #getSelf() self gateway}. It will also not accepts executions in main thread - * any more (via {@link #callAsync(Callable, Timeout)} and {@link #runAsync(Runnable)}). + * any more (via {@link #callAsync(Callable, Time)} and {@link #runAsync(Runnable)}). * *

This method can be overridden to add RPC endpoint specific shut down code. * The overridden method should always call the parent shut down method. @@ -161,8 +160,8 @@ public abstract class RpcEndpoint { * * @return Main thread execution context */ - protected ExecutionContext getMainThreadExecutionContext() { - return mainThreadExecutionContext; + protected Executor getMainThreadExecutor() { + return mainThreadExecutor; } /** @@ -185,7 +184,7 @@ public abstract class RpcEndpoint { * @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint */ protected void runAsync(Runnable runnable) { - ((MainThreadExecutor) self).runAsync(runnable); + ((MainThreadExecutable) self).runAsync(runnable); } /** @@ -196,7 +195,7 @@ public abstract class RpcEndpoint { * @param delay The delay after which the runnable will be executed */ protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) { - ((MainThreadExecutor) self).scheduleRunAsync(runnable, unit.toMillis(delay)); + ((MainThreadExecutable) self).scheduleRunAsync(runnable, unit.toMillis(delay)); } /** @@ -209,8 +208,8 @@ public abstract class RpcEndpoint { * @param Return type of the callable * @return Future for the result of the callable. */ - protected Future callAsync(Callable callable, Timeout timeout) { - return ((MainThreadExecutor) self).callAsync(callable, timeout); + protected Future callAsync(Callable callable, Time timeout) { + return ((MainThreadExecutable) self).callAsync(callable, timeout); } // ------------------------------------------------------------------------ @@ -241,36 +240,19 @@ public abstract class RpcEndpoint { // ------------------------------------------------------------------------ /** - * Execution context which executes runnables in the main thread context. A reported failure - * will cause the underlying rpc server to shut down. + * Executor which executes runnables in the main thread context. */ - private class MainThreadExecutionContext implements ExecutionContext { + private class MainThreadExecutor implements Executor { - private final MainThreadExecutor gateway; + private final MainThreadExecutable gateway; - MainThreadExecutionContext(MainThreadExecutor gateway) { - this.gateway = gateway; + MainThreadExecutor(MainThreadExecutable gateway) { + this.gateway = Preconditions.checkNotNull(gateway); } @Override public void execute(Runnable runnable) { gateway.runAsync(runnable); } - - @Override - public void reportFailure(final Throwable t) { - gateway.runAsync(new Runnable() { - @Override - public void run() { - log.error("Encountered failure in the main thread execution context.", t); - shutDown(); - } - }); - } - - @Override - public ExecutionContext prepare() { - return this; - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 78c1cec..a367ff2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -18,10 +18,10 @@ package org.apache.flink.runtime.rpc; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; /** @@ -68,23 +68,22 @@ public interface RpcService { void stopService(); /** - * Gets the execution context, provided by this RPC service. This execution - * context can be used for example for the {@code onComplete(...)} or {@code onSuccess(...)} - * methods of Futures. + * Gets the executor, provided by this RPC service. This executor can be used for example for + * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures. * - *

IMPORTANT: This execution context does not isolate the method invocations against + *

IMPORTANT: This executor does not isolate the method invocations against * any concurrent invocations and is therefore not suitable to run completion methods of futures * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the - * {@link RpcEndpoint#getMainThreadExecutionContext() MainThreadExecutionContext} of that + * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that * {@code RpcEndpoint}. * * @return The execution context provided by the RPC service */ - ExecutionContext getExecutionContext(); + Executor getExecutor(); /** * Execute the runnable in the execution context of this RPC Service, as returned by - * {@link #getExecutionContext()}, after a scheduled delay. + * {@link #getExecutor()}, after a scheduled delay. * * @param runnable Runnable to be executed * @param delay The delay after which the runnable will be executed http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index bfa04f6..8f4deff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -20,9 +20,11 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorRef; import akka.pattern.Patterns; -import akka.util.Timeout; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.rpc.MainThreadExecutor; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.rpc.MainThreadExecutable; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.StartStoppable; @@ -34,9 +36,6 @@ import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RunAsync; import org.apache.flink.util.Preconditions; import org.apache.log4j.Logger; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.io.IOException; import java.lang.annotation.Annotation; @@ -53,7 +52,7 @@ import static org.apache.flink.util.Preconditions.checkArgument; * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link AkkaRpcActor} where it is * executed. */ -class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutor, StartStoppable { +class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThreadExecutable, StartStoppable { private static final Logger LOG = Logger.getLogger(AkkaInvocationHandler.class); private final String address; @@ -64,11 +63,11 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea private final boolean isLocal; // default timeout for asks - private final Timeout timeout; + private final Time timeout; private final long maximumFramesize; - AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Timeout timeout, long maximumFramesize) { + AkkaInvocationHandler(String address, ActorRef rpcEndpoint, Time timeout, long maximumFramesize) { this.address = Preconditions.checkNotNull(address); this.rpcEndpoint = Preconditions.checkNotNull(rpcEndpoint); this.isLocal = this.rpcEndpoint.path().address().hasLocalScope(); @@ -82,7 +81,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea Object result; - if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutor.class) || + if (declaringClass.equals(AkkaGateway.class) || declaringClass.equals(MainThreadExecutable.class) || declaringClass.equals(Object.class) || declaringClass.equals(StartStoppable.class) || declaringClass.equals(RpcGateway.class)) { result = method.invoke(this, args); @@ -90,7 +89,7 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea String methodName = method.getName(); Class[] parameterTypes = method.getParameterTypes(); Annotation[][] parameterAnnotations = method.getParameterAnnotations(); - Timeout futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); + Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout); Tuple2[], Object[]> filteredArguments = filterArguments( parameterTypes, @@ -130,13 +129,14 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea result = null; } else if (returnType.equals(Future.class)) { // execute an asynchronous call - result = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout); + result = new FlinkFuture<>(Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds())); } else { // execute a synchronous call - Future futureResult = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout); - FiniteDuration duration = timeout.duration(); + scala.concurrent.Future scalaFuture = Patterns.ask(rpcEndpoint, rpcInvocation, futureTimeout.toMilliseconds()); - result = Await.result(futureResult, duration); + Future futureResult = new FlinkFuture<>(scalaFuture); + + return futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit()); } } @@ -167,12 +167,12 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea } @Override - public Future callAsync(Callable callable, Timeout callTimeout) { + public Future callAsync(Callable callable, Time callTimeout) { if(isLocal) { @SuppressWarnings("unchecked") - Future result = (Future) Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout); + scala.concurrent.Future result = (scala.concurrent.Future) Patterns.ask(rpcEndpoint, new CallAsync(callable), callTimeout.toMilliseconds()); - return result; + return new FlinkFuture<>(result); } else { throw new RuntimeException("Trying to send a Callable to a remote actor at " + rpcEndpoint.path() + ". This is not supported."); @@ -204,17 +204,17 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea * has been found * @return Timeout extracted from the array of arguments or the default timeout */ - private static Timeout extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Timeout defaultTimeout) { + private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Object[] args, Time defaultTimeout) { if (args != null) { Preconditions.checkArgument(parameterAnnotations.length == args.length); for (int i = 0; i < parameterAnnotations.length; i++) { if (isRpcTimeout(parameterAnnotations[i])) { - if (args[i] instanceof FiniteDuration) { - return new Timeout((FiniteDuration) args[i]); + if (args[i] instanceof Time) { + return (Time) args[i]; } else { throw new RuntimeException("The rpc timeout parameter must be of type " + - FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() + + Time.class.getName() + ". The type " + args[i].getClass().getName() + " is not supported."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 2373be9..59daa46 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -21,8 +21,11 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorRef; import akka.actor.Status; import akka.actor.UntypedActorWithStash; +import akka.dispatch.Futures; import akka.japi.Procedure; import akka.pattern.Patterns; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.rpc.MainThreadValidatorUtil; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; @@ -35,7 +38,6 @@ import org.apache.flink.runtime.rpc.akka.messages.RunAsync; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import java.io.IOException; @@ -146,8 +148,23 @@ class AkkaRpcActor> extends Untyp Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); if (result instanceof Future) { + final Future future = (Future) result; + // pipe result to sender - Patterns.pipe((Future) result, getContext().dispatcher()).to(getSender()); + if (future instanceof FlinkFuture) { + // FlinkFutures are currently backed by Scala's futures + FlinkFuture flinkFuture = (FlinkFuture) future; + + Patterns.pipe(flinkFuture.getScalaFuture(), getContext().dispatcher()).to(getSender()); + } else { + // We have to unpack the Flink future and pack it into a Scala future + Patterns.pipe(Futures.future(new Callable() { + @Override + public Object call() throws Exception { + return future.get(); + } + }, getContext().dispatcher()), getContext().dispatcher()); + } } else { // tell the sender the result of the computation getSender().tell(new Status.Success(result), getSelf()); http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 060a1ef..36f1115 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -26,11 +26,13 @@ import akka.actor.Identify; import akka.actor.PoisonPill; import akka.actor.Props; import akka.dispatch.Mapper; -import akka.pattern.AskableActorSelection; -import akka.util.Timeout; +import akka.pattern.Patterns; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.rpc.MainThreadExecutor; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; +import org.apache.flink.runtime.rpc.MainThreadExecutable; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; @@ -39,8 +41,6 @@ import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import javax.annotation.concurrent.ThreadSafe; @@ -48,6 +48,7 @@ import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkArgument; @@ -68,13 +69,13 @@ public class AkkaRpcService implements RpcService { private final Object lock = new Object(); private final ActorSystem actorSystem; - private final Timeout timeout; + private final Time timeout; private final Set actors = new HashSet<>(4); private final long maximumFramesize; private volatile boolean stopped; - public AkkaRpcService(final ActorSystem actorSystem, final Timeout timeout) { + public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { this.actorSystem = checkNotNull(actorSystem, "actor system"); this.timeout = checkNotNull(timeout, "timeout"); @@ -95,10 +96,9 @@ public class AkkaRpcService implements RpcService { address, clazz.getName()); final ActorSelection actorSel = actorSystem.actorSelection(address); - final AskableActorSelection asker = new AskableActorSelection(actorSel); - final Future identify = asker.ask(new Identify(42), timeout); - return identify.map(new Mapper(){ + final scala.concurrent.Future identify = Patterns.ask(actorSel, new Identify(42), timeout.toMilliseconds()); + final scala.concurrent.Future resultFuture = identify.map(new Mapper(){ @Override public C checkedApply(Object obj) throws Exception { @@ -128,6 +128,8 @@ public class AkkaRpcService implements RpcService { } } }, actorSystem.dispatcher()); + + return new FlinkFuture<>(resultFuture); } @Override @@ -159,7 +161,7 @@ public class AkkaRpcService implements RpcService { classLoader, new Class[]{ rpcEndpoint.getSelfGatewayType(), - MainThreadExecutor.class, + MainThreadExecutable.class, StartStoppable.class, AkkaGateway.class}, akkaInvocationHandler); @@ -209,7 +211,7 @@ public class AkkaRpcService implements RpcService { } @Override - public ExecutionContext getExecutionContext() { + public Executor getExecutor() { return actorSystem.dispatcher(); } @@ -219,6 +221,6 @@ public class AkkaRpcService implements RpcService { checkNotNull(unit, "unit"); checkArgument(delay >= 0, "delay must be zero or larger"); - actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, getExecutionContext()); + actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index fadae5f..d84a6a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.taskexecutor; import akka.actor.ActorSystem; -import akka.util.Timeout; import com.typesafe.config.Config; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.LocalConnectionManager; @@ -35,6 +35,7 @@ import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; import org.apache.flink.runtime.resourcemanager.SlotRequestReply; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.Preconditions; +import org.jboss.netty.channel.ChannelException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +79,7 @@ import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.io.IOException; +import java.net.BindException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.UUID; @@ -198,7 +200,7 @@ public class TaskExecutor extends RpcEndpoint { this, newLeaderAddress, newLeaderId, - getMainThreadExecutionContext()); + getMainThreadExecutor()); resourceManagerConnection.start(); } } @@ -302,9 +304,9 @@ public class TaskExecutor extends RpcEndpoint { LOG.debug("Using akka configuration\n " + akkaConfig); taskManagerSystem = AkkaUtils.createActorSystem(akkaConfig); } catch (Throwable t) { - if (t instanceof org.jboss.netty.channel.ChannelException) { + if (t instanceof ChannelException) { Throwable cause = t.getCause(); - if (cause != null && t.getCause() instanceof java.net.BindException) { + if (cause != null && t.getCause() instanceof BindException) { String address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort); throw new IOException("Unable to bind TaskManager actor system to address " + address + " - " + cause.getMessage(), t); @@ -314,7 +316,7 @@ public class TaskExecutor extends RpcEndpoint { } // start akka rpc service based on actor system - final Timeout timeout = new Timeout(AkkaUtils.getTimeout(configuration).toMillis(), TimeUnit.MILLISECONDS); + final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis()); final AkkaRpcService akkaRpcService = new AkkaRpcService(taskManagerSystem, timeout); // start high availability service to implement getResourceManagerLeaderRetriever method only http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 65323a8..0962802 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -18,12 +18,12 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.resourcemanager.SlotRequestReply; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.util.UUID; @@ -48,5 +48,5 @@ public interface TaskExecutorGateway extends RpcGateway { Future requestSlot( AllocationID allocationID, UUID resourceManagerLeaderID, - @RpcTimeout FiniteDuration timeout); + @RpcTimeout Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java index 28062b6..647359d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -18,11 +18,12 @@ package org.apache.flink.runtime.taskexecutor; -import akka.dispatch.OnFailure; -import akka.dispatch.OnSuccess; - +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.AcceptFunction; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.registration.RegistrationResponse; @@ -31,12 +32,8 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.slf4j.Logger; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import java.util.UUID; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executor; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -57,7 +54,7 @@ public class TaskExecutorToResourceManagerConnection { private final String resourceManagerAddress; /** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */ - private final ExecutionContext executionContext; + private final Executor executor; private TaskExecutorToResourceManagerConnection.ResourceManagerRegistration pendingRegistration; @@ -74,13 +71,13 @@ public class TaskExecutorToResourceManagerConnection { TaskExecutor taskExecutor, String resourceManagerAddress, UUID resourceManagerLeaderId, - ExecutionContext executionContext) { + Executor executor) { this.log = checkNotNull(log); this.taskExecutor = checkNotNull(taskExecutor); this.resourceManagerAddress = checkNotNull(resourceManagerAddress); this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); - this.executionContext = checkNotNull(executionContext); + this.executor = checkNotNull(executor); } // ------------------------------------------------------------------------ @@ -100,21 +97,22 @@ public class TaskExecutorToResourceManagerConnection { Future> future = pendingRegistration.getFuture(); - future.onSuccess(new OnSuccess>() { + future.thenAcceptAsync(new AcceptFunction>() { @Override - public void onSuccess(Tuple2 result) { + public void accept(Tuple2 result) { registrationId = result.f1.getRegistrationId(); registeredResourceManager = result.f0; } - }, executionContext); + }, executor); // this future should only ever fail if there is a bug, not if the registration is declined - future.onFailure(new OnFailure() { + future.exceptionallyAsync(new ApplyFunction() { @Override - public void onFailure(Throwable failure) { + public Void apply(Throwable failure) { taskExecutor.onFatalErrorAsync(failure); + return null; } - }, executionContext); + }, executor); } public void close() { @@ -197,7 +195,7 @@ public class TaskExecutorToResourceManagerConnection { protected Future invokeRegistration( ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception { - FiniteDuration timeout = new FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS); + Time timeout = Time.milliseconds(timeoutMillis); return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, timeout); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java index 80fa19c..e56a9ec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java @@ -18,9 +18,9 @@ package org.apache.flink.runtime.registration; -import akka.dispatch.Futures; - import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.util.TestLogger; @@ -29,18 +29,13 @@ import org.junit.Test; import org.slf4j.LoggerFactory; -import scala.concurrent.Await; -import scala.concurrent.ExecutionContext$; -import scala.concurrent.Future; -import scala.concurrent.Promise; -import scala.concurrent.duration.FiniteDuration; - import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -71,8 +66,8 @@ public class RetryingRegistrationTest extends TestLogger { // multiple accesses return the same future assertEquals(future, registration.getFuture()); - Tuple2 success = - Await.result(future, new FiniteDuration(10, SECONDS)); + Tuple2 success = + future.get(10L, TimeUnit.SECONDS); // validate correct invocation and result assertEquals(testId, success.f1.getCorrelationId()); @@ -83,7 +78,7 @@ public class RetryingRegistrationTest extends TestLogger { rpc.stopService(); } } - + @Test public void testPropagateFailures() throws Exception { final String testExceptionMessage = "testExceptionMessage"; @@ -96,9 +91,15 @@ public class RetryingRegistrationTest extends TestLogger { registration.startRegistration(); Future future = registration.getFuture(); - assertTrue(future.failed().isCompleted()); + assertTrue(future.isDone()); - assertEquals(testExceptionMessage, future.failed().value().get().get().getMessage()); + try { + future.get(); + + fail("We expected an ExecutionException."); + } catch (ExecutionException e) { + assertEquals(testExceptionMessage, e.getCause().getMessage()); + } } @Test @@ -113,16 +114,16 @@ public class RetryingRegistrationTest extends TestLogger { // RPC service that fails upon the first connection, but succeeds on the second RpcService rpc = mock(RpcService.class); when(rpc.connect(anyString(), any(Class.class))).thenReturn( - Futures.failed(new Exception("test connect failure")), // first connection attempt fails - Futures.successful(testGateway) // second connection attempt succeeds + FlinkCompletableFuture.completedExceptionally(new Exception("test connect failure")), // first connection attempt fails + FlinkCompletableFuture.completed(testGateway) // second connection attempt succeeds ); - when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor)); + when(rpc.getExecutor()).thenReturn(executor); TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId); registration.startRegistration(); Tuple2 success = - Await.result(registration.getFuture(), new FiniteDuration(10, SECONDS)); + registration.getFuture().get(10L, TimeUnit.SECONDS); // validate correct invocation and result assertEquals(testId, success.f1.getCorrelationId()); @@ -151,23 +152,23 @@ public class RetryingRegistrationTest extends TestLogger { try { rpc.registerGateway(testEndpointAddress, testGateway); - + TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); - + long started = System.nanoTime(); registration.startRegistration(); - + Future> future = registration.getFuture(); Tuple2 success = - Await.result(future, new FiniteDuration(10, SECONDS)); - + future.get(10L, TimeUnit.SECONDS); + long finished = System.nanoTime(); long elapsedMillis = (finished - started) / 1000000; - + // validate correct invocation and result assertEquals(testId, success.f1.getCorrelationId()); assertEquals(leaderId, testGateway.getInvocations().take().leaderId()); - + // validate that some retry-delay / back-off behavior happened assertTrue("retries did not properly back off", elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT); } @@ -199,10 +200,10 @@ public class RetryingRegistrationTest extends TestLogger { long started = System.nanoTime(); registration.startRegistration(); - + Future> future = registration.getFuture(); Tuple2 success = - Await.result(future, new FiniteDuration(10, SECONDS)); + future.get(10L, TimeUnit.SECONDS); long finished = System.nanoTime(); long elapsedMillis = (finished - started) / 1000000; @@ -212,7 +213,7 @@ public class RetryingRegistrationTest extends TestLogger { assertEquals(leaderId, testGateway.getInvocations().take().leaderId()); // validate that some retry-delay / back-off behavior happened - assertTrue("retries did not properly back off", elapsedMillis >= + assertTrue("retries did not properly back off", elapsedMillis >= 2 * TestRetryingRegistration.INITIAL_TIMEOUT + TestRetryingRegistration.DELAY_ON_DECLINE); } finally { @@ -220,7 +221,7 @@ public class RetryingRegistrationTest extends TestLogger { rpc.stopService(); } } - + @Test @SuppressWarnings("unchecked") public void testRetryOnError() throws Exception { @@ -235,9 +236,9 @@ public class RetryingRegistrationTest extends TestLogger { TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn( - Futures.failed(new Exception("test exception")), - Futures.successful(new TestRegistrationSuccess(testId))); - + FlinkCompletableFuture.completedExceptionally(new Exception("test exception")), + FlinkCompletableFuture.completed(new TestRegistrationSuccess(testId))); + rpc.registerGateway(testEndpointAddress, testGateway); TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); @@ -247,11 +248,11 @@ public class RetryingRegistrationTest extends TestLogger { Future> future = registration.getFuture(); Tuple2 success = - Await.result(future, new FiniteDuration(10, SECONDS)); + future.get(10, TimeUnit.SECONDS); long finished = System.nanoTime(); long elapsedMillis = (finished - started) / 1000000; - + assertEquals(testId, success.f1.getCorrelationId()); // validate that some retry-delay / back-off behavior happened @@ -271,10 +272,10 @@ public class RetryingRegistrationTest extends TestLogger { TestingRpcService rpc = new TestingRpcService(); try { - Promise result = Futures.promise(); + FlinkCompletableFuture result = new FlinkCompletableFuture<>(); TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); - when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result.future()); + when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result); rpc.registerGateway(testEndpointAddress, testGateway); @@ -283,7 +284,7 @@ public class RetryingRegistrationTest extends TestLogger { // cancel and fail the current registration attempt registration.cancel(); - result.failure(new TimeoutException()); + result.completeExceptionally(new TimeoutException()); // there should not be a second registration attempt verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong()); http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java index 431fbe8..2843aeb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java @@ -20,11 +20,11 @@ package org.apache.flink.runtime.registration; import akka.dispatch.Futures; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.rpc.TestingGatewayBase; import org.apache.flink.util.Preconditions; -import scala.concurrent.Future; - import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -56,7 +56,7 @@ public class TestRegistrationGateway extends TestingGatewayBase { } // return a completed future (for a proper value), or one that never completes and will time out (for null) - return response != null ? Futures.successful(response) : this.futureWithTimeout(timeout); + return response != null ? FlinkCompletableFuture.completed(response) : this.futureWithTimeout(timeout); } public BlockingQueue getInvocations() { http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 8183c0a..64a1191 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; -import org.apache.flink.runtime.rpc.MainThreadExecutor; +import org.apache.flink.runtime.rpc.MainThreadExecutable; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; @@ -68,7 +68,7 @@ public class ResourceManagerHATest { Assert.assertNull(resourceManager.getLeaderSessionID()); } - private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutor, StartStoppable, RpcGateway { + private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutable, StartStoppable, RpcGateway { @Override public void runAsync(Runnable runnable) { runnable.run(); http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 85d2880..1f9e7e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -18,10 +18,12 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.highavailability.NonHaServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; @@ -40,10 +42,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; import java.util.Collections; import java.util.UUID; @@ -99,7 +97,7 @@ public class SlotProtocolTest extends TestLogger { Future registrationFuture = resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); try { - Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS)); + registrationFuture.get(5, TimeUnit.SECONDS); } catch (Exception e) { Assert.fail("JobManager registration Future didn't become ready."); } @@ -141,7 +139,7 @@ public class SlotProtocolTest extends TestLogger { slotManager.updateSlotStatus(slotReport); // 4) Slot becomes available and TaskExecutor gets a SlotRequest - verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(FiniteDuration.class)); + verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class)); } /** @@ -171,7 +169,7 @@ public class SlotProtocolTest extends TestLogger { Future registrationFuture = resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); try { - Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS)); + registrationFuture.get(5, TimeUnit.SECONDS); } catch (Exception e) { Assert.fail("JobManager registration Future didn't become ready."); } @@ -207,7 +205,7 @@ public class SlotProtocolTest extends TestLogger { // 4) a SlotRequest is routed to the TaskExecutor - verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(FiniteDuration.class)); + verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class)); } http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index 1791056..7c6b0ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -21,18 +21,16 @@ package org.apache.flink.runtime.rpc; import akka.actor.ActorSystem; import akka.util.Timeout; +import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,7 +47,7 @@ public class AsyncCallsTest extends TestLogger { private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); private static AkkaRpcService akkaRpcService = - new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS)); + new AkkaRpcService(actorSystem, Time.milliseconds(10000L)); @AfterClass public static void shutdown() { @@ -104,8 +102,9 @@ public class AsyncCallsTest extends TestLogger { } return "test"; } - }, new Timeout(30, TimeUnit.SECONDS)); - String str = Await.result(result, new FiniteDuration(30, TimeUnit.SECONDS)); + }, Time.seconds(30L)); + + String str = result.get(30, TimeUnit.SECONDS); assertEquals("test", str); // validate that no concurrent access happened http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index b431eb9..ee3f784 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -18,14 +18,14 @@ package org.apache.flink.runtime.rpc; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.util.ReflectionUtil; import org.apache.flink.util.TestLogger; import org.junit.Test; import org.reflections.Reflections; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.lang.annotation.Annotation; import java.lang.reflect.Method; @@ -43,6 +43,7 @@ import static org.junit.Assert.fail; public class RpcCompletenessTest extends TestLogger { private static final Class futureClass = Future.class; + private static final Class timeoutClass = Time.class; @Test @SuppressWarnings({"rawtypes", "unchecked"}) @@ -147,8 +148,8 @@ public class RpcCompletenessTest extends TestLogger { for (int i = 0; i < parameterAnnotations.length; i++) { if (RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) { assertTrue( - "The rpc timeout has to be of type " + FiniteDuration.class.getName() + ".", - parameterTypes[i].equals(FiniteDuration.class)); + "The rpc timeout has to be of type " + timeoutClass.getName() + ".", + parameterTypes[i].equals(timeoutClass)); rpcTimeoutParameters++; } http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java index 8133a87..caf5e81 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingGatewayBase.java @@ -18,9 +18,9 @@ package org.apache.flink.runtime.rpc; -import akka.dispatch.Futures; -import scala.concurrent.Future; -import scala.concurrent.Promise; +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -73,25 +73,25 @@ public abstract class TestingGatewayBase implements RpcGateway { // ------------------------------------------------------------------------ public Future futureWithTimeout(long timeoutMillis) { - Promise promise = Futures.promise(); - executor.schedule(new FutureTimeout(promise), timeoutMillis, TimeUnit.MILLISECONDS); - return promise.future(); + FlinkCompletableFuture future = new FlinkCompletableFuture<>(); + executor.schedule(new FutureTimeout(future), timeoutMillis, TimeUnit.MILLISECONDS); + return future; } // ------------------------------------------------------------------------ private static final class FutureTimeout implements Runnable { - private final Promise promise; + private final CompletableFuture promise; - private FutureTimeout(Promise promise) { + private FutureTimeout(CompletableFuture promise) { this.promise = promise; } @Override public void run() { try { - promise.failure(new TimeoutException()); + promise.completeExceptionally(new TimeoutException()); } catch (Throwable t) { System.err.println("CAUGHT AN ERROR IN THE TEST: " + t.getMessage()); t.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/8fe0905c/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java index 2212680..f164056 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -18,18 +18,14 @@ package org.apache.flink.runtime.rpc; -import akka.dispatch.Futures; -import akka.util.Timeout; - +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -69,7 +65,7 @@ public class TestingRpcService extends AkkaRpcService { * Creates a new {@code TestingRpcService}, using the given configuration. */ public TestingRpcService(Configuration configuration) { - super(AkkaUtils.createLocalActorSystem(configuration), new Timeout(new FiniteDuration(10, TimeUnit.SECONDS))); + super(AkkaUtils.createLocalActorSystem(configuration), Time.seconds(10)); this.registeredConnections = new ConcurrentHashMap<>(); } @@ -103,13 +99,13 @@ public class TestingRpcService extends AkkaRpcService { if (clazz.isAssignableFrom(gateway.getClass())) { @SuppressWarnings("unchecked") C typedGateway = (C) gateway; - return Futures.successful(typedGateway); + return FlinkCompletableFuture.completed(typedGateway); } else { - return Futures.failed( - new Exception("Gateway registered under " + address + " is not of type " + clazz)); + return FlinkCompletableFuture.completedExceptionally( + new Exception("Gateway registered under " + address + " is not of type " + clazz)); } } else { - return Futures.failed(new Exception("No gateway registered under that name")); + return FlinkCompletableFuture.completedExceptionally(new Exception("No gateway registered under that name")); } }