hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject hive git commit: HIVE-12623: Add an option to force allocation of fragments on requested nodes (Sid Seth via Gunther Hagleitner)
Date Mon, 21 Dec 2015 23:19:49 GMT
Repository: hive
Updated Branches:
  refs/heads/master 34c83fb36 -> c65651930


HIVE-12623: Add an option to force allocation of fragments on requested nodes (Sid Seth via
Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c6565193
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c6565193
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c6565193

Branch: refs/heads/master
Commit: c65651930bd2fdf0bde01fe7a50708c49f8fddac
Parents: 34c83fb
Author: Gunther Hagleitner <gunther@apache.org>
Authored: Mon Dec 21 15:15:31 2015 -0800
Committer: Gunther Hagleitner <gunther@apache.org>
Committed: Mon Dec 21 15:15:31 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   7 +
 .../dag/app/rm/LlapTaskSchedulerService.java    | 233 +++++++++++++++---
 .../app/rm/TestLlapTaskSchedulerService.java    | 240 ++++++++++++++++++-
 3 files changed, 436 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c6565193/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0251a6f..d2dd9c6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2488,6 +2488,13 @@ public class HiveConf extends Configuration {
       "The number of tasks the AM TaskScheduler will try allocating per node. 0 indicates
that\n" +
       "this should be picked up from the Registry. -1 indicates unlimited capacity; positive\n"
+
       "values indicate a specific bound.", "llap.task.scheduler.num.schedulable.tasks.per.node"),
+    LLAP_TASK_SCHEDULER_LOCALITY_DELAY(
+        "hive.llap.task.scheduler.locality.delay", "0ms",
+        new TimeValidator(TimeUnit.MILLISECONDS, -1l, true, Long.MAX_VALUE, true),
+        "Amount of time to wait before allocating a request which contains location information,"
+
+            " to a location other than the ones requested. Set to -1 for an infinite delay,
0" +
+            "for a no delay. Currently these are the only two supported values"
+    ),
     LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE("hive.llap.daemon.task.scheduler.wait.queue.size",
       10, "LLAP scheduler maximum queue size.", "llap.daemon.task.scheduler.wait.queue.size"),
     LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME(

http://git-wip-us.apache.org/repos/asf/hive/blob/c6565193/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
index 9821117..6beb4f8 100644
--- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
+++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java
@@ -47,12 +47,13 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.registry.ServiceInstance;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
@@ -111,6 +112,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   @VisibleForTesting
   final DelayQueue<NodeInfo> disabledNodesQueue = new DelayQueue<>();
 
+  private final boolean forceLocation;
+
   private final ContainerFactory containerFactory;
   private final Random random = new Random();
   private final Clock clock;
@@ -130,7 +133,10 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private final SchedulerCallable schedulerCallable = new SchedulerCallable();
 
   private final AtomicBoolean isStopped = new AtomicBoolean(false);
+  // Tracks total pending preemptions.
   private final AtomicInteger pendingPreemptions = new AtomicInteger(0);
+  // Tracks pending preemptions per host, using the hostname || Always to be accessed inside
a lock
+  private final Map<String, MutableInt> pendingPreemptionsPerHost = new HashMap<>();
 
   private final NodeBlacklistConf nodeBlacklistConf;
 
@@ -185,6 +191,14 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     this.numSchedulableTasksPerNode =
         HiveConf.getIntVar(conf, ConfVars.LLAP_TASK_SCHEDULER_NUM_SCHEDULABLE_TASKS_PER_NODE);
 
+    long localityDelayMs = HiveConf
+        .getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY, TimeUnit.MILLISECONDS);
+    if (localityDelayMs == -1) {
+      this.forceLocation = true;
+    } else {
+      this.forceLocation = false;
+    }
+
     int memoryPerExecutor = (int) (memoryPerInstance / (float) executorsPerInstance);
     int coresPerExecutor = (int) (coresPerInstance / (float) executorsPerInstance);
     this.resourcePerExecutor = Resource.newInstance(memoryPerExecutor, coresPerExecutor);
@@ -206,7 +220,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     LOG.info("Running with configuration: " + "memoryPerInstance=" + memoryPerInstance
         + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance="
         + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor
-        + ", nodeBlacklistConf=" + nodeBlacklistConf);
+        + ", nodeBlacklistConf=" + nodeBlacklistConf
+        + ", forceLocation=" + forceLocation);
   }
 
   @Override
