tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-2325. Route status update event directly to the attempt. (Prakash Ramachandran via hitesh)
Date Mon, 27 Apr 2015 22:47:44 GMT
Repository: tez
Updated Branches:
  refs/heads/master aa87a14c5 -> c3232d0b7


TEZ-2325. Route status update event directly to the attempt. (Prakash Ramachandran via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c3232d0b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c3232d0b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c3232d0b

Branch: refs/heads/master
Commit: c3232d0b748d3fdff6676b403277013ebf9f3e32
Parents: aa87a14
Author: Hitesh Shah <hitesh@apache.org>
Authored: Mon Apr 27 15:47:07 2015 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Mon Apr 27 15:47:07 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  46 ++++-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  18 --
 .../app/TestTaskAttemptListenerImplTezDag.java  | 176 ++++++++++++++-----
 4 files changed, 177 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c3232d0b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e42a79e..36e1767 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2325. Route status update event directly to the attempt.
   TEZ-2358. Pipelined Shuffle: MergeManager assumptions about 1 merge per source-task.
   TEZ-2342. TestFaultTolerance.testRandomFailingTasks fails due to timeout.
   TEZ-2362. State Change Notifier Thread should be stopped when dag is

http://git-wip-us.apache.org/repos/asf/tez/blob/c3232d0b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index b64283b..d96da83 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -22,12 +22,20 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.commons.collections4.ListUtils;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -413,10 +421,22 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
           LOG.debug("Ping from " + taskAttemptID.toString() +
               " events: " + (inEvents != null? inEvents.size() : -1));
         }
-        if(inEvents!=null && !inEvents.isEmpty()) {
+
+        List<TezEvent> otherEvents = new ArrayList<TezEvent>();
+        for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+          final EventType eventType = tezEvent.getEventType();
+          if (eventType == EventType.TASK_STATUS_UPDATE_EVENT ||
+              eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT) {
+            context.getEventHandler()
+                .handle(getTaskAttemptEventFromTezEvent(taskAttemptID, tezEvent));
+          } else {
+            otherEvents.add(tezEvent);
+          }
+        }
+        if(!otherEvents.isEmpty()) {
           TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
           context.getEventHandler().handle(
-              new VertexEventRouteEvent(vertexId, inEvents));
+              new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents)));
         }
         taskHeartbeatHandler.pinged(taskAttemptID);
         List<TezEvent> outEvents = context
@@ -433,6 +453,28 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     }
   }
 
+  private TaskAttemptEvent getTaskAttemptEventFromTezEvent(TezTaskAttemptID taskAttemptID,
+                                                           TezEvent tezEvent) {
+    final EventType eventType = tezEvent.getEventType();
+    TaskAttemptEvent taskAttemptEvent;
+    switch (eventType) {
+      case TASK_STATUS_UPDATE_EVENT:
+        {
+          taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
+              (TaskStatusUpdateEvent) tezEvent.getEvent());
+        }
+        break;
+      case TASK_ATTEMPT_COMPLETED_EVENT:
+        {
+          taskAttemptEvent = new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE);
+        }
+        break;
+      default:
+        throw new TezUncheckedException("unknown event type " + eventType);
+    }
+    return taskAttemptEvent;
+  }
+
   private Map<String, TezLocalResource> convertLocalResourceMap(Map<String, LocalResource>
ylrs)
       throws IOException {
     Map<String, TezLocalResource> tlrs = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/tez/blob/c3232d0b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index dfa358d..c4619a0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -4096,24 +4096,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           srcEdge.sendTezEventToSourceTasks(tezEvent);
         }
         break;
