hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jd...@apache.org
Subject [14/20] hive git commit: HIVE-12959: LLAP: Add task scheduler timeout when no nodes are alive (Prasanth Jayachandran reviewed by Siddharth SetH)
Date Thu, 28 Apr 2016 22:15:32 GMT
HIVE-12959: LLAP: Add task scheduler timeout when no nodes are alive (Prasanth Jayachandran
reviewed by Siddharth SetH)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c3dd00b2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c3dd00b2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c3dd00b2

Branch: refs/heads/llap
Commit: c3dd00b27afbdcc2ef7b3c16ee1d446da16ff0a5
Parents: ce457a4
Author: Prasanth Jayachandran <prasanthj@apache.org>
Authored: Wed Apr 27 17:55:04 2016 -0500
Committer: Prasanth Jayachandran <prasanthj@apache.org>
Committed: Wed Apr 27 17:55:04 2016 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +
 .../hive/llap/registry/ServiceInstanceSet.java  |   6 +
 .../registry/impl/LlapFixedRegistryImpl.java    |   5 +
 .../impl/LlapZookeeperRegistryImpl.java         |   5 +
 .../tezplugins/LlapTaskSchedulerService.java    | 112 ++++++++++++++++---
 5 files changed, 117 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c3dd00b2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 5360ed4..49d748c 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2700,6 +2700,11 @@ public class HiveConf extends Configuration {
       "Sleep duration while waiting to retry connection failures to the AM from the daemon
for\n" +
       "the general keep-alive thread (milliseconds).",
       "llap.am.liveness.connection.sleep-between-retries-millis"),
+    LLAP_DAEMON_TASK_SCHEDULER_TIMEOUT_SECONDS(
+        "hive.llap.task.scheduler.timeout.seconds", "60s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "Amount of time to wait before failing the query when there are no llap daemons running\n"
+
+            "(alive) in the cluster.", "llap.daemon.scheduler.timeout.seconds"),
     LLAP_DAEMON_NUM_EXECUTORS("hive.llap.daemon.num.executors", 4,
       "Number of executors to use in LLAP daemon; essentially, the number of tasks that can
be\n" +
       "executed in parallel.", "llap.daemon.num.executors"),

http://git-wip-us.apache.org/repos/asf/hive/blob/c3dd00b2/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
index 73f94f3..99ead9b 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
@@ -55,4 +55,10 @@ public interface ServiceInstanceSet {
    */
   public Set<ServiceInstance> getByHost(String host);
 
+  /**
+   * Get number of instances in the currently availabe.
+   *
+   * @return - number of instances
+   */
+  public int size();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/c3dd00b2/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index bd814b9..67443a7 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -246,6 +246,11 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
       }
       return byHost;
     }
+
+    @Override
+    public int size() {
+      return instances.size();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/c3dd00b2/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 6af30d4..d51249a 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -493,6 +493,11 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
       }
       return byHost;
     }
+
+    @Override
+    public int size() {
+      return instancesCache.getCurrentData().size();
+    }
   }
 
   private class InstanceStateChangeListener implements PathChildrenCacheListener {

http://git-wip-us.apache.org/repos/asf/hive/blob/c3dd00b2/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index b57ae1a..5ecbf79 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -38,6 +38,8 @@ import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -74,6 +76,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
 import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -129,7 +132,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
   private final Lock scheduleLock = new ReentrantLock();
   private final Condition scheduleCondition = scheduleLock.newCondition();
-  private final AtomicBoolean pendingScheduleInvodations = new AtomicBoolean(false);
+  private final AtomicBoolean pendingScheduleInvocations = new AtomicBoolean(false);
   private final ListeningExecutorService schedulerExecutor;
   private final SchedulerCallable schedulerCallable = new SchedulerCallable();
 
@@ -151,6 +154,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   // Per Executor Thread
   private final Resource resourcePerExecutor;
 
+  // when there are no live nodes in the cluster and this timeout elapses the query is failed
+  private final long timeout;
+  private final Lock timeoutLock = new ReentrantLock();
+  private final ScheduledExecutorService timeoutExecutor;
+  private final SchedulerTimeoutMonitor timeoutMonitor;
+  private ScheduledFuture<?> timeoutFuture;
+
   private final LlapRegistryService registry = new LlapRegistryService(false);
 
   private volatile ListenableFuture<Void> nodeEnablerFuture;
@@ -200,6 +210,14 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       this.forceLocation = false;
     }
 
