tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject [1/2] tez git commit: TEZ-2789. Backport events added in TEZ-2612 to branch-0.7 (bikas)
Date Wed, 09 Sep 2015 20:59:07 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 9be8cd47f -> bc56ca315


http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index f43f52c..9c321ff 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -215,7 +215,7 @@ public class TestTaskRecovery {
     long taStartTime = taskStartTime + 100L;
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null,
0));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(0, task.getFinishedAttemptsCount());
     assertEquals(taskScheduledTime, task.scheduledTime);
@@ -286,7 +286,7 @@ public class TestTaskRecovery {
     restoreFromTaskStartEvent();
     TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
     task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-        0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"",
new TezCounters()));
+        0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"",
new TezCounters(), null));
     task.handle(new TaskEventRecoverTask(task.getTaskId()));
     // wait for the second task attempt is scheduled
     dispatcher.await();
@@ -307,7 +307,7 @@ public class TestTaskRecovery {
     restoreFromTaskStartEvent();
     TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
     task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-        0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"",
new TezCounters()));
+        0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"",
new TezCounters(), null));
     task.handle(new TaskEventRecoverTask(task.getTaskId()));
     // wait for the second task attempt is scheduled
     dispatcher.await();
@@ -329,7 +329,7 @@ public class TestTaskRecovery {
     TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
     try {
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
-          0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters()));
+          0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), null));
       fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)");
     } catch (TezUncheckedException e) {
       assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover"));
@@ -372,7 +372,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -405,7 +405,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -438,7 +438,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -473,7 +473,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -516,7 +516,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -528,7 +528,7 @@ public class TestTaskRecovery {
     recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -563,7 +563,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -575,7 +575,7 @@ public class TestTaskRecovery {
     recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -614,7 +614,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -654,7 +654,7 @@ public class TestTaskRecovery {
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), null));
     assertEquals(TaskState.SUCCEEDED, recoveredState);
     assertEquals(1, task.getAttempts().size());
     assertEquals(1, task.getFinishedAttemptsCount());
@@ -721,7 +721,7 @@ public class TestTaskRecovery {
     TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName,
-            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+            taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null,
0));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(TaskAttemptStateInternal.NEW,
         ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
@@ -735,7 +735,7 @@ public class TestTaskRecovery {
     recoveredState =
         task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
             taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
-            "", new TezCounters()));
+            "", new TezCounters(), null));
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(TaskAttemptStateInternal.NEW,
         ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
@@ -774,9 +774,9 @@ public class TestTaskRecovery {
     for (int i = 0; i < maxFailedAttempts; ++i) {
       TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+          mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.KILLED, null, "", null));
+          0, TaskAttemptState.KILLED, null, "", null, null));
     }
     assertEquals(maxFailedAttempts, task.getAttempts().size());
     assertEquals(0, task.failedAttempts);
@@ -804,9 +804,9 @@ public class TestTaskRecovery {
     for (int i = 0; i < maxFailedAttempts; ++i) {
       TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+          mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.FAILED, null, "", null));
+          0, TaskAttemptState.FAILED, null, "", null, null));
     }
     assertEquals(maxFailedAttempts, task.getAttempts().size());
     assertEquals(maxFailedAttempts, task.failedAttempts);
@@ -834,9 +834,9 @@ public class TestTaskRecovery {
     for (int i = 0; i < maxFailedAttempts - 1; ++i) {
       TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
       task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
-          mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+          mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0));
       task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
-          0, TaskAttemptState.FAILED, null, "", null));
+          0, TaskAttemptState.FAILED, null, "", null, null));
     }
     assertEquals(maxFailedAttempts - 1, task.getAttempts().size());
     assertEquals(maxFailedAttempts - 1, task.failedAttempts);
@@ -844,7 +844,7 @@ public class TestTaskRecovery {
     TezTaskAttemptID newTaskAttemptId = getNewTaskAttemptID(task.getTaskId());
     TaskState recoveredState =
         task.restoreFromEvent(new TaskAttemptStartedEvent(newTaskAttemptId,
-            vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", ""));
+            vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null,
0));
 
     assertEquals(TaskState.RUNNING, recoveredState);
     assertEquals(TaskAttemptStateInternal.NEW,

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 67bad47..1ae9289 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -110,6 +110,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.MockClock;
 import org.apache.tez.dag.app.TaskAttemptEventInfo;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
@@ -2271,6 +2272,8 @@ public class TestVertexImpl {
       }})
     .when(execService).submit((Callable<Void>) any());
     
+    MockClock clock = new MockClock();
+    
     doReturn(execService).when(appContext).getExecService();
     doReturn(conf).when(appContext).getAMConf();
     doReturn(new Credentials()).when(dag).getCredentials();
