hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject hive git commit: HIVE-16104 : LLAP: preemption may be too aggressive if the pre-empted task doesn't die immediately (Sergey Shelukhin, reviewed by Siddharth Seth)
Date Tue, 14 Mar 2017 00:48:25 GMT
Repository: hive
Updated Branches:
  refs/heads/master 95796e172 -> 82d346865


HIVE-16104 : LLAP: preemption may be too aggressive if the pre-empted task doesn't die immediately
(Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/82d34686
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/82d34686
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/82d34686

Branch: refs/heads/master
Commit: 82d346865860c7aacc174b9555d4126960d3f0b1
Parents: 95796e1
Author: Sergey Shelukhin <sershe@apache.org>
Authored: Mon Mar 13 17:41:27 2017 -0700
Committer: Sergey Shelukhin <sershe@apache.org>
Committed: Mon Mar 13 17:48:11 2017 -0700

----------------------------------------------------------------------
 llap-server/pom.xml                             |   6 +
 .../llap/daemon/impl/ContainerRunnerImpl.java   |   2 +-
 .../llap/daemon/impl/TaskExecutorService.java   | 179 ++++++++++---------
 .../llap/daemon/impl/TaskRunnerCallable.java    |   2 +-
 .../daemon/impl/TaskExecutorTestHelpers.java    |  39 +++-
 .../daemon/impl/TestTaskExecutorService.java    |  98 ++++++++--
 6 files changed, 224 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/82d34686/llap-server/pom.xml
----------------------------------------------------------------------
diff --git a/llap-server/pom.xml b/llap-server/pom.xml
index fc392fb..630e243 100644
--- a/llap-server/pom.xml
+++ b/llap-server/pom.xml
@@ -253,6 +253,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-llap-common</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>
       <version>${hadoop.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/82d34686/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 2a69d6a..82bbcf3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -136,7 +136,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     String waitQueueSchedulerClassName = HiveConf.getVar(
         conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME);
     this.executorService = new TaskExecutorService(numExecutors, waitQueueSize,
-        waitQueueSchedulerClassName, enablePreemption, classLoader, metrics);
+        waitQueueSchedulerClassName, enablePreemption, classLoader, metrics, null);
     completionListener = (SchedulerFragmentCompletingListener) executorService;
 
     addIfService(executorService);

http://git-wip-us.apache.org/repos/asf/hive/blob/82d34686/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index c1f6c96..9eaa7d7 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -88,6 +88,9 @@ public class TaskExecutorService extends AbstractService
   private static final boolean isDebugEnabled = LOG.isDebugEnabled();
   private static final String TASK_EXECUTOR_THREAD_NAME_FORMAT = "Task-Executor-%d";
   private static final String WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT = "Wait-Queue-Scheduler-%d";
+  private static final long PREEMPTION_KILL_GRACE_MS = 500; // 500ms
+  private static final int PREEMPTION_KILL_GRACE_SLEEP_MS = 50; // 50ms
+
 
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
@@ -106,7 +109,7 @@ public class TaskExecutorService extends AbstractService
   private final ThreadPoolExecutor threadPoolExecutor;
   private final AtomicInteger numSlotsAvailable;
   private final int maxParallelExecutors;
-  private final Clock clock = new MonotonicClock();
+  private final Clock clock;
 
   // Tracks running fragments, and completing fragments.
   // Completing since we have a race in the AM being notified and the task actually
@@ -123,7 +126,7 @@ public class TaskExecutorService extends AbstractService
 
   public TaskExecutorService(int numExecutors, int waitQueueSize,
       String waitQueueComparatorClassName, boolean enablePreemption,
-      ClassLoader classLoader, final LlapDaemonExecutorMetrics metrics) {
+      ClassLoader classLoader, final LlapDaemonExecutorMetrics metrics, Clock clock) {
     super(TaskExecutorService.class.getSimpleName());
     LOG.info("TaskExecutorService is being setup with parameters: "
         + "numExecutors=" + numExecutors
@@ -135,6 +138,7 @@ public class TaskExecutorService extends AbstractService
         waitQueueComparatorClassName);
     this.maxParallelExecutors = numExecutors;
     this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize);
+    this.clock = clock == null ? new MonotonicClock() : clock;
     this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size
         numExecutors, // max pool size
         1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), // direct hand-off