+    this.timeoutMonitor = new SchedulerTimeoutMonitor();
+    this.timeout = HiveConf.getTimeVar(conf,
+        ConfVars.LLAP_DAEMON_TASK_SCHEDULER_TIMEOUT_SECONDS, TimeUnit.MILLISECONDS);
+    this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapTaskSchedulerTimeoutMonitor")
+            .build());
+    this.timeoutFuture = null;
+
     int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance);
     int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance);
     this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor);
@@ -210,11 +228,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         + " must be defined");
 
     ExecutorService executorServiceRaw =
-        Executors.newFixedThreadPool(1,
+        Executors.newSingleThreadExecutor(
             new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerNodeEnabler").build());
     nodeEnabledExecutor = MoreExecutors.listeningDecorator(executorServiceRaw);
 
-    ExecutorService schedulerExecutorServiceRaw = Executors.newFixedThreadPool(1,
+    ExecutorService schedulerExecutorServiceRaw = Executors.newSingleThreadExecutor(
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build());
     schedulerExecutor = MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw);
 
@@ -291,6 +309,45 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       // FIXME: disabling this for now
       // instanceToNodeMap.remove(serviceInstance.getWorkerIdentity());
       LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity());
+      // if there are no more nodes. Signal timeout monitor to start timer
+      if (activeInstances.size() == 0) {
+        LOG.info("No node found. Signalling scheduler timeout monitor thread to start timer.");
+        startTimeoutMonitor();
+      }
+    }
+  }
+
+  private void startTimeoutMonitor() {
+    timeoutLock.lock();
+    try {
+      // If timer is null, start a new one.
+      // If timer has completed during previous invocation, start a new one.
+      // If timer already started and is not completed, leaving it running without resetting
it.
+      if ((timeoutFuture == null || (timeoutFuture != null && timeoutFuture.isDone()))
+          && activeInstances.size() == 0) {
+        timeoutFuture = timeoutExecutor.schedule(timeoutMonitor, timeout, TimeUnit.MILLISECONDS);
+        LOG.info("Scheduled timeout monitor task to run after {} ms", timeout);
+      } else {
+        LOG.info("Timeout monitor task not started. Timeout future state: {}, #instances:
{}",
+            timeoutFuture == null ? "null" : timeoutFuture.isDone(), activeInstances.size());
+      }
+    } finally {
+      timeoutLock.unlock();
+    }
+  }
+
+  private void stopTimeoutMonitor() {
+    timeoutLock.lock();
+    try {
+      if (timeoutFuture != null && activeInstances.size() != 0 && timeoutFuture.cancel(false))
{
+        LOG.info("Stopped timeout monitor task");
+      } else {
+        LOG.info("Timeout monitor task not stopped. Timeout future state: {}, #instances:
{}",
+            timeoutFuture == null ? "null" : timeoutFuture.isDone(), activeInstances.size());
+      }
+      timeoutFuture = null;
+    } finally {
+      timeoutLock.unlock();
     }
   }
 
@@ -305,6 +362,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         }
         nodeEnabledExecutor.shutdownNow();
 