@@ -431,7 +446,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       // Re-enable the node if preempted
       if (taskInfo.preempted) {
         LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task,
endReason);
-        pendingPreemptions.decrementAndGet();
+        unregisterPendingPreemption(taskInfo.assignedInstance.getHost());
         nodeInfo.registerUnsuccessfulTaskEnd(true);
         if (nodeInfo.isDisabled()) {
           // Re-enable the node. If a task succeeded, a slot may have become available.
@@ -514,7 +529,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
    * @param request the list of preferred hosts. null implies any host
    * @return
    */
-  private NodeServiceInstancePair selectHost(TaskInfo request) {
+  private SelectHostResult selectHost(TaskInfo request) {
     String[] requestedHosts = request.requestedHosts;
     readLock.lock(); // Read-lock. Not updating any stats at the moment.
     try {
@@ -528,26 +543,45 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
       // If there's no memory available, fail
       if (getTotalResources().getMemory() <= 0) {
-        return null;
+        return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY;
       }
 
-      if (requestedHosts != null) {
+      if (requestedHosts != null && requestedHosts.length > 0) {
         int prefHostCount = -1;
+        boolean requestedHostExists = false;
         for (String host : requestedHosts) {
           prefHostCount++;
           // Pick the first host always. Weak attempt at cache affinity.
           Set<ServiceInstance> instances = activeInstances.getByHost(host);
           if (!instances.isEmpty()) {
+            requestedHostExists = true;
             for (ServiceInstance inst : instances) {
               NodeInfo nodeInfo = instanceToNodeMap.get(inst);
               if (nodeInfo != null && nodeInfo.canAcceptTask()) {
                 LOG.info("Assigning " + inst + " when looking for " + host + "." +
-                    " FirstRequestedHost=" + (prefHostCount == 0));
-                return new NodeServiceInstancePair(inst, nodeInfo);
+                    " FirstRequestedHost=" + (prefHostCount == 0) +
+                    (requestedHosts.length > 1 ? "#prefLocations=" + requestedHosts.length
: ""));
+                return new SelectHostResult(inst, nodeInfo);
               }
             }
           }
         }
+        // Check if forcing the location is required.
+        if (forceLocation) {
+          if (requestedHostExists) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Skipping non-local location allocation for [" + request.task +
+                  "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]");
+            }
+            return SELECT_HOST_RESULT_DELAYED_LOCALITY;
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Not skipping non-local location allocation for [" + request.task
+
+                  "] when trying to allocate on [" + Arrays.toString(requestedHosts) +
+                  "] since none of these hosts are part of the known list");
+            }
+          }
+        }
       }
       /* fall through - miss in locality (random scheduling) */
       Entry<ServiceInstance, NodeInfo>[] all =
@@ -559,12 +593,15 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         for (int i = 0; i < all.length; i++) {
           Entry<ServiceInstance, NodeInfo> inst = all[(i + n) % all.length];
           if (inst.getValue().canAcceptTask()) {
-            LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length);
-            return new NodeServiceInstancePair(inst.getKey(), inst.getValue());
+            LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length
+
+                ", requestedHosts=" +
+                ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
+                    Arrays.toString(requestedHosts)));
+            return new SelectHostResult(inst.getKey(), inst.getValue());
           }
         }
       }
-      return null;
+      return SELECT_HOST_RESULT_DELAYED_RESOURCES;
     } finally {
       readLock.unlock();
     }