-      case TASK_STATUS_UPDATE_EVENT:
-        {
-          checkEventSourceMetadata(vertex, sourceMeta);
-          TaskStatusUpdateEvent sEvent =
-              (TaskStatusUpdateEvent) tezEvent.getEvent();
-          vertex.getEventHandler().handle(
-              new TaskAttemptEventStatusUpdate(sourceMeta.getTaskAttemptID(),
-                  sEvent));
-        }
-        break;
-      case TASK_ATTEMPT_COMPLETED_EVENT:
-        {
-          checkEventSourceMetadata(vertex, sourceMeta);
-          vertex.getEventHandler().handle(
-              new TaskAttemptEvent(sourceMeta.getTaskAttemptID(),
-                  TaskAttemptEventType.TA_DONE));
-        }
-        break;
       case TASK_ATTEMPT_FAILED_EVENT:
         {
           checkEventSourceMetadata(vertex, sourceMeta);

http://git-wip-us.apache.org/repos/asf/tez/blob/c3232d0b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 1f5d9bb..b0ff0e3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -20,53 +20,97 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
+@SuppressWarnings("unchecked")
 public class TestTaskAttemptListenerImplTezDag {
+  private ApplicationId appId;
+  private AppContext appContext;
+  AMContainerMap amContainerMap;
+  EventHandler eventHandler;
+  DAG dag;
+  TaskAttemptListenerImpTezDag taskAttemptListener;
+  ContainerTask containerTask;
+  AMContainerTask amContainerTask;
+  TaskSpec taskSpec;
 
-  @Test(timeout = 5000)
-  public void testGetTask() throws IOException {
-    ApplicationId appId = ApplicationId.newInstance(1000, 1);
-    AppContext appContext = mock(AppContext.class);
-    EventHandler eventHandler = mock(EventHandler.class);
-    DAG dag = mock(DAG.class);
-    AMContainerMap amContainerMap = mock(AMContainerMap.class);
+  TezVertexID vertexID;
+  TezTaskID taskID;
+  TezTaskAttemptID taskAttemptID;
+
+  @Before
+  public void setUp() {
+    appId = ApplicationId.newInstance(1000, 1);
+    dag = mock(DAG.class);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    vertexID = TezVertexID.getInstance(dagID, 1);
+    taskID = TezTaskID.getInstance(vertexID, 1);
+    taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
+
+    amContainerMap = mock(AMContainerMap.class);
     Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType,
String>();
+
+    eventHandler = mock(EventHandler.class);
+
+    appContext = mock(AppContext.class);
     doReturn(eventHandler).when(appContext).getEventHandler();
     doReturn(dag).when(appContext).getCurrentDAG();
     doReturn(appAcls).when(appContext).getApplicationACLs();
     doReturn(amContainerMap).when(appContext).getAllContainers();
 
-    TaskAttemptListenerImpTezDag taskAttemptListener =
-        new TaskAttemptListenerImplForTest(appContext, mock(TaskHeartbeatHandler.class),
-            mock(ContainerHeartbeatHandler.class), null);
+    taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
+        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), null);
 
+    taskSpec = mock(TaskSpec.class);
+    doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
+    amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0);
+    containerTask = null;
+  }
 
-    TaskSpec taskSpec = mock(TaskSpec.class);
-    TezTaskAttemptID taskAttemptId = mock(TezTaskAttemptID.class);
-    doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID();
-    AMContainerTask amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0);
-    ContainerTask containerTask = null;
-
+  @Test(timeout = 5000)
+  public void testGetTask() throws IOException {
 
     ContainerId containerId1 = createContainerId(appId, 1);
     doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId1);
@@ -74,7 +118,6 @@ public class TestTaskAttemptListenerImplTezDag {
     containerTask = taskAttemptListener.getTask(containerContext1);
     assertTrue(containerTask.shouldDie());
 
-
     ContainerId containerId2 = createContainerId(appId, 2);
     doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId2);
     ContainerContext containerContext2 = new ContainerContext(containerId2.toString());
