Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 41FEB192E1 for ; Wed, 27 Apr 2016 22:55:24 +0000 (UTC) Received: (qmail 90963 invoked by uid 500); 27 Apr 2016 22:55:24 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 90940 invoked by uid 500); 27 Apr 2016 22:55:24 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 90929 invoked by uid 99); 27 Apr 2016 22:55:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Apr 2016 22:55:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CBF04DFD5B; Wed, 27 Apr 2016 22:55:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: prasanthj@apache.org To: commits@hive.apache.org Message-Id: <878653d683c24b008ccebe54a0177526@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-12959: LLAP: Add task scheduler timeout when no nodes are alive (Prasanth Jayachandran reviewed by Siddharth SetH) Date: Wed, 27 Apr 2016 22:55:23 +0000 (UTC) Repository: hive Updated Branches: refs/heads/master ce457a496 -> c3dd00b27 HIVE-12959: LLAP: Add task scheduler timeout when no nodes are alive (Prasanth Jayachandran reviewed by Siddharth SetH) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c3dd00b2 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c3dd00b2 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c3dd00b2 Branch: refs/heads/master Commit: c3dd00b27afbdcc2ef7b3c16ee1d446da16ff0a5 Parents: ce457a4 Author: Prasanth Jayachandran Authored: Wed Apr 27 17:55:04 2016 -0500 Committer: Prasanth Jayachandran Committed: Wed Apr 27 17:55:04 2016 -0500 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 + .../hive/llap/registry/ServiceInstanceSet.java | 6 + .../registry/impl/LlapFixedRegistryImpl.java | 5 + .../impl/LlapZookeeperRegistryImpl.java | 5 + .../tezplugins/LlapTaskSchedulerService.java | 112 ++++++++++++++++--- 5 files changed, 117 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c3dd00b2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5360ed4..49d748c 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2700,6 +2700,11 @@ public class HiveConf extends Configuration { "Sleep duration while waiting to retry connection failures to the AM from the daemon for\n" + "the general keep-alive thread (milliseconds).", "llap.am.liveness.connection.sleep-between-retries-millis"), + LLAP_DAEMON_TASK_SCHEDULER_TIMEOUT_SECONDS( + "hive.llap.task.scheduler.timeout.seconds", "60s", + new TimeValidator(TimeUnit.SECONDS), + "Amount of time to wait before failing the query when there are no llap daemons running\n" + + "(alive) in the cluster.", "llap.daemon.scheduler.timeout.seconds"), LLAP_DAEMON_NUM_EXECUTORS("hive.llap.daemon.num.executors", 4, "Number of executors to use in LLAP daemon; essentially, the number of tasks that can be\n" + "executed in parallel.", "llap.daemon.num.executors"), http://git-wip-us.apache.org/repos/asf/hive/blob/c3dd00b2/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java index 73f94f3..99ead9b 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java @@ -55,4 +55,10 @@ public interface ServiceInstanceSet { */ public Set getByHost(String host); + /** + * Get number of instances in the currently availabe. + * + * @return - number of instances + */ + public int size(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/c3dd00b2/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index bd814b9..67443a7 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -246,6 +246,11 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { } return byHost; } + + @Override + public int size() { + return instances.size(); + } } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/c3dd00b2/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 6af30d4..d51249a 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -493,6 +493,11 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { } return byHost; } + + @Override + public int size() { + return instancesCache.getCurrentData().size(); + } } private class InstanceStateChangeListener implements PathChildrenCacheListener { http://git-wip-us.apache.org/repos/asf/hive/blob/c3dd00b2/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index b57ae1a..5ecbf79 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -38,6 +38,8 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -74,6 +76,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskScheduler; import org.apache.tez.serviceplugins.api.TaskSchedulerContext; @@ -129,7 +132,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { private final Lock scheduleLock = new ReentrantLock(); private final Condition scheduleCondition = scheduleLock.newCondition(); - private final AtomicBoolean pendingScheduleInvodations = new AtomicBoolean(false); + private final AtomicBoolean pendingScheduleInvocations = new AtomicBoolean(false); private final ListeningExecutorService schedulerExecutor; private final SchedulerCallable schedulerCallable = new SchedulerCallable(); @@ -151,6 +154,13 @@ public class LlapTaskSchedulerService extends TaskScheduler { // Per Executor Thread private final Resource resourcePerExecutor; + // when there are no live nodes in the cluster and this timeout elapses the query is failed + private final long timeout; + private final Lock timeoutLock = new ReentrantLock(); + private final ScheduledExecutorService timeoutExecutor; + private final SchedulerTimeoutMonitor timeoutMonitor; + private ScheduledFuture timeoutFuture; + private final LlapRegistryService registry = new LlapRegistryService(false); private volatile ListenableFuture nodeEnablerFuture; @@ -200,6 +210,14 @@ public class LlapTaskSchedulerService extends TaskScheduler { this.forceLocation = false; } + this.timeoutMonitor = new SchedulerTimeoutMonitor(); + this.timeout = HiveConf.getTimeVar(conf, + ConfVars.LLAP_DAEMON_TASK_SCHEDULER_TIMEOUT_SECONDS, TimeUnit.MILLISECONDS); + this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapTaskSchedulerTimeoutMonitor") + .build()); + this.timeoutFuture = null; + int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance); int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance); this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor); @@ -210,11 +228,11 @@ public class LlapTaskSchedulerService extends TaskScheduler { + " must be defined"); ExecutorService executorServiceRaw = - Executors.newFixedThreadPool(1, + Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerNodeEnabler").build()); nodeEnabledExecutor = MoreExecutors.listeningDecorator(executorServiceRaw); - ExecutorService schedulerExecutorServiceRaw = Executors.newFixedThreadPool(1, + ExecutorService schedulerExecutorServiceRaw = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build()); schedulerExecutor = MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw); @@ -291,6 +309,45 @@ public class LlapTaskSchedulerService extends TaskScheduler { // FIXME: disabling this for now // instanceToNodeMap.remove(serviceInstance.getWorkerIdentity()); LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity()); + // if there are no more nodes. Signal timeout monitor to start timer + if (activeInstances.size() == 0) { + LOG.info("No node found. Signalling scheduler timeout monitor thread to start timer."); + startTimeoutMonitor(); + } + } + } + + private void startTimeoutMonitor() { + timeoutLock.lock(); + try { + // If timer is null, start a new one. + // If timer has completed during previous invocation, start a new one. + // If timer already started and is not completed, leaving it running without resetting it. + if ((timeoutFuture == null || (timeoutFuture != null && timeoutFuture.isDone())) + && activeInstances.size() == 0) { + timeoutFuture = timeoutExecutor.schedule(timeoutMonitor, timeout, TimeUnit.MILLISECONDS); + LOG.info("Scheduled timeout monitor task to run after {} ms", timeout); + } else { + LOG.info("Timeout monitor task not started. Timeout future state: {}, #instances: {}", + timeoutFuture == null ? "null" : timeoutFuture.isDone(), activeInstances.size()); + } + } finally { + timeoutLock.unlock(); + } + } + + private void stopTimeoutMonitor() { + timeoutLock.lock(); + try { + if (timeoutFuture != null && activeInstances.size() != 0 && timeoutFuture.cancel(false)) { + LOG.info("Stopped timeout monitor task"); + } else { + LOG.info("Timeout monitor task not stopped. Timeout future state: {}, #instances: {}", + timeoutFuture == null ? "null" : timeoutFuture.isDone(), activeInstances.size()); + } + timeoutFuture = null; + } finally { + timeoutLock.unlock(); } } @@ -305,6 +362,13 @@ public class LlapTaskSchedulerService extends TaskScheduler { } nodeEnabledExecutor.shutdownNow(); + timeoutExecutor.shutdown(); + if (timeoutFuture != null) { + timeoutFuture.cancel(true); + timeoutFuture = null; + } + timeoutExecutor.shutdownNow(); + schedulerCallable.shutdown(); if (schedulerFuture != null) { schedulerFuture.cancel(true); @@ -567,13 +631,6 @@ public class LlapTaskSchedulerService extends TaskScheduler { String[] requestedHosts = request.requestedHosts; readLock.lock(); // Read-lock. Not updating any stats at the moment. try { - // Check if any hosts are active. - if (getAvailableResources().getMemory() <= 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Refreshing instances since total memory is 0"); - } - } - // If there's no memory available, fail if (getTotalResources().getMemory() <= 0) { return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY; @@ -657,6 +714,11 @@ public class LlapTaskSchedulerService extends TaskScheduler { private void addNode(ServiceInstance inst, NodeInfo node) { LOG.info("Adding node: " + inst); + // we have just added a new node. Signal timeout monitor to reset timer + if (activeInstances.size() == 1) { + LOG.info("New node added. Signalling scheduler timeout monitor thread to stop timer."); + stopTimeoutMonitor(); + } instanceToNodeMap.put(inst.getWorkerIdentity(), node); // Trigger scheduling since a new node became available. trySchedulingPendingTasks(); @@ -794,7 +856,6 @@ public class LlapTaskSchedulerService extends TaskScheduler { Iterator taskIter = taskListAtPriority.iterator(); boolean scheduledAllAtPriority = true; while (taskIter.hasNext()) { - // TODO Optimization: Add a check to see if there's any capacity available. No point in // walking through all active nodes, if they don't have potential capacity. @@ -807,8 +868,11 @@ public class LlapTaskSchedulerService extends TaskScheduler { if (scheduleResult == ScheduleResult.SCHEDULED) { taskIter.remove(); } else { - // TODO Handle INADEQUATE_TOTAL_RESOURCES eventually - either by throwin an error immediately, - // or waiting for some timeout for new executors and then throwing an error + if (scheduleResult == ScheduleResult.INADEQUATE_TOTAL_RESOURCES) { + LOG.info("Inadequate total resources before scheduling pending tasks." + + " Signalling scheduler timeout monitor thread to start timer."); + startTimeoutMonitor(); + } // Try pre-empting a task so that a higher priority task can take it's place. // Preempt only if there's no pending preemptions to avoid preempting twice for a task. @@ -1052,13 +1116,29 @@ public class LlapTaskSchedulerService extends TaskScheduler { private void trySchedulingPendingTasks() { scheduleLock.lock(); try { - pendingScheduleInvodations.set(true); + pendingScheduleInvocations.set(true); scheduleCondition.signal(); } finally { scheduleLock.unlock(); } } + private class SchedulerTimeoutMonitor implements Runnable { + private final Logger LOG = LoggerFactory.getLogger(SchedulerTimeoutMonitor.class); + + @Override + public void run() { + LOG.info("Reporting SERVICE_UNAVAILABLE error as no instances are running"); + try { + getContext().reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE, + "No LLAP Daemons are running", getContext().getCurrentDagInfo()); + } catch (Exception e) { + LOG.error("Exception when reporting SERVICE_UNAVAILABLE error for dag: {}", + getContext().getCurrentDagInfo().getName(), e); + } + } + } + private class SchedulerCallable implements Callable { private AtomicBoolean isShutdown = new AtomicBoolean(false); @@ -1067,7 +1147,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { scheduleLock.lock(); try { - while (!pendingScheduleInvodations.get()) { + while (!pendingScheduleInvocations.get()) { scheduleCondition.await(); } } catch (InterruptedException e) { @@ -1086,7 +1166,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { // will be handled in the next run. // A new request may come in right after this is set to false, but before the actual scheduling. // This will be handled in this run, but will cause an immediate run after, which is harmless. - pendingScheduleInvodations.set(false); + pendingScheduleInvocations.set(false); // Schedule outside of the scheduleLock - which should only be used to wait on the condition. schedulePendingTasks(); }