hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [33/50] [abbrv] hive git commit: HIVE-15958: LLAP: IPC connections are not being reused for umbilical protocol (Prasanth Jayachandran reviewed by Siddharth Seth)
Date Tue, 28 Mar 2017 22:32:01 GMT
HIVE-15958: LLAP: IPC connections are not being reused for umbilical protocol (Prasanth Jayachandran
reviewed by Siddharth Seth)

Change-Id: Ib4e8ba9881cc560142dc3f75a130060b29ea7c57


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/badea3fa
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/badea3fa
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/badea3fa

Branch: refs/heads/branch-2.2
Commit: badea3fad5b8c6ef734e16446218ce01fa3a1904
Parents: 59cde66
Author: Prasanth Jayachandran <pjayachandran@hortonworks.com>
Authored: Mon Feb 27 19:45:37 2017 -0800
Committer: Owen O'Malley <omalley@apache.org>
Committed: Tue Mar 28 15:27:53 2017 -0700

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/AMReporter.java       | 74 ++++++++++++--------
 .../llap/daemon/impl/ContainerRunnerImpl.java   | 21 +++---
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 17 +++--
 .../hive/llap/daemon/impl/QueryTracker.java     | 20 +++---
 .../daemon/impl/TaskExecutorTestHelpers.java    |  2 +-
 5 files changed, 82 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/badea3fa/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index ede7e00..b01a495 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -113,9 +113,9 @@ public class AMReporter extends AbstractService {
     this.conf = conf;
     this.daemonId = daemonId;
     if (maxThreads < numExecutors) {
-      maxThreads = numExecutors;
       LOG.warn("maxThreads={} is less than numExecutors={}. Setting maxThreads=numExecutors",
-          maxThreads, numExecutors);
+        maxThreads, numExecutors);
+      maxThreads = numExecutors;
     }
     ExecutorService rawExecutor =
         new ThreadPoolExecutor(numExecutors, maxThreads,
@@ -227,12 +227,15 @@ public class AMReporter extends AbstractService {
 
   public void taskKilled(String amLocation, int port, String user, Token<JobTokenIdentifier>
jobToken,
                          final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId)
{
-    // Not re-using the connection for the AM heartbeat - which may or may not be open by
this point.
-    // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use
a new connection.
     LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
-    AMNodeInfo amNodeInfo =
-        new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout,
socketFactory,
-            conf);
+    AMNodeInfo amNodeInfo;
+    synchronized (knownAppMasters) {
+      amNodeInfo = knownAppMasters.get(amNodeId);
+      if (amNodeInfo == null) {
+        amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy,
retryTimeout, socketFactory,
+          conf);
+      }
+    }
 
     // Even if the service hasn't started up. It's OK to make this invocation since this
will
     // only happen after the AtomicReference address has been populated. Not adding an additional
check.
@@ -252,6 +255,20 @@ public class AMReporter extends AbstractService {
     });
   }
 