@@ -2281,7 +2284,8 @@ public class TestVertexImpl {
     doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources();
     doReturn(historyEventHandler).when(appContext).getHistoryHandler();
     doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler();
-
+    doReturn(clock).when(appContext).getClock();
+    
     vertexGroups = Maps.newHashMap();
     for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
       vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo));
@@ -2997,6 +3001,11 @@ public class TestVertexImpl {
     dispatcher.await();
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(2, v.getCompletedTasks());
+    Assert.assertTrue(v.initTimeRequested > 0);
+    Assert.assertTrue(v.initedTime > 0);
+    Assert.assertTrue(v.startTimeRequested > 0);
+    Assert.assertTrue(v.startedTime > 0);
+    Assert.assertTrue(v.finishTime > 0);
   }
 
   @Test(timeout = 5000)
@@ -3307,10 +3316,25 @@ public class TestVertexImpl {
     initAllVertices(VertexState.INITED);
 
     VertexImpl v6 = vertices.get("vertex6");
+    VertexImpl v3 = vertices.get("vertex3");
 
     startVertex(vertices.get("vertex1"));
+    dispatcher.await();
+    Assert.assertEquals(VertexState.INITED, v3.getState());
+    long v3StartTimeRequested = v3.startTimeRequested;
+    Assert.assertEquals(1, v3.numStartedSourceVertices);
+    Assert.assertTrue(v3StartTimeRequested > 0);
+    try {
+      Thread.sleep(100);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
     startVertex(vertices.get("vertex2"));
     dispatcher.await();
+    // start request from second source vertex overrides the value from the first source
vertex
+    Assert.assertEquals(VertexState.RUNNING, v3.getState());
+    Assert.assertEquals(2, v3.numStartedSourceVertices);
+    Assert.assertTrue(v3.startTimeRequested > v3StartTimeRequested);
     LOG.info("Verifying v6 state " + v6.getState());
     Assert.assertEquals(VertexState.RUNNING, v6.getState());
     Assert.assertEquals(3, v6.getDistanceFromRoot());
@@ -3692,10 +3716,15 @@ public class TestVertexImpl {
     // v3 still initializing with source vertex started. So should start running
     // once num tasks is defined
     Assert.assertEquals(VertexState.INITIALIZING, v3.getState());
+    Assert.assertTrue(v3.numStartedSourceVertices > 0);
+    long v3StartTimeRequested = v3.startTimeRequested;
+    Assert.assertTrue(v3StartTimeRequested > 0);
     v3.reconfigureVertex(numTasks, null, null);
     dispatcher.await();
     Assert.assertEquals(numTasks, v3.getTotalTasks());
     Assert.assertEquals(VertexState.RUNNING, v3.getState());
+    // the start time requested should remain at its original value
+    Assert.assertEquals(v3StartTimeRequested, v3.startTimeRequested);
   }
 
   @Test(timeout = 5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index fafbba6..f9a1c5e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -108,11 +108,14 @@ public class TestAMContainer {
     assertNull(wc.amContainer.getCurrentTaskAttempt());
 
     // Assign task.
+    long currTime = wc.appContext.getClock().getTime();
     wc.assignTaskAttempt(wc.taskAttemptID);
     wc.verifyState(AMContainerState.LAUNCHING);
     wc.verifyNoOutgoingEvents();
     assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt());
-
+    assertTrue(wc.amContainer.getCurrentTaskAttemptAllocationTime() > 0);
+    assertTrue(wc.amContainer.getCurrentTaskAttemptAllocationTime() >= currTime);
+    
     // Container Launched
     wc.containerLaunched();
     wc.verifyState(AMContainerState.RUNNING);

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 3760644..5c8c90e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -48,6 +48,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -479,7 +480,10 @@ public class TestHistoryEventsProtoConversion {
         "vertex1", 10009l, ContainerId.newInstance(
         ApplicationAttemptId.newInstance(
             ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance(
-        "host1", 19999), "inProgress", "Completed", "nodeHttpAddress");
+        "host1", 19999), "inProgress", "Completed", "nodeHttpAddress", 1024,
+        TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+            TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024
+        );
     TaskAttemptStartedEvent deserializedEvent = (TaskAttemptStartedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getTaskAttemptID(),
@@ -490,6 +494,12 @@ public class TestHistoryEventsProtoConversion {
         deserializedEvent.getNodeId());
     Assert.assertEquals(event.getStartTime(),
         deserializedEvent.getStartTime());
+    Assert.assertEquals(event.getCreationTime(),
+        deserializedEvent.getCreationTime());
+    Assert.assertEquals(event.getAllocationTime(),
+        deserializedEvent.getAllocationTime());
+    Assert.assertEquals(event.getCreationCausalTA(),
+        deserializedEvent.getCreationCausalTA());
     logEvents(event, deserializedEvent);
   }
 
@@ -499,7 +509,7 @@ public class TestHistoryEventsProtoConversion {
           TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
           "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
-          null, null, null);
+          null, null, null, null);
       TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getTaskAttemptID(),
@@ -515,11 +525,17 @@ public class TestHistoryEventsProtoConversion {
       logEvents(event, deserializedEvent);
     }
     {
+      TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
+          TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 0), 0);
+      long timestamp = 1024L;
+      List<DataEventDependencyInfo> events = Lists.newArrayList();
+      events.add(new DataEventDependencyInfo(timestamp, taId));
+      events.add(new DataEventDependencyInfo(timestamp, taId));
       TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(
           TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
           "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
-          TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters());
+          TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events);
       TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getTaskAttemptID(),
