tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [25/51] [abbrv] tez git commit: TEZ-2433. Fixes after rebase 05/08. (sseth)
Date Thu, 06 Aug 2015 09:26:17 GMT
TEZ-2433. Fixes after rebase 05/08. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/960d3b9c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/960d3b9c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/960d3b9c

Branch: refs/heads/TEZ-2003
Commit: 960d3b9cfb07615bb44f7883cd09cb572cebff2f
Parents: 7f0b967
Author: Siddharth Seth <sseth@apache.org>
Authored: Fri May 8 18:43:16 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Aug 6 01:26:09 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../tez/dag/api/TaskHeartbeatResponse.java      | 10 ++++++--
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 27 ++++++++++----------
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |  9 +++----
 .../app/TestTaskAttemptListenerImplTezDag.java  | 10 +++-----
 .../library/common/shuffle/TestFetcher.java     |  8 ++----
 6 files changed, 31 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/960d3b9c/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index 9b2339f..ad167ab 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -22,5 +22,6 @@ ALL CHANGES:
   TEZ-2388. Send dag identifier as part of the fetcher request string.
   TEZ-2414. LogicalIOProcessorRuntimeTask, RuntimeTask, TezTaskRunner should handle interrupts
& carry out necessary cleanups.
   TEZ-2420. TaskRunner returning before executing the task.
+  TEZ-2433. Fixes after rebase 05/08
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/960d3b9c/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
index c82a743..b826e76 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskHeartbeatResponse.java
@@ -22,11 +22,13 @@ import org.apache.tez.runtime.api.impl.TezEvent;
 public class TaskHeartbeatResponse {
 
   private final boolean shouldDie;
-  private List<TezEvent> events;
+  private final int nextFromEventId;
+  private final List<TezEvent> events;
 
-  public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events) {
+  public TaskHeartbeatResponse(boolean shouldDie, List<TezEvent> events, int nextFromEventId)
{
     this.shouldDie = shouldDie;
     this.events = events;
+    this.nextFromEventId = nextFromEventId;
   }
 
   public boolean isShouldDie() {
@@ -36,4 +38,8 @@ public class TaskHeartbeatResponse {
   public List<TezEvent> getEvents() {
     return events;
   }
+
+  public int getNextFromEventId() {
+    return nextFromEventId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/960d3b9c/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index cbaed99..db78fa9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -78,7 +78,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
   protected final ContainerHeartbeatHandler containerHeartbeatHandler;
 
-  private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true,
null);
+  private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true,
null, 0);
 
   private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts =
       new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
@@ -194,7 +194,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     // So - avoiding synchronization.
 
     pingContainerHeartbeatHandler(containerId);
-    List<TezEvent> outEvents = null;
+    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null);
     TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
     if (taskAttemptID != null) {
       ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID);
@@ -216,12 +216,17 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       }
 
       List<TezEvent> otherEvents = new ArrayList<TezEvent>();
+      // route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
+      // (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
+      // to VertexImpl to ensure the events ordering
+      //  1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
+      //  2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
       for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
         final EventType eventType = tezEvent.getEventType();
-        if (eventType == EventType.TASK_STATUS_UPDATE_EVENT ||
-            eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT) {
-          context.getEventHandler()
-              .handle(getTaskAttemptEventFromTezEvent(taskAttemptID, tezEvent));
+        if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
+          TaskAttemptEvent taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
+              (TaskStatusUpdateEvent) tezEvent.getEvent());
+          context.getEventHandler().handle(taskAttemptEvent);
         } else {
           otherEvents.add(tezEvent);
         }
@@ -232,14 +237,13 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
             new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents)));
       }
       taskHeartbeatHandler.pinged(taskAttemptID);
-      outEvents = context
+      eventInfo = context
           .getCurrentDAG()
           .getVertex(taskAttemptID.getTaskID().getVertexID())
-          .getTask(taskAttemptID.getTaskID())
           .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
               request.getMaxEvents());
     }
-    return new TaskHeartbeatResponse(false, outEvents);
+    return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId());
   }
   public void taskAlive(TezTaskAttemptID taskAttemptId) {
     taskHeartbeatHandler.pinged(taskAttemptId);
@@ -435,9 +439,4 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
           + ", ContainerId not known for this attempt");
     }
   }
-
-
-  public TaskCommunicator getTaskCommunicator() {
-    return taskCommunicators[0];
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/960d3b9c/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 6200a5b..accde2c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -364,13 +364,10 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
             request.getMaxEvents());
         tResponse = taskCommunicatorContext.heartbeat(tRequest);
       }
-      TezHeartbeatResponse response;
-      if (tResponse == null) {
-        response = new TezHeartbeatResponse();
-      } else {
-        response = new TezHeartbeatResponse(tResponse.getEvents());
-      }
+      TezHeartbeatResponse response = new TezHeartbeatResponse();
       response.setLastRequestId(requestId);
+      response.setEvents(tResponse.getEvents());
+      response.setNextFromEventId(tResponse.getNextFromEventId());
       containerInfo.lastRequestId = requestId;
       containerInfo.lastResponse = response;
       return response;

http://git-wip-us.apache.org/repos/asf/tez/blob/960d3b9c/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 2208220..34b9792 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -48,6 +48,7 @@ import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TaskHeartbeatRequest;
+import org.apache.tez.dag.api.TaskHeartbeatResponse;
 import org.apache.tez.dag.api.TezException;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
@@ -70,8 +71,6 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -260,10 +259,9 @@ public class TestTaskAttemptListenerImplTezDag {
   public void testTaskHeartbeatResponse() throws Exception {
     List<TezEvent> events = new ArrayList<TezEvent>();
     List<TezEvent> eventsToSend = new ArrayList<TezEvent>();
-    TezHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend);
+    TaskHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend);
     
     assertEquals(2, response.getNextFromEventId());
-    assertEquals(1, response.getLastRequestId());
     assertEquals(eventsToSend, response.getEvents());
   }
 
@@ -320,7 +318,7 @@ public class TestTaskAttemptListenerImplTezDag {
     return succeedToAllocate;
   }
 
-  private TezHeartbeatResponse generateHeartbeat(List<TezEvent> events,
+  private TaskHeartbeatResponse generateHeartbeat(List<TezEvent> events,
       int fromEventId, int maxEvents, int nextFromEventId,
       List<TezEvent> sendEvents) throws IOException, TezException {
     ContainerId containerId = createContainerId(appId, 1);
@@ -335,7 +333,7 @@ public class TestTaskAttemptListenerImplTezDag {
     taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0);
 
     TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class);
-
+    doReturn(containerId.toString()).when(request).getContainerIdentifier();
     doReturn(containerId.toString()).when(request).getContainerIdentifier();
     doReturn(taskAttemptID).when(request).getTaskAttemptId();
     doReturn(events).when(request).getEvents();

http://git-wip-us.apache.org/repos/asf/tez/blob/960d3b9c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index 85e3540..08efb3e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -31,7 +31,6 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -43,11 +42,8 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.api.ExecutionContext;
-import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.InputIdentifier;
@@ -93,7 +89,7 @@ public class TestFetcher {
 
     // when enabled and hostname does not match use http fetch.
     builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
-        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
+        ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH,
HOST,
         PORT, false);
     builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts));
     fetcher = spy(builder.build());
@@ -109,7 +105,7 @@ public class TestFetcher {
 
     // when enabled and port does not match use http fetch.
     builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
-        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
PORT, false);
+        ApplicationId.newInstance(0, 1), -1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH,
HOST, PORT, false);
     builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts));
     fetcher = spy(builder.build());
 


Mime
View raw message