hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [45/50] [abbrv] hive git commit: HIVE-16094. queued containers may timeout if they don't get to run for a long time.
Date Tue, 28 Mar 2017 22:32:13 GMT
HIVE-16094. queued containers may timeout if they don't get to run for a long time.

Change-Id: Ieb412a66dbe53c6709f7bd840b3dfa543225d826
(cherry picked from commit 3def1d7f19982b4a710e7cb867a8b6b7bbf8fb97)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f3c949c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f3c949c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f3c949c

Branch: refs/heads/branch-2.2
Commit: 2f3c949c20ca51ac835bedd9bff2ac1c07e000a1
Parents: 29db808
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Mar 2 22:27:39 2017 -0800
Committer: Owen O'Malley <omalley@apache.org>
Committed: Tue Mar 28 15:27:57 2017 -0700

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/AMReporter.java       | 95 ++++++++++++--------
 .../llap/daemon/impl/ContainerRunnerImpl.java   |  3 +-
 .../llap/daemon/impl/TaskRunnerCallable.java    |  7 +-
 .../llap/tezplugins/LlapTaskCommunicator.java   |  2 +-
 4 files changed, 64 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2f3c949c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index b01a495..af4a1f2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -100,7 +100,7 @@ public class AMReporter extends AbstractService {
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
   // Tracks appMasters to which heartbeats are being sent. This should not be used for any
other
   // messages like taskKilled, etc.
-  private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>();
+  private final Map<QueryIdentifier, AMNodeInfo> knownAppMasters = new HashMap<>();
   volatile ListenableFuture<Void> queueLookupFuture;
   private final DaemonId daemonId;
 
@@ -186,35 +186,42 @@ public class AMReporter extends AbstractService {
   public void registerTask(String amLocation, int port, String user,
                            Token<JobTokenIdentifier> jobToken, QueryIdentifier queryIdentifier)
{
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for queryIdentifier="
+ queryIdentifier);
+      LOG.trace(
+          "Registering for heartbeat: {}, queryIdentifier={}",
+          (amLocation + ":" + port), queryIdentifier);
     }
     AMNodeInfo amNodeInfo;
+
+    // Since we don't have an explicit AM end signal yet - we're going to create
+    // and discard AMNodeInfo instances per query.
     synchronized (knownAppMasters) {
       LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
-      amNodeInfo = knownAppMasters.get(amNodeId);
+      amNodeInfo = knownAppMasters.get(queryIdentifier);
       if (amNodeInfo == null) {
         amNodeInfo =
             new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout,
socketFactory,
                 conf);
-        knownAppMasters.put(amNodeId, amNodeInfo);
+        knownAppMasters.put(queryIdentifier, amNodeInfo);
         // Add to the queue only the first time this is registered, and on
         // subsequent instances when it's taken off the queue.
         amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + heartbeatInterval);
         pendingHeartbeatQueeu.add(amNodeInfo);
+        // AMNodeInfo will only be cleared when a queryComplete is received for this query,
or
+        // when we detect a failure on the AM side (failure to heartbeat).
+        // A single queueLookupCallable is added here. We have to make sure one instance
stays
+        // in the queue till the query completes.
       }
-      amNodeInfo.setCurrentQueryIdentifier(queryIdentifier);
       amNodeInfo.incrementAndGetTaskCount();
     }
   }
 