@@ -252,20 +256,18 @@ public class TaskExecutorService extends AbstractService
    * Worker that takes tasks from wait queue and schedule it for execution.
    */
   private final class WaitQueueWorker implements Runnable {
-    TaskWrapper task;
+    private TaskWrapper task;
 
     @Override
     public void run() {
-
       try {
-
-
+        Long lastKillTimeMs = null;
         while (!isShutdown.get()) {
           RejectedExecutionException rejectedException = null;
           synchronized (lock) {
-            // Since schedule() can be called from multiple threads, we peek the wait queue,
-            // try scheduling the task and then remove the task if scheduling is successful.
-            // This will make sure the task's place in the wait queue is held until it gets
scheduled.
+            // Since schedule() can be called from multiple threads, we peek the wait queue,
try
+            // scheduling the task and then remove the task if scheduling is successful.
This
+            // will make sure the task's place in the wait queue is held until it gets scheduled.
             task = waitQueue.peek();
             if (task == null) {
               if (!isShutdown.get()) {
@@ -273,22 +275,17 @@ public class TaskExecutorService extends AbstractService
               }
               continue;
             }
-            // if the task cannot finish and if no slots are available then don't schedule
it.
-            boolean shouldWait = false;
+            // If the task cannot finish and if no slots are available then don't schedule
it.
+            // Also don't wait if we have a task and we just killed something to schedule
it.
+            boolean shouldWait = numSlotsAvailable.get() == 0 && lastKillTimeMs ==
null;
             if (task.getTaskRunnerCallable().canFinish()) {
               if (isDebugEnabled) {
-                LOG.debug(
-                    "Attempting to schedule task {}, canFinish={}. Current state: preemptionQueueSize={},
numSlotsAvailable={}, waitQueueSize={}",
+                LOG.debug("Attempting to schedule task {}, canFinish={}. Current state: "
+                    + "preemptionQueueSize={}, numSlotsAvailable={}, waitQueueSize={}",
                     task.getRequestId(), task.getTaskRunnerCallable().canFinish(),
                     preemptionQueue.size(), numSlotsAvailable.get(), waitQueue.size());
               }
-              if (numSlotsAvailable.get() == 0 && (enablePreemption == false || preemptionQueue.isEmpty()))
{
-                shouldWait = true;
-              }
-            } else {
-              if (numSlotsAvailable.get() == 0) {
-                shouldWait = true;
-              }
+              shouldWait = shouldWait && (enablePreemption == false || preemptionQueue.isEmpty());
             }
             if (shouldWait) {
               if (!isShutdown.get()) {
@@ -299,36 +296,43 @@ public class TaskExecutorService extends AbstractService
               continue;
             }
             try {
-              trySchedule(task);
-              // wait queue could have been re-ordered in the mean time because of concurrent
task
+              tryScheduleUnderLock(task);
+              // Wait queue could have been re-ordered in the mean time because of concurrent
task
               // submission. So remove the specific task instead of the head task.
               if (waitQueue.remove(task)) {
                 if (metrics != null) {
                   metrics.setExecutorNumQueuedRequests(waitQueue.size());
                 }
               }
+              lastKillTimeMs = null; // We have filled the spot we may have killed for (if
any).
             } catch (RejectedExecutionException e) {
               rejectedException = e;
             }
-          }
+          } // synchronized (lock)
 
           // Handle the rejection outside of the lock
-          if (rejectedException !=null) {
-            handleScheduleAttemptedRejection(task);
-          }
-
-          synchronized (lock) {
-            while (waitQueue.isEmpty()) {
-              if (!isShutdown.get()) {
-                lock.wait();
+          if (rejectedException != null) {
+            if (lastKillTimeMs != null
+                && (clock.getTime() - lastKillTimeMs) < PREEMPTION_KILL_GRACE_MS)
{
+              // We killed something, but still got rejected. Wait a bit to give a chance
to our
+              // previous victim to actually die.
+              synchronized (lock) {
+                lock.wait(PREEMPTION_KILL_GRACE_SLEEP_MS);
+              }
+            } else {
+              if (isDebugEnabled && lastKillTimeMs != null) {
+                LOG.debug("Grace period ended for the previous kill; preemtping more tasks");
+              }
+              if (handleScheduleAttemptedRejection(task)) {
+                lastKillTimeMs = clock.getTime(); // We killed something.
               }
             }
           }
         }
-
       } catch (InterruptedException e) {
         if (isShutdown.get()) {
-          LOG.info(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " thread has been interrupted
after shutdown.");
+          LOG.info(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT
+              + " thread has been interrupted after shutdown.");
         } else {
           LOG.warn(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT + " interrupted without shutdown",
e);
           throw new RuntimeException(e);
@@ -459,7 +463,7 @@ public class TaskExecutorService extends AbstractService
       }
     }
     synchronized (lock) {
-      lock.notify();
+      lock.notifyAll();
     }
 
     if (metrics != null) {
@@ -504,7 +508,7 @@ public class TaskExecutorService extends AbstractService
       } else {
         LOG.info("Ignoring killFragment request for {} since it isn't known", fragmentId);
       }
-      lock.notify();
+      lock.notifyAll();
     }
   }
 
@@ -537,72 +541,73 @@ public class TaskExecutorService extends AbstractService
   }
 
   @VisibleForTesting
-  void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException {
-
-      synchronized (lock) {
-        boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
-        LOG.info("Attempting to execute {}", taskWrapper);
-        ListenableFuture<TaskRunner2Result> future = executorService.submit(
-            taskWrapper.getTaskRunnerCallable());
-        runningFragmentCount.incrementAndGet();
-        taskWrapper.setIsInWaitQueue(false);
-        FutureCallback<TaskRunner2Result> wrappedCallback = createInternalCompletionListener(
-            taskWrapper);
-        // Callback on a separate thread so that when a task completes, the thread in the
main queue
-        // is actually available for execution and will not potentially result in a RejectedExecution
-        Futures.addCallback(future, wrappedCallback, executionCompletionExecutorService);
-
-        if (isDebugEnabled) {
-          LOG.debug("{} scheduled for execution. canFinish={}",
-              taskWrapper.getRequestId(), canFinish);
-        }
+  /** Assumes the epic lock is already taken. */
+  void tryScheduleUnderLock(final TaskWrapper taskWrapper) throws RejectedExecutionException
{
+    if (isInfoEnabled) {
+      LOG.info("Attempting to execute {}", taskWrapper);
+    }
+    ListenableFuture<TaskRunner2Result> future = executorService.submit(
+        taskWrapper.getTaskRunnerCallable());
+    runningFragmentCount.incrementAndGet();
+    taskWrapper.setIsInWaitQueue(false);
+    FutureCallback<TaskRunner2Result> wrappedCallback = createInternalCompletionListener(
+      taskWrapper);
+    // Callback on a separate thread so that when a task completes, the thread in the main
queue
+    // is actually available for execution and will not potentially result in a RejectedExecution
+    Futures.addCallback(future, wrappedCallback, executionCompletionExecutorService);
+
+    boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
+    if (isDebugEnabled) {
+      LOG.debug("{} scheduled for execution. canFinish={}", taskWrapper.getRequestId(), canFinish);
+    }
 
-        // only tasks that cannot finish immediately are pre-emptable. In other words, if
all inputs
-        // to the tasks are not ready yet, the task is eligible for pre-emptable.
-        if (enablePreemption) {
-          if (!canFinish) {
-            if (isInfoEnabled) {
-              LOG.info("{} is not finishable. Adding it to pre-emption queue",
-                  taskWrapper.getRequestId());
-            }
-            addToPreemptionQueue(taskWrapper);
-          }
+    // only tasks that cannot finish immediately are pre-emptable. In other words, if all
inputs
+    // to the tasks are not ready yet, the task is eligible for pre-emptable.
+    if (enablePreemption) {
+      if (!canFinish) {
+        if (isInfoEnabled) {
+          LOG.info("{} is not finishable. Adding it to pre-emption queue",
+              taskWrapper.getRequestId());
         }
+        addToPreemptionQueue(taskWrapper);
       }
-      numSlotsAvailable.decrementAndGet();
-      if (metrics != null) {
-        metrics.setNumExecutorsAvailable(numSlotsAvailable.get());
-      }
+    }
+    numSlotsAvailable.decrementAndGet();
+    if (metrics != null) {
+      metrics.setNumExecutorsAvailable(numSlotsAvailable.get());
+    }
   }
 
-  private void handleScheduleAttemptedRejection(TaskWrapper taskWrapper) {
+  private boolean handleScheduleAttemptedRejection(TaskWrapper taskWrapper) {
     if (enablePreemption && taskWrapper.getTaskRunnerCallable().canFinish()
         && !preemptionQueue.isEmpty()) {
-
       if (isDebugEnabled) {
         LOG.debug("Preemption Queue: " + preemptionQueue);
       }
 
-      TaskWrapper pRequest = removeAndGetNextFromPreemptionQueue();
-
-      // Avoid preempting tasks which are finishable - callback still to be processed.
-      if (pRequest != null) {
+      while (true) { // Try to preempt until we have something.
+        TaskWrapper pRequest = removeAndGetNextFromPreemptionQueue();
+        if (pRequest == null) {
+          return false; // Woe us.
+        }
         if (pRequest.getTaskRunnerCallable().canFinish()) {
-          LOG.info(
-              "Removed {} from preemption queue, but not preempting since it's now finishable",
+          LOG.info("Removed {} from preemption queue, but not preempting since it's now finishable",
               pRequest.getRequestId());
-        } else {
-          if (isInfoEnabled) {
-            LOG.info("Invoking kill task for {} due to pre-emption to run {}",
-                pRequest.getRequestId(), taskWrapper.getRequestId());
-          }
-          // The task will either be killed or is already in the process of completing, which
will
-          // trigger the next scheduling run, or result in available slots being higher than
0,
-          // which will cause the scheduler loop to continue.
-          pRequest.getTaskRunnerCallable().killTask();
+          continue; // Try something else.
         }
+        if (isInfoEnabled) {
+          LOG.info("Invoking kill task for {} due to pre-emption to run {}",
+              pRequest.getRequestId(), taskWrapper.getRequestId());
+        }
+        // The task will either be killed or is already in the process of completing, which
will
+        // trigger the next scheduling run, or result in available slots being higher than
0,
+        // which will cause the scheduler loop to continue.
+        pRequest.getTaskRunnerCallable().killTask();
+        // We've killed something and may want to wait for it to die.
+        return true;
       }
     }
+    return false;
   }
 
   private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState)
{
@@ -628,7 +633,7 @@ public class TaskExecutorService extends AbstractService
             taskWrapper.getRequestId(), newFinishableState);
         addToPreemptionQueue(taskWrapper);
       }
-      lock.notify();
+      lock.notifyAll();
     }
   }
 
@@ -765,7 +770,7 @@ public class TaskExecutorService extends AbstractService
       }
       synchronized (lock) {
         if (!waitQueue.isEmpty()) {
-          lock.notify();
+          lock.notifyAll();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/82d34686/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 18f0db9..c077d75 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -329,6 +329,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result>
{
         synchronized (this) {
           TezTaskAttemptID ta = taskSpec.getTaskAttemptID();
           LOG.info("Kill task requested for id={}, taskRunnerSetup={}", ta, taskRunner !=
null);
+          shouldRunTask = false;
           if (taskRunner != null) {
             killtimerWatch.start();
             LOG.info("Issuing kill to task {}", taskSpec.getTaskAttemptID());
@@ -346,7 +347,6 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result>
{
               LOG.info("Kill request for task {} did not complete because the task is already
complete",
                   ta);
             }
-            shouldRunTask = false;
           } else {
             // If the task hasn't started, and it is killed - report back to the AM that
the task has been killed.
             LOG.debug("Reporting taskKilled for non-started fragment {}", getRequestId());

http://git-wip-us.apache.org/repos/asf/hive/blob/82d34686/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 259e383..6287ae8 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -178,6 +178,8 @@ public class TaskExecutorTestHelpers {
     private final Condition sleepCondition = lock.newCondition();
     private boolean shouldSleep = true;
     private final Condition finishedCondition = lock.newCondition();
+    private final Object killDelay = new Object();
+    private boolean isOkToFinish = true;
 
     public MockRequest(SubmitWorkRequestProto requestProto, QueryFragmentInfo fragmentInfo,
                        boolean canFinish, long workTime, TezEvent initialEvent) {
@@ -207,17 +209,19 @@ public class TaskExecutorTestHelpers {
         lock.lock();
         try {
           if (shouldSleep) {
+            logInfo(super.getRequestId() + " is sleeping for " + workTime, null);
             sleepCondition.await(workTime, TimeUnit.MILLISECONDS);
           }
         } catch (InterruptedException e) {
           wasInterrupted.set(true);
-          return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
+          return handleKill();
         } finally {
           lock.unlock();
         }
         if (wasKilled.get()) {
-          return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
+          return handleKill();
         } else {
+          logInfo(super.getRequestId() + " succeeded", null);
           return new TaskRunner2Result(EndReason.SUCCESS, null, null, false);
         }
       } finally {
@@ -231,6 +235,33 @@ public class TaskExecutorTestHelpers {
       }
     }
 
+    private TaskRunner2Result handleKill() {
+      boolean hasLogged = false;
+      while (true) {
+        synchronized (killDelay) {
+          if (isOkToFinish) break;
+          if (!hasLogged) {
+            logInfo("Waiting after the kill: " + getRequestId());
+            hasLogged = true;
+          }
+          try {
+            killDelay.wait(100);
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+      logInfo("Finished with the kill: " + getRequestId());
+      return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false);
+    }
+
+    public void unblockKill() {
+      synchronized (killDelay) {
+        logInfo("Unblocking the kill: " + getRequestId());
+        isOkToFinish = true;
+        killDelay.notifyAll();
+      }
+    }
+
     @Override
     public void killTask() {
       lock.lock();
@@ -292,6 +323,10 @@ public class TaskExecutorTestHelpers {
     public boolean canFinish() {
       return canFinish;
     }
+
+    public void setSleepAfterKill() {
+      isOkToFinish = false;
+    }
   }
 
   private static void logInfo(String message, Throwable t) {

http://git-wip-us.apache.org/repos/asf/hive/blob/82d34686/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index bf7d1d8..df563f4 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -20,11 +20,15 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createMockRequest;
 import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createSubmitWorkRequestProto;
 import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import org.apache.hadoop.hive.llap.testhelpers.ControlledClock;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.yarn.util.Clock;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -303,6 +307,58 @@ public class TestTaskExecutorService {
     }
   }
 
+  @Test(timeout = 10000)
+  public void testDontKillMultiple() throws InterruptedException {
+    MockRequest victim1 = createMockRequest(1, 1, 100, 100, false, 20000l);
+    MockRequest victim2 = createMockRequest(2, 1, 100, 100, false, 20000l);
+    runPreemptionGraceTest(victim1, victim2, 200);
+    assertNotEquals(victim1.wasPreempted(), victim2.wasPreempted()); // One and only one.
+  }
+
+  @Test(timeout = 10000)
+  public void testDoKillMultiple() throws InterruptedException {
+    MockRequest victim1 = createMockRequest(1, 1, 100, 100, false, 20000l);
+    MockRequest victim2 = createMockRequest(2, 1, 100, 100, false, 20000l);
+    runPreemptionGraceTest(victim1, victim2, 1000);
+    assertTrue(victim1.wasPreempted());
+    assertTrue(victim2.wasPreempted());
+  }
+
+  private void runPreemptionGraceTest(
+      MockRequest victim1, MockRequest victim2, int time) throws InterruptedException {
+    MockRequest preemptor = createMockRequest(3, 1, 100, 100, true, 20000l);
+    victim1.setSleepAfterKill();
+    victim2.setSleepAfterKill();
+
+    ControlledClock clock = new ControlledClock(new SystemClock());
+    clock.setTime(0);
+    TaskExecutorServiceForTest taskExecutorService = new TaskExecutorServiceForTest(
+        2, 3, ShortestJobFirstComparator.class.getName(), true, clock);
+    taskExecutorService.init(new Configuration());
+    taskExecutorService.start();
+
+    try {
+      taskExecutorService.schedule(victim1);
+      awaitStartAndSchedulerRun(victim1, taskExecutorService);
+      taskExecutorService.schedule(victim2);
+      awaitStartAndSchedulerRun(victim2, taskExecutorService);
+      taskExecutorService.schedule(preemptor);
+      taskExecutorService.waitForScheduleRuns(5); // Wait for scheduling to run a few times.
+      clock.setTime(time);
+      taskExecutorService.waitForScheduleRuns(5); // Wait for scheduling to run a few times.
+      victim1.unblockKill();
+      victim2.unblockKill();
+      preemptor.complete();
+      preemptor.awaitEnd();
+      TaskExecutorServiceForTest.InternalCompletionListenerForTest icl3 =
+          taskExecutorService.getInternalCompletionListenerForTest(preemptor.getRequestId());
+      icl3.awaitCompletion();
+    } finally {
+      taskExecutorService.shutDown(false);
+    }
+  }
+
+
 
   private void awaitStartAndSchedulerRun(MockRequest mockRequest,
                                          TaskExecutorServiceForTest taskExecutorServiceForTest)
throws
@@ -319,23 +375,43 @@ public class TestTaskExecutorService {
     private final Lock tryScheduleLock = new ReentrantLock();
     private final Condition tryScheduleCondition = tryScheduleLock.newCondition();
     private boolean isInTrySchedule = false;
+    private int scheduleAttempts = 0;
+
+    public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize,
+        String waitQueueComparatorClassName, boolean enablePreemption) {
+      this(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, null);
+    }
 
-    public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName,
-                                      boolean enablePreemption) {
+    public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize,
+        String waitQueueComparatorClassName, boolean enablePreemption, Clock clock) {
       super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption,
-          Thread.currentThread().getContextClassLoader(), null);
+          Thread.currentThread().getContextClassLoader(), null, clock);
     }
 
-    private ConcurrentMap<String, InternalCompletionListenerForTest> completionListeners
= new ConcurrentHashMap<>();
+    private ConcurrentMap<String, InternalCompletionListenerForTest> completionListeners
=
+        new ConcurrentHashMap<>();
 
     @Override
-    void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException {
+    void tryScheduleUnderLock(final TaskWrapper taskWrapper) throws RejectedExecutionException
{
       tryScheduleLock.lock();
       try {
         isInTrySchedule = true;
-        super.trySchedule(taskWrapper);
+        super.tryScheduleUnderLock(taskWrapper);
+      } finally {
         isInTrySchedule = false;
+        ++scheduleAttempts;
         tryScheduleCondition.signal();
+        tryScheduleLock.unlock();
+      }
+    }
+
+    public void waitForScheduleRuns(int n) throws InterruptedException {
+      tryScheduleLock.lock();
+      try {
+        int targetRuns = scheduleAttempts + n;
+        while (scheduleAttempts < targetRuns) {
+          tryScheduleCondition.await(100, TimeUnit.MILLISECONDS);
+        }
       } finally {
         tryScheduleLock.unlock();
       }


Mime
View raw message