hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject hive git commit: HIVE-13667: Improve performance for ServiceInstanceSet.getByHost (Rajesh Balamohan reviewed by Siddharth Seth, Prasanth Jayachandran)
Date Fri, 03 Feb 2017 00:19:42 GMT
Repository: hive
Updated Branches:
  refs/heads/master 82998f463 -> 341225a7f


HIVE-13667: Improve performance for ServiceInstanceSet.getByHost (Rajesh Balamohan reviewed
by Siddharth Seth, Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/341225a7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/341225a7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/341225a7

Branch: refs/heads/master
Commit: 341225a7f5c858dba01be38282c3c2d1d20aff02
Parents: 82998f4
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Fri Feb 3 05:48:52 2017 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Fri Feb 3 05:48:52 2017 +0530

----------------------------------------------------------------------
 .../impl/LlapZookeeperRegistryImpl.java         | 160 +++++++++++++------
 .../tezplugins/LlapTaskSchedulerService.java    |  70 ++++----
 2 files changed, 147 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/341225a7/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 49ab59a..34ba6bb 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -31,11 +31,15 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.security.auth.login.AppConfigurationEntry;
 
+import com.google.common.collect.Sets;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
@@ -121,6 +125,9 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
   private static final String UNIQUE_IDENTIFIER = "llap.unique.id";
 
   private Set<ServiceInstanceStateChangeListener> stateChangeListeners;
+  private final Map<String, Set<ServiceInstance>> pathToInstanceCache;
+  private final Map<String, Set<ServiceInstance>> nodeToInstanceCache;
+  private final Lock instanceCacheLock = new ReentrantLock();
 
   // get local hostname
   private static final String hostname;
@@ -157,6 +164,8 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     this.instancesCache = null;
     this.instances = null;
     this.stateChangeListeners = new HashSet<>();
+    this.pathToInstanceCache = new ConcurrentHashMap<>();
+    this.nodeToInstanceCache = new ConcurrentHashMap<>();
 
     final boolean isSecure = UserGroupInformation.isSecurityEnabled();
     ACLProvider zooKeeperAclProvider = new ACLProvider() {
@@ -406,6 +415,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     private final int shufflePort;
     private final int outputFormatPort;
     private final String serviceAddress;
+    private final Resource resource;
 
     public DynamicServiceInstance(ServiceRecord srv) throws IOException {
       this.srv = srv;
@@ -437,6 +447,27 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
               AddressTypes.ADDRESS_PORT_FIELD));
       this.serviceAddress =
           RegistryTypeUtils.getAddressField(services.addresses.get(0), AddressTypes.ADDRESS_URI);
+      int memory = Integer.parseInt(srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname));
+      int vCores = Integer.parseInt(srv.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname));
+      this.resource = Resource.newInstance(memory, vCores);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      DynamicServiceInstance other = (DynamicServiceInstance) o;
+      return this.getWorkerIdentity().equals(other.getWorkerIdentity());
+    }
+
+    @Override
+    public int hashCode() {
+      return getWorkerIdentity().hashCode();
     }
 
     @Override
@@ -471,9 +502,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
 
     @Override
     public Resource getResource() {
-      int memory = Integer.parseInt(srv.get(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname));
-      int vCores = Integer.parseInt(srv.get(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname));
-      return Resource.newInstance(memory, vCores);
+      return resource;
     }
 
     @Override
@@ -497,51 +526,91 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     // A new ServiceInstance is created each time.
   }
 