@@ -534,6 +550,9 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getCounters());
       Assert.assertEquals(event.getTaskAttemptError(),
           deserializedEvent.getTaskAttemptError());
+      Assert.assertEquals(events.size(), event.getDataEvents().size());
+      Assert.assertEquals(events.get(0).getTimestamp(), event.getDataEvents().get(0).getTimestamp());
+      Assert.assertEquals(events.get(0).getTaskAttemptId(), event.getDataEvents().get(0).getTaskAttemptId());
       logEvents(event, deserializedEvent);
     }
   }
@@ -582,9 +601,10 @@ public class TestHistoryEventsProtoConversion {
     } catch (RuntimeException e) {
       // Expected
     }
+    long eventTime = 1024;
     List<TezEvent> events =
         Arrays.asList(new TezEvent(DataMovementEvent.create(1, null),
-            new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null)));
+            new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null), eventTime));
     event = new VertexRecoverableEventsGeneratedEvent(
             TezVertexID.getInstance(
                 TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events);
@@ -595,6 +615,8 @@ public class TestHistoryEventsProtoConversion {
         deserializedEvent.getTezEvents().size());
     Assert.assertEquals(event.getTezEvents().get(0).getEventType(),
         deserializedEvent.getTezEvents().get(0).getEventType());
+    Assert.assertEquals(event.getTezEvents().get(0).getEventReceivedTime(),
+        deserializedEvent.getTezEvents().get(0).getEventReceivedTime());
     logEvents(event, deserializedEvent);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index 3ab204a..711e4bb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -160,11 +160,11 @@ public class TestHistoryEventJsonConversion {
           break;
         case TASK_ATTEMPT_STARTED:
           event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
-              nodeId, null, null, "nodeHttpAddress");
+              nodeId, null, null, "nodeHttpAddress", 0, null, 0);
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
-              random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,
null, null);
+              random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,
null, null, null);
           break;
         case CONTAINER_LAUNCHED:
           event = new ContainerLaunchedEvent(containerId, random.nextInt(),

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 53fc66b..4685a61 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -435,7 +435,10 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
     atsEntity.addOtherInfo(ATSConstants.COUNTERS,
         DAGUtils.convertCountersToATSMap(event.getCounters()));
-
+    if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) {
+      atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENTS, 
+          DAGUtils.convertDataEventDependecyInfoToATS(event.getDataEvents()));
+    }
     return atsEntity;
   }
 
@@ -470,6 +473,12 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
     atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
     atsEntity.addOtherInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name());
+    atsEntity.addOtherInfo(ATSConstants.CREATION_TIME, event.getCreationTime());
+    atsEntity.addOtherInfo(ATSConstants.ALLOCATION_TIME, event.getAllocationTime());
+    if (event.getCreationCausalTA() != null) {
+      atsEntity.addOtherInfo(ATSConstants.CREATION_CAUSAL_ATTEMPT,
+          event.getCreationCausalTA().toString());
+    }
 
     return atsEntity;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 7e99ac3..2849c10 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.history.logging.ats;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -45,6 +46,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.app.web.AMWebController;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -85,6 +87,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 public class TestHistoryEventTimelineConversion {
 
   private ApplicationAttemptId applicationAttemptId;
@@ -166,11 +170,11 @@ public class TestHistoryEventTimelineConversion {
           break;
         case TASK_ATTEMPT_STARTED:
           event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId,
-              nodeId, null, null, "nodeHttpAddress");
+              nodeId, null, null, "nodeHttpAddress", 0, null, 0);
           break;
         case TASK_ATTEMPT_FINISHED:
           event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
-              random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST,
null, null);
+              random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST,
null, null, null);
           break;
         case CONTAINER_LAUNCHED:
           event = new ContainerLaunchedEvent(containerId, random.nextInt(),
@@ -445,6 +449,7 @@ public class TestHistoryEventTimelineConversion {
         timelineEntity.getOtherInfo().get(ATSConstants.USER));
   }
 