-  public void unregisterTask(String amLocation, int port) {
+  public void unregisterTask(String amLocation, int port, QueryIdentifier queryIdentifier)
{
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Un-registering for heartbeat: " + amLocation + ":" + port);
+      LOG.trace("Un-registering for heartbeat: {}", (amLocation + ":" + port));
     }
     AMNodeInfo amNodeInfo;
-    LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
     synchronized (knownAppMasters) {
-      amNodeInfo = knownAppMasters.get(amNodeId);
+      amNodeInfo = knownAppMasters.get(queryIdentifier);
       if (amNodeInfo == null) {
         LOG.info(("Ignoring duplicate unregisterRequest for am at: " + amLocation + ":" +
port));
       } else {
@@ -230,7 +237,7 @@ public class AMReporter extends AbstractService {
     LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
     AMNodeInfo amNodeInfo;
     synchronized (knownAppMasters) {
-      amNodeInfo = knownAppMasters.get(amNodeId);
+      amNodeInfo = knownAppMasters.get(queryIdentifier);
       if (amNodeInfo == null) {
         amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy,
retryTimeout, socketFactory,
           conf);
@@ -255,10 +262,17 @@ public class AMReporter extends AbstractService {
     });
   }
 
-  public void queryComplete(LlapNodeId llapNodeId) {
-    if (llapNodeId != null) {
+  public void queryComplete(QueryIdentifier queryIdentifier) {
+    if (queryIdentifier != null) {
       synchronized (knownAppMasters) {
-        AMNodeInfo amNodeInfo = knownAppMasters.remove(llapNodeId);
+        AMNodeInfo amNodeInfo = knownAppMasters.remove(queryIdentifier);
+
+        // The AM can be used for multiple queries. This is an indication that a single query
is complete.
+        // We don't have a good mechanism to know when an app ends. Removing this right now
ensures
+        // that a new one gets created for the next query on the same AM.
+        if (amNodeInfo != null) {
+          amNodeInfo.setIsDone(true);
+        }
         // TODO: not stopping umbilical explicitly as some taskKill requests may get scheduled
during queryComplete
         // which will be using the umbilical. HIVE-16021 should fix this, until then leave
umbilical open and wait for
         // it to be closed after max idle timeout (10s default)
@@ -276,22 +290,26 @@ public class AMReporter extends AbstractService {
       while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
         try {
           final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take();
-          if (amNodeInfo.hasAmFailed()) {
+          if (amNodeInfo.hasAmFailed() || amNodeInfo.isDone()) {
             synchronized (knownAppMasters) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug(
-                    "Removing am {} with last associated dag {} from heartbeat with taskCount={},
amFailed={}",
-                    amNodeInfo.amNodeId, amNodeInfo.getCurrentQueryIdentifier(), amNodeInfo.getTaskCount(),
-                    amNodeInfo.hasAmFailed(), amNodeInfo);
+                    "Removing am {} with last associated dag {} from heartbeat with taskCount={},
amFailed={}, isDone={}",
+                    amNodeInfo.amNodeId, amNodeInfo.getQueryIdentifier(), amNodeInfo.getTaskCount(),
+                    amNodeInfo.hasAmFailed(), amNodeInfo.isDone());
               }
-              knownAppMasters.remove(amNodeInfo.amNodeId);
+              knownAppMasters.remove(amNodeInfo.getQueryIdentifier());
             }
           } else {
+            // Always re-schedule the next callable - irrespective of task count,
+            // in case new tasks come in later.
+            long next = System.currentTimeMillis() + heartbeatInterval;
+            amNodeInfo.setNextHeartbeatTime(next);
+            pendingHeartbeatQueeu.add(amNodeInfo);
+
+            // Send an actual heartbeat only if the task count is > 0
             if (amNodeInfo.getTaskCount() > 0) {
               // Add back to the queue for the next heartbeat, and schedule the actual heartbeat
-              long next = System.currentTimeMillis() + heartbeatInterval;
-              amNodeInfo.setNextHeartbeatTime(next);
-              pendingHeartbeatQueeu.add(amNodeInfo);
               ListenableFuture<Void> future = executor.submit(new AMHeartbeatCallable(amNodeInfo));
               Futures.addCallback(future, new FutureCallback<Void>() {
                 @Override
@@ -301,9 +319,9 @@ public class AMReporter extends AbstractService {
 
                 @Override
                 public void onFailure(Throwable t) {
-                  QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
+                  QueryIdentifier currentQueryIdentifier = amNodeInfo.getQueryIdentifier();
                   amNodeInfo.setAmFailed(true);
-                  LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}",
+                  LOG.warn("Heartbeat failed to AM {}. Marking query as failed. query={}",
                     amNodeInfo.amNodeId, currentQueryIdentifier, t);
                   queryFailedHandler.queryFailed(currentQueryIdentifier);
                 }
@@ -369,7 +387,7 @@ public class AMReporter extends AbstractService {
           amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()),
               new Text(daemonId.getUniqueNodeIdInCluster()), nodeId.getPort());
         } catch (IOException e) {
-          QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier();
+          QueryIdentifier currentQueryIdentifier = amNodeInfo.getQueryIdentifier();
           amNodeInfo.setAmFailed(true);
           LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for
query {}",
               amNodeInfo.amNodeId, currentQueryIdentifier, e);
@@ -379,11 +397,7 @@ public class AMReporter extends AbstractService {
             LOG.warn("Interrupted while trying to send heartbeat to AM {}", amNodeInfo.amNodeId,
e);
           }
         }
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipping node heartbeat to AM: " + amNodeInfo + ", since ref count is
0");
-        }
-      }
+      } 
       return null;
     }
   }
@@ -400,9 +414,10 @@ public class AMReporter extends AbstractService {
     private final long timeout;
     private final SocketFactory socketFactory;
     private final AtomicBoolean amFailed = new AtomicBoolean(false);
-    private QueryIdentifier currentQueryIdentifier;
+    private final QueryIdentifier queryIdentifier;
     private LlapTaskUmbilicalProtocol umbilical;
     private long nextHeartbeatTime;
+    private final AtomicBoolean isDone = new AtomicBoolean(false);
 
 
     public AMNodeInfo(LlapNodeId amNodeId, String user,
@@ -414,7 +429,7 @@ public class AMReporter extends AbstractService {
                       Configuration conf) {
       this.user = user;
       this.jobToken = jobToken;
-      this.currentQueryIdentifier = currentQueryIdentifier;
+      this.queryIdentifier = currentQueryIdentifier;
       this.retryPolicy = retryPolicy;
       this.timeout = timeout;
       this.socketFactory = socketFactory;
@@ -465,16 +480,20 @@ public class AMReporter extends AbstractService {
       return amFailed.get();
     }
 
-    int getTaskCount() {
-      return taskCount.get();
+    void setIsDone(boolean val) {
+      isDone.set(val);
     }
 
-    public synchronized QueryIdentifier getCurrentQueryIdentifier() {
-      return currentQueryIdentifier;
+    boolean isDone() {
+      return isDone.get();
+    }
+
+    int getTaskCount() {
+      return taskCount.get();
     }
 
-    public synchronized void setCurrentQueryIdentifier(QueryIdentifier queryIdentifier) {
-      this.currentQueryIdentifier = queryIdentifier;
+    public QueryIdentifier getQueryIdentifier() {
+      return queryIdentifier;
     }
 
     synchronized void setNextHeartbeatTime(long nextTime) {
@@ -500,7 +519,7 @@ public class AMReporter extends AbstractService {
 
     @Override
     public String toString() {
-      return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount();
+      return "AMInfo: " + amNodeId + ", taskCount=" + getTaskCount() + ", queryIdentifier="
+ queryIdentifier;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f3c949c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 1176e5e..2b9ece9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -399,8 +399,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
           fragmentInfo.getFragmentIdentifierString());
         executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
       }
-      LlapNodeId amNodeId = queryInfo.getAmNodeId();
-      amReporter.queryComplete(amNodeId);
+      amReporter.queryComplete(queryIdentifier);
     }
     return QueryCompleteResponseProto.getDefaultInstance();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f3c949c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 8739d5b..716d05b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -178,7 +178,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result>
{
       }
 
       // Unregister from the AMReporter, since the task is now running.
-      this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
+      this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort(),
+          fragmentInfo.getQueryInfo().getQueryIdentifier());
 
       synchronized (this) {
         if (!shouldRunTask) {
@@ -326,7 +327,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result>
{
             // If the task hasn't started - inform about fragment completion immediately.
It's possible for
             // the callable to never run.
             fragmentCompletionHanler.fragmentComplete(fragmentInfo);
-            this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort());
+            this.amReporter
+                .unregisterTask(request.getAmHost(), request.getAmPort(),
+                    fragmentInfo.getQueryInfo().getQueryIdentifier());
           }
         }
       } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/2f3c949c/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index c716c5e..f6166a6 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -744,10 +744,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
 
     @Override
     public void nodeHeartbeat(Text hostname, Text uniqueId, int port) throws IOException
{
-      nodePinged(hostname.toString(), uniqueId.toString(), port);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Received heartbeat from [" + hostname + ":" + port +" (" + uniqueId +")]");
       }
+      nodePinged(hostname.toString(), uniqueId.toString(), port);
     }
 
     @Override


Mime
View raw message