flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/3] flink git commit: [FLINK-4690] Use direct executor to run slot allocation future handler
Date Tue, 27 Sep 2016 16:40:31 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6e123d287 -> 84672c22f


[FLINK-4690] Use direct executor to run slot allocation future handler


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84672c22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84672c22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84672c22

Branch: refs/heads/master
Commit: 84672c22f8088a70caf35b54d74eee458bf600dd
Parents: 7b88f1a
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Sep 27 15:33:07 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Sep 27 18:39:36 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/concurrent/Executors.java     | 52 +++++++++++++++++
 .../flink/runtime/executiongraph/Execution.java | 61 ++++++++------------
 .../runtime/jobmanager/scheduler/Scheduler.java | 15 +++--
 3 files changed, 84 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
new file mode 100644
index 0000000..1832d70
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Collection of {@link Executor} implementations
+ */
+public class Executors {
+
+	/**
+	 * Return a direct executor. The direct executor directly executes the runnable in the calling
+	 * thread.
+	 *
+	 * @return Direct executor
+	 */
+	public static Executor directExecutor() {
+		return DirectExecutor.INSTANCE;
+	}
+
+	/**
+	 * Direct executor implementation.
+	 */
+	private static class DirectExecutor implements Executor {
+
+		static final DirectExecutor INSTANCE = new DirectExecutor();
+
+		private DirectExecutor() {}
+
+		@Override
+		public void execute(Runnable command) {
+			command.run();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 8c02e1b..912ff10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -52,7 +53,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 
 import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
@@ -297,49 +297,38 @@ public class Execution {
 
 			// IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are
returned
 			//     in all cases where the deployment failed. we use many try {} finally {} clauses
to assure that
-			final Future<SimpleSlot> future = slotProvider.allocateSlot(toSchedule, queued);
+			final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule,
queued);
 
-			if (queued) {
-				future.handleAsync(new BiFunction<SimpleSlot, Throwable, Void>() {
-					@Override
-					public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
-						if (simpleSlot != null) {
+			// IMPORTANT: We have to use the direct executor here so that we directly deploy the tasks
+			// if the slot allocation future is completed. This is necessary for immediate deployment
+			final Future<Void> deploymentFuture = slotAllocationFuture.handleAsync(new BiFunction<SimpleSlot,
Throwable, Void>() {
+				@Override
+				public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
+					if (simpleSlot != null) {
+						try {
+							deployToSlot(simpleSlot);
+						} catch (Throwable t) {
 							try {
-								deployToSlot(simpleSlot);
-							} catch (Throwable t) {
-								try {
-									simpleSlot.releaseSlot();
-								} finally {
-									markFailed(t);
-								}
+								simpleSlot.releaseSlot();
+							} finally {
+								markFailed(t);
 							}
 						}
-						else {
-							markFailed(throwable);
-						}
-						return null;
 					}
-				}, ExecutionContext$.MODULE$.global());
-			}
-			else {
-				SimpleSlot slot = null;
-				try {
-					// when queued is not allowed, we will get a slot or NoResourceAvailableException will
be
-					// thrown earlier (when allocateSlot).
-					slot = checkNotNull(future.getNow(null));
-					deployToSlot(slot);
-				}
-				catch (Throwable t) {
-					try {
-						if (slot != null) {
-							slot.releaseSlot();
-						}
-					} finally {
-						markFailed(t);
+					else {
+						markFailed(throwable);
 					}
+					return null;
 				}
-			}
+			}, Executors.directExecutor());
 
+			// if tasks have to scheduled immediately check that the task has been deployed
+			if (!queued) {
+				if (!deploymentFuture.isDone()) {
+					markFailed(new IllegalArgumentException("The slot allocation future has not been completed
yet."));
+				}
+			}
+			
 			return true;
 		}
 		else {

http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index ce2f6f7..b839e0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -39,6 +39,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -140,9 +141,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener,
Sl
 
 		final Object ret = scheduleTask(task, allowQueued);
 		if (ret instanceof SimpleSlot) {
-			FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
-			future.complete((SimpleSlot) ret);
-			return future;
+			return FlinkCompletableFuture.completed((SimpleSlot) ret);
 		}
 		else if (ret instanceof Future) {
 			return (Future) ret;
@@ -153,7 +152,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener,
Sl
 	}
 
 	/**
-	 * Returns either a {@link org.apache.flink.runtime.instance.SimpleSlot}, or a {@link Future}.
+	 * Returns either a {@link SimpleSlot}, or a {@link Future}.
 	 */
 	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException
{
 		if (task == null) {
@@ -316,7 +315,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener,
Sl
 				else {
 					// no resource available now, so queue the request
 					if (queueIfNoResource) {
-						FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+						CompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
 						this.taskQueue.add(new QueuedTask(task, future));
 						return future;
 					}
@@ -833,10 +832,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener,
Sl
 		
 		private final ScheduledUnit task;
 		
-		private final FlinkCompletableFuture<SimpleSlot> future;
+		private final CompletableFuture<SimpleSlot> future;
 		
 		
-		public QueuedTask(ScheduledUnit task, FlinkCompletableFuture<SimpleSlot> future)
{
+		public QueuedTask(ScheduledUnit task, CompletableFuture<SimpleSlot> future) {
 			this.task = task;
 			this.future = future;
 		}
@@ -845,7 +844,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener,
Sl
 			return task;
 		}
 
-		public FlinkCompletableFuture<SimpleSlot> getFuture() {
+		public CompletableFuture<SimpleSlot> getFuture() {
 			return future;
 		}
 	}


Mime
View raw message