+        timeoutExecutor.shutdown();
+        if (timeoutFuture != null) {
+          timeoutFuture.cancel(true);
+          timeoutFuture = null;
+        }
+        timeoutExecutor.shutdownNow();
+
         schedulerCallable.shutdown();
         if (schedulerFuture != null) {
           schedulerFuture.cancel(true);
@@ -567,13 +631,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     String[] requestedHosts = request.requestedHosts;
     readLock.lock(); // Read-lock. Not updating any stats at the moment.
     try {
-      // Check if any hosts are active.
-      if (getAvailableResources().getMemory() <= 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Refreshing instances since total memory is 0");
-        }
-      }
-
       // If there's no memory available, fail
       if (getTotalResources().getMemory() <= 0) {
         return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY;
@@ -657,6 +714,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
   private void addNode(ServiceInstance inst, NodeInfo node) {
     LOG.info("Adding node: " + inst);
+    // we have just added a new node. Signal timeout monitor to reset timer
+    if (activeInstances.size() == 1) {
+      LOG.info("New node added. Signalling scheduler timeout monitor thread to stop timer.");
+      stopTimeoutMonitor();
+    }
     instanceToNodeMap.put(inst.getWorkerIdentity(), node);
     // Trigger scheduling since a new node became available.
     trySchedulingPendingTasks();
@@ -794,7 +856,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         Iterator<TaskInfo> taskIter = taskListAtPriority.iterator();
         boolean scheduledAllAtPriority = true;
         while (taskIter.hasNext()) {
-
           // TODO Optimization: Add a check to see if there's any capacity available. No
point in
           // walking through all active nodes, if they don't have potential capacity.
 
@@ -807,8 +868,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           if (scheduleResult == ScheduleResult.SCHEDULED) {
             taskIter.remove();
           } else {
-            // TODO Handle INADEQUATE_TOTAL_RESOURCES eventually - either by throwin an error
immediately,
-            // or waiting for some timeout for new executors and then throwing an error
+            if (scheduleResult == ScheduleResult.INADEQUATE_TOTAL_RESOURCES) {
+              LOG.info("Inadequate total resources before scheduling pending tasks." +
+                  " Signalling scheduler timeout monitor thread to start timer.");
+              startTimeoutMonitor();
+            }
 
             // Try pre-empting a task so that a higher priority task can take it's place.
             // Preempt only if there's no pending preemptions to avoid preempting twice for
a task.
@@ -1052,13 +1116,29 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private void trySchedulingPendingTasks() {
     scheduleLock.lock();
     try {
-      pendingScheduleInvodations.set(true);
+      pendingScheduleInvocations.set(true);
       scheduleCondition.signal();
     } finally {
       scheduleLock.unlock();
     }
   }
 
+  private class SchedulerTimeoutMonitor implements Runnable {
+    private final Logger LOG = LoggerFactory.getLogger(SchedulerTimeoutMonitor.class);
+
+    @Override
+    public void run() {
+      LOG.info("Reporting SERVICE_UNAVAILABLE error as no instances are running");
+      try {
+        getContext().reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE,
+            "No LLAP Daemons are running", getContext().getCurrentDagInfo());
+      } catch (Exception e) {
+        LOG.error("Exception when reporting SERVICE_UNAVAILABLE error for dag: {}",
+            getContext().getCurrentDagInfo().getName(), e);
+      }
+    }
+  }
+
   private class SchedulerCallable implements Callable<Void> {
     private AtomicBoolean isShutdown = new AtomicBoolean(false);
 
@@ -1067,7 +1147,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
         scheduleLock.lock();
         try {
-          while (!pendingScheduleInvodations.get()) {
+          while (!pendingScheduleInvocations.get()) {
             scheduleCondition.await();
           }
         } catch (InterruptedException e) {
@@ -1086,7 +1166,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         // will be handled in the next run.
         // A new request may come in right after this is set to false, but before the actual
scheduling.
         // This will be handled in this run, but will cause an immediate run after, which
is harmless.
-        pendingScheduleInvodations.set(false);
+        pendingScheduleInvocations.set(false);
         // Schedule outside of the scheduleLock - which should only be used to wait on the
condition.
         schedulePendingTasks();
       }


Mime
View raw message