flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-5938] Replace ExecutionContext by Executor in Scheduler
Date Thu, 02 Mar 2017 11:24:21 GMT
Repository: flink
Updated Branches:
  refs/heads/master a552d6746 -> 5c968563d


[FLINK-5938] Replace ExecutionContext by Executor in Scheduler

In order to remove the Scheduler's dependency on Scala's ExecutionContext and
Akka's futures, this PR replaces the ExecutionContext by an Executor which is
used to execute the concurrent handleNewSlot call.

This closes #3435.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cde3cdd0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cde3cdd0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cde3cdd0

Branch: refs/heads/master
Commit: cde3cdd00c7b97f86904f641475381240fc440df
Parents: a552d67
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Feb 28 17:27:03 2017 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Mar 2 12:22:16 2017 +0100

----------------------------------------------------------------------
 .../runtime/jobmanager/scheduler/Scheduler.java | 32 ++++++--------------
 1 file changed, 10 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cde3cdd0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 72f9789..58dac3e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -30,12 +30,9 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import akka.dispatch.Futures;
-
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -55,10 +52,9 @@ import org.apache.flink.runtime.instance.InstanceListener;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
 
 /**
  * The scheduler is responsible for distributing the ready-to-run tasks among instances and
slots.
@@ -104,23 +100,16 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener,
Sl
 	/** The number of slot allocations where locality could not be respected */
 	private int nonLocalizedAssignments;
 
-	/** The ExecutionContext which is used to execute newSlotAvailable futures. */
-	private final ExecutionContext executionContext;
+	/** The Executor which is used to execute newSlotAvailable futures. */
+	private final Executor executor;
 
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Creates a new scheduler.
 	 */
-	public Scheduler(ExecutorService executor) {
-		this(ExecutionContext$.MODULE$.fromExecutor(executor));
-	}
-	
-	/**
-	 * Creates a new scheduler.
-	 */
-	public Scheduler(ExecutionContext executionContext) {
-		this.executionContext = executionContext;
+	public Scheduler(Executor executor) {
+		this.executor = Preconditions.checkNotNull(executor);
 	}
 	
 	/**
@@ -543,15 +532,14 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener,
Sl
 		// 
 		// that leads with a high probability to deadlocks, when scheduling fast
 
-		this.newlyAvailableInstances.add(instance);
+		newlyAvailableInstances.add(instance);
 
-		Futures.future(new Callable<Object>() {
+		executor.execute(new Runnable() {
 			@Override
-			public Object call() throws Exception {
+			public void run() {
 				handleNewSlot();
-				return null;
 			}
-		}, executionContext);
+		});
 	}
 	
 	private void handleNewSlot() {


Mime
View raw message