+  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testConvertTaskAttemptFinishedEvent() {
     String vertexName = "testVertex";
@@ -456,9 +461,13 @@ public class TestHistoryEventTimelineConversion {
         .values()[random.nextInt(TaskAttemptTerminationCause.values().length)];
     String diagnostics = "random diagnostics message";
     TezCounters counters = new TezCounters();
+    long lastDataEventTime = finishTime - 1;
+    List<DataEventDependencyInfo> events = Lists.newArrayList();
+    events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID));
+    events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID));
 
     TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
-        startTime, finishTime, state, error, diagnostics, counters);
+        startTime, finishTime, state, error, diagnostics, counters, events);
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
     Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
@@ -481,12 +490,17 @@ public class TestHistoryEventTimelineConversion {
     Assert.assertEquals(finishTime, evt.getTimestamp());
 
     final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
-    Assert.assertEquals(6, otherInfo.size());
+    Assert.assertEquals(7, otherInfo.size());
     Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME));
     Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN));
     Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS));
     Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
     Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS));
+    Map<String, Object> obj1 = (Map<String, Object>)otherInfo.get(ATSConstants.LAST_DATA_EVENTS);
+    List<Object> obj2 = (List<Object>) obj1.get(ATSConstants.LAST_DATA_EVENTS);
+    Assert.assertEquals(2, obj2.size());
+    Map<String, Object> obj3 = (Map<String, Object>) obj2.get(0);
+    Assert.assertEquals(events.get(0).getTimestamp(), obj3.get(ATSConstants.TIMESTAMP));
     Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS));
   }
 
@@ -730,8 +744,11 @@ public class TestHistoryEventTimelineConversion {
   @Test(timeout = 5000)
   public void testConvertTaskAttemptStartedEvent() {
     long startTime = random.nextLong();
+    long creationTime = 1024;
+    long allocationTime = 1024;
     TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1",
-        startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress");
+        startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress", 
+        creationTime, tezTaskAttemptID, allocationTime);
 
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
@@ -776,6 +793,10 @@ public class TestHistoryEventTimelineConversion {
         timelineEntity.getOtherInfo().get(ATSConstants.NODE_HTTP_ADDRESS));
     Assert.assertTrue(TaskAttemptState.RUNNING.name()
         .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS)));
+    Assert.assertEquals(tezTaskAttemptID.toString(), 
+        timelineEntity.getOtherInfo().get(ATSConstants.CREATION_CAUSAL_ATTEMPT));
+    Assert.assertEquals(creationTime, timelineEntity.getOtherInfo().get(ATSConstants.CREATION_TIME));
+    Assert.assertEquals(allocationTime, timelineEntity.getOtherInfo().get(ATSConstants.ALLOCATION_TIME));
   }
 
   @Test(timeout = 5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index 974e190..b44b7d4 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -55,12 +55,19 @@ public class TezEvent implements Writable {
   private EventMetaData sourceInfo;
 
   private EventMetaData destinationInfo;
+  
+  private long eventReceivedTime;
 
   public TezEvent() {
   }
 
   public TezEvent(Event event, EventMetaData sourceInfo) {
+    this(event, sourceInfo, System.currentTimeMillis());
+  }
+  
+  public TezEvent(Event event, EventMetaData sourceInfo, long time) {
     this.event = event;
+    this.eventReceivedTime = time;
     this.setSourceInfo(sourceInfo);
     if (event instanceof DataMovementEvent) {
       eventType = EventType.DATA_MOVEMENT_EVENT;
@@ -91,6 +98,14 @@ public class TezEvent implements Writable {
   public Event getEvent() {
     return event;
   }
+  
+  public void setEventReceivedTime(long eventReceivedTime) { // TODO save
+    this.eventReceivedTime = eventReceivedTime;
+  }
+  
+  public long getEventReceivedTime() {
+    return eventReceivedTime;
+  }
 
   public EventMetaData getSourceInfo() {
     return sourceInfo;
@@ -119,6 +134,7 @@ public class TezEvent implements Writable {
     }
     out.writeBoolean(true);
     out.writeInt(eventType.ordinal());
+    out.writeLong(eventReceivedTime);
     if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
       // TODO NEWTEZ convert to PB
       TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
@@ -188,6 +204,7 @@ public class TezEvent implements Writable {
       return;
     }
     eventType = EventType.values()[in.readInt()];
+    eventReceivedTime = in.readLong();
     if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
       // TODO NEWTEZ convert to PB
       event = new TaskStatusUpdateEvent();


Mime
View raw message