hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject hive git commit: HIVE-18071 : add HS2 jmx information about pools and current resource plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Date Thu, 16 Nov 2017 23:24:49 GMT
Repository: hive
Updated Branches:
  refs/heads/master 254e2169c -> 477e9c5ff


HIVE-18071 : add HS2 jmx information about pools and current resource plan (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/477e9c5f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/477e9c5f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/477e9c5f

Branch: refs/heads/master
Commit: 477e9c5ff94da173a4f9e641f206f81814635dfb
Parents: 254e216
Author: sergey <sershe@apache.org>
Authored: Thu Nov 16 15:24:38 2017 -0800
Committer: sergey <sershe@apache.org>
Committed: Thu Nov 16 15:24:38 2017 -0800

----------------------------------------------------------------------
 .../llap/daemon/impl/TaskExecutorService.java   |   2 +-
 .../ql/exec/tez/GuaranteedTasksAllocator.java   |   1 +
 .../hadoop/hive/ql/exec/tez/WmTezSession.java   |   7 ++
 .../hive/ql/exec/tez/WorkloadManager.java       | 100 ++++++++++++++++---
 .../hive/ql/exec/tez/WorkloadManagerMxBean.java |  17 ++++
 .../apache/hive/service/server/HiveServer2.java |   1 +
 6 files changed, 114 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/477e9c5f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index b2a778d..c0356af 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -276,7 +276,7 @@ public class TaskExecutorService extends AbstractService
     if (c != null && c.getVertexSpec() != null) {
       SignableVertexSpec fs = c.getVertexSpec();
       value.append(isFirst ? " (" : ", ").append(c.getQueryId())
-        .append("/").append(fs.getVertexName());
+        .append("/").append(fs.getVertexName()).append(c.isGuaranteed() ? ", guaranteed"
: "");
       isFirst = false;
     }
     value.append(isFirst ? " (" : ", ");

http://git-wip-us.apache.org/repos/asf/hive/blob/477e9c5f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
index a1775cd..d6d6f07 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java
@@ -167,6 +167,7 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager
{
     public void setResponse(UpdateQueryResponseProto response) {
       int nextUpdate = session.setSentGuaranteed();
       if (nextUpdate >= 0) {
+        LOG.info("Sending a new update " + nextUpdate + " to " + session + " in the response");
         updateSessionAsync(session, nextUpdate);
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/477e9c5f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
index 093e8fe..96d70c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java
@@ -161,6 +161,13 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode
     }
   }
 
+  public String getAllocationState() {
+    synchronized (actualState) {
+      return "actual/target " + actualState.sent + "/" + actualState.target
+          + (actualState.sending >= 0 ? "; sending" : "");
+    }
+  }
+
   int setSentGuaranteed() {
     // Only one send can be active at the same time.
     synchronized (actualState) {

http://git-wip-us.apache.org/repos/asf/hive/blob/477e9c5f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 1fe5859..3990f95 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -18,6 +18,20 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import com.google.common.collect.Lists;
+
+import java.util.concurrent.ExecutionException;
+
+import java.util.Collection;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -39,7 +53,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -63,14 +76,6 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 
 /** Workload management entry point for HS2.
  * Note on how this class operates.
@@ -84,7 +89,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  * with background operations like init, need to go thru eventstate; see e.g. returnAfterUse.
  */
 public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator
-  implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl {
+  implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl,
+             WorkloadManagerMxBean {
   private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class);
   private static final char POOL_SEPARATOR = '.';
   private static final String POOL_SEPARATOR_STR = "" + POOL_SEPARATOR;
@@ -107,6 +113,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
 
   // The below group of fields (pools, etc.) can only be modified by the master thread.
   private Map<String, PoolState> pools;
+  private String rpName, defaultPool; // For information only.
   private int totalQueryParallelism;
   /**
    * The queries being killed. This is used to sync between the background kill finishing
and the
@@ -208,7 +215,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     TriggerActionHandler triggerActionHandler = new KillMoveTriggerActionHandler(this);
     triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(perPoolProviders, triggerActionHandler,
       triggerValidationIntervalMs);
-    startTriggerValidator(triggerValidationIntervalMs);
+    startTriggerValidator(triggerValidationIntervalMs); // TODO: why is this not in start
+
+    org.apache.hadoop.metrics2.util.MBeans.register("HiveServer2", "WorkloadManager", this);
   }
 
   private static int determineQueryParallelism(WMFullResourcePlan plan) {
@@ -285,6 +294,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     private WMFullResourcePlan resourcePlanToApply = null;
     private boolean hasClusterStateChanged = false;
     private SettableFuture<Boolean> testEvent, applyRpFuture;
+    private SettableFuture<List<String>> dumpStateFuture;
     private final List<MoveSession> moveSessions = new LinkedList<>();
   }
 
@@ -601,6 +611,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       processPoolChangesOnMasterThread(poolName, syncWork, hasRequeues);
     }
 
+
     // 12. Save state for future iterations.
     for (KillQueryContext killCtx : syncWork.toKillQuery.values()) {
       if (killQueryInProgress.put(killCtx.session, killCtx) != null) {
@@ -609,6 +620,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
 
     // 13. Notify tests and global async ops.
+    if (e.dumpStateFuture != null) {
+      List<String> result = new ArrayList<>();
+      result.add("RESOURCE PLAN " + rpName + "; default pool " + defaultPool);
+      for (PoolState ps : pools.values()) {
+        dumpPoolState(ps, result);
+      }
+      e.dumpStateFuture.set(result);
+      e.dumpStateFuture = null;
+    }
     if (e.testEvent != null) {
       e.testEvent.set(true);
       e.testEvent = null;
@@ -619,6 +639,31 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
+  private void dumpPoolState(PoolState ps, List<String> set) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("POOL ").append(ps.fullName).append(": qp ").append(ps.queryParallelism).append(",
%% ")
+      .append(ps.finalFraction).append(", sessions: ").append(ps.sessions.size())
+      .append(", initializing: ").append(ps.initializingSessions.size()).append(", queued:
").append(ps.queue.size());
+    set.add(sb.toString());
+    sb.setLength(0);
+    for (WmTezSession session : ps.sessions) {
+      sb.append("RUNNING: ").append(session.getClusterFraction()).append(" (")
+        .append(session.getAllocationState()).append(") => ").append(session.getSessionId());
+      set.add(sb.toString());
+      sb.setLength(0);
+    }
+    for (SessionInitContext session : ps.initializingSessions) {
+      sb.append("INITIALIZING: state ").append(session.state);
+      set.add(sb.toString());
+      sb.setLength(0);
+    }
+    for (GetRequest session : ps.queue) {
+      sb.append("QUEUED: from ").append(session.mappingInput);
+      set.add(sb.toString());
+      sb.setLength(0);
+    }
+  }
+
   private void handleMoveSessionOnMasterThread(final MoveSession moveSession,
     final WmThreadSyncWork syncWork,
     final HashSet<String> poolsToRedistribute) {
@@ -817,8 +862,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     // FIXME: Add Triggers from metastore to poolstate
     // Note: we assume here that plan has been validated beforehand, so we don't verify
     //       that fractions or query parallelism add up, etc.
-    this.userPoolMapping = new UserPoolMapping(e.resourcePlanToApply.getMappings(),
-        e.resourcePlanToApply.getPlan().getDefaultPoolPath());
+    this.rpName = e.resourcePlanToApply.getPlan().getName();
+    this.defaultPool = e.resourcePlanToApply.getPlan().getDefaultPoolPath();
+    this.userPoolMapping = new UserPoolMapping(e.resourcePlanToApply.getMappings(), defaultPool);
     Map<String, PoolState> oldPools = pools;
     pools = new HashMap<>();
 
@@ -1254,6 +1300,29 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
     }
   }
 
+
+  @Override
+  public List<String> getWmStateDescription() {
+    Future<List<String>> future = null;
+    currentLock.lock();
+    try {
+      if (current.dumpStateFuture != null) {
+        future = current.dumpStateFuture;
+      } else {
+        future = current.dumpStateFuture = SettableFuture.create();
+        notifyWmThreadUnderLock();
+      }
+    } finally {
+      currentLock.unlock();
+    }
+    try {
+      return future.get();
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error("Error getting description", e);
+      return Lists.newArrayList("Error: " + e.toString());
+    }
+  }
+
   private void addKillQueryResult(WmTezSession toKill, boolean success) {
     currentLock.lock();
     try {
@@ -1753,6 +1822,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
         lock.unlock();
       }
     }
+
+    @Override
+    public String toString() {
+      return "[state=" + state + ", session=" + session + "]";
+    }
   }
 
   boolean isManaged(MappingInput input) {

http://git-wip-us.apache.org/repos/asf/hive/blob/477e9c5f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerMxBean.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerMxBean.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerMxBean.java
new file mode 100644
index 0000000..1158dd1
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerMxBean.java
@@ -0,0 +1,17 @@
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.util.List;
+
+import javax.management.MXBean;
+
+/**
+ * MXbean to expose cache allocator related information through JMX.
+ */
+@MXBean
+public interface WorkloadManagerMxBean {
+  /**
+   * @return The text-based description of current WM state.
+   */
+  public List<String> getWmStateDescription();
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/477e9c5f/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index c8516fe..5a6d0c8 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -279,6 +279,7 @@ public class HiveServer2 extends CompositeService {
           }
           builder.addServlet("llap", LlapServlet.class);
           builder.setContextRootRewriteTarget("/hiveserver2.jsp");
+
           webServer = builder.build();
           webServer.addServlet("query_page", "/query_page", QueryProfileServlet.class);
         }


Mime
View raw message