@@ -89,7 +132,7 @@ public class TestTaskAttemptListenerImplTezDag {
     assertEquals(taskSpec, containerTask.getTaskSpec());
 
     // Task unregistered. Should respond to heartbeats
-    taskAttemptListener.unregisterTaskAttempt(taskAttemptId);
+    taskAttemptListener.unregisterTaskAttempt(taskAttemptID);
     containerTask = taskAttemptListener.getTask(containerContext2);
     assertNull(containerTask);
 
@@ -115,29 +158,6 @@ public class TestTaskAttemptListenerImplTezDag {
 
   @Test(timeout = 5000)
   public void testGetTaskMultiplePulls() throws IOException {
-    ApplicationId appId = ApplicationId.newInstance(1000, 1);
-    AppContext appContext = mock(AppContext.class);
-    EventHandler eventHandler = mock(EventHandler.class);
-    DAG dag = mock(DAG.class);
-    AMContainerMap amContainerMap = mock(AMContainerMap.class);
-    Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType,
String>();
-    doReturn(eventHandler).when(appContext).getEventHandler();
-    doReturn(dag).when(appContext).getCurrentDAG();
-    doReturn(appAcls).when(appContext).getApplicationACLs();
-    doReturn(amContainerMap).when(appContext).getAllContainers();
-
-    TaskAttemptListenerImpTezDag taskAttemptListener =
-        new TaskAttemptListenerImplForTest(appContext, mock(TaskHeartbeatHandler.class),
-            mock(ContainerHeartbeatHandler.class), null);
-
-
-    TaskSpec taskSpec = mock(TaskSpec.class);
-    TezTaskAttemptID taskAttemptId = mock(TezTaskAttemptID.class);
-    doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID();
-    AMContainerTask amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0);
-    ContainerTask containerTask = null;
-
-
     ContainerId containerId1 = createContainerId(appId, 1);
     doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId1);
     ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
@@ -156,10 +176,78 @@ public class TestTaskAttemptListenerImplTezDag {
     assertNull(containerTask);
   }
 
-  private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) {
+  @Test (timeout = 5000)
+  public void testTaskEventRouting() throws Exception {
+    List<TezEvent> events =  Arrays.asList(
+      new TezEvent(InputInitializerEvent.create("test_vertex", "test_input", null), null),
+      new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null)
+    );
+
+    EventHandler eventHandler = generateHeartbeat(events);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(2)).handle(arg.capture());
+    final List<Event> argAllValues = arg.getAllValues();
+
+    final Event statusUpdateEvent = argAllValues.get(0);
+    assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
+        statusUpdateEvent.getType());
+
+
+    final Event vertexEvent = argAllValues.get(1);
+    final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
+    assertEquals("Other events should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
+        vertexEvent.getType());
+    assertEquals(EventType.ROOT_INPUT_INITIALIZER_EVENT,
+        vertexRouteEvent.getEvents().get(0).getEventType());
+  }
+
+  @Test (timeout = 5000)
+  public void testTaskEventRoutingTaskAttemptOnly() throws Exception {
+    List<TezEvent> events = Arrays.asList(
+      new TezEvent(new TaskAttemptCompletedEvent(), null)
+    );
+    final EventHandler eventHandler = generateHeartbeat(events);
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(1)).handle(arg.capture());
+    final List<Event> argAllValues = arg.getAllValues();
+
+    final Event statusUpdateEvent = argAllValues.get(0);
+    assertEquals("only event should be task done", TaskAttemptEventType.TA_DONE,
+        statusUpdateEvent.getType());
+  }
+
+  private EventHandler generateHeartbeat(List<TezEvent> events) throws IOException,
TezException {
+    ContainerId containerId = createContainerId(appId, 1);
+    long requestId = 0;
+    Vertex vertex = mock(Vertex.class);
+    Task task = mock(Task.class);
+
+    doReturn(vertex).when(dag).getVertex(vertexID);
+    doReturn("test_vertex").when(vertex).getName();
+    doReturn(task).when(vertex).getTask(taskID);
+
+    doReturn(new ArrayList<TezEvent>()).when(task).getTaskAttemptTezEvents(taskAttemptID,
0, 1);
+
+    taskAttemptListener.registerRunningContainer(containerId);
+    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId);
+
+    TezHeartbeatRequest request = mock(TezHeartbeatRequest.class);
+    doReturn(containerId.toString()).when(request).getContainerIdentifier();
+    doReturn(taskAttemptID).when(request).getCurrentTaskAttemptID();
+    doReturn(++requestId).when(request).getRequestId();
+    doReturn(events).when(request).getEvents();
+
+    taskAttemptListener.heartbeat(request);
+
+    return eventHandler;
+  }
+
+
+  private ContainerId createContainerId(ApplicationId applicationId, long containerIdx) {
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
-    ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx);
-    return containerId;
+    return ContainerId.newContainerId(appAttemptId, containerIdx);
   }
 
   private static class TaskAttemptListenerImplForTest extends TaskAttemptListenerImpTezDag
{


Mime
View raw message