hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject hive git commit: HIVE-15562. LLAP TaskExecutorService race can lead to some fragments being permanently lost. (Siddharth Seth, reviewed by Sergey Shelukhin)
Date Tue, 10 Jan 2017 08:03:14 GMT
Repository: hive
Updated Branches:
  refs/heads/branch-2.1 0ad904315 -> dc3d99359


HIVE-15562. LLAP TaskExecutorService race can lead to some fragments being permanently lost.
(Siddharth Seth, reviewed by Sergey Shelukhin)

(cherry picked from commit 1749d70455bd6f8d010dd18566dac5774487c753)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/dc3d9935
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/dc3d9935
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/dc3d9935

Branch: refs/heads/branch-2.1
Commit: dc3d99359064012f9e896c02b0bdff7ff72167c6
Parents: 0ad9043
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Jan 9 23:54:25 2017 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Jan 10 00:03:02 2017 -0800

----------------------------------------------------------------------
 .../impl/LlapZookeeperRegistryImpl.java         |  4 +-
 .../impl/EvictingPriorityBlockingQueue.java     | 16 ++++-
 .../llap/daemon/impl/PriorityBlockingDeque.java |  4 +-
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java |  7 ++-
 .../hive/llap/daemon/impl/QueryTracker.java     |  2 +
 .../llap/daemon/impl/TaskExecutorService.java   | 61 +++++++++++++++-----
 6 files changed, 74 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/dc3d9935/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index cff1b12..170d797 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -399,8 +399,8 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     public DynamicServiceInstance(ServiceRecord srv) throws IOException {
       this.srv = srv;
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Working with ServiceRecord: {}", srv);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Working with ServiceRecord: {}", srv);
       }
 
       final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE);

http://git-wip-us.apache.org/repos/asf/hive/blob/dc3d9935/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index 4ea3b0b..adc86ea 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -19,6 +19,9 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 
 import java.util.Comparator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Bounded priority queue that evicts the last element based on priority order specified
  * through comparator. Elements that are added to the queue are sorted based on the specified
@@ -27,6 +30,10 @@ import java.util.Comparator;
  * returned back. If the queue is not full, new element will be added to queue and null is
returned.
  */
 public class EvictingPriorityBlockingQueue<E> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(EvictingPriorityBlockingQueue.class);
+
   private final PriorityBlockingDeque<E> deque;
   private final Comparator<E> comparator;
 
@@ -42,7 +49,14 @@ public class EvictingPriorityBlockingQueue<E> {
       E last = deque.peekLast();
       if (comparator.compare(e, last) < 0) {
         deque.removeLast();
-        deque.offer(e);
+        if (!deque.offer(e)) {
+          LOG.error(
+              "Failed to insert element into queue with capacity available. size={}, element={}",
+              size(), e);
+          throw new RuntimeException(
+              "Failed to insert element into queue with capacity available. size=" +
+                  size());
+        }
         return last;
       }
       return e;

http://git-wip-us.apache.org/repos/asf/hive/blob/dc3d9935/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
index db2ab16..e27efa5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/PriorityBlockingDeque.java
@@ -107,7 +107,7 @@ public class PriorityBlockingDeque<E>
     }
 
     list.add(insertionPoint, e);
-    //        Collections.sort(list, comparator);
+    // Inserted in sort order. Hence no explict sort.
     notEmpty.signal();
 
     return true;
@@ -178,6 +178,7 @@ public class PriorityBlockingDeque<E>
   /**
    * @throws NullPointerException {@inheritDoc}
    */