@@ -716,6 +753,20 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
+  private enum ScheduleResult {
+    // Successfully scheduled
+    SCHEDULED,
+
+    // Delayed to find a local match
+    DELAYED_LOCALITY,
+
+    // Delayed due to temporary resource availability
+    DELAYED_RESOURCES,
+
+    // Inadequate total resources - will never succeed / wait for new executors to become
available
+    INADEQUATE_TOTAL_RESOURCES,
+  }
+
   @VisibleForTesting
   protected void schedulePendingTasks() {
     writeLock.lock();
@@ -737,22 +788,65 @@ public class LlapTaskSchedulerService extends TaskScheduler {
             dagStats.registerDelayedAllocation();
           }
           taskInfo.triedAssigningTask();
-          boolean scheduled = scheduleTask(taskInfo);
-          if (scheduled) {
+          ScheduleResult scheduleResult = scheduleTask(taskInfo);
+          if (scheduleResult == ScheduleResult.SCHEDULED) {
             taskIter.remove();
           } else {
+            // TODO Handle INADEQUATE_TOTAL_RESOURCES eventually - either by throwin an error
immediately,
+            // or waiting for some timeout for new executors and then throwing an error
+
             // Try pre-empting a task so that a higher priority task can take it's place.
-            // Preempt only if there's not pending preemptions to avoid preempting twice
for a task.
-            LOG.info("Attempting to preempt for {}, pendingPreemptions={}", taskInfo.task,
pendingPreemptions.get());
-            if (pendingPreemptions.get() == 0) {
-              preemptTasks(entry.getKey().getPriority(), 1);
+            // Preempt only if there's no pending preemptions to avoid preempting twice for
a task.
+            String[] potentialHosts;
+            if (scheduleResult == ScheduleResult.DELAYED_LOCALITY) {
+              // preempt only on specific hosts, if no preemptions already exist on those.
+              potentialHosts = taskInfo.requestedHosts;
+              //Protect against a bad location being requested.
+              if (potentialHosts == null || potentialHosts.length == 0) {
+                potentialHosts = null;
+              }
+            } else {
+              // preempt on any host.
+              potentialHosts = null;
             }
 
+            if (potentialHosts != null) {
+              // Preempt on specific host
+              boolean shouldPreempt = true;
+              for (String host : potentialHosts) {
+                // Preempt only if there are not pending preemptions on the same host
+                // When the premption registers, the request at the highest priority will
be given the slot,
+                // even if the initial request was for some other task.
+                // TODO Maybe register which task the preemption was for, to avoid a bad
non-local allocation.
+                MutableInt pendingHostPreemptions = pendingPreemptionsPerHost.get(host);
+                if (pendingHostPreemptions != null && pendingHostPreemptions.intValue()
> 0) {
+                  shouldPreempt = false;
+                  break;
+                }
+              }
+              if (shouldPreempt) {
+                LOG.info("Attempting to preempt for {}, pendingPreemptions={} on hosts={}",
+                    taskInfo.task, pendingPreemptions.get(), Arrays.toString(potentialHosts));
+                preemptTasks(entry.getKey().getPriority(), 1, potentialHosts);
+              }
+            } else {
+              // Request for a preemption if there's none pending. If a single preemption
is pending,
+              // and this is the next task to be assigned, it will be assigned once that
slot becomes available.
+              if (pendingPreemptions.get() == 0) {
+                LOG.info("Attempting to preempt for {}, pendingPreemptions={} on any host",
+                    taskInfo.task, pendingPreemptions.get());
+                preemptTasks(entry.getKey().getPriority(), 1, null);
+              }
+            }
+            // Since there was an allocation failure - don't try assigning tasks at the next
priority.
+
             scheduledAllAtPriority = false;
-            // Don't try assigning tasks at the next priority.
-            break;
-          }
-        }
+            // Don't break if this allocation failure was a result of a LOCALITY_DELAY. Others
could still be allocated.
+            if (scheduleResult != ScheduleResult.DELAYED_LOCALITY) {
+              break;
+            }
+          } // end of else - i.e. could not allocate
+        } // end of loop over pending tasks
         if (taskListAtPriority.isEmpty()) {
           // Remove the entry, if there's nothing left at the specific priority level
           pendingIterator.remove();
@@ -768,11 +862,10 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   }
 
 
-  private boolean scheduleTask(TaskInfo taskInfo) {
-    NodeServiceInstancePair nsPair = selectHost(taskInfo);
-    if (nsPair == null) {
-      return false;
-    } else {
+  private ScheduleResult scheduleTask(TaskInfo taskInfo) {
+    SelectHostResult selectHostResult = selectHost(taskInfo);
+    if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) {
+      NodeServiceInstancePair nsPair = selectHostResult.nodeServiceInstancePair;
       Container container =
           containerFactory.createContainer(resourcePerExecutor, taskInfo.priority,
               nsPair.getServiceInstance().getHost(),
@@ -788,16 +881,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       } finally {
         writeLock.unlock();
       }
-
       getContext().taskAllocated(taskInfo.task, taskInfo.clientCookie, container);
-      return true;
     }
+    return selectHostResult.scheduleResult;
   }
 
   // Removes tasks from the runningList and sends out a preempt request to the system.
   // Subsequent tasks will be scheduled again once the de-allocate request for the preempted
   // task is processed.
-  private void preemptTasks(int forPriority, int numTasksToPreempt) {
+  private void preemptTasks(int forPriority, int numTasksToPreempt, String []potentialHosts)
{
+    Set<String> preemptHosts;
+    if (potentialHosts == null) {
+      preemptHosts = null;
+    } else {
+      preemptHosts = Sets.newHashSet(potentialHosts);
+    }
     writeLock.lock();
     List<TaskInfo> preemptedTaskList = null;
     try {
@@ -810,17 +908,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           Iterator<TaskInfo> taskInfoIterator = entryAtPriority.getValue().iterator();
           while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt)
{
             TaskInfo taskInfo = taskInfoIterator.next();
-            preemptedCount++;
-            LOG.info("preempting {} for task at priority {}", taskInfo, forPriority);
-            taskInfo.setPreemptedInfo(clock.getTime());
-            if (preemptedTaskList == null) {
-              preemptedTaskList = new LinkedList<>();
+            if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedInstance.getHost()))
{
+              // Candidate for preemption.
+              preemptedCount++;
+              LOG.info("preempting {} for task at priority {} with potentialHosts={}", taskInfo,
+                  forPriority, potentialHosts == null ? "" : Arrays.toString(potentialHosts));
+              taskInfo.setPreemptedInfo(clock.getTime());
+              if (preemptedTaskList == null) {
+                preemptedTaskList = new LinkedList<>();
+              }
+              dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost());
+              preemptedTaskList.add(taskInfo);
+              registerPendingPreemption(taskInfo.assignedInstance.getHost());
+              // Remove from the runningTaskList
+              taskInfoIterator.remove();
             }
-            dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost());
-            preemptedTaskList.add(taskInfo);
-            pendingPreemptions.incrementAndGet();
-            // Remove from the runningTaskList
-            taskInfoIterator.remove();
           }
 
           // Remove entire priority level if it's been emptied.
