Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7C71C200C61 for ; Tue, 25 Apr 2017 23:53:02 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7B147160BB8; Tue, 25 Apr 2017 21:53:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CFC8F160BB3 for ; Tue, 25 Apr 2017 23:52:59 +0200 (CEST) Received: (qmail 36900 invoked by uid 500); 25 Apr 2017 21:52:45 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 33752 invoked by uid 99); 25 Apr 2017 21:52:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Apr 2017 21:52:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 32A2EDFC4A; Tue, 25 Apr 2017 21:52:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: varunsaxena@apache.org To: common-commits@hadoop.apache.org Date: Tue, 25 Apr 2017 21:53:07 -0000 Message-Id: <4da1d51d97ad48b4889599421b26d903@git.apache.org> In-Reply-To: <03f34a27fa2849929414721c845efbfd@git.apache.org> References: <03f34a27fa2849929414721c845efbfd@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [27/50] [abbrv] hadoop git commit: YARN-5792. Adopt the id prefix for YARN, MR, and DS entities. Contributed by Varun Saxena. archived-at: Tue, 25 Apr 2017 21:53:02 -0000 YARN-5792. Adopt the id prefix for YARN, MR, and DS entities. Contributed by Varun Saxena. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6808a30b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6808a30b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6808a30b Branch: refs/heads/YARN-5355 Commit: 6808a30b0ce061c82fd1e623d0b0fb8ce9246f38 Parents: 187b988 Author: Sangjin Lee Authored: Mon Nov 21 13:48:35 2016 -0800 Committer: Varun Saxena Committed: Mon Apr 24 19:01:05 2017 +0530 ---------------------------------------------------------------------- .../jobhistory/JobHistoryEventHandler.java | 29 +++- .../v2/app/job/impl/TaskAttemptImpl.java | 58 +++---- .../mapreduce/v2/app/job/impl/TaskImpl.java | 19 +- .../hadoop/mapreduce/jobhistory/TestEvents.java | 4 +- .../jobhistory/TestJobHistoryEventHandler.java | 8 +- .../jobhistory/MapAttemptFinishedEvent.java | 87 ++++++---- .../jobhistory/ReduceAttemptFinishedEvent.java | 83 ++++++--- .../jobhistory/TaskAttemptFinishedEvent.java | 47 +++-- .../TaskAttemptUnsuccessfulCompletionEvent.java | 50 ++++-- .../mapreduce/jobhistory/TaskFailedEvent.java | 51 ++++-- .../mapreduce/jobhistory/TaskFinishedEvent.java | 42 +++-- .../mapred/TestMRTimelineEventHandling.java | 30 +++- .../distributedshell/ApplicationMaster.java | 42 ++++- .../distributedshell/TestDistributedShell.java | 173 +++++++++++-------- .../containermanager/ContainerManagerImpl.java | 6 +- .../ApplicationContainerFinishedEvent.java | 9 +- .../containermanager/container/Container.java | 2 + .../container/ContainerImpl.java | 22 ++- .../recovery/NMLeveldbStateStoreService.java | 21 ++- .../recovery/NMNullStateStoreService.java | 2 +- .../recovery/NMStateStoreService.java | 13 +- .../timelineservice/NMTimelinePublisher.java | 18 +- .../application/TestApplication.java | 2 +- .../recovery/NMMemoryStateStoreService.java | 6 +- .../TestNMLeveldbStateStoreService.java | 6 +- .../nodemanager/webapp/MockContainer.java | 4 + .../nodemanager/webapp/TestNMWebServer.java | 4 +- .../resourcemanager/ResourceTrackerService.java | 14 +- .../metrics/TimelineServiceV2Publisher.java | 12 +- .../TestSystemMetricsPublisherForV2.java | 13 +- 30 files changed, 588 insertions(+), 289 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 285d36e..bf8cec8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -77,6 +77,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.util.TimelineServiceHelper; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.JsonNodeFactory; import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientHandlerException; @@ -1123,7 +1126,7 @@ public class JobHistoryEventHandler extends AbstractService private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity createTaskEntity(HistoryEvent event, long timestamp, String taskId, String entityType, String relatedJobEntity, JobId jobId, - boolean setCreatedTime) { + boolean setCreatedTime, long taskIdPrefix) { org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = createBaseEntity(event, timestamp, entityType, setCreatedTime); entity.setId(taskId); @@ -1132,6 +1135,7 @@ public class JobHistoryEventHandler extends AbstractService ((TaskStartedEvent)event).getTaskType().toString()); } entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString()); + entity.setIdPrefix(taskIdPrefix); return entity; } @@ -1140,11 +1144,12 @@ public class JobHistoryEventHandler extends AbstractService private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity createTaskAttemptEntity(HistoryEvent event, long timestamp, String taskAttemptId, String entityType, String relatedTaskEntity, - String taskId, boolean setCreatedTime) { + String taskId, boolean setCreatedTime, long taskAttemptIdPrefix) { org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = createBaseEntity(event, timestamp, entityType, setCreatedTime); entity.setId(taskAttemptId); entity.addIsRelatedToEntity(relatedTaskEntity, taskId); + entity.setIdPrefix(taskAttemptIdPrefix); return entity; } @@ -1195,6 +1200,8 @@ public class JobHistoryEventHandler extends AbstractService String taskId = null; String taskAttemptId = null; boolean setCreatedTime = false; + long taskIdPrefix = 0; + long taskAttemptIdPrefix = 0; switch (event.getEventType()) { // Handle job events @@ -1217,15 +1224,21 @@ public class JobHistoryEventHandler extends AbstractService case TASK_STARTED: setCreatedTime = true; taskId = ((TaskStartedEvent)event).getTaskId().toString(); + taskIdPrefix = TimelineServiceHelper. + invertLong(((TaskStartedEvent)event).getStartTime()); break; case TASK_FAILED: taskId = ((TaskFailedEvent)event).getTaskId().toString(); + taskIdPrefix = TimelineServiceHelper. + invertLong(((TaskFailedEvent)event).getStartTime()); break; case TASK_UPDATED: taskId = ((TaskUpdatedEvent)event).getTaskId().toString(); break; case TASK_FINISHED: taskId = ((TaskFinishedEvent)event).getTaskId().toString(); + taskIdPrefix = TimelineServiceHelper. + invertLong(((TaskFinishedEvent)event).getStartTime()); break; case MAP_ATTEMPT_STARTED: case REDUCE_ATTEMPT_STARTED: @@ -1233,6 +1246,8 @@ public class JobHistoryEventHandler extends AbstractService taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString(); taskAttemptId = ((TaskAttemptStartedEvent)event). getTaskAttemptId().toString(); + taskAttemptIdPrefix = TimelineServiceHelper. + invertLong(((TaskAttemptStartedEvent)event).getStartTime()); break; case CLEANUP_ATTEMPT_STARTED: case SETUP_ATTEMPT_STARTED: @@ -1252,16 +1267,22 @@ public class JobHistoryEventHandler extends AbstractService getTaskId().toString(); taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event). getTaskAttemptId().toString(); + taskAttemptIdPrefix = TimelineServiceHelper.invertLong( + ((TaskAttemptUnsuccessfulCompletionEvent)event).getStartTime()); break; case MAP_ATTEMPT_FINISHED: taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString(); taskAttemptId = ((MapAttemptFinishedEvent)event). getAttemptId().toString(); + taskAttemptIdPrefix = TimelineServiceHelper. + invertLong(((MapAttemptFinishedEvent)event).getStartTime()); break; case REDUCE_ATTEMPT_FINISHED: taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString(); taskAttemptId = ((ReduceAttemptFinishedEvent)event). getAttemptId().toString(); + taskAttemptIdPrefix = TimelineServiceHelper. + invertLong(((ReduceAttemptFinishedEvent)event).getStartTime()); break; case SETUP_ATTEMPT_FINISHED: case CLEANUP_ATTEMPT_FINISHED: @@ -1290,12 +1311,12 @@ public class JobHistoryEventHandler extends AbstractService // TaskEntity tEntity = createTaskEntity(event, timestamp, taskId, MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE, - jobId, setCreatedTime); + jobId, setCreatedTime, taskIdPrefix); } else { // TaskAttemptEntity tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId, MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE, - taskId, setCreatedTime); + taskId, setCreatedTime, taskAttemptIdPrefix); } } try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 9ea1b9a..3faad48 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1530,7 +1530,7 @@ public abstract class TaskAttemptImpl implements StringUtils.join( LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt.getCounters(), taskAttempt - .getProgressSplitBlock().burst()); + .getProgressSplitBlock().burst(), taskAttempt.launchTime); return tauce; } @@ -1943,35 +1943,35 @@ public abstract class TaskAttemptImpl implements this.container == null ? -1 : this.container.getNodeId().getPort(); if (attemptId.getTaskId().getTaskType() == TaskType.MAP) { MapAttemptFinishedEvent mfe = - new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), - TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), - state.toString(), - this.reportedStatus.mapFinishTime, - finishTime, - containerHostName, - containerNodePort, - this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, - this.reportedStatus.stateString, - getCounters(), - getProgressSplitBlock().burst()); - eventHandler.handle( - new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe)); + new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), + TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), + state.toString(), + this.reportedStatus.mapFinishTime, + finishTime, + containerHostName, + containerNodePort, + this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, + this.reportedStatus.stateString, + getCounters(), + getProgressSplitBlock().burst(), launchTime); + eventHandler.handle( + new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe)); } else { - ReduceAttemptFinishedEvent rfe = - new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), - TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), - state.toString(), - this.reportedStatus.shuffleFinishTime, - this.reportedStatus.sortFinishTime, - finishTime, - containerHostName, - containerNodePort, - this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, - this.reportedStatus.stateString, - getCounters(), - getProgressSplitBlock().burst()); - eventHandler.handle( - new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe)); + ReduceAttemptFinishedEvent rfe = + new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), + TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), + state.toString(), + this.reportedStatus.shuffleFinishTime, + this.reportedStatus.sortFinishTime, + finishTime, + containerHostName, + containerNodePort, + this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName, + this.reportedStatus.stateString, + getCounters(), + getProgressSplitBlock().burst(), launchTime); + eventHandler.handle( + new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 8a6fa30..228ae24 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -139,6 +139,8 @@ public abstract class TaskImpl implements Task, EventHandler { private final Set inProgressAttempts; private boolean historyTaskStartGenerated = false; + // Launch time reported in history events. + private long launchTime; private static final SingleArcTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition(); @@ -705,8 +707,9 @@ public abstract class TaskImpl implements Task, EventHandler { } private void sendTaskStartedEvent() { + launchTime = getLaunchTime(); TaskStartedEvent tse = new TaskStartedEvent( - TypeConverter.fromYarn(taskId), getLaunchTime(), + TypeConverter.fromYarn(taskId), launchTime, TypeConverter.fromYarn(taskId.getTaskType()), getSplitsAsString()); eventHandler @@ -714,18 +717,19 @@ public abstract class TaskImpl implements Task, EventHandler { historyTaskStartGenerated = true; } - private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) { + private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, + TaskStateInternal taskState) { TaskFinishedEvent tfe = new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId), TypeConverter.fromYarn(task.successfulAttempt), task.getFinishTime(task.successfulAttempt), TypeConverter.fromYarn(task.taskId.getTaskType()), - taskState.toString(), - task.getCounters()); + taskState.toString(), task.getCounters(), task.launchTime); return tfe; } - private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List diag, TaskStateInternal taskState, TaskAttemptId taId) { + private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, + List diag, TaskStateInternal taskState, TaskAttemptId taId) { StringBuilder errorSb = new StringBuilder(); if (diag != null) { for (String d : diag) { @@ -740,7 +744,7 @@ public abstract class TaskImpl implements Task, EventHandler { errorSb.toString(), taskState.toString(), taId == null ? null : TypeConverter.fromYarn(taId), - task.getCounters()); + task.getCounters(), task.launchTime); return taskFailedEvent; } @@ -861,7 +865,8 @@ public abstract class TaskImpl implements Task, EventHandler { TaskFailedEvent tfe = new TaskFailedEvent(taskInfo.getTaskId(), taskInfo.getFinishTime(), taskInfo.getTaskType(), taskInfo.getError(), taskInfo.getTaskStatus(), - taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters()); + taskInfo.getFailedDueToAttemptId(), taskInfo.getCounters(), + launchTime); eventHandler.handle(new JobHistoryEvent(taskId.getJobId(), tfe)); eventHandler.handle( new JobTaskEvent(taskId, getExternalState(taskState))); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java index ac510b3..e271319 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestEvents.java @@ -58,7 +58,7 @@ public class TestEvents { Counters counters = new Counters(); TaskAttemptFinishedEvent test = new TaskAttemptFinishedEvent(taskAttemptId, TaskType.REDUCE, "TEST", 123L, "RAKNAME", "HOSTNAME", "STATUS", - counters); + counters, 234); assertEquals(test.getAttemptId().toString(), taskAttemptId.toString()); assertEquals(test.getCounters(), counters); @@ -69,7 +69,7 @@ public class TestEvents { assertEquals(test.getTaskId(), tid); assertEquals(test.getTaskStatus(), "TEST"); assertEquals(test.getTaskType(), TaskType.REDUCE); - + assertEquals(234, test.getStartTime()); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index 6c5e604..1a36cb9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -144,7 +144,7 @@ public class TestJobHistoryEventHandler { // First completion event, but min-queue-size for batching flushes is 10 handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null))); + t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0))); verify(mockWriter).flush(); } finally { @@ -180,7 +180,7 @@ public class TestJobHistoryEventHandler { for (int i = 0 ; i < 100 ; i++) { queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null))); + t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0))); } handleNextNEvents(jheh, 9); @@ -225,7 +225,7 @@ public class TestJobHistoryEventHandler { for (int i = 0 ; i < 100 ; i++) { queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null))); + t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0))); } handleNextNEvents(jheh, 9); @@ -268,7 +268,7 @@ public class TestJobHistoryEventHandler { for (int i = 0 ; i < 100 ; i++) { queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null))); + t.taskID, t.taskAttemptID, 0, TaskType.MAP, "", null, 0))); } queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent( TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters()))); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java index 3121c4e..2b1357e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java @@ -32,9 +32,10 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.util.SystemClock; /** - * Event to record successful completion of a map attempt + * Event to record successful completion of a map attempt. * */ @InterfaceAudience.Private @@ -58,9 +59,10 @@ public class MapAttemptFinishedEvent implements HistoryEvent { int[] cpuUsages; int[] vMemKbytes; int[] physMemKbytes; + private long startTime; /** - * Create an event for successful completion of map attempts + * Create an event for successful completion of map attempts. * @param id Task Attempt ID * @param taskType Type of the task * @param taskStatus Status of the task @@ -77,12 +79,13 @@ public class MapAttemptFinishedEvent implements HistoryEvent { * virtual memory and physical memory. * * If you have no splits data, code {@code null} for this - * parameter. + * parameter. + * @param startTs Task start time to be used for writing entity to ATSv2. */ - public MapAttemptFinishedEvent - (TaskAttemptID id, TaskType taskType, String taskStatus, - long mapFinishTime, long finishTime, String hostname, int port, - String rackName, String state, Counters counters, int[][] allSplits) { + public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long mapFinishTime, long finishTime, String hostname, + int port, String rackName, String state, Counters counters, + int[][] allSplits, long startTs) { this.attemptId = id; this.taskType = taskType; this.taskStatus = taskStatus; @@ -98,6 +101,16 @@ public class MapAttemptFinishedEvent implements HistoryEvent { this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits); this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits); this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits); + this.startTime = startTs; + } + + public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long mapFinishTime, long finishTime, String hostname, + int port, String rackName, String state, Counters counters, + int[][] allSplits) { + this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, port, + rackName, state, counters, allSplits, + SystemClock.getInstance().getTime()); } /** @@ -117,15 +130,13 @@ public class MapAttemptFinishedEvent implements HistoryEvent { * @param counters Counters for the attempt */ @Deprecated - public MapAttemptFinishedEvent - (TaskAttemptID id, TaskType taskType, String taskStatus, - long mapFinishTime, long finishTime, String hostname, - String state, Counters counters) { + public MapAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long mapFinishTime, long finishTime, String hostname, + String state, Counters counters) { this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, -1, "", state, counters, null); } - - + MapAttemptFinishedEvent() {} public Object getDatum() { @@ -175,38 +186,56 @@ public class MapAttemptFinishedEvent implements HistoryEvent { this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes()); } - /** Get the task ID */ - public TaskID getTaskId() { return attemptId.getTaskID(); } - /** Get the attempt id */ + /** Gets the task ID. */ + public TaskID getTaskId() { + return attemptId.getTaskID(); + } + /** Gets the attempt id. */ public TaskAttemptID getAttemptId() { return attemptId; } - /** Get the task type */ + /** Gets the task type. */ public TaskType getTaskType() { return taskType; } - /** Get the task status */ + /** Gets the task status. */ public String getTaskStatus() { return taskStatus.toString(); } - /** Get the map phase finish time */ + /** Gets the map phase finish time. */ public long getMapFinishTime() { return mapFinishTime; } - /** Get the attempt finish time */ + /** Gets the attempt finish time. */ public long getFinishTime() { return finishTime; } - /** Get the host name */ + /** + * Gets the task attempt start time. + * @return task attempt start time. + */ + public long getStartTime() { + return startTime; + } + /** Gets the host name. */ public String getHostname() { return hostname.toString(); } - /** Get the tracker rpc port */ + /** Gets the tracker rpc port. */ public int getPort() { return port; } - /** Get the rack name */ + /** Gets the rack name. */ public String getRackName() { return rackName == null ? null : rackName.toString(); } - - /** Get the state string */ - public String getState() { return state.toString(); } - /** Get the counters */ - Counters getCounters() { return counters; } - /** Get the event type */ + /** + * Gets the attempt state string. + * @return map attempt state + */ + public String getState() { + return state.toString(); + } + /** + * Gets the counters. + * @return counters + */ + Counters getCounters() { + return counters; + } + /** Gets the event type. */ public EventType getEventType() { return EventType.MAP_ATTEMPT_FINISHED; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java index 9c0f09b..5a16f83 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java @@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.util.SystemClock; /** * Event to record successful completion of a reduce attempt @@ -59,6 +60,7 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { int[] cpuUsages; int[] vMemKbytes; int[] physMemKbytes; + private long startTime; /** * Create an event to record completion of a reduce attempt @@ -76,13 +78,13 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { * @param allSplits the "splits", or a pixelated graph of various * measurable worker node state variables against progress. * Currently there are four; wallclock time, CPU time, - * virtual memory and physical memory. + * virtual memory and physical memory. + * @param startTs Task start time to be used for writing entity to ATSv2. */ - public ReduceAttemptFinishedEvent - (TaskAttemptID id, TaskType taskType, String taskStatus, - long shuffleFinishTime, long sortFinishTime, long finishTime, - String hostname, int port, String rackName, String state, - Counters counters, int[][] allSplits) { + public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long shuffleFinishTime, long sortFinishTime, + long finishTime, String hostname, int port, String rackName, + String state, Counters counters, int[][] allSplits, long startTs) { this.attemptId = id; this.taskType = taskType; this.taskStatus = taskStatus; @@ -99,6 +101,16 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { this.cpuUsages = ProgressSplitsBlock.arrayGetCPUTime(allSplits); this.vMemKbytes = ProgressSplitsBlock.arrayGetVMemKbytes(allSplits); this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits); + this.startTime = startTs; + } + + public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long shuffleFinishTime, long sortFinishTime, + long finishTime, String hostname, int port, String rackName, + String state, Counters counters, int[][] allSplits) { + this(id, taskType, taskStatus, shuffleFinishTime, sortFinishTime, + finishTime, hostname, port, rackName, state, counters, allSplits, + SystemClock.getInstance().getTime()); } /** @@ -118,13 +130,12 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { * @param state State of the attempt * @param counters Counters for the attempt */ - public ReduceAttemptFinishedEvent - (TaskAttemptID id, TaskType taskType, String taskStatus, - long shuffleFinishTime, long sortFinishTime, long finishTime, - String hostname, String state, Counters counters) { + public ReduceAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long shuffleFinishTime, long sortFinishTime, + long finishTime, String hostname, String state, Counters counters) { this(id, taskType, taskStatus, - shuffleFinishTime, sortFinishTime, finishTime, - hostname, -1, "", state, counters, null); + shuffleFinishTime, sortFinishTime, finishTime, + hostname, -1, "", state, counters, null); } ReduceAttemptFinishedEvent() {} @@ -178,39 +189,55 @@ public class ReduceAttemptFinishedEvent implements HistoryEvent { this.physMemKbytes = AvroArrayUtils.fromAvro(datum.getPhysMemKbytes()); } - /** Get the Task ID */ + /** Gets the Task ID. */ public TaskID getTaskId() { return attemptId.getTaskID(); } - /** Get the attempt id */ + /** Gets the attempt id. */ public TaskAttemptID getAttemptId() { return attemptId; } - /** Get the task type */ + /** Gets the task type. */ public TaskType getTaskType() { return taskType; } - /** Get the task status */ + /** Gets the task status. */ public String getTaskStatus() { return taskStatus.toString(); } - /** Get the finish time of the sort phase */ + /** Gets the finish time of the sort phase. */ public long getSortFinishTime() { return sortFinishTime; } - /** Get the finish time of the shuffle phase */ + /** Gets the finish time of the shuffle phase. */ public long getShuffleFinishTime() { return shuffleFinishTime; } - /** Get the finish time of the attempt */ + /** Gets the finish time of the attempt. */ public long getFinishTime() { return finishTime; } - /** Get the name of the host where the attempt ran */ + /** + * Gets the start time. + * @return task attempt start time. + */ + public long getStartTime() { + return startTime; + } + /** Gets the name of the host where the attempt ran. */ public String getHostname() { return hostname.toString(); } - /** Get the tracker rpc port */ + /** Gets the tracker rpc port. */ public int getPort() { return port; } - /** Get the rack name of the node where the attempt ran */ + /** Gets the rack name of the node where the attempt ran. */ public String getRackName() { return rackName == null ? null : rackName.toString(); } - - /** Get the state string */ - public String getState() { return state.toString(); } - /** Get the counters for the attempt */ - Counters getCounters() { return counters; } - /** Get the event type */ + /** + * Gets the state string. + * @return reduce attempt state + */ + public String getState() { + return state.toString(); + } + /** + * Gets the counters. + * @return counters + */ + Counters getCounters() { + return counters; + } + /** Gets the event type. */ public EventType getEventType() { return EventType.REDUCE_ATTEMPT_FINISHED; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java index a931ca2..c28c216 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.util.SystemClock; /** * Event to record successful task completion @@ -50,10 +51,11 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { private String hostname; private String state; private Counters counters; + private long startTime; /** - * Create an event to record successful finishes for setup and cleanup - * attempts + * Create an event to record successful finishes for setup and cleanup + * attempts. * @param id Attempt ID * @param taskType Type of task * @param taskStatus Status of task @@ -61,11 +63,12 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { * @param hostname Host where the attempt executed * @param state State string * @param counters Counters for the attempt + * @param startTs Task start time to be used for writing entity to ATSv2. */ public TaskAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, String taskStatus, long finishTime, String rackName, - String hostname, String state, Counters counters) { + String hostname, String state, Counters counters, long startTs) { this.attemptId = id; this.taskType = taskType; this.taskStatus = taskStatus; @@ -74,6 +77,14 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.hostname = hostname; this.state = state; this.counters = counters; + this.startTime = startTs; + } + + public TaskAttemptFinishedEvent(TaskAttemptID id, TaskType taskType, + String taskStatus, long finishTime, String rackName, String hostname, + String state, Counters counters) { + this(id, taskType, taskStatus, finishTime, rackName, hostname, state, + counters, SystemClock.getInstance().getTime()); } TaskAttemptFinishedEvent() {} @@ -107,33 +118,43 @@ public class TaskAttemptFinishedEvent implements HistoryEvent { this.counters = EventReader.fromAvro(datum.getCounters()); } - /** Get the task ID */ + /** Gets the task ID. */ public TaskID getTaskId() { return attemptId.getTaskID(); } - /** Get the task attempt id */ + /** Gets the task attempt id. */ public TaskAttemptID getAttemptId() { return attemptId; } - /** Get the task type */ + /** Gets the task type. */ public TaskType getTaskType() { return taskType; } - /** Get the task status */ + /** Gets the task status. */ public String getTaskStatus() { return taskStatus.toString(); } - /** Get the attempt finish time */ + /** Gets the attempt finish time. */ public long getFinishTime() { return finishTime; } - /** Get the host where the attempt executed */ + /** + * Gets the task attempt start time to be used while publishing to ATSv2. + * @return task attempt start time. + */ + public long getStartTime() { + return startTime; + } + /** Gets the host where the attempt executed. */ public String getHostname() { return hostname.toString(); } - /** Get the rackname where the attempt executed */ + /** Gets the rackname where the attempt executed. */ public String getRackName() { return rackName == null ? null : rackName.toString(); } - /** Get the state string */ + /** + * Gets the state string. + * @return task attempt state. + */ public String getState() { return state.toString(); } - /** Get the counters for the attempt */ + /** Gets the counters for the attempt. */ Counters getCounters() { return counters; } - /** Get the event type */ + /** Gets the event type. */ public EventType getEventType() { // Note that the task type can be setup/map/reduce/cleanup but the // attempt-type can only be map/reduce. http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java index 1732d91..491700c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java @@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.util.SystemClock; /** * Event to record unsuccessful (Killed/Failed) completion of task attempts @@ -58,10 +59,11 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { int[] cpuUsages; int[] vMemKbytes; int[] physMemKbytes; + private long startTime; private static final Counters EMPTY_COUNTERS = new Counters(); /** - * Create an event to record the unsuccessful completion of attempts + * Create an event to record the unsuccessful completion of attempts. * @param id Attempt ID * @param taskType Type of the task * @param status Status of the attempt @@ -74,13 +76,14 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { * @param allSplits the "splits", or a pixelated graph of various * measurable worker node state variables against progress. * Currently there are four; wallclock time, CPU time, - * virtual memory and physical memory. + * virtual memory and physical memory. + * @param startTs Task start time to be used for writing entity to ATSv2. */ public TaskAttemptUnsuccessfulCompletionEvent (TaskAttemptID id, TaskType taskType, String status, long finishTime, String hostname, int port, String rackName, - String error, Counters counters, int[][] allSplits) { + String error, Counters counters, int[][] allSplits, long startTs) { this.attemptId = id; this.taskType = taskType; this.status = status; @@ -99,6 +102,15 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { ProgressSplitsBlock.arrayGetVMemKbytes(allSplits); this.physMemKbytes = ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits); + this.startTime = startTs; + } + + public TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID id, + TaskType taskType, String status, long finishTime, String hostname, + int port, String rackName, String error, Counters counters, + int[][] allSplits) { + this(id, taskType, status, finishTime, hostname, port, rackName, error, + counters, allSplits, SystemClock.getInstance().getTime()); } /** @@ -190,39 +202,49 @@ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { AvroArrayUtils.fromAvro(datum.getPhysMemKbytes()); } - /** Get the task id */ + /** Gets the task id. */ public TaskID getTaskId() { return attemptId.getTaskID(); } - /** Get the task type */ + /** Gets the task type. */ public TaskType getTaskType() { return TaskType.valueOf(taskType.toString()); } - /** Get the attempt id */ + /** Gets the attempt id. */ public TaskAttemptID getTaskAttemptId() { return attemptId; } - /** Get the finish time */ + /** Gets the finish time. */ public long getFinishTime() { return finishTime; } - /** Get the name of the host where the attempt executed */ + /** + * Gets the task attempt start time to be used while publishing to ATSv2. + * @return task attempt start time. + */ + public long getStartTime() { + return startTime; + } + /** Gets the name of the host where the attempt executed. */ public String getHostname() { return hostname; } - /** Get the rpc port for the host where the attempt executed */ + /** Gets the rpc port for the host where the attempt executed. */ public int getPort() { return port; } - /** Get the rack name of the node where the attempt ran */ + /** Gets the rack name of the node where the attempt ran. */ public String getRackName() { return rackName == null ? null : rackName.toString(); } - /** Get the error string */ + /** Gets the error string. */ public String getError() { return error.toString(); } - /** Get the task status */ + /** + * Gets the task attempt status. + * @return task attempt status. + */ public String getTaskStatus() { return status.toString(); } - /** Get the counters */ + /** Gets the counters. */ Counters getCounters() { return counters; } - /** Get the event type */ + /** Gets the event type. */ public EventType getEventType() { // Note that the task type can be setup/map/reduce/cleanup but the // attempt-type can only be map/reduce. http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java index d14350d..b4d9e41 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java @@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.util.SystemClock; /** * Event to record the failure of a task @@ -49,11 +50,12 @@ public class TaskFailedEvent implements HistoryEvent { private String status; private String error; private Counters counters; + private long startTime; private static final Counters EMPTY_COUNTERS = new Counters(); /** - * Create an event to record task failure + * Create an event to record task failure. * @param id Task ID * @param finishTime Finish time of the task * @param taskType Type of the task @@ -61,10 +63,11 @@ public class TaskFailedEvent implements HistoryEvent { * @param status Status * @param failedDueToAttempt The attempt id due to which the task failed * @param counters Counters for the task + * @param startTs task start time. */ public TaskFailedEvent(TaskID id, long finishTime, TaskType taskType, String error, String status, - TaskAttemptID failedDueToAttempt, Counters counters) { + TaskAttemptID failedDueToAttempt, Counters counters, long startTs) { this.id = id; this.finishTime = finishTime; this.taskType = taskType; @@ -72,15 +75,23 @@ public class TaskFailedEvent implements HistoryEvent { this.status = status; this.failedDueToAttempt = failedDueToAttempt; this.counters = counters; + this.startTime = startTs; + } + + public TaskFailedEvent(TaskID id, long finishTime, TaskType taskType, + String error, String status, TaskAttemptID failedDueToAttempt, + Counters counters) { + this(id, finishTime, taskType, error, status, failedDueToAttempt, counters, + SystemClock.getInstance().getTime()); } public TaskFailedEvent(TaskID id, long finishTime, - TaskType taskType, String error, String status, - TaskAttemptID failedDueToAttempt) { - this(id, finishTime, taskType, error, status, - failedDueToAttempt, EMPTY_COUNTERS); + TaskType taskType, String error, String status, + TaskAttemptID failedDueToAttempt) { + this(id, finishTime, taskType, error, status, failedDueToAttempt, + EMPTY_COUNTERS); } - + TaskFailedEvent() {} public Object getDatum() { @@ -118,27 +129,37 @@ public class TaskFailedEvent implements HistoryEvent { EventReader.fromAvro(datum.getCounters()); } - /** Get the task id */ + /** Gets the task id. */ public TaskID getTaskId() { return id; } - /** Get the error string */ + /** Gets the error string. */ public String getError() { return error; } - /** Get the finish time of the attempt */ + /** Gets the finish time of the attempt. */ public long getFinishTime() { return finishTime; } - /** Get the task type */ + /** + * Gets the task start time to be reported to ATSv2. + * @return task start time. + */ + public long getStartTime() { + return startTime; + } + /** Gets the task type. */ public TaskType getTaskType() { return taskType; } - /** Get the attempt id due to which the task failed */ + /** Gets the attempt id due to which the task failed. */ public TaskAttemptID getFailedAttemptID() { return failedDueToAttempt; } - /** Get the task status */ + /** + * Gets the task status. + * @return task status + */ public String getTaskStatus() { return status; } - /** Get task counters */ + /** Gets task counters. */ public Counters getCounters() { return counters; } - /** Get the event type */ + /** Gets the event type. */ public EventType getEventType() { return EventType.TASK_FAILED; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java index 0bc4383..97557c7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java @@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.util.SystemClock; /** * Event to record the successful completion of a task @@ -49,27 +50,36 @@ public class TaskFinishedEvent implements HistoryEvent { private TaskType taskType; private String status; private Counters counters; - + private long startTime; + /** - * Create an event to record the successful completion of a task + * Create an event to record the successful completion of a task. * @param id Task ID * @param attemptId Task Attempt ID of the successful attempt for this task * @param finishTime Finish time of the task * @param taskType Type of the task * @param status Status string * @param counters Counters for the task + * @param startTs task start time */ public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime, TaskType taskType, - String status, Counters counters) { + String status, Counters counters, long startTs) { this.taskid = id; this.successfulAttemptId = attemptId; this.finishTime = finishTime; this.taskType = taskType; this.status = status; this.counters = counters; + this.startTime = startTs; } - + + public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime, + TaskType taskType, String status, Counters counters) { + this(id, attemptId, finishTime, taskType, status, counters, + SystemClock.getInstance().getTime()); + } + TaskFinishedEvent() {} public Object getDatum() { @@ -101,23 +111,33 @@ public class TaskFinishedEvent implements HistoryEvent { this.counters = EventReader.fromAvro(datum.getCounters()); } - /** Get task id */ + /** Gets task id. */ public TaskID getTaskId() { return taskid; } - /** Get successful task attempt id */ + /** Gets successful task attempt id. */ public TaskAttemptID getSuccessfulTaskAttemptId() { return successfulAttemptId; } - /** Get the task finish time */ + /** Gets the task finish time. */ public long getFinishTime() { return finishTime; } - /** Get task counters */ + /** + * Gets the task start time to be reported to ATSv2. + * @return task start time + */ + public long getStartTime() { + return startTime; + } + /** Gets task counters. */ public Counters getCounters() { return counters; } - /** Get task type */ + /** Gets task type. */ public TaskType getTaskType() { return taskType; } - /** Get task status */ + /** + * Gets task status. + * @return task status + */ public String getTaskStatus() { return status.toString(); } - /** Get event type */ + /** Gets event type. */ public EventType getEventType() { return EventType.TASK_FINISHED; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java index eca51fb..9b4448f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java @@ -298,10 +298,10 @@ public class TestMRTimelineEventHandling { " does not exist.", jobEventFile.exists()); verifyEntity(jobEventFile, EventType.JOB_FINISHED.name(), - true, false, null); + true, false, null, false); Set cfgsToCheck = Sets.newHashSet("dummy_conf1", "dummy_conf2", "huge_dummy_conf1", "huge_dummy_conf2"); - verifyEntity(jobEventFile, null, false, true, cfgsToCheck); + verifyEntity(jobEventFile, null, false, true, cfgsToCheck, false); // for this test, we expect MR job metrics are published in YARN_APPLICATION String outputAppDir = @@ -322,8 +322,8 @@ public class TestMRTimelineEventHandling { "appEventFilePath: " + appEventFilePath + " does not exist.", appEventFile.exists()); - verifyEntity(appEventFile, null, true, false, null); - verifyEntity(appEventFile, null, false, true, cfgsToCheck); + verifyEntity(appEventFile, null, true, false, null, false); + verifyEntity(appEventFile, null, false, true, cfgsToCheck, false); // check for task event file String outputDirTask = @@ -344,7 +344,7 @@ public class TestMRTimelineEventHandling { " does not exist.", taskEventFile.exists()); verifyEntity(taskEventFile, EventType.TASK_FINISHED.name(), - true, false, null); + true, false, null, true); // check for task attempt event file String outputDirTaskAttempt = @@ -363,7 +363,7 @@ public class TestMRTimelineEventHandling { Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath + " does not exist.", taskAttemptEventFile.exists()); verifyEntity(taskAttemptEventFile, EventType.MAP_ATTEMPT_FINISHED.name(), - true, false, null); + true, false, null, true); } /** @@ -380,12 +380,13 @@ public class TestMRTimelineEventHandling { * @throws IOException */ private void verifyEntity(File entityFile, String eventId, - boolean chkMetrics, boolean chkCfg, Set cfgsToVerify) - throws IOException { + boolean chkMetrics, boolean chkCfg, Set cfgsToVerify, + boolean checkIdPrefix) throws IOException { BufferedReader reader = null; String strLine; try { reader = new BufferedReader(new FileReader(entityFile)); + long idPrefix = -1; while ((strLine = reader.readLine()) != null) { if (strLine.trim().length() > 0) { org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity @@ -394,6 +395,19 @@ public class TestMRTimelineEventHandling { strLine.trim(), org.apache.hadoop.yarn.api.records.timelineservice. TimelineEntity.class); + + LOG.info("strLine.trim()= " + strLine.trim()); + if (checkIdPrefix) { + Assert.assertTrue("Entity ID prefix expected to be > 0" , + entity.getIdPrefix() > 0); + if (idPrefix == -1) { + idPrefix = entity.getIdPrefix(); + } else { + Assert.assertEquals("Entity ID prefix should be same across " + + "each publish of same entity", + idPrefix, entity.getIdPrefix()); + } + } if (eventId == null) { // Job metrics are published without any events for // ApplicationEntity. There is also possibility that some other http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 4daebb5..11911cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -104,6 +104,8 @@ import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.TimelineServiceHelper; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.log4j.LogManager; @@ -314,6 +316,17 @@ public class ApplicationMaster { Collections.newSetFromMap(new ConcurrentHashMap()); /** + * Container start times used to set id prefix while publishing entity + * to ATSv2. + */ + private final ConcurrentMap containerStartTimes = + new ConcurrentHashMap(); + + private ConcurrentMap getContainerStartTimes() { + return containerStartTimes; + } + + /** * @param args Command line args */ public static void main(String[] args) { @@ -866,7 +879,15 @@ public class ApplicationMaster { + containerStatus.getContainerId()); } if (timelineServiceV2Enabled) { - publishContainerEndEventOnTimelineServiceV2(containerStatus); + Long containerStartTime = + containerStartTimes.get(containerStatus.getContainerId()); + if (containerStartTime == null) { + containerStartTime = SystemClock.getInstance().getTime(); + containerStartTimes.put(containerStatus.getContainerId(), + containerStartTime); + } + publishContainerEndEventOnTimelineServiceV2(containerStatus, + containerStartTime); } else if (timelineServiceV1Enabled) { publishContainerEndEvent(timelineClient, containerStatus, domainId, appSubmitterUgi); @@ -994,8 +1015,10 @@ public class ApplicationMaster { containerId, container.getNodeId()); } if (applicationMaster.timelineServiceV2Enabled) { - applicationMaster - .publishContainerStartEventOnTimelineServiceV2(container); + long startTime = SystemClock.getInstance().getTime(); + applicationMaster.getContainerStartTimes().put(containerId, startTime); + applicationMaster.publishContainerStartEventOnTimelineServiceV2( + container, startTime); } else if (applicationMaster.timelineServiceV1Enabled) { applicationMaster.publishContainerStartEvent( applicationMaster.timelineClient, container, @@ -1356,24 +1379,24 @@ public class ApplicationMaster { } private void publishContainerStartEventOnTimelineServiceV2( - Container container) { + Container container, long startTime) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice. TimelineEntity(); entity.setId(container.getId().toString()); entity.setType(DSEntity.DS_CONTAINER.toString()); - long ts = System.currentTimeMillis(); - entity.setCreatedTime(ts); + entity.setCreatedTime(startTime); entity.addInfo("user", appSubmitterUgi.getShortUserName()); org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event = new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent(); - event.setTimestamp(ts); + event.setTimestamp(startTime); event.setId(DSEvent.DS_CONTAINER_START.toString()); event.addInfo("Node", container.getNodeId().toString()); event.addInfo("Resources", container.getResource().toString()); entity.addEvent(event); + entity.setIdPrefix(TimelineServiceHelper.invertLong(startTime)); try { appSubmitterUgi.doAs(new PrivilegedExceptionAction() { @@ -1391,7 +1414,7 @@ public class ApplicationMaster { } private void publishContainerEndEventOnTimelineServiceV2( - final ContainerStatus container) { + final ContainerStatus container, long containerStartTime) { final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = new org.apache.hadoop.yarn.api.records.timelineservice. @@ -1407,6 +1430,7 @@ public class ApplicationMaster { event.addInfo("State", container.getState().name()); event.addInfo("Exit Status", container.getExitStatus()); entity.addEvent(event); + entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); try { appSubmitterUgi.doAs(new PrivilegedExceptionAction() { @@ -1441,6 +1465,8 @@ public class ApplicationMaster { event.setId(appEvent.toString()); event.setTimestamp(ts); entity.addEvent(event); + entity.setIdPrefix( + TimelineServiceHelper.invertLong(appAttemptID.getAttemptId())); try { appSubmitterUgi.doAs(new PrivilegedExceptionAction() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index ef21c87..47485ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -64,7 +64,9 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter; import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient; @@ -81,6 +83,7 @@ import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils; import org.apache.hadoop.yarn.server.timeline.TimelineVersion; import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher; import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; +import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; @@ -523,15 +526,31 @@ public class TestDistributedShell { "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId() + "_000001" + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - verifyEntityTypeFileExists(basePath, "DS_APP_ATTEMPT", - appTimestampFileName); - - // Verify DS_CONTAINER entities posted by the client + File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath, + "DS_APP_ATTEMPT", appTimestampFileName); + // Check if required events are published and same idprefix is sent for + // on each publish. + verifyEntityForTimelineV2(dsAppAttemptEntityFile, + DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true); + // to avoid race condition of testcase, atleast check 40 times with sleep + // of 50ms + verifyEntityForTimelineV2(dsAppAttemptEntityFile, + DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true); + + // Verify DS_CONTAINER entities posted by the client. String containerTimestampFileName = "container_" + appId.getClusterTimestamp() + "_000" + appId.getId() + "_01_000002.thist"; - verifyEntityTypeFileExists(basePath, "DS_CONTAINER", - containerTimestampFileName); + File dsContainerEntityFile = verifyEntityTypeFileExists(basePath, + "DS_CONTAINER", containerTimestampFileName); + // Check if required events are published and same idprefix is sent for + // on each publish. + verifyEntityForTimelineV2(dsContainerEntityFile, + DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true); + // to avoid race condition of testcase, atleast check 40 times with sleep + // of 50ms + verifyEntityForTimelineV2(dsContainerEntityFile, + DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true); // Verify NM posting container metrics info. String containerMetricsTimestampFileName = @@ -541,29 +560,13 @@ public class TestDistributedShell { File containerEntityFile = verifyEntityTypeFileExists(basePath, TimelineEntityType.YARN_CONTAINER.toString(), containerMetricsTimestampFileName); - Assert.assertEquals( - "Container created event needs to be published atleast once", - 1, - getNumOfStringOccurrences(containerEntityFile, - ContainerMetricsConstants.CREATED_EVENT_TYPE)); - - // to avoid race condition of testcase, atleast check 4 times with sleep - // of 500ms - long numOfContainerFinishedOccurrences = 0; - for (int i = 0; i < 4; i++) { - numOfContainerFinishedOccurrences = - getNumOfStringOccurrences(containerEntityFile, - ContainerMetricsConstants.FINISHED_EVENT_TYPE); - if (numOfContainerFinishedOccurrences > 0) { - break; - } else { - Thread.sleep(500L); - } - } - Assert.assertEquals( - "Container finished event needs to be published atleast once", - 1, - numOfContainerFinishedOccurrences); + verifyEntityForTimelineV2(containerEntityFile, + ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true); + + // to avoid race condition of testcase, atleast check 40 times with sleep + // of 50ms + verifyEntityForTimelineV2(containerEntityFile, + ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true); // Verify RM posting Application life cycle Events are getting published String appMetricsTimestampFileName = @@ -573,29 +576,14 @@ public class TestDistributedShell { verifyEntityTypeFileExists(basePath, TimelineEntityType.YARN_APPLICATION.toString(), appMetricsTimestampFileName); - Assert.assertEquals( - "Application created event should be published atleast once", - 1, - getNumOfStringOccurrences(appEntityFile, - ApplicationMetricsConstants.CREATED_EVENT_TYPE)); - - // to avoid race condition of testcase, atleast check 4 times with sleep - // of 500ms - long numOfStringOccurrences = 0; - for (int i = 0; i < 4; i++) { - numOfStringOccurrences = - getNumOfStringOccurrences(appEntityFile, - ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - if (numOfStringOccurrences > 0) { - break; - } else { - Thread.sleep(500L); - } - } - Assert.assertEquals( - "Application finished event should be published atleast once", - 1, - numOfStringOccurrences); + // No need to check idprefix for app. + verifyEntityForTimelineV2(appEntityFile, + ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false); + + // to avoid race condition of testcase, atleast check 40 times with sleep + // of 50ms + verifyEntityForTimelineV2(appEntityFile, + ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false); // Verify RM posting AppAttempt life cycle Events are getting published String appAttemptMetricsTimestampFileName = @@ -606,17 +594,10 @@ public class TestDistributedShell { verifyEntityTypeFileExists(basePath, TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(), appAttemptMetricsTimestampFileName); - Assert.assertEquals( - "AppAttempt register event should be published atleast once", - 1, - getNumOfStringOccurrences(appAttemptEntityFile, - AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)); - - Assert.assertEquals( - "AppAttempt finished event should be published atleast once", - 1, - getNumOfStringOccurrences(appAttemptEntityFile, - AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)); + verifyEntityForTimelineV2(appAttemptEntityFile, + AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true); + verifyEntityForTimelineV2(appAttemptEntityFile, + AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true); } finally { FileUtils.deleteDirectory(tmpRootFolder.getParentFile()); } @@ -636,22 +617,64 @@ public class TestDistributedShell { return entityFile; } - private long getNumOfStringOccurrences(File entityFile, String searchString) - throws IOException { - BufferedReader reader = null; - String strLine; + /** + * Checks the events and idprefix published for an entity. + * + * @param entityFile Entity file. + * @param expectedEvent Expected event Id. + * @param numOfExpectedEvent Number of expected occurences of expected event + * id. + * @param checkTimes Number of times to check. + * @param sleepTime Sleep time for each iteration. + * @param checkIdPrefix Whether to check idprefix. + * @throws IOException if entity file reading fails. + * @throws InterruptedException if sleep is interrupted. + */ + private void verifyEntityForTimelineV2(File entityFile, String expectedEvent, + long numOfExpectedEvent, int checkTimes, long sleepTime, + boolean checkIdPrefix) throws IOException, InterruptedException { long actualCount = 0; - try { - reader = new BufferedReader(new FileReader(entityFile)); - while ((strLine = reader.readLine()) != null) { - if (strLine.trim().contains(searchString)) { - actualCount++; + for (int i = 0; i < checkTimes; i++) { + BufferedReader reader = null; + String strLine = null; + actualCount = 0; + try { + reader = new BufferedReader(new FileReader(entityFile)); + long idPrefix = -1; + while ((strLine = reader.readLine()) != null) { + String entityLine = strLine.trim(); + if (entityLine.isEmpty()) { + continue; + } + if (entityLine.contains(expectedEvent)) { + actualCount++; + } + if (checkIdPrefix) { + TimelineEntity entity = FileSystemTimelineReaderImpl. + getTimelineRecordFromJSON(entityLine, TimelineEntity.class); + Assert.assertTrue("Entity ID prefix expected to be > 0" , + entity.getIdPrefix() > 0); + if (idPrefix == -1) { + idPrefix = entity.getIdPrefix(); + } else { + Assert.assertEquals("Entity ID prefix should be same across " + + "each publish of same entity", + idPrefix, entity.getIdPrefix()); + } + } } + } finally { + reader.close(); + } + if (numOfExpectedEvent == actualCount) { + break; + } + if (sleepTime > 0 && i < checkTimes - 1) { + Thread.sleep(sleepTime); } - } finally { - reader.close(); } - return actualCount; + Assert.assertEquals("Unexpected number of " + expectedEvent + + " event published.", numOfExpectedEvent, actualCount); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index d82c728..125b046 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -150,6 +150,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProv import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; @@ -1011,10 +1012,11 @@ public class ContainerManagerImpl extends CompositeService implements Credentials credentials = YarnServerSecurityUtils.parseCredentials(launchContext); + long containerStartTime = SystemClock.getInstance().getTime(); Container container = new ContainerImpl(getConfig(), this.dispatcher, launchContext, credentials, metrics, containerTokenIdentifier, - context); + context, containerStartTime); ApplicationId applicationID = containerId.getApplicationAttemptId().getApplicationId(); if (context.getContainers().putIfAbsent(containerId, container) != null) { @@ -1067,7 +1069,7 @@ public class ContainerManagerImpl extends CompositeService implements } this.context.getNMStateStore().storeContainer(containerId, - containerTokenIdentifier.getVersion(), request); + containerTokenIdentifier.getVersion(), containerStartTime, request); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java index 0a8ffdf..09c946b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java @@ -23,12 +23,16 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; public class ApplicationContainerFinishedEvent extends ApplicationEvent { private ContainerStatus containerStatus; + // Required by NMTimelinePublisher. + private long containerStartTime; - public ApplicationContainerFinishedEvent(ContainerStatus containerStatus) { + public ApplicationContainerFinishedEvent(ContainerStatus containerStatus, + long containerStartTs) { super(containerStatus.getContainerId().getApplicationAttemptId(). getApplicationId(), ApplicationEventType.APPLICATION_CONTAINER_FINISHED); this.containerStatus = containerStatus; + this.containerStartTime = containerStartTs; } public ContainerId getContainerID() { @@ -39,4 +43,7 @@ public class ApplicationContainerFinishedEvent extends ApplicationEvent { return containerStatus; } + public long getContainerStartTime() { + return containerStartTime; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index bd3f06d..14588a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -37,6 +37,8 @@ public interface Container extends EventHandler { ContainerId getContainerId(); + long getContainerStartTime(); + Resource getResource(); void setResource(Resource targetResource); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6808a30b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 055e12c..3d13dd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -170,10 +170,10 @@ public class ContainerImpl implements Container { /** The NM-wide configuration - not specific to this container */ private final Configuration daemonConf; + private final long startTime; private static final Log LOG = LogFactory.getLog(ContainerImpl.class); - // whether container has been recovered after a restart private RecoveredContainerStatus recoveredStatus = RecoveredContainerStatus.REQUESTED; @@ -186,6 +186,16 @@ public class ContainerImpl implements Container { ContainerLaunchContext launchContext, Credentials creds, NodeManagerMetrics metrics, ContainerTokenIdentifier containerTokenIdentifier, Context context) { + this(conf, dispatcher, launchContext, creds, metrics, + containerTokenIdentifier, context, SystemClock.getInstance().getTime()); + } + + public ContainerImpl(Configuration conf, Dispatcher dispatcher, + ContainerLaunchContext launchContext, Credentials creds, + NodeManagerMetrics metrics, + ContainerTokenIdentifier containerTokenIdentifier, Context context, + long startTs) { + this.startTime = startTs; this.daemonConf = conf; this.dispatcher = dispatcher; this.stateStore = context.getNMStateStore(); @@ -260,7 +270,7 @@ public class ContainerImpl implements Container { ContainerTokenIdentifier containerTokenIdentifier, Context context, RecoveredContainerState rcs) { this(conf, dispatcher, launchContext, creds, metrics, - containerTokenIdentifier, context); + containerTokenIdentifier, context, rcs.getStartTime()); this.recoveredStatus = rcs.getStatus(); this.exitCode = rcs.getExitCode(); this.recoveredAsKilled = rcs.getKilled(); @@ -635,6 +645,11 @@ public class ContainerImpl implements Container { } @Override + public long getContainerStartTime() { + return this.startTime; + } + + @Override public Resource getResource() { return Resources.clone(this.resource); } @@ -694,7 +709,8 @@ public class ContainerImpl implements Container { EventHandler eventHandler = dispatcher.getEventHandler(); ContainerStatus containerStatus = cloneAndGetContainerStatus(); - eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus)); + eventHandler.handle( + new ApplicationContainerFinishedEvent(containerStatus, startTime)); // Tell the scheduler the container is Done eventHandler.handle(new ContainerSchedulerEvent(this, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org