+  @Override
   public boolean offerLast(E e) {
     if (e == null) throw new NullPointerException();
     lock.lock();
@@ -450,6 +451,7 @@ public class PriorityBlockingDeque<E>
   /**
    * @throws NullPointerException if the specified element is null
    */
+  @Override
   public boolean offer(E e) {
     return offerLast(e);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/dc3d9935/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 6914134..28948c4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -189,11 +189,12 @@ public class QueryInfo {
           sourceToEntity.put(source, entityInfo);
         }
 
-        if (lastFinishableState != fragmentInfo.canFinish()) {
+        if (lastFinishableState == fragmentInfo.canFinish()) {
+          // State has not changed.
+          return true;
+        } else {
           entityInfo.setLastFinishableState(fragmentInfo.canFinish());
           return false;
-        } else {
-          return true;
         }
       } finally {
         lock.unlock();

http://git-wip-us.apache.org/repos/asf/hive/blob/dc3d9935/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index a965872..b404d20 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -207,6 +207,8 @@ public class QueryTracker extends AbstractService {
           deleteDelay);
       queryInfoMap.remove(queryIdentifier);
       if (queryInfo == null) {
+        // One case where this happens is when a query is killed via an explicit signal,
and then
+        // another message is received from teh AMHeartbeater.
         LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
         return Collections.emptyList();
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/dc3d9935/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 7744611..fddcddd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -346,11 +346,12 @@ public class TaskExecutorService extends AbstractService implements
Scheduler<Ta
       // The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots)
       canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
       evictedTask = waitQueue.offer(taskWrapper);
+      // Finishable state is checked on the task, via an explicit query to the TaskRunnerCallable
 
       // null evicted task means offer accepted
       // evictedTask is not equal taskWrapper means current task is accepted and it evicted
       // some other task
-      if (evictedTask == null || evictedTask != taskWrapper) {
+      if (evictedTask == null || !evictedTask.equals(taskWrapper)) {
         knownTasks.put(taskWrapper.getRequestId(), taskWrapper);
         taskWrapper.setIsInWaitQueue(true);
         if (isDebugEnabled) {
@@ -379,6 +380,18 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         }
         return result;
       }
+
+      // Register for notifications inside the lock. Should avoid races with unregisterForNotifications
+      // happens in a different Submission thread. i.e. Avoid register running for this task
+      // after some other submission has evicted it.
+      boolean stateChanged = !taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
+      if (stateChanged) {
+        if (isDebugEnabled) {
+          LOG.debug("Finishable state of {} updated to {} during registration for state updates",
+              taskWrapper.getRequestId(), !canFinish);
+        }
+        finishableStateUpdated(taskWrapper, !canFinish);
+      }
     }
 
     // At this point, the task has been added into the queue. It may have caused an eviction
for
@@ -387,27 +400,25 @@ public class TaskExecutorService extends AbstractService implements
Scheduler<Ta
     // This registration has to be done after knownTasks has been populated.
     // Register for state change notifications so that the waitQueue can be re-ordered correctly
     // if the fragment moves in or out of the finishable state.
-    boolean stateChanged = taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
-    if (stateChanged) {
-      if (isDebugEnabled) {
-        LOG.debug("Finishable state of {} updated to {} during registration for state updates",
-            taskWrapper.getRequestId(), !canFinish);
-      }
-      finishableStateUpdated(taskWrapper, !canFinish);
-    }
 
     if (isDebugEnabled) {
       LOG.debug("Wait Queue: {}", waitQueue);
     }
+
     if (evictedTask != null) {
-      knownTasks.remove(evictedTask.getRequestId());
-      evictedTask.maybeUnregisterForFinishedStateNotifications();
-      evictedTask.setIsInWaitQueue(false);
-      evictedTask.getTaskRunnerCallable().killTask();
       if (isInfoEnabled) {
         LOG.info("{} evicted from wait queue in favor of {} because of lower priority",
             evictedTask.getRequestId(), task.getRequestId());
       }
+      try {
+        knownTasks.remove(evictedTask.getRequestId());
+        evictedTask.maybeUnregisterForFinishedStateNotifications();
+        evictedTask.setIsInWaitQueue(false);
+      } finally {
+        // This is dealing with tasks from a different submission, and cause the kill
+        // to go out before the previous submissions has completed. Handled in the AM
+        evictedTask.getTaskRunnerCallable().killTask();
+      }
       if (metrics != null) {
         metrics.incrTotalEvictedFromWaitQueue();
       }
@@ -769,6 +780,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
         return taskRunnerCallable.getFragmentInfo()
             .registerForFinishableStateUpdates(this, currentFinishableState);
       } else {
+        // State has not changed / already registered for notifications.
         return true;
       }
     }
@@ -830,6 +842,29 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta
           taskRunnerCallable.getRequestId(), finishableState);
       taskExecutorService.finishableStateUpdated(this, finishableState);
     }
+
+
+    // TaskWrapper is used in structures, as well as for ordering using Comparators
+    // in the waitQueue. Avoid Object comparison.
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      TaskWrapper that = (TaskWrapper) o;
+
+      return taskRunnerCallable.getRequestId()
+          .equals(that.taskRunnerCallable.getRequestId());
+    }
+
+    @Override
+    public int hashCode() {
+      return taskRunnerCallable.getRequestId().hashCode();
+    }
   }
 
   private static class ExecutorThreadFactory implements ThreadFactory {


Mime
View raw message