Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 347CF200BA7 for ; Fri, 21 Oct 2016 14:21:48 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 33420160B08; Fri, 21 Oct 2016 12:21:48 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 469B1160AE8 for ; Fri, 21 Oct 2016 14:21:47 +0200 (CEST) Received: (qmail 64825 invoked by uid 500); 21 Oct 2016 12:21:43 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 63912 invoked by uid 99); 21 Oct 2016 12:21:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Oct 2016 12:21:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 67A4CE0451; Fri, 21 Oct 2016 12:21:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Fri, 21 Oct 2016 12:22:26 -0000 Message-Id: In-Reply-To: <588d3af21851448daded2f08ec8ae330@git.apache.org> References: <588d3af21851448daded2f08ec8ae330@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [46/50] [abbrv] flink git commit: [FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager archived-at: Fri, 21 Oct 2016 12:21:48 -0000 [FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99049f69 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99049f69 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99049f69 Branch: refs/heads/flip-6 Commit: 99049f69f873d975fa6089e12c9b2797f710c0ff Parents: e0c54ca Author: Kurt Young Authored: Sun Oct 16 22:20:38 2016 +0800 Committer: Till Rohrmann Committed: Thu Oct 20 19:50:35 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/runtime/instance/SlotPool.java | 1 - .../jobmanager/slots/PooledSlotProvider.java | 73 ++++++++++++++++++++ .../flink/runtime/jobmaster/JobMaster.java | 24 ++++--- 3 files changed, 89 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/99049f69/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index e7857c1..de952c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -135,7 +135,6 @@ public class SlotPool implements SlotOwner { internalAllocateSlot(jobID, allocationID, resourceProfile, future); - final SlotOwner owner = this; return future.thenApplyAsync( new ApplyFunction() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/99049f69/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java new file mode 100644 index 0000000..5655fc2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.slots; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.SlotPool; +import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A simple pool based slot provider with {@link SlotPool} as the underlying storage. + */ +public class PooledSlotProvider implements SlotProvider { + + /** The pool which holds all the slots. */ + private final SlotPool slotPool; + + /** The timeout for allocation. */ + private final Time timeout; + + public PooledSlotProvider(final SlotPool slotPool, final Time timeout) { + this.slotPool = slotPool; + this.timeout = timeout; + } + + @Override + public Future allocateSlot(ScheduledUnit task, + boolean allowQueued) throws NoResourceAvailableException + { + checkNotNull(task); + + final JobID jobID = task.getTaskToExecute().getVertex().getJobId(); + final Future future = slotPool.allocateSimpleSlot(jobID, ResourceProfile.UNKNOWN); + try { + final SimpleSlot slot = future.get(timeout.getSize(), timeout.getUnit()); + return FlinkCompletableFuture.completed(slot); + } catch (InterruptedException e) { + throw new NoResourceAvailableException("Could not allocate a slot because it's interrupted."); + } catch (ExecutionException e) { + throw new NoResourceAvailableException("Could not allocate a slot because some error occurred " + + "during allocation, " + e.getMessage()); + } catch (TimeoutException e) { + throw new NoResourceAvailableException("Could not allocate a slot within time limit: " + timeout); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/99049f69/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index a7be476..05c20d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -49,6 +49,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.instance.SlotPool; import org.apache.flink.runtime.io.network.PartitionState; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -56,7 +57,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.OnCompletionActions; -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.jobmanager.slots.PooledSlotProvider; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -84,7 +85,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.SerializedValue; - import org.slf4j.Logger; import javax.annotation.Nullable; @@ -93,6 +93,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -145,6 +146,9 @@ public class JobMaster extends RpcEndpoint { /** The execution graph of this job */ private final ExecutionGraph executionGraph; + private final SlotPool slotPool; + + private final Time allocationTimeout; private volatile UUID leaderSessionID; @@ -156,8 +160,6 @@ public class JobMaster extends RpcEndpoint { /** Connection with ResourceManager, null if not located address yet or we close it initiative */ private ResourceManagerConnection resourceManagerConnection; - // TODO - we need to replace this with the slot pool - private final Scheduler scheduler; // ------------------------------------------------------------------------ @@ -239,8 +241,8 @@ public class JobMaster extends RpcEndpoint { -1, log); - // TODO - temp fix - this.scheduler = new Scheduler(executorService); + this.slotPool = new SlotPool(executorService); + this.allocationTimeout = Time.of(5, TimeUnit.SECONDS); } //---------------------------------------------------------------------------------------------- @@ -262,6 +264,7 @@ public class JobMaster extends RpcEndpoint { if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) { super.start(); + slotPool.setJobManagerLeaderId(leaderSessionID); log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID()); getSelf().startJobExecution(); } else { @@ -337,7 +340,7 @@ public class JobMaster extends RpcEndpoint { @Override public void run() { try { - executionGraph.scheduleForExecution(scheduler); + executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout)); } catch (Throwable t) { executionGraph.fail(t); } @@ -365,6 +368,7 @@ public class JobMaster extends RpcEndpoint { ((StartStoppable) getSelf()).stop(); leaderSessionID = null; + slotPool.setJobManagerLeaderId(null); executionGraph.suspend(cause); // disconnect from resource manager: @@ -783,9 +787,12 @@ public class JobMaster extends RpcEndpoint { // TODO - add tests for comment in https://github.com/apache/flink/pull/2565 // verify the response with current connection if (resourceManagerConnection != null - && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) { + && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) + { log.info("JobManager successfully registered at ResourceManager, leader id: {}.", success.getResourceManagerLeaderId()); + slotPool.setResourceManager(success.getResourceManagerLeaderId(), + resourceManagerConnection.getTargetGateway()); } } }); @@ -796,6 +803,7 @@ public class JobMaster extends RpcEndpoint { resourceManagerConnection.close(); resourceManagerConnection = null; } + slotPool.disconnectResourceManager(); } //----------------------------------------------------------------------------------------------