tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-2381. Fixes after rebase 04/28. (sseth)
Date Tue, 28 Apr 2015 20:41:31 GMT
Repository: tez
Updated Branches:
  refs/heads/TEZ-2003 7d5f3f8e3 -> 5d6e80367


TEZ-2381. Fixes after rebase 04/28. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/5d6e8036
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/5d6e8036
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/5d6e8036

Branch: refs/heads/TEZ-2003
Commit: 5d6e80367616dbea629ba71fc35bd12c30707164
Parents: 7d5f3f8
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Apr 28 13:41:12 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Apr 28 13:41:12 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  1 +
 .../dag/app/TaskAttemptListenerImpTezDag.java   | 17 ++++----
 .../app/TestTaskAttemptListenerImplTezDag.java  | 45 ++++++++++++++------
 3 files changed, 42 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/5d6e8036/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index f6bc8e7..d42aaf8 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -18,5 +18,6 @@ ALL CHANGES:
   TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates.
   TEZ-2347. Expose additional information in TaskCommunicatorContext.
   TEZ-2361. Propagate dag completion to TaskCommunicator.
+  TEZ-2381. Fixes after rebase 04/28.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/5d6e8036/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 61bd4ca..c586787 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -17,28 +17,22 @@
 
 package org.apache.tez.dag.app;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
-import com.google.common.base.Preconditions;
-import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,7 +56,6 @@ import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -151,7 +144,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   private TaskCommunicator createTaskCommunicator(String taskCommClassIdentifier, int taskCommIndex)
{
     if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_NAME_DEFAULT))
{
       LOG.info("Using Default Task Communicator");
-      return new TezTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]);
+      return createTezTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
     } else if (taskCommClassIdentifier.equals(TezConstants.TEZ_AM_SERVICE_PLUGINS_LOCAL_MODE_NAME_DEFAULT))
{
       LOG.info("Using Default Local Task Communicator");
       return new TezLocalTaskCommunicatorImpl(taskCommunicatorContexts[taskCommIndex]);
@@ -174,6 +167,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       }
     }
   }
+
+  @VisibleForTesting
+  protected TezTaskCommunicatorImpl createTezTaskCommunicator(TaskCommunicatorContext context)
{
+    return new TezTaskCommunicatorImpl(context);
+  }
+
   public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
       throws IOException, TezException {
     ContainerId containerId = ConverterUtils.toContainerId(request

http://git-wip-us.apache.org/repos/asf/tez/blob/5d6e8036/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index fa62c11..55a546e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -42,11 +42,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TaskHeartbeatRequest;
 import org.apache.tez.dag.api.TezException;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.dag.api.TaskCommunicatorContext;
 import org.apache.tez.dag.app.dag.DAG;
@@ -68,7 +66,6 @@ import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -77,7 +74,9 @@ import org.mockito.ArgumentCaptor;
 // TODO TEZ-2003 Rename to TestTezTaskCommunicator
 public class TestTaskAttemptListenerImplTezDag {
   private ApplicationId appId;
+  private ApplicationAttemptId appAttemptId;
   private AppContext appContext;
+  Credentials credentials;
   AMContainerMap amContainerMap;
   EventHandler eventHandler;
   DAG dag;
@@ -93,11 +92,13 @@ public class TestTaskAttemptListenerImplTezDag {
   @Before
   public void setUp() {
     appId = ApplicationId.newInstance(1000, 1);
+    appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
     dag = mock(DAG.class);
     TezDAGID dagID = TezDAGID.getInstance(appId, 1);
     vertexID = TezVertexID.getInstance(dagID, 1);
     taskID = TezTaskID.getInstance(vertexID, 1);
     taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
+    credentials = new Credentials();
 
     amContainerMap = mock(AMContainerMap.class);
     Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType,
String>();
@@ -109,6 +110,8 @@ public class TestTaskAttemptListenerImplTezDag {
     doReturn(dag).when(appContext).getCurrentDAG();
     doReturn(appAcls).when(appContext).getApplicationACLs();
     doReturn(amContainerMap).when(appContext).getAllContainers();
+    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+    doReturn(credentials).when(appContext).getAppCredentials();
     NodeId nodeId = NodeId.newInstance("localhost", 0);
 
     AMContainer amContainer = mock(AMContainer.class);
@@ -150,7 +153,7 @@ public class TestTaskAttemptListenerImplTezDag {
     assertEquals(taskSpec, containerTask.getTaskSpec());
 
     // Task unregistered. Should respond to heartbeats
-    taskAttemptListener.unregisterTaskAttempt(taskAttemptId, 0);
+    taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0);
     containerTask = tezUmbilical.getTask(containerContext2);
     assertNull(containerTask);
 
@@ -180,7 +183,7 @@ public class TestTaskAttemptListenerImplTezDag {
     TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
 
     ContainerId containerId1 = createContainerId(appId, 1);
-    doReturn(mock(AMContainer.class)).when(amContainerMap).get(containerId1);
+
     ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
     taskAttemptListener.registerRunningContainer(containerId1, 0);
     containerTask = tezUmbilical.getTask(containerContext1);
@@ -241,7 +244,6 @@ public class TestTaskAttemptListenerImplTezDag {
 
   private EventHandler generateHeartbeat(List<TezEvent> events) throws IOException,
TezException {
     ContainerId containerId = createContainerId(appId, 1);
-    long requestId = 0;
     Vertex vertex = mock(Vertex.class);
     Task task = mock(Task.class);
 
@@ -251,13 +253,13 @@ public class TestTaskAttemptListenerImplTezDag {
 
     doReturn(new ArrayList<TezEvent>()).when(task).getTaskAttemptTezEvents(taskAttemptID,
0, 1);
 
-    taskAttemptListener.registerRunningContainer(containerId);
-    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId);
+    taskAttemptListener.registerRunningContainer(containerId, 0);
+    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0);
+
+    TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class);
 
-    TezHeartbeatRequest request = mock(TezHeartbeatRequest.class);
     doReturn(containerId.toString()).when(request).getContainerIdentifier();
-    doReturn(taskAttemptID).when(request).getCurrentTaskAttemptID();
-    doReturn(++requestId).when(request).getRequestId();
+    doReturn(taskAttemptID).when(request).getTaskAttemptId();
     doReturn(events).when(request).getEvents();
 
     taskAttemptListener.heartbeat(request);
@@ -271,6 +273,25 @@ public class TestTaskAttemptListenerImplTezDag {
     return ContainerId.newInstance(appAttemptId, containerIdx);
   }
 
+  private static class TaskAttemptListenerImplForTest extends TaskAttemptListenerImpTezDag
{
+
+    public TaskAttemptListenerImplForTest(AppContext context,
+                                          TaskHeartbeatHandler thh,
+                                          ContainerHeartbeatHandler chh,
+                                          JobTokenSecretManager jobTokenSecretManager,
+                                          String[] taskCommunicatorClassIdentifiers,
+                                          boolean isPureLocalMode) {
+      super(context, thh, chh, jobTokenSecretManager, taskCommunicatorClassIdentifiers,
+          isPureLocalMode);
+    }
+
+    @Override
+    protected TezTaskCommunicatorImpl createTezTaskCommunicator(TaskCommunicatorContext context)
{
+      return new TezTaskCommunicatorImplForTest(context);
+    }
+
+  }
+
   private static class TezTaskCommunicatorImplForTest extends TezTaskCommunicatorImpl {
 
     public TezTaskCommunicatorImplForTest(


Mime
View raw message