Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 92BB717D5D for ; Fri, 10 Apr 2015 15:41:08 +0000 (UTC) Received: (qmail 49252 invoked by uid 500); 10 Apr 2015 15:41:08 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 48947 invoked by uid 500); 10 Apr 2015 15:41:08 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 48937 invoked by uid 99); 10 Apr 2015 15:41:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Apr 2015 15:41:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BA108E0418; Fri, 10 Apr 2015 15:41:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: junping_du@apache.org To: common-commits@hadoop.apache.org Date: Fri, 10 Apr 2015 15:41:08 -0000 Message-Id: In-Reply-To: <2c0d2209ed45426b96404ec410aca64b@git.apache.org> References: <2c0d2209ed45426b96404ec410aca64b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: YARN-1376. NM need to notify the log aggregation status to RM through Node heartbeat. Contributed by Xuan Gong. (cherry picked from commit 92431c961741747b5d6442f4025016d48d9a6863) YARN-1376. NM need to notify the log aggregation status to RM through Node heartbeat. Contributed by Xuan Gong. (cherry picked from commit 92431c961741747b5d6442f4025016d48d9a6863) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0891de07 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0891de07 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0891de07 Branch: refs/heads/branch-2 Commit: 0891de07125ca11b4b4b857ccf4c99b65871bd81 Parents: f603b9c Author: Junping Du Authored: Fri Apr 10 08:56:18 2015 -0700 Committer: Junping Du Committed: Fri Apr 10 08:58:07 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 11 + .../protocolrecords/LogAggregationReport.java | 104 ++++++ .../protocolrecords/NodeHeartbeatRequest.java | 8 + .../impl/pb/LogAggregationReportPBImpl.java | 227 +++++++++++++ .../impl/pb/NodeHeartbeatRequestPBImpl.java | 82 ++++- .../api/records/LogAggregationStatus.java | 31 ++ .../hadoop/yarn/server/webapp/AppBlock.java | 14 +- .../main/proto/yarn_server_common_protos.proto | 8 + .../yarn_server_common_service_protos.proto | 13 + .../hadoop/yarn/server/nodemanager/Context.java | 5 + .../yarn/server/nodemanager/NodeManager.java | 12 + .../nodemanager/NodeStatusUpdaterImpl.java | 72 ++++- .../logaggregation/AppLogAggregatorImpl.java | 34 +- .../resourcemanager/ResourceTrackerService.java | 12 +- .../server/resourcemanager/rmapp/RMApp.java | 3 + .../server/resourcemanager/rmapp/RMAppImpl.java | 88 +++++ .../resourcemanager/rmnode/RMNodeImpl.java | 24 +- .../rmnode/RMNodeStatusEvent.java | 26 ++ .../webapp/AppLogAggregationStatusPage.java | 41 +++ .../webapp/RMAppLogAggregationStatusBlock.java | 148 +++++++++ .../server/resourcemanager/webapp/RMWebApp.java | 2 + .../resourcemanager/webapp/RmController.java | 4 + .../applicationsmanager/MockAsm.java | 6 + .../TestRMAppLogAggregationStatus.java | 318 +++++++++++++++++++ .../server/resourcemanager/rmapp/MockRMApp.java | 6 + 26 files changed, 1291 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 86277d3..493b69e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -12,6 +12,9 @@ Release 2.8.0 - UNRELEASED container-executor for outbound network traffic control. (Sidharta Seethana via vinodkv) + YARN-1376. NM need to notify the log aggregation status to RM through + heartbeat. (Xuan Gong via junping_du) + IMPROVEMENTS YARN-1880. Cleanup TestApplicationClientProtocolOnHA http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a25cfe9..81bcb9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -742,6 +742,17 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS = -1; /** + * How long for ResourceManager to wait for NodeManager to report its + * log aggregation status. If waiting time of which the log aggregation status + * is reported from NodeManager exceeds the configured value, RM will report + * log aggregation status for this NodeManager as TIME_OUT + */ + public static final String LOG_AGGREGATION_STATUS_TIME_OUT_MS = + YARN_PREFIX + "log-aggregation-status.time-out.ms"; + public static final long DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS + = 10 * 60 * 1000; + + /** * Number of seconds to retain logs on the NodeManager. Only applicable if Log * aggregation is disabled */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java new file mode 100644 index 0000000..808804b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/LogAggregationReport.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.util.Records; + +/** + * {@code LogAggregationReport} is a report for log aggregation status + * in one NodeManager of an application. + *

+ * It includes details such as: + *

    + *
  • {@link ApplicationId} of the application.
  • + *
  • {@link NodeId} of the NodeManager.
  • + *
  • {@link LogAggregationStatus}
  • + *
  • Diagnostic information
  • + *
+ * + */ +@Public +@Unstable +public abstract class LogAggregationReport { + + @Public + @Unstable + public static LogAggregationReport newInstance(ApplicationId appId, + NodeId nodeId, LogAggregationStatus status, String diagnosticMessage) { + LogAggregationReport report = Records.newRecord(LogAggregationReport.class); + report.setApplicationId(appId); + report.setLogAggregationStatus(status); + report.setDiagnosticMessage(diagnosticMessage); + return report; + } + + /** + * Get the ApplicationId of the application. + * @return ApplicationId of the application + */ + @Public + @Unstable + public abstract ApplicationId getApplicationId(); + + @Public + @Unstable + public abstract void setApplicationId(ApplicationId appId); + + /** + * Get the NodeId. + * @return NodeId + */ + @Public + @Unstable + public abstract NodeId getNodeId(); + + @Public + @Unstable + public abstract void setNodeId(NodeId nodeId); + + /** + * Get the LogAggregationStatus. + * @return LogAggregationStatus + */ + @Public + @Unstable + public abstract LogAggregationStatus getLogAggregationStatus(); + + @Public + @Unstable + public abstract void setLogAggregationStatus( + LogAggregationStatus logAggregationStatus); + + /** + * Get the diagnositic information of this log aggregation + * @return diagnositic information of this log aggregation + */ + @Public + @Unstable + public abstract String getDiagnosticMessage(); + + @Public + @Unstable + public abstract void setDiagnosticMessage(String diagnosticMessage); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index b80d9ce..227363f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -18,8 +18,10 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.util.Map; import java.util.Set; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -51,4 +53,10 @@ public abstract class NodeHeartbeatRequest { public abstract Set getNodeLabels(); public abstract void setNodeLabels(Set nodeLabels); + + public abstract Map + getLogAggregationReportsForApps(); + + public abstract void setLogAggregationReportsForApps( + Map logAggregationReportsForApps); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java new file mode 100644 index 0000000..7999fa7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/LogAggregationReportPBImpl.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.LogAggregationStatusProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class LogAggregationReportPBImpl extends LogAggregationReport { + + LogAggregationReportProto proto = LogAggregationReportProto + .getDefaultInstance(); + LogAggregationReportProto.Builder builder = null; + boolean viaProto = false; + + private static final String LOGAGGREGATION_STATUS_PREFIX = "LOG_"; + + private ApplicationId applicationId; + private NodeId nodeId; + + public LogAggregationReportPBImpl() { + builder = LogAggregationReportProto.newBuilder(); + } + + public LogAggregationReportPBImpl(LogAggregationReportProto proto) { + this.proto = proto; + viaProto = true; + } + + public LogAggregationReportProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.applicationId != null + && !((ApplicationIdPBImpl) this.applicationId).getProto().equals( + builder.getApplicationId())) { + builder.setApplicationId(convertToProtoFormat(this.applicationId)); + } + + if (this.nodeId != null + && !((NodeIdPBImpl) this.nodeId).getProto().equals( + builder.getNodeId())) { + builder.setNodeId(convertToProtoFormat(this.nodeId)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = LogAggregationReportProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public ApplicationId getApplicationId() { + if (this.applicationId != null) { + return this.applicationId; + } + + LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasApplicationId()) { + return null; + } + this.applicationId = convertFromProtoFormat(p.getApplicationId()); + return this.applicationId; + } + + @Override + public void setApplicationId(ApplicationId appId) { + maybeInitBuilder(); + if (appId == null) + builder.clearApplicationId(); + this.applicationId = appId; + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + + private ApplicationIdPBImpl convertFromProtoFormat( + ApplicationIdProto applicationId) { + return new ApplicationIdPBImpl(applicationId); + } + + @Override + public LogAggregationStatus getLogAggregationStatus() { + LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasLogAggregationStatus()) { + return null; + } + return convertFromProtoFormat(p.getLogAggregationStatus()); + } + + @Override + public void + setLogAggregationStatus(LogAggregationStatus logAggregationStatus) { + maybeInitBuilder(); + if (logAggregationStatus == null) { + builder.clearLogAggregationStatus(); + return; + } + builder.setLogAggregationStatus(convertToProtoFormat(logAggregationStatus)); + } + + private LogAggregationStatus convertFromProtoFormat( + LogAggregationStatusProto s) { + return LogAggregationStatus.valueOf(s.name().replace( + LOGAGGREGATION_STATUS_PREFIX, "")); + } + + private LogAggregationStatusProto + convertToProtoFormat(LogAggregationStatus s) { + return LogAggregationStatusProto.valueOf(LOGAGGREGATION_STATUS_PREFIX + + s.name()); + } + + @Override + public String getDiagnosticMessage() { + LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasDiagnostics()) { + return null; + } + return p.getDiagnostics(); + } + + @Override + public void setDiagnosticMessage(String diagnosticMessage) { + maybeInitBuilder(); + if (diagnosticMessage == null) { + builder.clearDiagnostics(); + return; + } + builder.setDiagnostics(diagnosticMessage); + } + + @Override + public NodeId getNodeId() { + if (this.nodeId != null) { + return this.nodeId; + } + + LogAggregationReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasNodeId()) { + return null; + } + this.nodeId = convertFromProtoFormat(p.getNodeId()); + return this.nodeId; + } + + @Override + public void setNodeId(NodeId nodeId) { + maybeInitBuilder(); + if (nodeId == null) + builder.clearNodeId(); + this.nodeId = nodeId; + } + + private NodeIdProto convertToProtoFormat(NodeId t) { + return ((NodeIdPBImpl) t).getProto(); + } + + private NodeIdPBImpl convertFromProtoFormat(NodeIdProto nodeId) { + return new NodeIdPBImpl(nodeId); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 16d47f9..03db39c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -18,15 +18,24 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; -import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportsForAppsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -42,6 +51,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null; private Set labels = null; + private Map + logAggregationReportsForApps = null; public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); @@ -91,6 +102,25 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { builder.setNodeLabels(StringArrayProto.newBuilder() .addAllElements(this.labels).build()); } + if (this.logAggregationReportsForApps != null) { + addLogAggregationStatusForAppsToProto(); + } + } + + private void addLogAggregationStatusForAppsToProto() { + maybeInitBuilder(); + builder.clearLogAggregationReportsForApps(); + for (Entry entry : logAggregationReportsForApps + .entrySet()) { + builder.addLogAggregationReportsForApps(LogAggregationReportsForAppsProto + .newBuilder().setAppId(convertToProtoFormat(entry.getKey())) + .setLogAggregationReport(convertToProtoFormat(entry.getValue()))); + } + } + + private LogAggregationReportProto convertToProtoFormat( + LogAggregationReport value) { + return ((LogAggregationReportPBImpl) value).getProto(); } private void mergeLocalToProto() { @@ -215,4 +245,54 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { StringArrayProto nodeLabels = p.getNodeLabels(); labels = new HashSet(nodeLabels.getElementsList()); } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + + @Override + public Map + getLogAggregationReportsForApps() { + if (this.logAggregationReportsForApps != null) { + return this.logAggregationReportsForApps; + } + initLogAggregationReportsForApps(); + return logAggregationReportsForApps; + } + + private void initLogAggregationReportsForApps() { + NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = + p.getLogAggregationReportsForAppsList(); + this.logAggregationReportsForApps = + new HashMap(); + for (LogAggregationReportsForAppsProto c : list) { + ApplicationId appId = convertFromProtoFormat(c.getAppId()); + LogAggregationReport report = + convertFromProtoFormat(c.getLogAggregationReport()); + this.logAggregationReportsForApps.put(appId, report); + } + } + + private LogAggregationReport convertFromProtoFormat( + LogAggregationReportProto logAggregationReport) { + return new LogAggregationReportPBImpl(logAggregationReport); + } + + @Override + public void setLogAggregationReportsForApps( + Map logAggregationStatusForApps) { + if (logAggregationStatusForApps == null + || logAggregationStatusForApps.isEmpty()) { + return; + } + maybeInitBuilder(); + this.logAggregationReportsForApps = + new HashMap(); + this.logAggregationReportsForApps.putAll(logAggregationStatusForApps); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java new file mode 100644 index 0000000..496767f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/LogAggregationStatus.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +/** + *

Status of Log aggregation.

+ */ +public enum LogAggregationStatus { + DISABLED, + NOT_START, + RUNNING, + FINISHED, + FAILED, + TIME_OUT +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java index ae4737d..d5a3dd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppBlock.java @@ -21,8 +21,10 @@ package org.apache.hadoop.yarn.server.webapp; import static org.apache.hadoop.yarn.util.StringHelper.join; import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ID; import static org.apache.hadoop.yarn.webapp.YarnWebParams.WEB_UI_TYPE; + import java.security.PrivilegedExceptionAction; import java.util.Collection; + import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo; import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.Times; +import org.apache.hadoop.yarn.webapp.ResponseInfo; import org.apache.hadoop.yarn.webapp.YarnWebParams; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE; @@ -154,7 +157,7 @@ public class AppBlock extends HtmlBlock { html.script().$type("text/javascript")._(script.toString())._(); } - info("Application Overview") + ResponseInfo overviewTable = info("Application Overview") ._("User:", app.getUser()) ._("Name:", app.getName()) ._("Application Type:", app.getType()) @@ -181,8 +184,13 @@ public class AppBlock extends HtmlBlock { .getAppState() == YarnApplicationState.FINISHED || app.getAppState() == YarnApplicationState.FAILED || app.getAppState() == YarnApplicationState.KILLED ? "History" - : "ApplicationMaster") - ._("Diagnostics:", + : "ApplicationMaster"); + if (webUiType != null + && webUiType.equals(YarnWebParams.RM_WEB_UI)) { + overviewTable._("Log Aggregation Status", + root_url("logaggregationstatus", app.getAppId()), "Status"); + } + overviewTable._("Diagnostics:", app.getDiagnosticsInfo() == null ? "" : app.getDiagnosticsInfo()); Collection attempts; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 01fac32..6e9f4cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -54,3 +54,11 @@ message VersionProto { optional int32 minor_version = 2; } +enum LogAggregationStatusProto { + LOG_DISABLED = 1; + LOG_NOT_START = 2; + LOG_RUNNING = 3; + LOG_FINISHED = 4; + LOG_TIME_OUT = 5; +} + http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index d8c92c4..3103582 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -50,6 +50,19 @@ message NodeHeartbeatRequestProto { optional MasterKeyProto last_known_container_token_master_key = 2; optional MasterKeyProto last_known_nm_token_master_key = 3; optional StringArrayProto nodeLabels = 4; + repeated LogAggregationReportsForAppsProto log_aggregation_reports_for_apps = 5; +} + +message LogAggregationReportsForAppsProto { + optional ApplicationIdProto appId = 1; + optional LogAggregationReportProto log_aggregation_report = 2; +} + +message LogAggregationReportProto { +optional ApplicationIdProto application_id = 1; +optional NodeIdProto node_id = 2; +optional LogAggregationStatusProto log_aggregation_status = 3; +optional string diagnostics = 4 [default = "N/A"]; } message NodeHeartbeatResponseProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 6e7e2ec..42a4234 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.security.Credentials; @@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -77,4 +79,7 @@ public interface Context { boolean getDecommissioned(); void setDecommissioned(boolean isDecommissioned); + + ConcurrentLinkedQueue + getLogAggregationStatusForApps(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index d54180a..ae3381b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -362,6 +364,8 @@ public class NodeManager extends CompositeService .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class); private final NMStateStoreService stateStore; private boolean isDecommissioned = false; + private final ConcurrentLinkedQueue + logAggregationReportForApps; public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, @@ -375,6 +379,8 @@ public class NodeManager extends CompositeService this.nodeHealthStatus.setHealthReport("Healthy"); this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis()); this.stateStore = stateStore; + this.logAggregationReportForApps = new ConcurrentLinkedQueue< + LogAggregationReport>(); } /** @@ -466,6 +472,12 @@ public class NodeManager extends CompositeService Map systemCredentials) { this.systemCredentials = systemCredentials; } + + @Override + public ConcurrentLinkedQueue + getLogAggregationStatusForApps() { + return this.logAggregationReportForApps; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 2549e0f..b1ab5f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -30,6 +30,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.Random; import java.util.Set; @@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -73,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; +import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; @@ -115,6 +118,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // Duration for which to track recently stopped container. private long durationToTrackStoppedContainers; + private boolean logAggregationEnabled; + + private final List logAggregationReportForAppsTempList; + private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; @@ -144,6 +151,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements this.recentlyStoppedContainers = new LinkedHashMap(); this.pendingCompletedContainers = new HashMap(); + this.logAggregationReportForAppsTempList = + new ArrayList(); } @Override @@ -193,6 +202,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements LOG.info("Initialized nodemanager for " + nodeId + ":" + " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb + " virtual-cores=" + virtualCores); + + this.logAggregationEnabled = + conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); } @Override @@ -649,6 +662,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatusUpdaterImpl.this.context .getNMTokenSecretManager().getCurrentKey(), nodeLabelsForHeartbeat); + + if (logAggregationEnabled) { + // pull log aggregation status for application running in this NM + Map logAggregationReports = + getLogAggregationReportsForApps(context + .getLogAggregationStatusForApps()); + if (logAggregationReports != null + && !logAggregationReports.isEmpty()) { + request.setLogAggregationReportsForApps(logAggregationReports); + } + } + response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); @@ -698,6 +723,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements removeOrTrackCompletedContainersFromContext(response .getContainersToBeRemovedFromNM()); + logAggregationReportForAppsTempList.clear(); lastHeartbeatID = response.getResponseId(); List containersToCleanup = response .getContainersToCleanup(); @@ -782,6 +808,48 @@ public class NodeStatusUpdaterImpl extends AbstractService implements new Thread(statusUpdaterRunnable, "Node Status Updater"); statusUpdater.start(); } - - + + private Map + getLogAggregationReportsForApps( + ConcurrentLinkedQueue lastestLogAggregationStatus) { + Map latestLogAggregationReports = + new HashMap(); + LogAggregationReport status; + while ((status = lastestLogAggregationStatus.poll()) != null) { + this.logAggregationReportForAppsTempList.add(status); + } + for (LogAggregationReport logAggregationReport + : this.logAggregationReportForAppsTempList) { + LogAggregationReport report = null; + if (latestLogAggregationReports.containsKey(logAggregationReport + .getApplicationId())) { + report = + latestLogAggregationReports.get(logAggregationReport + .getApplicationId()); + report.setLogAggregationStatus(logAggregationReport + .getLogAggregationStatus()); + String message = report.getDiagnosticMessage(); + if (logAggregationReport.getDiagnosticMessage() != null + && !logAggregationReport.getDiagnosticMessage().isEmpty()) { + if (message != null) { + message += logAggregationReport.getDiagnosticMessage(); + } else { + message = logAggregationReport.getDiagnosticMessage(); + } + report.setDiagnosticMessage(message); + } + } else { + report = Records.newRecord(LogAggregationReport.class); + report.setApplicationId(logAggregationReport.getApplicationId()); + report.setNodeId(this.nodeId); + report.setLogAggregationStatus(logAggregationReport + .getLogAggregationStatus()); + report + .setDiagnosticMessage(logAggregationReport.getDiagnosticMessage()); + } + latestLogAggregationReports.put(logAggregationReport.getApplicationId(), + report); + } + return latestLogAggregationReports; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 393576b..bf7d5f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -57,12 +57,16 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.Times; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; @@ -120,6 +124,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { // This variable is only for testing private final AtomicBoolean waiting = new AtomicBoolean(false); + private boolean renameTemporaryLogFileFailed = false; + private final Map containerLogAggregators = new HashMap(); @@ -292,12 +298,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator { writer.close(); } + long currentTime = System.currentTimeMillis(); final Path renamedPath = this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp : new Path( remoteNodeLogFileForApp.getParent(), remoteNodeLogFileForApp.getName() + "_" - + System.currentTimeMillis()); + + currentTime); + String diagnosticMessage = ""; final boolean rename = uploadedLogsInThisCycle; try { userUgi.doAs(new PrivilegedExceptionAction() { @@ -314,12 +322,36 @@ public class AppLogAggregatorImpl implements AppLogAggregator { return null; } }); + diagnosticMessage = + "Log uploaded successfully for Application: " + appId + + " in NodeManager: " + + LogAggregationUtils.getNodeString(nodeId) + " at " + + Times.format(currentTime) + "\n"; } catch (Exception e) { LOG.error( "Failed to move temporary log file to final location: [" + remoteNodeTmpLogFileForApp + "] to [" + renamedPath + "]", e); + diagnosticMessage = + "Log uploaded failed for Application: " + appId + + " in NodeManager: " + + LogAggregationUtils.getNodeString(nodeId) + " at " + + Times.format(currentTime) + "\n"; + renameTemporaryLogFileFailed = true; + } + + LogAggregationReport report = + Records.newRecord(LogAggregationReport.class); + report.setApplicationId(appId); + report.setNodeId(nodeId); + report.setDiagnosticMessage(diagnosticMessage); + if (appFinished) { + report.setLogAggregationStatus(renameTemporaryLogFileFailed + ? LogAggregationStatus.FAILED : LogAggregationStatus.FINISHED); + } else { + report.setLogAggregationStatus(LogAggregationStatus.RUNNING); } + this.context.getLogAggregationStatusForApps().add(report); } finally { if (writer != null) { writer.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 22efe25..5e2dc7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -458,10 +458,16 @@ public class ResourceTrackerService extends AbstractService implements } // 4. Send status to RMNode, saving the latest response. - this.rmContext.getDispatcher().getEventHandler().handle( + RMNodeStatusEvent nodeStatusEvent = new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), - remoteNodeStatus.getContainersStatuses(), - remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); + remoteNodeStatus.getContainersStatuses(), + remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse); + if (request.getLogAggregationReportsForApps() != null + && !request.getLogAggregationReportsForApps().isEmpty()) { + nodeStatusEvent.setLogAggregationReportsForApps(request + .getLogAggregationReportsForApps()); + } + this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent); // 5. Update node's labels to RM's NodeLabelManager. if (isDistributesNodeLabelsConf && request.getNodeLabels() != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index fbcaab9..33eedbf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -242,4 +243,6 @@ public interface RMApp extends EventHandler { ReservationId getReservationId(); ResourceRequest getAMResourceRequest(); + + Map getLogAggregationReportsForApp(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 2d1737a..47c4807 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -25,9 +25,11 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -61,6 +63,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; @@ -142,6 +146,12 @@ public class RMAppImpl implements RMApp, Recoverable { new AppFinishedTransition(); private Set ranNodes = new ConcurrentSkipListSet(); + private final boolean logAggregationEnabled; + private long logAggregationStartTime = 0; + private final long logAggregationStatusTimeout; + private final Map logAggregationStatus = + new HashMap(); + // These states stored are only valid when app is at killing or final_saving. private RMAppState stateBeforeKilling; private RMAppState stateBeforeFinalSaving; @@ -413,6 +423,19 @@ public class RMAppImpl implements RMApp, Recoverable { rmContext.getRMApplicationHistoryWriter().applicationStarted(this); rmContext.getSystemMetricsPublisher().appCreated(this, startTime); + + long localLogAggregationStatusTimeout = + conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS); + if (localLogAggregationStatusTimeout <= 0) { + this.logAggregationStatusTimeout = + YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS; + } else { + this.logAggregationStatusTimeout = localLogAggregationStatusTimeout; + } + this.logAggregationEnabled = + conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); } @Override @@ -803,6 +826,12 @@ public class RMAppImpl implements RMApp, Recoverable { // otherwise, add it to ranNodes for further process app.ranNodes.add(nodeAddedEvent.getNodeId()); + + app.logAggregationStatus.put(nodeAddedEvent.getNodeId(), + LogAggregationReport.newInstance(app.applicationId, nodeAddedEvent + .getNodeId(), app.logAggregationEnabled + ? LogAggregationStatus.NOT_START : LogAggregationStatus.DISABLED, + "")); }; } @@ -1153,6 +1182,7 @@ public class RMAppImpl implements RMApp, Recoverable { } public void transition(RMAppImpl app, RMAppEvent event) { + app.logAggregationStartTime = System.currentTimeMillis(); for (NodeId nodeId : app.getRanNodes()) { app.handler.handle( new RMNodeCleanAppEvent(nodeId, app.applicationId)); @@ -1356,4 +1386,62 @@ public class RMAppImpl implements RMApp, Recoverable { } return credentials; } + + @Override + public Map getLogAggregationReportsForApp() { + try { + this.readLock.lock(); + Map outputs = + new HashMap(); + outputs.putAll(logAggregationStatus); + for (Entry output : outputs.entrySet()) { + if (!output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.TIME_OUT) + && !output.getValue().getLogAggregationStatus() + .equals(LogAggregationStatus.FINISHED) + && isAppInFinalState(this) + && System.currentTimeMillis() > this.logAggregationStartTime + + this.logAggregationStatusTimeout) { + output.getValue().setLogAggregationStatus( + LogAggregationStatus.TIME_OUT); + } + } + return outputs; + } finally { + this.readLock.unlock(); + } + } + + public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { + try { + this.writeLock.lock(); + if (this.logAggregationEnabled) { + LogAggregationReport curReport = this.logAggregationStatus.get(nodeId); + if (curReport == null) { + this.logAggregationStatus.put(nodeId, report); + } else { + if (curReport.getLogAggregationStatus().equals( + LogAggregationStatus.TIME_OUT)) { + if (report.getLogAggregationStatus().equals( + LogAggregationStatus.FINISHED)) { + curReport.setLogAggregationStatus(report + .getLogAggregationStatus()); + } + } else { + curReport.setLogAggregationStatus(report.getLogAggregationStatus()); + } + + if (report.getDiagnosticMessage() != null + && !report.getDiagnosticMessage().isEmpty()) { + curReport + .setDiagnosticMessage(curReport.getDiagnosticMessage() == null + ? report.getDiagnosticMessage() : curReport + .getDiagnosticMessage() + report.getDiagnosticMessage()); + } + } + } + } finally { + this.writeLock.unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index c556b80..ace2cf7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; @@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; @@ -56,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; @@ -243,7 +247,7 @@ public class RMNodeImpl implements RMNode, EventHandler { this.stateMachine = stateMachineFactory.make(this); - this.nodeUpdateQueue = new ConcurrentLinkedQueue(); + this.nodeUpdateQueue = new ConcurrentLinkedQueue(); } @Override @@ -773,6 +777,13 @@ public class RMNodeImpl implements RMNode, EventHandler { rmNode.handleContainerStatus(statusEvent.getContainers()); + Map logAggregationReportsForApps = + statusEvent.getLogAggregationReportsForApps(); + if (logAggregationReportsForApps != null + && !logAggregationReportsForApps.isEmpty()) { + rmNode.handleLogAggregationStatus(logAggregationReportsForApps); + } + if(rmNode.nextHeartBeat) { rmNode.nextHeartBeat = false; rmNode.context.getDispatcher().getEventHandler().handle( @@ -903,4 +914,15 @@ public class RMNodeImpl implements RMNode, EventHandler { } } + private void handleLogAggregationStatus( + Map logAggregationReportsForApps) { + for (Entry report : + logAggregationReportsForApps.entrySet()) { + RMApp rmApp = this.context.getRMApps().get(report.getKey()); + if (rmApp != null) { + ((RMAppImpl)rmApp).aggregateLogReport(this.nodeId, report.getValue()); + } + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index abfacbb..4bbf610 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -19,10 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; import java.util.List; +import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; @@ -32,6 +34,7 @@ public class RMNodeStatusEvent extends RMNodeEvent { private final List containersCollection; private final NodeHeartbeatResponse latestResponse; private final List keepAliveAppIds; + private Map logAggregationReportsForApps; public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, List collection, List keepAliveAppIds, @@ -41,6 +44,19 @@ public class RMNodeStatusEvent extends RMNodeEvent { this.containersCollection = collection; this.keepAliveAppIds = keepAliveAppIds; this.latestResponse = latestResponse; + this.logAggregationReportsForApps = null; + } + + public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, + List collection, List keepAliveAppIds, + NodeHeartbeatResponse latestResponse, + Map logAggregationReportsForApps) { + super(nodeId, RMNodeEventType.STATUS_UPDATE); + this.nodeHealthStatus = nodeHealthStatus; + this.containersCollection = collection; + this.keepAliveAppIds = keepAliveAppIds; + this.latestResponse = latestResponse; + this.logAggregationReportsForApps = logAggregationReportsForApps; } public NodeHealthStatus getNodeHealthStatus() { @@ -58,4 +74,14 @@ public class RMNodeStatusEvent extends RMNodeEvent { public List getKeepAliveAppIds() { return this.keepAliveAppIds; } + + public Map + getLogAggregationReportsForApps() { + return this.logAggregationReportsForApps; + } + + public void setLogAggregationReportsForApps( + Map logAggregationReportsForApps) { + this.logAggregationReportsForApps = logAggregationReportsForApps; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java new file mode 100644 index 0000000..ccb53dd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppLogAggregationStatusPage.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp; + +import static org.apache.hadoop.yarn.util.StringHelper.join; +import org.apache.hadoop.yarn.webapp.SubView; +import org.apache.hadoop.yarn.webapp.YarnWebParams; + +public class AppLogAggregationStatusPage extends RmView{ + + @Override + protected void preHead(Page.HTML<_> html) { + commonPreHead(html); + String appId = $(YarnWebParams.APPLICATION_ID); + set( + TITLE, + appId.isEmpty() ? "Bad request: missing application ID" : join( + "Application ", $(YarnWebParams.APPLICATION_ID))); + } + + @Override + protected Class content() { + return RMAppLogAggregationStatusBlock.class; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java new file mode 100644 index 0000000..a95f76f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMAppLogAggregationStatusBlock.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp; + +import static org.apache.hadoop.yarn.util.StringHelper.join; +import static org.apache.hadoop.yarn.webapp.YarnWebParams.APPLICATION_ID; +import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP; +import static org.apache.hadoop.yarn.webapp.view.JQueryUI._TH; + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.util.Apps; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock; + +import com.google.inject.Inject; + +public class RMAppLogAggregationStatusBlock extends HtmlBlock { + + private static final Log LOG = LogFactory + .getLog(RMAppLogAggregationStatusBlock.class); + private final ResourceManager rm; + private final Configuration conf; + + @Inject + RMAppLogAggregationStatusBlock(ViewContext ctx, ResourceManager rm, + Configuration conf) { + super(ctx); + this.rm = rm; + this.conf = conf; + } + + @Override + protected void render(Block html) { + String aid = $(APPLICATION_ID); + if (aid.isEmpty()) { + puts("Bad request: requires Application ID"); + return; + } + + ApplicationId appId; + try { + appId = Apps.toAppID(aid); + } catch (Exception e) { + puts("Invalid Application ID: " + aid); + return; + } + + setTitle(join("Application ", aid)); + + // Add LogAggregationStatus description table + // to explain the meaning of different LogAggregationStatus + DIV div_description = html.div(_INFO_WRAP); + TABLE> table_description = + div_description.table("#LogAggregationStatusDecription"); + table_description. + tr(). + th(_TH, "Log Aggregation Status"). + th(_TH, "Description"). + _(); + table_description.tr().td(LogAggregationStatus.DISABLED.name()) + .td("Log Aggregation is Disabled.")._(); + table_description.tr().td(LogAggregationStatus.NOT_START.name()) + .td("Log Aggregation does not Start.")._(); + table_description.tr().td(LogAggregationStatus.RUNNING.name()) + .td("Log Aggregation is Running.")._(); + table_description.tr().td(LogAggregationStatus.FINISHED.name()) + .td("Log Aggregation is Finished. All of the logs have been " + + "aggregated successfully.")._(); + table_description.tr().td(LogAggregationStatus.FAILED.name()) + .td("Log Aggregation is Failed. At least one of the logs " + + "have not been aggregated.")._(); + table_description.tr().td(LogAggregationStatus.TIME_OUT.name()) + .td("Does not get the Log aggregation status for a long time. " + + "Not sure what is the current Log Aggregation Status.")._(); + table_description._(); + div_description._(); + + boolean logAggregationEnabled = + conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + // Application Log aggregation status Table + DIV div = html.div(_INFO_WRAP); + TABLE> table = + div.h3( + "Log Aggregation: " + + (logAggregationEnabled ? "Enabled" : "Disabled")).table( + "#LogAggregationStatus"); + table. + tr(). + th(_TH, "NodeId"). + th(_TH, "Log Aggregation Status"). + th(_TH, "Diagnostis Message"). + _(); + + RMApp rmApp = rm.getRMContext().getRMApps().get(appId); + if (rmApp != null) { + Map logAggregationReports = + rmApp.getLogAggregationReportsForApp(); + if (logAggregationReports != null && !logAggregationReports.isEmpty()) { + for (Entry report : + logAggregationReports.entrySet()) { + LogAggregationStatus status = + report.getValue() == null ? null : report.getValue() + .getLogAggregationStatus(); + String message = + report.getValue() == null ? null : report.getValue() + .getDiagnosticMessage(); + table.tr() + .td(report.getKey().toString()) + .td(status == null ? "N/A" : status.toString()) + .td(message == null ? "N/A" : message)._(); + } + } + } + table._(); + div._(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java index 86300ce..a86ed4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java @@ -68,6 +68,8 @@ public class RMWebApp extends WebApp implements YarnWebParams { "appattempt"); route(pajoin("/container", CONTAINER_ID), RmController.class, "container"); route("/errors-and-warnings", RmController.class, "errorsAndWarnings"); + route(pajoin("/logaggregationstatus", APPLICATION_ID), + RmController.class, "logaggregationstatus"); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java index c8e3c5b..b124d75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java @@ -109,4 +109,8 @@ public class RmController extends Controller { public void errorsAndWarnings() { render(RMErrorsAndWarningsPage.class); } + + public void logaggregationstatus() { + render(AppLogAggregationStatusPage.class); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0891de07/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index f8d92aa..a6e469e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -190,6 +191,11 @@ public abstract class MockAsm extends MockApps { public ResourceRequest getAMResourceRequest() { return this.amReq; } + + @Override + public Map getLogAggregationReportsForApp() { + throw new UnsupportedOperationException("Not supported yet."); + } } public static RMApp newApplication(int i) {