Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 12D7A200B56 for ; Sat, 16 Jul 2016 00:49:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 11590160A87; Fri, 15 Jul 2016 22:49:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D3F94160A61 for ; Sat, 16 Jul 2016 00:49:20 +0200 (CEST) Received: (qmail 32084 invoked by uid 500); 15 Jul 2016 22:49:20 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 32067 invoked by uid 99); 15 Jul 2016 22:49:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Jul 2016 22:49:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DE631E0A3F; Fri, 15 Jul 2016 22:49:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: prasanthj@apache.org To: commits@hive.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-9756: LLAP: use log4j 2 for llap (log to separate files, etc.) (Prasanth Jayachandran reviewed by Siddharth Seth) Date: Fri, 15 Jul 2016 22:49:19 +0000 (UTC) archived-at: Fri, 15 Jul 2016 22:49:22 -0000 Repository: hive Updated Branches: refs/heads/master 8c11d370f -> 04597681d HIVE-9756: LLAP: use log4j 2 for llap (log to separate files, etc.) (Prasanth Jayachandran reviewed by Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/04597681 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/04597681 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/04597681 Branch: refs/heads/master Commit: 04597681d296e7121cbd71af6408a11681bd4c80 Parents: 8c11d37 Author: Prasanth Jayachandran Authored: Fri Jul 15 15:49:05 2016 -0700 Committer: Prasanth Jayachandran Committed: Fri Jul 15 15:49:05 2016 -0700 ---------------------------------------------------------------------- llap-server/bin/llap-daemon-env.sh | 2 +- .../llap/daemon/impl/ContainerRunnerImpl.java | 20 +- .../hive/llap/daemon/impl/LlapDaemon.java | 2 + .../daemon/impl/StatsRecordingThreadPool.java | 41 ++++ .../llap/daemon/impl/TaskRunnerCallable.java | 194 ++++++++++--------- .../hive/llap/io/api/impl/LlapInputFormat.java | 11 +- .../resources/llap-daemon-log4j2.properties | 47 ++++- llap-server/src/main/resources/package.py | 2 + llap-server/src/main/resources/templates.py | 2 +- .../hive/llap/tezplugins/LlapTezUtils.java | 14 +- 10 files changed, 227 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/bin/llap-daemon-env.sh ---------------------------------------------------------------------- diff --git a/llap-server/bin/llap-daemon-env.sh b/llap-server/bin/llap-daemon-env.sh index 02c4315..14cab3d 100755 --- a/llap-server/bin/llap-daemon-env.sh +++ b/llap-server/bin/llap-daemon-env.sh @@ -32,7 +32,7 @@ #export LLAP_DAEMON_USER_CLASSPATH= # Logger setup for LLAP daemon -#export LLAP_DAEMON_LOGGER=RFA +#export LLAP_DAEMON_LOGGER=query-routing # Log level for LLAP daemon #export LLAP_DAEMON_LOG_LEVEL=INFO http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 2f9dea0..103115e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrB import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.security.LlapSignerImpl; import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -62,6 +63,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; +import org.apache.log4j.MDC; import org.apache.log4j.NDC; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; @@ -185,8 +187,19 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber()); // This is the start of container-annotated logging. - // TODO Reduce the length of this string. Way too verbose at the moment. - NDC.push(fragmentIdString); + final String dagId = attemptId.getTaskID().getVertexID().getDAGId().toString(); + final String queryId = vertex.getHiveQueryId(); + final String fragId = LlapTezUtils.stripAttemptPrefix(fragmentIdString); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); + MDC.put("fragmentId", fragId); + // TODO: Ideally we want tez to use CallableWithMdc that retains the MDC for threads created in + // thread pool. For now, we will push both dagId and queryId into NDC and the custom thread + // pool that we use for task execution and llap io (StatsRecordingThreadPool) will pop them + // using reflection and update the MDC. + NDC.push(dagId); + NDC.push(queryId); + NDC.push(fragId); Scheduler.SubmissionState submissionState; SubmitWorkResponseProto.Builder responseBuilder = SubmitWorkResponseProto.newBuilder(); try { @@ -246,7 +259,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu metrics.incrExecutorTotalRequestsHandled(); } } finally { - NDC.pop(); + MDC.clear(); + NDC.clear(); } responseBuilder.setSubmissionState(SubmissionStateProto.valueOf(submissionState.name())); http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index c7e9d32..91b8727 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -287,6 +287,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource(LOG4j2_PROPERTIES_FILE); if (llap_l4j2 != null) { final boolean async = LogUtils.checkAndSetAsyncLogging(conf); + // required for MDC based routing appender so that child threads can inherit the MDC context + System.setProperty("isThreadContextMapInheritable", "true"); Configurator.initialize("LlapDaemonLog4j2", llap_l4j2.toString()); long end = System.currentTimeMillis(); LOG.warn("LLAP daemon logging initialized from {} in {} ms. Async: {}", http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java index 9b3ce7e..363b9b1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/StatsRecordingThreadPool.java @@ -17,8 +17,10 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; +import java.lang.reflect.Field; import java.util.List; import java.util.Map; +import java.util.Stack; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; @@ -30,6 +32,9 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader; +import org.apache.log4j.MDC; +import org.apache.log4j.NDC; +import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.counters.FileSystemCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.runtime.task.TaskRunner2Callable; @@ -100,10 +105,46 @@ public class StatsRecordingThreadPool extends ThreadPoolExecutor { // clone thread local file system statistics List statsBefore = LlapUtil.cloneThreadLocalFileSystemStatistics(); + setupMDCFromNDC(actualCallable); try { return actualCallable.call(); } finally { updateFileSystemCounters(statsBefore, actualCallable); + MDC.clear(); + } + } + + private void setupMDCFromNDC(final Callable actualCallable) { + if (actualCallable instanceof CallableWithNdc) { + CallableWithNdc callableWithNdc = (CallableWithNdc) actualCallable; + try { + // CallableWithNdc inherits from NDC only when call() is invoked. CallableWithNdc has to + // extended to provide access to its ndcStack that is cloned during creation. Until, then + // we will use reflection to access the private field. + // FIXME: HIVE-14243 follow to remove this reflection + Field field = callableWithNdc.getClass().getSuperclass().getDeclaredField("ndcStack"); + field.setAccessible(true); + Stack ndcStack = (Stack) field.get(callableWithNdc); + + final Stack clonedStack = (Stack) ndcStack.clone(); + final String fragmentId = (String) clonedStack.pop(); + final String queryId = (String) clonedStack.pop(); + final String dagId = (String) clonedStack.pop(); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); + MDC.put("fragmentId", fragmentId); + if (LOG.isDebugEnabled()) { + LOG.debug("Received dagId: {} queryId: {} instanceType: {}", + dagId, queryId, actualCallable.getClass().getSimpleName()); + } + } catch (Exception e) { + LOG.warn("Not setting up MDC as NDC stack cannot be accessed reflectively for" + + " instance type: {} exception type: {}", + actualCallable.getClass().getSimpleName(), e.getClass().getSimpleName()); + } + } else { + LOG.warn("Not setting up MDC as unknown callable instance type received: {}", + actualCallable.getClass().getSimpleName()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index fb64f0b..87bd5c8 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.HashMap; import java.util.Map; +import java.util.Stack; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -50,6 +51,8 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.log4j.MDC; +import org.apache.log4j.NDC; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.security.JobTokenIdentifier; @@ -166,109 +169,126 @@ public class TaskRunnerCallable extends CallableWithNdc { @Override protected TaskRunner2Result callInternal() throws Exception { - isStarted.set(true); + setMDCFromNDC(); - this.startTime = System.currentTimeMillis(); - this.threadName = Thread.currentThread().getName(); - if (LOG.isDebugEnabled()) { - LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); - } - - // Unregister from the AMReporter, since the task is now running. - this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); + try { + isStarted.set(true); - synchronized (this) { - if (!shouldRunTask) { - LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); + this.startTime = System.currentTimeMillis(); + this.threadName = Thread.currentThread().getName(); + if (LOG.isDebugEnabled()) { + LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); } - } - // TODO This executor seems unnecessary. Here and TezChild - executor = new StatsRecordingThreadPool(1, 1, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(), - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("TezTaskRunner") - .build()); - - // TODO Consolidate this code with TezChild. - runtimeWatch.start(); - if (taskUgi == null) { - taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser()); - } - taskUgi.addCredentials(credentials); - - Map serviceConsumerMetadata = new HashMap<>(); - serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, - TezCommonUtils.convertJobTokenToBytes(jobToken)); - Multimap startedInputsMap = createStartedInputMap(vertex); - - UserGroupInformation taskOwner = - UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier()); - final InetSocketAddress address = - NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); - SecurityUtil.setTokenService(jobToken, address); - taskOwner.addToken(jobToken); - umbilical = taskOwner.doAs(new PrivilegedExceptionAction() { - @Override - public LlapTaskUmbilicalProtocol run() throws Exception { - return RPC.getProxy(LlapTaskUmbilicalProtocol.class, - LlapTaskUmbilicalProtocol.versionID, address, conf); - } - }); - - String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString()); - taskReporter = new LlapTaskReporter( - umbilical, - confParams.amHeartbeatIntervalMsMax, - confParams.amCounterHeartbeatInterval, - confParams.amMaxEventsPerHeartbeat, - new AtomicLong(0), - request.getContainerIdString(), - fragmentId, - initialEvent); - - String attemptId = fragmentInfo.getFragmentIdentifierString(); - IOContextMap.setThreadAttemptId(attemptId); - try { + // Unregister from the AMReporter, since the task is now running. + this.amReporter.unregisterTask(request.getAmHost(), request.getAmPort()); + synchronized (this) { - if (shouldRunTask) { - taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(), - taskSpec, - vertex.getQueryIdentifier().getAppAttemptNumber(), - serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, - objectRegistry, - pid, - executionContext, memoryAvailable, false, tezHadoopShim); + if (!shouldRunTask) { + LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); } } - if (taskRunner == null) { - LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); - return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); - } + // TODO This executor seems unnecessary. Here and TezChild + executor = new StatsRecordingThreadPool(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("TezTaskRunner") + .build()); + + // TODO Consolidate this code with TezChild. + runtimeWatch.start(); + if (taskUgi == null) { + taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser()); + } + taskUgi.addCredentials(credentials); + + Map serviceConsumerMetadata = new HashMap<>(); + serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, + TezCommonUtils.convertJobTokenToBytes(jobToken)); + Multimap startedInputsMap = createStartedInputMap(vertex); + + UserGroupInformation taskOwner = + UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier()); + final InetSocketAddress address = + NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); + SecurityUtil.setTokenService(jobToken, address); + taskOwner.addToken(jobToken); + umbilical = taskOwner.doAs(new PrivilegedExceptionAction() { + @Override + public LlapTaskUmbilicalProtocol run() throws Exception { + return RPC.getProxy(LlapTaskUmbilicalProtocol.class, + LlapTaskUmbilicalProtocol.versionID, address, conf); + } + }); + + String fragmentId = LlapTezUtils.stripAttemptPrefix(taskSpec.getTaskAttemptID().toString()); + taskReporter = new LlapTaskReporter( + umbilical, + confParams.amHeartbeatIntervalMsMax, + confParams.amCounterHeartbeatInterval, + confParams.amMaxEventsPerHeartbeat, + new AtomicLong(0), + request.getContainerIdString(), + fragmentId, + initialEvent); + + String attemptId = fragmentInfo.getFragmentIdentifierString(); + IOContextMap.setThreadAttemptId(attemptId); try { - TaskRunner2Result result = taskRunner.run(); - if (result.isContainerShutdownRequested()) { - LOG.warn("Unexpected container shutdown requested while running task. Ignoring"); + synchronized (this) { + if (shouldRunTask) { + taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(), + taskSpec, + vertex.getQueryIdentifier().getAppAttemptNumber(), + serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, + objectRegistry, + pid, + executionContext, memoryAvailable, false, tezHadoopShim); + } } - isCompleted.set(true); - return result; - } finally { - FileSystem.closeAllForUGI(taskUgi); - LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + - runtimeWatch.stop().elapsedMillis()); - if (LOG.isDebugEnabled()) { - LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); + if (taskRunner == null) { + LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, null, false); + } + + try { + TaskRunner2Result result = taskRunner.run(); + if (result.isContainerShutdownRequested()) { + LOG.warn("Unexpected container shutdown requested while running task. Ignoring"); + } + isCompleted.set(true); + return result; + } finally { + FileSystem.closeAllForUGI(taskUgi); + LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + + runtimeWatch.stop().elapsedMillis()); + if (LOG.isDebugEnabled()) { + LOG.debug( + "canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); + } } + } finally { + IOContextMap.clearThreadAttempt(attemptId); } } finally { - IOContextMap.clearThreadAttempt(attemptId); + MDC.clear(); } } + private void setMDCFromNDC() { + final Stack clonedNDC = NDC.cloneStack(); + final String fragId = clonedNDC.pop(); + final String queryId = clonedNDC.pop(); + final String dagId = clonedNDC.pop(); + MDC.put("dagId", dagId); + MDC.put("queryId", queryId); + MDC.put("fragmentId", fragId); + } + /** * Attempt to kill a running task. If the task has not started running, it will not start. * If it's already running, a kill request will be sent to it. http://git-wip-us.apache.org/repos/asf/hive/blob/04597681/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index cc4e10b..c5d0680 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -42,6 +42,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.counters.FragmentCountersMap; import org.apache.hadoop.hive.llap.counters.LlapIOCounters; @@ -84,8 +85,10 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hive.common.util.HiveStringUtils; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.api.impl.TaskSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; public class LlapInputFormat implements InputFormat, VectorizedInputFormatInterface, SelfDescribingInputFormatInterface, @@ -189,9 +192,14 @@ public class LlapInputFormat implements InputFormat