Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E5D0D200D4E for ; Fri, 17 Nov 2017 00:24:51 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E43A9160BF4; Thu, 16 Nov 2017 23:24:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DB96E160BEA for ; Fri, 17 Nov 2017 00:24:50 +0100 (CET) Received: (qmail 51192 invoked by uid 500); 16 Nov 2017 23:24:50 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 51181 invoked by uid 99); 16 Nov 2017 23:24:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Nov 2017 23:24:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E7061F5F06; Thu, 16 Nov 2017 23:24:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sershe@apache.org To: commits@hive.apache.org Message-Id: <10783ab3646a48aba25ac80347c4d755@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-18071 : add HS2 jmx information about pools and current resource plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Date: Thu, 16 Nov 2017 23:24:49 +0000 (UTC) archived-at: Thu, 16 Nov 2017 23:24:52 -0000 Repository: hive Updated Branches: refs/heads/master 254e2169c -> 477e9c5ff HIVE-18071 : add HS2 jmx information about pools and current resource plan (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/477e9c5f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/477e9c5f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/477e9c5f Branch: refs/heads/master Commit: 477e9c5ff94da173a4f9e641f206f81814635dfb Parents: 254e216 Author: sergey Authored: Thu Nov 16 15:24:38 2017 -0800 Committer: sergey Committed: Thu Nov 16 15:24:38 2017 -0800 ---------------------------------------------------------------------- .../llap/daemon/impl/TaskExecutorService.java | 2 +- .../ql/exec/tez/GuaranteedTasksAllocator.java | 1 + .../hadoop/hive/ql/exec/tez/WmTezSession.java | 7 ++ .../hive/ql/exec/tez/WorkloadManager.java | 100 ++++++++++++++++--- .../hive/ql/exec/tez/WorkloadManagerMxBean.java | 17 ++++ .../apache/hive/service/server/HiveServer2.java | 1 + 6 files changed, 114 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/477e9c5f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index b2a778d..c0356af 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -276,7 +276,7 @@ public class TaskExecutorService extends AbstractService if (c != null && c.getVertexSpec() != null) { SignableVertexSpec fs = c.getVertexSpec(); value.append(isFirst ? " (" : ", ").append(c.getQueryId()) - .append("/").append(fs.getVertexName()); + .append("/").append(fs.getVertexName()).append(c.isGuaranteed() ? ", guaranteed" : ""); isFirst = false; } value.append(isFirst ? " (" : ", "); http://git-wip-us.apache.org/repos/asf/hive/blob/477e9c5f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java index a1775cd..d6d6f07 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/GuaranteedTasksAllocator.java @@ -167,6 +167,7 @@ public class GuaranteedTasksAllocator implements QueryAllocationManager { public void setResponse(UpdateQueryResponseProto response) { int nextUpdate = session.setSentGuaranteed(); if (nextUpdate >= 0) { + LOG.info("Sending a new update " + nextUpdate + " to " + session + " in the response"); updateSessionAsync(session, nextUpdate); } } http://git-wip-us.apache.org/repos/asf/hive/blob/477e9c5f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java index 093e8fe..96d70c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WmTezSession.java @@ -161,6 +161,13 @@ public class WmTezSession extends TezSessionPoolSession implements AmPluginNode } } + public String getAllocationState() { + synchronized (actualState) { + return "actual/target " + actualState.sent + "/" + actualState.target + + (actualState.sending >= 0 ? "; sending" : ""); + } + } + int setSentGuaranteed() { // Only one send can be active at the same time. synchronized (actualState) { http://git-wip-us.apache.org/repos/asf/hive/blob/477e9c5f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index 1fe5859..3990f95 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -18,6 +18,20 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import com.google.common.collect.Lists; + +import java.util.concurrent.ExecutionException; + +import java.util.Collection; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -39,7 +53,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; - import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -63,14 +76,6 @@ import org.apache.tez.dag.api.TezConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - /** Workload management entry point for HS2. * Note on how this class operates. @@ -84,7 +89,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * with background operations like init, need to go thru eventstate; see e.g. returnAfterUse. */ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValidator - implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl { + implements TezSessionPoolSession.Manager, SessionExpirationTracker.RestartImpl, + WorkloadManagerMxBean { private static final Logger LOG = LoggerFactory.getLogger(WorkloadManager.class); private static final char POOL_SEPARATOR = '.'; private static final String POOL_SEPARATOR_STR = "" + POOL_SEPARATOR; @@ -107,6 +113,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida // The below group of fields (pools, etc.) can only be modified by the master thread. private Map pools; + private String rpName, defaultPool; // For information only. private int totalQueryParallelism; /** * The queries being killed. This is used to sync between the background kill finishing and the @@ -208,7 +215,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida TriggerActionHandler triggerActionHandler = new KillMoveTriggerActionHandler(this); triggerValidatorRunnable = new PerPoolTriggerValidatorRunnable(perPoolProviders, triggerActionHandler, triggerValidationIntervalMs); - startTriggerValidator(triggerValidationIntervalMs); + startTriggerValidator(triggerValidationIntervalMs); // TODO: why is this not in start + + org.apache.hadoop.metrics2.util.MBeans.register("HiveServer2", "WorkloadManager", this); } private static int determineQueryParallelism(WMFullResourcePlan plan) { @@ -285,6 +294,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida private WMFullResourcePlan resourcePlanToApply = null; private boolean hasClusterStateChanged = false; private SettableFuture testEvent, applyRpFuture; + private SettableFuture> dumpStateFuture; private final List moveSessions = new LinkedList<>(); } @@ -601,6 +611,7 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida processPoolChangesOnMasterThread(poolName, syncWork, hasRequeues); } + // 12. Save state for future iterations. for (KillQueryContext killCtx : syncWork.toKillQuery.values()) { if (killQueryInProgress.put(killCtx.session, killCtx) != null) { @@ -609,6 +620,15 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } // 13. Notify tests and global async ops. + if (e.dumpStateFuture != null) { + List result = new ArrayList<>(); + result.add("RESOURCE PLAN " + rpName + "; default pool " + defaultPool); + for (PoolState ps : pools.values()) { + dumpPoolState(ps, result); + } + e.dumpStateFuture.set(result); + e.dumpStateFuture = null; + } if (e.testEvent != null) { e.testEvent.set(true); e.testEvent = null; @@ -619,6 +639,31 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } } + private void dumpPoolState(PoolState ps, List set) { + StringBuilder sb = new StringBuilder(); + sb.append("POOL ").append(ps.fullName).append(": qp ").append(ps.queryParallelism).append(", %% ") + .append(ps.finalFraction).append(", sessions: ").append(ps.sessions.size()) + .append(", initializing: ").append(ps.initializingSessions.size()).append(", queued: ").append(ps.queue.size()); + set.add(sb.toString()); + sb.setLength(0); + for (WmTezSession session : ps.sessions) { + sb.append("RUNNING: ").append(session.getClusterFraction()).append(" (") + .append(session.getAllocationState()).append(") => ").append(session.getSessionId()); + set.add(sb.toString()); + sb.setLength(0); + } + for (SessionInitContext session : ps.initializingSessions) { + sb.append("INITIALIZING: state ").append(session.state); + set.add(sb.toString()); + sb.setLength(0); + } + for (GetRequest session : ps.queue) { + sb.append("QUEUED: from ").append(session.mappingInput); + set.add(sb.toString()); + sb.setLength(0); + } + } + private void handleMoveSessionOnMasterThread(final MoveSession moveSession, final WmThreadSyncWork syncWork, final HashSet poolsToRedistribute) { @@ -817,8 +862,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida // FIXME: Add Triggers from metastore to poolstate // Note: we assume here that plan has been validated beforehand, so we don't verify // that fractions or query parallelism add up, etc. - this.userPoolMapping = new UserPoolMapping(e.resourcePlanToApply.getMappings(), - e.resourcePlanToApply.getPlan().getDefaultPoolPath()); + this.rpName = e.resourcePlanToApply.getPlan().getName(); + this.defaultPool = e.resourcePlanToApply.getPlan().getDefaultPoolPath(); + this.userPoolMapping = new UserPoolMapping(e.resourcePlanToApply.getMappings(), defaultPool); Map oldPools = pools; pools = new HashMap<>(); @@ -1254,6 +1300,29 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida } } + + @Override + public List getWmStateDescription() { + Future> future = null; + currentLock.lock(); + try { + if (current.dumpStateFuture != null) { + future = current.dumpStateFuture; + } else { + future = current.dumpStateFuture = SettableFuture.create(); + notifyWmThreadUnderLock(); + } + } finally { + currentLock.unlock(); + } + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Error getting description", e); + return Lists.newArrayList("Error: " + e.toString()); + } + } + private void addKillQueryResult(WmTezSession toKill, boolean success) { currentLock.lock(); try { @@ -1753,6 +1822,11 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida lock.unlock(); } } + + @Override + public String toString() { + return "[state=" + state + ", session=" + session + "]"; + } } boolean isManaged(MappingInput input) { http://git-wip-us.apache.org/repos/asf/hive/blob/477e9c5f/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerMxBean.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerMxBean.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerMxBean.java new file mode 100644 index 0000000..1158dd1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManagerMxBean.java @@ -0,0 +1,17 @@ +package org.apache.hadoop.hive.ql.exec.tez; + +import java.util.List; + +import javax.management.MXBean; + +/** + * MXbean to expose cache allocator related information through JMX. + */ +@MXBean +public interface WorkloadManagerMxBean { + /** + * @return The text-based description of current WM state. + */ + public List getWmStateDescription(); +} + http://git-wip-us.apache.org/repos/asf/hive/blob/477e9c5f/service/src/java/org/apache/hive/service/server/HiveServer2.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index c8516fe..5a6d0c8 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -279,6 +279,7 @@ public class HiveServer2 extends CompositeService { } builder.addServlet("llap", LlapServlet.class); builder.setContextRootRewriteTarget("/hiveserver2.jsp"); + webServer = builder.build(); webServer.addServlet("query_page", "/query_page", QueryProfileServlet.class); }