+  public void queryComplete(LlapNodeId llapNodeId) {
+    if (llapNodeId != null) {
+      synchronized (knownAppMasters) {
+        AMNodeInfo amNodeInfo = knownAppMasters.remove(llapNodeId);
+        // TODO: not stopping umbilical explicitly as some taskKill requests may get scheduled
during queryComplete
+        // which will be using the umbilical. HIVE-16021 should fix this, until then leave
umbilical open and wait for
+        // it to be closed after max idle timeout (10s default)
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Query complete received. Removed {}.", amNodeInfo);
+        }
+      }
+    }
+  }
+
   private class QueueLookupCallable extends CallableWithNdc<Void> {
 
     @Override
@@ -259,7 +276,7 @@ public class AMReporter extends AbstractService {
       while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
         try {
           final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take();
-          if (amNodeInfo.getTaskCount() == 0 || amNodeInfo.hasAmFailed()) {
+          if (amNodeInfo.hasAmFailed()) {
             synchronized (knownAppMasters) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug(
@@ -269,28 +286,29 @@ public class AMReporter extends AbstractService {
               }
               knownAppMasters.remove(amNodeInfo.amNodeId);
             }
-            amNodeInfo.stopUmbilical();
           } else {
-            // Add back to the queue for the next heartbeat, and schedule the actual heartbeat
-            long next = System.currentTimeMillis() + heartbeatInterval;
-            amNodeInfo.setNextHeartbeatTime(next);
-            pendingHeartbeatQueeu.add(amNodeInfo);
-            ListenableFuture<Void> future = executor.submit(new AMHeartbeatCallable(amNodeInfo));
-            Futures.addCallback(future, new FutureCallback<Void>() {
-              @Override
-              public void onSuccess(Void result) {
-                // Nothing to do.
-              }
-
-              @Override
-              public void onFailure(Throwable t) {
-                QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
-                amNodeInfo.setAmFailed(true);
-                LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}",
+            if (amNodeInfo.getTaskCount() > 0) {
+              // Add back to the queue for the next heartbeat, and schedule the actual heartbeat
+              long next = System.currentTimeMillis() + heartbeatInterval;
+              amNodeInfo.setNextHeartbeatTime(next);
+              pendingHeartbeatQueeu.add(amNodeInfo);
+              ListenableFuture<Void> future = executor.submit(new AMHeartbeatCallable(amNodeInfo));
+              Futures.addCallback(future, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void result) {
+                  // Nothing to do.
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                  QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
+                  amNodeInfo.setAmFailed(true);
+                  LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}",
                     amNodeInfo.amNodeId, currentQueryIdentifier, t);
-                queryFailedHandler.queryFailed(currentQueryIdentifier);
-              }
-            });
+                  queryFailedHandler.queryFailed(currentQueryIdentifier);
+                }
+              });
+            }
           }
         } catch (InterruptedException e) {
           if (isShutdown.get()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/badea3fa/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index cc4eff0..1176e5e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.UgiFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.DaemonId;
+import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.NotTezEventHelper;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
@@ -240,12 +241,12 @@ public class ContainerRunnerImpl extends CompositeService implements
ContainerRu
 
       Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
 
+      LlapNodeId amNodeId = LlapNodeId.getInstance(request.getAmHost(), request.getAmPort());
       QueryFragmentInfo fragmentInfo = queryTracker.registerFragment(
           queryIdentifier, qIdProto.getApplicationIdString(), dagId,
           vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier,
           vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(),
-          vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, request.getAmHost(),
-          request.getAmPort());
+          vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId);
 
       String[] localDirs = fragmentInfo.getLocalDirs();
       Preconditions.checkNotNull(localDirs);
@@ -388,14 +389,18 @@ public class ContainerRunnerImpl extends CompositeService implements
ContainerRu
         new QueryIdentifier(request.getQueryIdentifier().getApplicationIdString(),
             request.getQueryIdentifier().getDagIndex());
     LOG.info("Processing queryComplete notification for {}", queryIdentifier);
-    List<QueryFragmentInfo> knownFragments = queryTracker.queryComplete(
-        queryIdentifier, request.getDeleteDelay(), false);
-    LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
+    QueryInfo queryInfo = queryTracker.queryComplete(queryIdentifier, request.getDeleteDelay(),
false);
+    if (queryInfo != null) {
+      List<QueryFragmentInfo> knownFragments = queryInfo.getRegisteredFragments();
+      LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
         knownFragments.size());
-    for (QueryFragmentInfo fragmentInfo : knownFragments) {
-      LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier,
+      for (QueryFragmentInfo fragmentInfo : knownFragments) {
+        LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier,
           fragmentInfo.getFragmentIdentifierString());
-      executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
+        executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
+      }
+      LlapNodeId amNodeId = queryInfo.getAmNodeId();
+      amReporter.queryComplete(amNodeId);
     }
     return QueryCompleteResponseProto.getDefaultInstance();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/badea3fa/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 5f0271f..90641d4 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