@@ -841,12 +943,43 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       for (TaskInfo taskInfo : preemptedTaskList) {
         LOG.info("DBG: Preempting task {}", taskInfo);
         getContext().preemptContainer(taskInfo.containerId);
+        // Preemption will finally be registered as a deallocateTask as a result of preemptContainer
+        // That resets preemption info and allows additional tasks to be pre-empted if required.
       }
     }
     // The schedule loop will be triggered again when the deallocateTask request comes in
for the
     // preempted task.
   }
 
+  private void registerPendingPreemption(String host) {
+    writeLock.lock();
+    try {
+      pendingPreemptions.incrementAndGet();
+      MutableInt val = pendingPreemptionsPerHost.get(host);
+      if (val == null) {
+        val = new MutableInt(1);
+        pendingPreemptionsPerHost.put(host, val);
+      }
+      val.increment();
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void unregisterPendingPreemption(String host) {
+    writeLock.lock();
+    try {
+      pendingPreemptions.decrementAndGet();
+      MutableInt val = pendingPreemptionsPerHost.get(host);
+      Preconditions.checkNotNull(val);
+      val.decrement();
+      // Not bothering with removing the entry. There's a limited number of hosts, and a
good
+      // chance that the entry will make it back in when the AM is used for a long duration.
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
   private class NodeEnablerCallable implements Callable<Void> {
 
     private AtomicBoolean isShutdown = new AtomicBoolean(false);
@@ -1316,6 +1449,28 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
+  private static class SelectHostResult {
+    final NodeServiceInstancePair nodeServiceInstancePair;
+    final ScheduleResult scheduleResult;
+
+    SelectHostResult(ServiceInstance serviceInstance, NodeInfo nodeInfo) {
+      this.nodeServiceInstancePair = new NodeServiceInstancePair(serviceInstance, nodeInfo);
+      this.scheduleResult = ScheduleResult.SCHEDULED;
+    }
+
+    SelectHostResult(ScheduleResult scheduleResult) {
+      this.nodeServiceInstancePair = null;
+      this.scheduleResult = scheduleResult;
+    }
+  }
+
+  private static final SelectHostResult SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY =
+      new SelectHostResult(ScheduleResult.INADEQUATE_TOTAL_RESOURCES);
+  private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_LOCALITY =
+      new SelectHostResult(ScheduleResult.DELAYED_LOCALITY);
+  private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_RESOURCES =
+      new SelectHostResult(ScheduleResult.DELAYED_RESOURCES);
+
   private static class NodeServiceInstancePair {
     final NodeInfo nodeInfo;
     final ServiceInstance serviceInstance;

http://git-wip-us.apache.org/repos/asf/hive/blob/c6565193/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
index 4eccc06..4c1cbb3 100644
--- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
+++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java
@@ -20,6 +20,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -108,8 +109,6 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
-  // TODO Add a test to ensure the correct task is being preempted, and the completion for
the specific
-  // task triggers the next task to be scheduled.
 
   @Test(timeout=5000)
   public void testPreemption() throws InterruptedException, IOException {
@@ -123,11 +122,11 @@ public class TestLlapTaskSchedulerService {
       Object task1 = "task1";
       Object clientCookie1 = "cookie1";
       Object task2 = "task2";
-      Object clientCookie2 = "cookie1";
+      Object clientCookie2 = "cookie2";
       Object task3 = "task3";
-      Object clientCookie3 = "cookie1";
+      Object clientCookie3 = "cookie3";
       Object task4 = "task4";
-      Object clientCookie4 = "cookie1";
+      Object clientCookie4 = "cookie4";
 
       tsWrapper.controlScheduler(true);
       tsWrapper.allocateTask(task1, hosts, priority2, clientCookie1);
@@ -309,6 +308,231 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
+  @Test (timeout = 5000)
+  public void testForceLocalityTest1() throws IOException, InterruptedException {
+    // 2 hosts. 2 per host. 5 requests at the same priority.
+    // First 3 on host1, Next at host2, Last with no host.
+    // Third request on host1 should not be allocated immediately.
+    forceLocalityTest1(true);
+
+  }
+
+  @Test (timeout = 5000)
+  public void testNoForceLocalityCounterTest1() throws IOException, InterruptedException
{
+    // 2 hosts. 2 per host. 5 requests at the same priority.
+    // First 3 on host1, Next at host2, Last with no host.
+    // Third should allocate on host2, 4th on host2, 5th will wait.
+
+    forceLocalityTest1(false);
+  }
+
+  private void forceLocalityTest1(boolean forceLocality) throws IOException, InterruptedException
{
+    Priority priority1 = Priority.newInstance(1);
+
+    String[] hosts = new String[] {HOST1, HOST2};
+
+    String[] hostsH1 = new String[] {HOST1};
+    String[] hostsH2 = new String[] {HOST2};
+
+    TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000,
hosts, 1, 1, (forceLocality ? -1l : 0l));
+
+    try {
+      Object task1 = "task1";
+      Object clientCookie1 = "cookie1";
+      Object task2 = "task2";
+      Object clientCookie2 = "cookie2";
+      Object task3 = "task3";
+      Object clientCookie3 = "cookie3";
+      Object task4 = "task4";
+      Object clientCookie4 = "cookie4";
+      Object task5 = "task5";
+      Object clientCookie5 = "cookie5";
+
+      tsWrapper.controlScheduler(true);
+      //H1 - should allocate
+      tsWrapper.allocateTask(task1, hostsH1, priority1, clientCookie1);
+      //H1 - should allocate
+      tsWrapper.allocateTask(task2, hostsH1, priority1, clientCookie2);
+      //H1 - no capacity if force, should allocate otherwise
+      tsWrapper.allocateTask(task3, hostsH1, priority1, clientCookie3);
+      //H2 - should allocate
+      tsWrapper.allocateTask(task4, hostsH2, priority1, clientCookie4);
+      //No location - should allocate if force, no capacity otherwise
+      tsWrapper.allocateTask(task5, null, priority1, clientCookie5);
+
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numTotalAllocations == 4) {
+          break;
+        }
+      }
+
+      // Verify no preemption requests - since everything is at the same priority
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(4)).taskAllocated(argumentCaptor.capture(),
any(Object.class), any(Container.class));
+      assertEquals(4, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+      if (forceLocality) {
+        // task3 not allocated
+        assertEquals(task4, argumentCaptor.getAllValues().get(2));
+        assertEquals(task5, argumentCaptor.getAllValues().get(3));
+      } else {
+        assertEquals(task3, argumentCaptor.getAllValues().get(2));
+        assertEquals(task4, argumentCaptor.getAllValues().get(3));
+      }
+
+      //Complete one task on host1.
+      tsWrapper.deallocateTask(task1, true, null);
+
+      reset(tsWrapper.mockAppCallback);
+
+      // Try scheduling again.
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numTotalAllocations == 5) {
+          break;
+        }
+      }
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(argumentCaptor.capture(),
any(Object.class), any(Container.class));
+      assertEquals(1, argumentCaptor.getAllValues().size());
+      if (forceLocality) {
+        assertEquals(task3, argumentCaptor.getAllValues().get(0));
+      } else {
+        assertEquals(task5, argumentCaptor.getAllValues().get(0));
+      }
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testForcedLocalityUnknownHost() throws IOException, InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+
+    String[] hostsKnown = new String[]{HOST1};
+    String[] hostsUnknown = new String[]{HOST2};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+        new TestTaskSchedulerServiceWrapper(2000, hostsKnown, 1, 1, -1l);
+    try {
+      Object task1 = "task1";
+      Object clientCookie1 = "cookie1";
+
+      Object task2 = "task2";
+      Object clientCookie2 = "cookie2";
+
+      tsWrapper.controlScheduler(true);
+      // Should allocate since H2 is not known.
+      tsWrapper.allocateTask(task1, hostsUnknown, priority1, clientCookie1);
+      tsWrapper.allocateTask(task2, hostsKnown, priority1, clientCookie2);
+
+
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numTotalAllocations == 2) {
+          break;
+        }
+      }
+
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+
+  @Test(timeout = 5000)
+  public void testForcedLocalityPreemption() throws IOException, InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+    Priority priority2 = Priority.newInstance(2);
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+    String [] hostsH2 = new String[] {HOST2};
+
+    TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000,
hosts, 1, 1, -1l);
+
+    // Fill up host1 with p2 tasks.
+    // Leave host2 empty
+    // Try running p1 task on host1 - should preempt
+
+    try {
+      Object task1 = "task1";
+      Object clientCookie1 = "cookie1";
+      Object task2 = "task2";
+      Object clientCookie2 = "cookie2";
+      Object task3 = "task3";
+      Object clientCookie3 = "cookie3";
+      Object task4 = "task4";
+      Object clientCookie4 = "cookie4";
+
+      tsWrapper.controlScheduler(true);
+      tsWrapper.allocateTask(task1, hostsH1, priority2, clientCookie1);
+      tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2);
+      // This request at a lower priority should not affect anything.
+      tsWrapper.allocateTask(task3, hostsH1, priority2, clientCookie3);
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numLocalAllocations == 2) {
+          break;
+        }
+      }
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+      reset(tsWrapper.mockAppCallback);
+      // Allocate t4 at higher priority. t3 should not be allocated,
+      // and a preemption should be attempted on host1, despite host2 having available capacity
+      tsWrapper.allocateTask(task4, hostsH1, priority1, clientCookie4);
+
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numPreemptedTasks == 1) {
+          break;
+        }
+      }
+      verify(tsWrapper.mockAppCallback).preemptContainer(any(ContainerId.class));
+
+      tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION);
+
+      while (true) {
+        tsWrapper.signalSchedulerRun();
+        tsWrapper.awaitSchedulerRun();
+        if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
+          break;
+        }
+      }
+      verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
+          eq(clientCookie4), any(Container.class));
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
   private static class TestTaskSchedulerServiceWrapper {
     static final Resource resource = Resource.newInstance(1024, 1);
     Configuration conf;
@@ -329,6 +553,11 @@ public class TestLlapTaskSchedulerService {
 
     TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors,
int waitQueueSize) throws
         IOException, InterruptedException {
+      this(disableTimeoutMillis, hosts, numExecutors, waitQueueSize, 0l);
+    }
+
+    TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors,
int waitQueueSize, long localityDelayMs) throws
+        IOException, InterruptedException {
       conf = new Configuration();
       conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts);
       conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors);
@@ -336,6 +565,7 @@ public class TestLlapTaskSchedulerService {
       conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname,
           disableTimeoutMillis + "ms");
       conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false);
+      conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs);
 
       doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId();
       doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier();


Mime
View raw message