Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9D930200C2B for ; Thu, 2 Mar 2017 12:24:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9C24D160B61; Thu, 2 Mar 2017 11:24:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E8220160B7D for ; Thu, 2 Mar 2017 12:24:22 +0100 (CET) Received: (qmail 66477 invoked by uid 500); 2 Mar 2017 11:24:22 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 66463 invoked by uid 99); 2 Mar 2017 11:24:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Mar 2017 11:24:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DD9DEDFC15; Thu, 2 Mar 2017 11:24:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Thu, 02 Mar 2017 11:24:21 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-5938] Replace ExecutionContext by Executor in Scheduler archived-at: Thu, 02 Mar 2017 11:24:23 -0000 Repository: flink Updated Branches: refs/heads/master a552d6746 -> 5c968563d [FLINK-5938] Replace ExecutionContext by Executor in Scheduler In order to remove the Scheduler's dependency on Scala's ExecutionContext and Akka's futures, this PR replaces the ExecutionContext by an Executor which is used to execute the concurrent handleNewSlot call. This closes #3435. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cde3cdd0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cde3cdd0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cde3cdd0 Branch: refs/heads/master Commit: cde3cdd00c7b97f86904f641475381240fc440df Parents: a552d67 Author: Till Rohrmann Authored: Tue Feb 28 17:27:03 2017 +0100 Committer: Till Rohrmann Committed: Thu Mar 2 12:22:16 2017 +0100 ---------------------------------------------------------------------- .../runtime/jobmanager/scheduler/Scheduler.java | 32 ++++++-------------- 1 file changed, 10 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cde3cdd0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index 72f9789..58dac3e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -30,12 +30,9 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; -import akka.dispatch.Futures; - import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -55,10 +52,9 @@ import org.apache.flink.runtime.instance.InstanceListener; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.ExecutionContext; -import scala.concurrent.ExecutionContext$; /** * The scheduler is responsible for distributing the ready-to-run tasks among instances and slots. @@ -104,23 +100,16 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl /** The number of slot allocations where locality could not be respected */ private int nonLocalizedAssignments; - /** The ExecutionContext which is used to execute newSlotAvailable futures. */ - private final ExecutionContext executionContext; + /** The Executor which is used to execute newSlotAvailable futures. */ + private final Executor executor; // ------------------------------------------------------------------------ /** * Creates a new scheduler. */ - public Scheduler(ExecutorService executor) { - this(ExecutionContext$.MODULE$.fromExecutor(executor)); - } - - /** - * Creates a new scheduler. - */ - public Scheduler(ExecutionContext executionContext) { - this.executionContext = executionContext; + public Scheduler(Executor executor) { + this.executor = Preconditions.checkNotNull(executor); } /** @@ -543,15 +532,14 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl // // that leads with a high probability to deadlocks, when scheduling fast - this.newlyAvailableInstances.add(instance); + newlyAvailableInstances.add(instance); - Futures.future(new Callable() { + executor.execute(new Runnable() { @Override - public Object call() throws Exception { + public void run() { handleNewSlot(); - return null; } - }, executionContext); + }); } private void handleNewSlot() {