@@ -56,6 +57,7 @@ public class QueryInfo {
   private final String[] localDirsBase;
   private final FileSystem localFs;
   private String[] localDirs;
+  private final LlapNodeId amNodeId;
   // Map of states for different vertices.
 
   private final Set<QueryFragmentInfo> knownFragments =
@@ -68,11 +70,11 @@ public class QueryInfo {
   private final AtomicReference<UserGroupInformation> umbilicalUgi;
 
   public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString,
-                   String dagName, String hiveQueryIdString,
-                   int dagIdentifier, String user,
-                   ConcurrentMap<String, SourceStateProto> sourceStateMap,
-                   String[] localDirsBase, FileSystem localFs, String tokenUserName,
-                   String tokenAppId) {
+    String dagName, String hiveQueryIdString,
+    int dagIdentifier, String user,
+    ConcurrentMap<String, SourceStateProto> sourceStateMap,
+    String[] localDirsBase, FileSystem localFs, String tokenUserName,
+    String tokenAppId, final LlapNodeId amNodeId) {
     this.queryIdentifier = queryIdentifier;
     this.appIdString = appIdString;
     this.dagIdString = dagIdString;
@@ -86,6 +88,7 @@ public class QueryInfo {
     this.tokenUserName = tokenUserName;
     this.appId = tokenAppId;
     this.umbilicalUgi = new AtomicReference<>();
+    this.amNodeId = amNodeId;
   }
 
   public QueryIdentifier getQueryIdentifier() {
@@ -116,6 +119,10 @@ public class QueryInfo {
     return sourceStateMap;
   }
 
+  public LlapNodeId getAmNodeId() {
+    return amNodeId;
+  }
+
   public QueryFragmentInfo registerFragment(String vertexName, int fragmentNumber,
       int attemptNumber, SignableVertexSpec vertexSpec, String fragmentIdString) {
     QueryFragmentInfo fragmentInfo = new QueryFragmentInfo(

http://git-wip-us.apache.org/repos/asf/hive/blob/badea3fa/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 5cf3a38..7e646c5 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker;
 import org.apache.hadoop.hive.llap.log.LogHelpers;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -69,8 +70,6 @@ public class QueryTracker extends AbstractService {
 
   private final ConcurrentHashMap<QueryIdentifier, QueryInfo> queryInfoMap = new ConcurrentHashMap<>();
 
-
-
   private final String[] localDirsBase;
   private final FileSystem localFs;
   private final String clusterId;
@@ -137,9 +136,10 @@ public class QueryTracker extends AbstractService {
    * Register a new fragment for a specific query
    */
   QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString,
String dagIdString,
-      String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int
fragmentNumber, int attemptNumber,
-      String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken,
-      String fragmentIdString, LlapTokenInfo tokenInfo, String amHost, int amPort) throws
IOException {
+    String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber,
+    int attemptNumber,
+    String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken,
+    String fragmentIdString, LlapTokenInfo tokenInfo, final LlapNodeId amNodeId) throws IOException
{
 
     ReadWriteLock dagLock = getDagLock(queryIdentifier);
     // Note: This is a readLock to prevent a race with queryComplete. Operations
@@ -169,13 +169,13 @@ public class QueryTracker extends AbstractService {
             new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString,
                 dagIdentifier, user,
                 getSourceCompletionMap(queryIdentifier), localDirsBase, localFs,
-                tokenInfo.userName, tokenInfo.appId);
+                tokenInfo.userName, tokenInfo.appId, amNodeId);
         QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo);
         if (old != null) {
           queryInfo = old;
         } else {
           // Ensure the UGI is setup once.
-          queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amHost, amPort);
+          queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amNodeId.getHostname(),
amNodeId.getPort());
           isExistingQueryInfo = false;
         }
       }
@@ -238,7 +238,7 @@ public class QueryTracker extends AbstractService {
    * @param queryIdentifier
    * @param deleteDelay
    */
-  List<QueryFragmentInfo> queryComplete(QueryIdentifier queryIdentifier, long deleteDelay,
+  QueryInfo queryComplete(QueryIdentifier queryIdentifier, long deleteDelay,
       boolean isInternal) throws IOException {
     if (deleteDelay == -1) {
       deleteDelay = defaultDeleteDelaySeconds;
@@ -255,7 +255,7 @@ public class QueryTracker extends AbstractService {
       if (queryInfo == null) {
         // Should not happen.
         LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier);
-        return Collections.emptyList();
+        return null;
       }
       String[] localDirs = queryInfo.getLocalDirsNoCreate();
       if (localDirs != null) {
@@ -292,7 +292,7 @@ public class QueryTracker extends AbstractService {
       if (savedQueryId != null) {
         ObjectCacheFactory.removeLlapQueryCache(savedQueryId);
       }
-      return queryInfo.getRegisteredFragments();
+      return queryInfo;
     } finally {
       dagLock.writeLock().unlock();
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/badea3fa/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index ae3328a..d1fce19 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -93,7 +93,7 @@ public class TaskExecutorTestHelpers {
         new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name",
             "fakeHiveQueryId", 1, "fakeUser",
             new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(),
-            new String[0], null, "fakeUser", null);
+            new String[0], null, "fakeUser", null, null);
     return queryInfo;
   }
 


Mime
View raw message