+  private void addToCache(String path, String host, ServiceInstance instance) {
+    instanceCacheLock.lock();
+    try {
+      putInCache(path, pathToInstanceCache, instance);
+      putInCache(host, nodeToInstanceCache, instance);
+    } finally {
+      instanceCacheLock.unlock();
+    }
+    LOG.debug("Added path={}, host={} instance={} to cache."
+            + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}",
+        path, host, instance, pathToInstanceCache.size(), nodeToInstanceCache.size());
+  }
+
+  private void removeFromCache(String path, String host) {
+    instanceCacheLock.lock();
+    try {
+      pathToInstanceCache.remove(path);
+      nodeToInstanceCache.remove(host);
+    } finally {
+      instanceCacheLock.unlock();
+    }
+    LOG.debug("Removed path={}, host={} from cache."
+            + " pathToInstanceCache:size={}, nodeToInstanceCache:size={}",
+        path, host, pathToInstanceCache.size(), nodeToInstanceCache.size());
+  }
+
+  private void putInCache(String key, Map<String, Set<ServiceInstance>> cache,
+      ServiceInstance instance) {
+    Set<ServiceInstance> instanceSet = cache.get(key);
+    if (instanceSet == null) {
+      instanceSet = Sets.newHashSet();
+      cache.put(key, instanceSet);
+    }
+    instanceSet.add(instance);
+  }
+
+
   private class DynamicServiceInstanceSet implements ServiceInstanceSet {
     private final PathChildrenCache instancesCache;
 
     public DynamicServiceInstanceSet(final PathChildrenCache cache) {
       this.instancesCache = cache;
+      populateCache();
     }
 
-    @Override
-    public Collection<ServiceInstance> getAll() {
-      List<ServiceInstance> instances = new ArrayList<>();
-      // TODO: we could refresh instanceCache here on previous failure
+    private void populateCache() {
       for (ChildData childData : instancesCache.getCurrentData()) {
         if (childData == null) continue;
         byte[] data = childData.getData();
         if (data == null) continue;
-        if (!extractNodeName(childData).startsWith(WORKER_PREFIX)) continue;
+        String nodeName = extractNodeName(childData);
+        if (!nodeName.startsWith(WORKER_PREFIX)) continue;
         try {
           ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
           ServiceInstance instance = new DynamicServiceInstance(srv);
-          instances.add(instance);
+          addToCache(childData.getPath(), instance.getHost(), instance);
         } catch (IOException e) {
           LOG.error("Unable to decode data for zkpath: {}." +
               " Ignoring from current instances list..", childData.getPath());
         }
       }
+    }
+
+    @Override
+    public Collection<ServiceInstance> getAll() {
+      Set<ServiceInstance> instances =  new HashSet<>();
+      for(Set<ServiceInstance> instanceSet : pathToInstanceCache.values()) {
+        instances.addAll(instanceSet);
+      }
       return instances;
     }
 
     @Override
     public Collection<ServiceInstance> getAllInstancesOrdered(boolean consistentIndexes)
{
       Map<String, Long> slotByWorker = new HashMap<String, Long>();
-      List<ServiceInstance> unsorted = new LinkedList<ServiceInstance>();
+      Set<ServiceInstance> unsorted = Sets.newHashSet();
       for (ChildData childData : instancesCache.getCurrentData()) {
         if (childData == null) continue;
         byte[] data = childData.getData();
         if (data == null) continue;
         String nodeName = extractNodeName(childData);
         if (nodeName.startsWith(WORKER_PREFIX)) {
-          try {
-            ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
-            ServiceInstance instance = new DynamicServiceInstance(srv);
-            unsorted.add(instance);
-          } catch (IOException e) {
-            LOG.error("Unable to decode data for zkpath: {}." +
-                " Ignoring from current instances list..", childData.getPath());
+          Set<ServiceInstance> instances = pathToInstanceCache.get(childData.getPath());
+          if (instances != null) {
+            unsorted.addAll(instances);
           }
         } else if (nodeName.startsWith(SLOT_PREFIX)) {
           slotByWorker.put(extractWorkerIdFromSlot(childData),
@@ -599,27 +668,8 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
 
     @Override
     public Set<ServiceInstance> getByHost(String host) {
-      Set<ServiceInstance> byHost = new HashSet<>();
-      for (ChildData childData : instancesCache.getCurrentData()) {
-        if (childData == null) continue;
-        byte[] data = childData.getData();
-        if (data == null) continue;
-        if (!extractNodeName(childData).startsWith(WORKER_PREFIX)) continue;
-        try {
-          ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
-          ServiceInstance instance = new DynamicServiceInstance(srv);
-          if (host.equals(instance.getHost())) {
-            byHost.add(instance);
-          }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Locality comparing " + host + " to " + instance.getHost());
-          }
-        } catch (IOException e) {
-          LOG.error("Unable to decode data for zkpath: {}." +
-              " Ignoring host from current instances list..", childData.getPath());
-        }
-      }
-
+      Set<ServiceInstance> byHost = nodeToInstanceCache.get(host);
+      byHost = (byHost == null) ? Sets.<ServiceInstance>newHashSet() : byHost;
       if (LOG.isDebugEnabled()) {
         LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
       }
@@ -643,27 +693,35 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
           && client.getState() == CuratorFrameworkState.STARTED, "client is not started");
 
       synchronized (this) {
-        if (stateChangeListeners.isEmpty()) return;
         ChildData childData = event.getData();
-        if (childData == null) return;
+        if (childData == null)
+          return;
         String nodeName = extractNodeName(childData);
-        if (!nodeName.startsWith(WORKER_PREFIX)) return; // No need to propagate slot updates.
+        if (!nodeName.startsWith(WORKER_PREFIX))
+          return; // No need to propagate slot updates.
         LOG.info("{} for zknode {} in llap namespace", event.getType(), childData.getPath());
         ServiceInstance instance = extractServiceInstance(event, childData);
-        for (ServiceInstanceStateChangeListener listener : stateChangeListeners) {
-          switch (event.getType()) {
-          case CHILD_ADDED:
+        switch (event.getType()) {
+        case CHILD_ADDED:
+          addToCache(childData.getPath(), instance.getHost(), instance);
+          for (ServiceInstanceStateChangeListener listener : stateChangeListeners) {
             listener.onCreate(instance);
-            break;
-          case CHILD_UPDATED:
+          }
+          break;
+        case CHILD_UPDATED:
+          addToCache(childData.getPath(), instance.getHost(), instance);
+          for (ServiceInstanceStateChangeListener listener : stateChangeListeners) {
             listener.onUpdate(instance);
-            break;
-          case CHILD_REMOVED:
+          }
+          break;
+        case CHILD_REMOVED:
+          removeFromCache(childData.getPath(), instance.getHost());
+          for (ServiceInstanceStateChangeListener listener : stateChangeListeners) {
             listener.onRemove(instance);
-            break;
-          default:
-            // Ignore all the other events; logged above.
           }
+          break;
+        default:
+          // Ignore all the other events; logged above.
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/341225a7/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index ff7140d..3c0a661 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -701,20 +701,17 @@ public class LlapTaskSchedulerService extends TaskScheduler {
    */
   private SelectHostResult selectHost(TaskInfo request) {
     String[] requestedHosts = request.requestedHosts;
+    String requestedHostsDebugStr = Arrays.toString(requestedHosts);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("selectingHost for task={} on hosts={}", request.task, Arrays.toString(requestedHosts));
+      LOG.debug("selectingHost for task={} on hosts={}", request.task,
+          requestedHostsDebugStr);
     }
     long schedulerAttemptTime = clock.getTime();
     readLock.lock(); // Read-lock. Not updating any stats at the moment.
     try {
-      // If there's no memory available, fail
-      if (getTotalResources().getMemory() <= 0) {
-        return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY;
-      }
-
       boolean shouldDelayForLocality = request.shouldDelayForLocality(schedulerAttemptTime);
       LOG.debug("ShouldDelayForLocality={} for task={} on hosts={}", shouldDelayForLocality,
-          request.task, Arrays.toString(requestedHosts));
+          request.task, requestedHostsDebugStr);
       if (requestedHosts != null && requestedHosts.length > 0) {
         int prefHostCount = -1;
         boolean requestedHostsWillBecomeAvailable = false;
@@ -729,8 +726,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
                 if  (nodeInfo.canAcceptTask()) {
                   // Successfully scheduled.
                   LOG.info(
-                      "Assigning " + nodeInfo.toShortString() + " when looking for " + host
+
-                          ". local=true" + " FirstRequestedHost=" + (prefHostCount == 0)
+
+                      "Assigning {} when looking for {}."
+                          + " local=true FirstRequestedHost={}, #prefLocations={}", nodeInfo
+                          .toShortString(), host, (prefHostCount == 0) +
                           (requestedHosts.length > 1 ? ", #prefLocations=" + requestedHosts.length
:
                               ""));
                   return new SelectHostResult(nodeInfo);
@@ -767,7 +765,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           if (requestedHostsWillBecomeAvailable) {
             if (LOG.isDebugEnabled()) {
               LOG.debug("Delaying local allocation for [" + request.task +
-                  "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]"
+
+                  "] when trying to allocate on [" + requestedHostsDebugStr + "]" +
                   ". ScheduleAttemptTime=" + schedulerAttemptTime + ", taskDelayTimeout="
+
                   request.getLocalityDelayTimeout());
             }
@@ -775,7 +773,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           } else {
             if (LOG.isDebugEnabled()) {
               LOG.debug("Skipping local allocation for [" + request.task +
-                  "] when trying to allocate on [" + Arrays.toString(requestedHosts) +
+                  "] when trying to allocate on [" + requestedHostsDebugStr +
                   "] since none of these hosts are part of the known list");
             }
           }
@@ -799,10 +797,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         }
         for (NodeInfo nodeInfo : allNodes) {
           if (nodeInfo.canAcceptTask()) {
-            LOG.info("Assigning " + nodeInfo.toShortString()
-              + " when looking for any host, from #hosts=" + allNodes.size() + ", requestedHosts="
-              + ((requestedHosts == null || requestedHosts.length == 0)
-              ? "null" : Arrays.toString(requestedHosts)));
+            LOG.info("Assigning {} when looking for any host, from #hosts={}, requestedHosts={}",
+                nodeInfo.toShortString(), allNodes.size(), ((requestedHosts == null || requestedHosts.length
== 0)
+                    ? "null" : requestedHostsDebugStr));
             return new SelectHostResult(nodeInfo);
           }
         }
@@ -825,10 +822,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         for (int i = 0; i < allNodes.size(); i++) {
           NodeInfo nodeInfo = allNodes.get((i + requestedHostIdx + 1) % allNodes.size());
           if (nodeInfo.canAcceptTask()) {
-            LOG.info("Assigning " + nodeInfo.toShortString()
-              + " when looking for first requested host, from #hosts=" + allNodes.size()
+ ", requestedHosts="
-              + ((requestedHosts == null || requestedHosts.length == 0)
-              ? "null" : Arrays.toString(requestedHosts)));
+            if (LOG.isInfoEnabled()) {
+              LOG.info("Assigning {} when looking for first requested host, from #hosts={},"
+                      + " requestedHosts={}", nodeInfo.toShortString(), allNodes.size(),
+                  ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
+                      requestedHostsDebugStr));
+            }
             return new SelectHostResult(nodeInfo);
           }
         }
@@ -1036,6 +1035,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       }
       Iterator<Entry<Priority, List<TaskInfo>>> pendingIterator =
           pendingTasks.entrySet().iterator();
+      Resource totalResource = getTotalResources();
       while (pendingIterator.hasNext()) {
         Entry<Priority, List<TaskInfo>> entry = pendingIterator.next();
         List<TaskInfo> taskListAtPriority = entry.getValue();
@@ -1050,7 +1050,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
             dagStats.registerDelayedAllocation();
           }
           taskInfo.triedAssigningTask();
-          ScheduleResult scheduleResult = scheduleTask(taskInfo);
+          ScheduleResult scheduleResult = scheduleTask(taskInfo, totalResource);
           LOG.info("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult);
           if (scheduleResult == ScheduleResult.SCHEDULED) {
             taskIter.remove();
@@ -1105,13 +1105,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
                 }
               }
               if (shouldPreempt) {
-                LOG.debug("Preempting for {} on potential hosts={}. TotalPendingPreemptions={}",
-                    taskInfo.task, Arrays.toString(potentialHosts), pendingPreemptions.get());
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Preempting for {} on potential hosts={}. TotalPendingPreemptions={}",
+                      taskInfo.task, Arrays.toString(potentialHosts), pendingPreemptions.get());
+                }
                 preemptTasks(entry.getKey().getPriority(), 1, potentialHosts);
               } else {
-                LOG.debug(
-                    "Not preempting for {} on potential hosts={}. An existing preemption
request exists",
-                    taskInfo.task, Arrays.toString(potentialHosts));
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Not preempting for {} on potential hosts={}. An existing preemption
request exists",
+                      taskInfo.task, Arrays.toString(potentialHosts));
+                }
               }
             } else { // Either DELAYED_RESOURCES or DELAYED_LOCALITY with an unknown requested
host.
               // Request for a preemption if there's none pending. If a single preemption
is pending,
@@ -1172,7 +1175,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     return sb.toString();
   }
 
-  private ScheduleResult scheduleTask(TaskInfo taskInfo) {
+  private ScheduleResult scheduleTask(TaskInfo taskInfo, Resource totalResource) {
+    Preconditions.checkNotNull(totalResource, "totalResource can not be null");
+    // If there's no memory available, fail
+    if (totalResource.getMemory() <= 0) {
+      return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY.scheduleResult;
+    }
     SelectHostResult selectHostResult = selectHost(taskInfo);
     if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) {
       NodeInfo nodeInfo = selectHostResult.nodeInfo;
@@ -1202,12 +1210,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   // Subsequent tasks will be scheduled again once the de-allocate request for the preempted
   // task is processed.
   private void preemptTasks(int forPriority, int numTasksToPreempt, String []potentialHosts)
{
-    Set<String> preemptHosts;
-    if (potentialHosts == null) {
-      preemptHosts = null;
-    } else {
-      preemptHosts = Sets.newHashSet(potentialHosts);
-    }
+    Set<String> preemptHosts = null;
     writeLock.lock();
     List<TaskInfo> preemptedTaskList = null;
     try {
@@ -1217,6 +1220,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       while (iterator.hasNext() && preemptedCount < numTasksToPreempt) {
         Entry<Integer, TreeSet<TaskInfo>> entryAtPriority = iterator.next();
         if (entryAtPriority.getKey() > forPriority) {
+          if (potentialHosts != null && preemptHosts == null) {
+            preemptHosts = Sets.newHashSet(potentialHosts);
+          }
           Iterator<TaskInfo> taskInfoIterator = entryAtPriority.getValue().iterator();
           while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt)
{
             TaskInfo taskInfo = taskInfoIterator.next();


Mime
View raw message