flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [46/50] [abbrv] flink git commit: [FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager
Date Fri, 21 Oct 2016 12:22:26 GMT
[FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99049f69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99049f69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99049f69

Branch: refs/heads/flip-6
Commit: 99049f69f873d975fa6089e12c9b2797f710c0ff
Parents: e0c54ca
Author: Kurt Young <ykt836@gmail.com>
Authored: Sun Oct 16 22:20:38 2016 +0800
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Oct 20 19:50:35 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/instance/SlotPool.java |  1 -
 .../jobmanager/slots/PooledSlotProvider.java    | 73 ++++++++++++++++++++
 .../flink/runtime/jobmaster/JobMaster.java      | 24 ++++---
 3 files changed, 89 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99049f69/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index e7857c1..de952c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -135,7 +135,6 @@ public class SlotPool implements SlotOwner {
 
 		internalAllocateSlot(jobID, allocationID, resourceProfile, future);
 
-		final SlotOwner owner = this;
 		return future.thenApplyAsync(
 			new ApplyFunction<SlotDescriptor, SimpleSlot>() {
 				@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/99049f69/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
new file mode 100644
index 0000000..5655fc2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotPool;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple pool based slot provider with {@link SlotPool} as the underlying storage.
+ */
+public class PooledSlotProvider implements SlotProvider {
+
+	/** The pool which holds all the slots. */
+	private final SlotPool slotPool;
+
+	/** The timeout for allocation. */
+	private final Time timeout;
+
+	public PooledSlotProvider(final SlotPool slotPool, final Time timeout) {
+		this.slotPool = slotPool;
+		this.timeout = timeout;
+	}
+
+	@Override
+	public Future<SimpleSlot> allocateSlot(ScheduledUnit task,
+			boolean allowQueued) throws NoResourceAvailableException
+	{
+		checkNotNull(task);
+
+		final JobID jobID = task.getTaskToExecute().getVertex().getJobId();
+		final Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, ResourceProfile.UNKNOWN);
+		try {
+			final SimpleSlot slot = future.get(timeout.getSize(), timeout.getUnit());
+			return FlinkCompletableFuture.completed(slot);
+		} catch (InterruptedException e) {
+			throw new NoResourceAvailableException("Could not allocate a slot because it's interrupted.");
+		} catch (ExecutionException e) {
+			throw new NoResourceAvailableException("Could not allocate a slot because some error occurred
" +
+					"during allocation, " + e.getMessage());
+		} catch (TimeoutException e) {
+			throw new NoResourceAvailableException("Could not allocate a slot within time limit: "
+ timeout);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/99049f69/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a7be476..05c20d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -49,6 +49,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotPool;
 import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -56,7 +57,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.slots.PooledSlotProvider;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -84,7 +85,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
-
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
@@ -93,6 +93,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -145,6 +146,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** The execution graph of this job */
 	private final ExecutionGraph executionGraph;
 
+	private final SlotPool slotPool;
+
+	private final Time allocationTimeout;
 
 	private volatile UUID leaderSessionID;
 
@@ -156,8 +160,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	/** Connection with ResourceManager, null if not located address yet or we close it initiative
*/
 	private ResourceManagerConnection resourceManagerConnection;
 
-	// TODO - we need to replace this with the slot pool
-	private final Scheduler scheduler;
 
 	// ------------------------------------------------------------------------
 
@@ -239,8 +241,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				-1,
 				log);
 
-		// TODO - temp fix
-		this.scheduler = new Scheduler(executorService);
+		this.slotPool = new SlotPool(executorService);
+		this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -262,6 +264,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
 			super.start();
 
+			slotPool.setJobManagerLeaderId(leaderSessionID);
 			log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID());
 			getSelf().startJobExecution();
 		} else {
@@ -337,7 +340,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			@Override
 			public void run() {
 				try {
-					executionGraph.scheduleForExecution(scheduler);
+					executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout));
 				} catch (Throwable t) {
 					executionGraph.fail(t);
 				}
@@ -365,6 +368,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		((StartStoppable) getSelf()).stop();
 
 		leaderSessionID = null;
+		slotPool.setJobManagerLeaderId(null);
 		executionGraph.suspend(cause);
 
 		// disconnect from resource manager:
@@ -783,9 +787,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 				// TODO - add tests for comment in https://github.com/apache/flink/pull/2565
 				// verify the response with current connection
 				if (resourceManagerConnection != null
-						&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
{
+						&& resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
+				{
 					log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
 							success.getResourceManagerLeaderId());
+					slotPool.setResourceManager(success.getResourceManagerLeaderId(),
+							resourceManagerConnection.getTargetGateway());
 				}
 			}
 		});
@@ -796,6 +803,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
 		}
+		slotPool.disconnectResourceManager();
 	}
 
 	//----------------------------------------------------------------------------------------------


Mime
View raw message