Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 83BB1200C36 for ; Fri, 24 Feb 2017 02:08:41 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 82609160B80; Fri, 24 Feb 2017 01:08:41 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A5C75160B64 for ; Fri, 24 Feb 2017 02:08:40 +0100 (CET) Received: (qmail 34431 invoked by uid 500); 24 Feb 2017 01:08:35 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 32003 invoked by uid 99); 24 Feb 2017 01:08:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Feb 2017 01:08:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E6899DFF7C; Fri, 24 Feb 2017 01:08:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sershe@apache.org To: commits@hive.apache.org Date: Fri, 24 Feb 2017 01:09:02 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [30/50] [abbrv] hive git commit: HIVE-15971: LLAP: logs urls should use daemon container id instead of fake container id archived-at: Fri, 24 Feb 2017 01:08:41 -0000 HIVE-15971: LLAP: logs urls should use daemon container id instead of fake container id Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d5bb76cf Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d5bb76cf Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d5bb76cf Branch: refs/heads/hive-14535 Commit: d5bb76cf2da3934d1de6b3087ac4bfafa2b2cb6f Parents: de532b1 Author: Prasanth Jayachandran Authored: Tue Feb 21 14:25:47 2017 -0800 Committer: Prasanth Jayachandran Committed: Tue Feb 21 14:25:47 2017 -0800 ---------------------------------------------------------------------- .../llap/registry/impl/LlapRegistryService.java | 13 +-- .../llap/tezplugins/LlapTaskCommunicator.java | 91 +++++++++++--------- 2 files changed, 57 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d5bb76cf/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 5a94db9..610c0a5 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet; import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener; import org.apache.hadoop.hive.llap.registry.ServiceRegistry; +import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.slf4j.Logger; @@ -57,17 +58,17 @@ public class LlapRegistryService extends AbstractService { String hosts = HiveConf.getTrimmedVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS); Preconditions.checkNotNull(hosts, ConfVars.LLAP_DAEMON_SERVICE_HOSTS.toString() + " must be defined"); LlapRegistryService registry; - // TODO: this is not going to work with multiple users. if (hosts.startsWith("@")) { // Caching instances only in case of the YARN registry. Each host based list will get it's own copy. - String name = hosts.substring(1); - if (yarnRegistries.containsKey(name) && yarnRegistries.get(name).isInState(STATE.STARTED)) { - registry = yarnRegistries.get(name); - } else { + String appName = hosts.substring(1); + String userName = HiveConf.getVar(conf, ConfVars.LLAP_ZK_REGISTRY_USER, RegistryUtils.currentUser()); + String key = appName + "-" + userName; + registry = yarnRegistries.get(key); + if (registry == null || !registry.isInState(STATE.STARTED)) { registry = new LlapRegistryService(false); registry.init(conf); registry.start(); - yarnRegistries.put(name, registry); + yarnRegistries.put(key, registry); } } else { registry = new LlapRegistryService(false); http://git-wip-us.apache.org/repos/asf/hive/blob/d5bb76cf/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 3aae7a4..e593b33 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -14,12 +14,12 @@ package org.apache.hadoop.hive.llap.tezplugins; +import org.apache.hadoop.hive.llap.registry.ServiceInstance; import org.apache.hadoop.io.Writable; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol.TezAttemptArray; import java.io.IOException; -import java.net.URI; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Map; @@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.tez.common.TezTaskUmbilicalProtocol; import org.apache.tez.common.TezUtils; import org.apache.tez.common.security.JobTokenSecretManager; @@ -119,7 +119,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { private final Token token; private final String user; private String amHost; - private URI timelineServerUri; + private String timelineServerUri; + private int nmPort; // These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats. // Primarily for debugging purposes a.t.m, since there's some unexplained TASK_TIMEOUTS which are currently being observed. @@ -149,7 +150,6 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { Preconditions.checkState((token != null) == UserGroupInformation.isSecurityEnabled()); // Not closing this at the moment at shutdown, since this could be a shared instance. - // TODO: this is unused. serviceRegistry = LlapRegistryService.getClient(conf); umbilical = new LlapTaskUmbilicalProtocolImpl(getUmbilical()); @@ -191,18 +191,10 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { + "fileCleanupDelay=" + deleteDelayOnDagComplete + ", numCommunicatorThreads=" + numThreads); this.communicator.init(conf); - if (YarnConfiguration.useHttps(conf)) { - timelineServerUri = URI - .create(JOINER.join("https://", conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS), - RESOURCE_URI_STR)); - } else { - timelineServerUri = URI.create(JOINER.join("http://", conf.get( - YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, - YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS), - RESOURCE_URI_STR)); - } + String scheme = WebAppUtils.getHttpSchemePrefix(conf); + String ahsUrl = WebAppUtils.getAHSWebAppURLWithoutScheme(conf); + this.timelineServerUri = WebAppUtils.getURLWithScheme(scheme, ahsUrl); + this.nmPort = Integer.valueOf(WebAppUtils.getNMWebAppURLWithoutScheme(conf).split(":")[1]); } @Override @@ -540,37 +532,54 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { @Override public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { - String url = ""; - if (timelineServerUri != null && containerNodeId != null) { - LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort()); - BiMap biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId); - ContainerId containerId = biMap.inverse().get(attemptID); - if (containerId != null) { - String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); - String filename = currentHiveQueryId + "-" + dagId + ".log"; - // YARN-6011 provides a webservice to get the logs - url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs", - filename); - } - } - return url; + return constructLogUrl(attemptID, containerNodeId, false); } @Override public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) { - String url = ""; - if (timelineServerUri != null && containerNodeId != null) { - LlapNodeId llapNodeId = LlapNodeId.getInstance(containerNodeId.getHost(), containerNodeId.getPort()); - BiMap biMap = entityTracker.getContainerAttemptMapForNode(llapNodeId); - ContainerId containerId = biMap.inverse().get(attemptID); - if (containerId != null) { - String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); - String filename = currentHiveQueryId + "-" + dagId + ".log.done"; - // YARN-6011 provides a webservice to get the logs - url = PATH_JOINER.join(timelineServerUri.toString(), "containers", containerId.toString(), "logs", - filename); + return constructLogUrl(attemptID, containerNodeId, true); + } + + private String constructLogUrl(final TezTaskAttemptID attemptID, final NodeId containerNodeId, final boolean isDone) { + if (timelineServerUri == null || containerNodeId == null) { + return null; + } + Set instanceSet; + try { + instanceSet = serviceRegistry.getInstances().getByHost(containerNodeId.getHost()); + } catch (IOException e) { + // Not failing the job due to a failure constructing the log url + LOG.warn( + "Unable to find instance for yarnNodeId={} to construct the log url. Exception message={}", + containerNodeId, e.getMessage()); + return null; + } + if (instanceSet != null) { + ServiceInstance matchedInstance = null; + for (ServiceInstance instance : instanceSet) { + if (instance.getRpcPort() == containerNodeId.getPort()) { + matchedInstance = instance; + break; + } + } + if (matchedInstance != null) { + String containerIdString = matchedInstance.getProperties() + .get(HiveConf.ConfVars.LLAP_DAEMON_CONTAINER_ID.varname); + if (containerIdString != null) { + return constructLlapLogUrl(attemptID, containerIdString, isDone, containerNodeId.getHost()); + } } } + return null; + } + + private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final String containerIdString, + final boolean isDone, final String nmHost) { + String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString(); + String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log", (isDone ? ".done" : ""), + "?nm.id=", nmHost, ":", nmPort); + String url = PATH_JOINER.join(timelineServerUri, "ws", "v1", "applicationhistory", "containers", + containerIdString, "logs", filename); return url; }