hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1388595 - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/a...
Date Fri, 21 Sep 2012 18:10:19 GMT
Author: sseth
Date: Fri Sep 21 18:10:19 2012
New Revision: 1388595

URL: http://svn.apache.org/viewvc?rev=1388595&view=rev
Log:
MAPREDUCE-4664. ContainerHeartbeatHandler should be pinged on a getTask call (sseth)

Modified:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1388595&r1=1388594&r2=1388595&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Sep 21
18:10:19 2012
@@ -22,3 +22,5 @@ Branch MR-3902
   MAPREDUCE-4618. Re-wire LocalContainerAllocator/UberAM (sseth)
 
   MAPREDUCE-4665. Use the configured shuffle port and application ACLs (sseth)
+
+  MAPREDUCE-4664. ContainerHeartbeatHandler should be pinged on a getTask call (sseth)

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java?rev=1388595&r1=1388594&r2=1388595&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl2.java
Fri Sep 21 18:10:19 2012
@@ -70,12 +70,10 @@ import org.apache.hadoop.yarn.service.Co
 public class TaskAttemptListenerImpl2 extends CompositeService 
     implements TaskUmbilicalProtocol, TaskAttemptListener {
 
-  // TODO XXX: Ideally containerId registration and unregistration should be taken care of
by the Container.
-  // .... TaskAttemptId registration and unregistration by the TaskAttempt. Can this be split
into a 
-  // ContainerListener + TaskAttemptListener ?
-  
-  // TODO XXX. Re-look at big chunks. Possibly redo bits.
-  // ..launchedJvm map etc.
+  // TODO: Eventually, split this into a ContainerListener (getTask) and a
+  // TaskAttemptListener (attempt specific requests). After that, AMContainer
+  // registers containers, AMTask registers attempts.
+
   // ..Sending back errors for unknown tasks.
   
   private static final JvmTask TASK_FOR_INVALID_JVM = new JvmTask(null, true);
@@ -90,17 +88,13 @@ public class TaskAttemptListenerImpl2 ex
   private InetSocketAddress address;
   private Server server;
   
-  // TODO XXX: Use this to figure out whether an incoming ping is valid.
+  // TODO Use this to figure out whether an incoming ping is valid.
   private ConcurrentMap<TaskAttemptID, WrappedJvmID> attemptToJvmIdMap =
       new ConcurrentHashMap<TaskAttemptID, WrappedJvmID>();
   // jvmIdToContainerIdMap also serving to check whether the container is still running.
   private ConcurrentMap<WrappedJvmID, ContainerId> jvmIDToContainerIdMap =
-      new ConcurrentHashMap<WrappedJvmID, ContainerId>();
-//  private Set<WrappedJvmID> launchedJVMs = Collections
-//      .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); 
-  
-  
-  
+      new ConcurrentHashMap<WrappedJvmID, ContainerId>();  
+
   public TaskAttemptListenerImpl2(AppContext context, TaskHeartbeatHandler thh,
       ContainerHeartbeatHandler chh, JobTokenSecretManager jobTokenSecretManager) {
     super(TaskAttemptListenerImpl2.class.getName());
@@ -112,7 +106,6 @@ public class TaskAttemptListenerImpl2 ex
 
   @Override
   public void start() {
-    LOG.info("XXX: Starting TAL2");
     startRpcServer();
     super.start();
   }
@@ -160,10 +153,26 @@ public class TaskAttemptListenerImpl2 ex
     return address;
   }
 
+  private void pingContainerHeartbeatHandler(WrappedJvmID jvmId) {
+    ContainerId containerId = jvmIDToContainerIdMap.get(jvmId);
+    if (containerId != null) {
+      containerHeartbeatHandler.pinged(containerId);
+    } else {
+      LOG.warn("Handling communication from JvmId: " + jvmId
+          + ", ContainerId not known for this jvm");
+    }
+  }
+
   private void pingContainerHeartbeatHandler(TaskAttemptID attemptID) {
-    containerHeartbeatHandler.pinged(jvmIDToContainerIdMap.get(attemptToJvmIdMap.get(attemptID)));
+    WrappedJvmID jvmId = attemptToJvmIdMap.get(attemptID);
+    if (jvmId != null) {
+      pingContainerHeartbeatHandler(jvmId);
+    } else {
+      LOG.warn("Handling communication from attempt: " + attemptID
+          + ", JvmID not know for this attempt");
+    }
   }
-  
+
   /**
    * Child checking whether it can commit.
    * 
@@ -418,10 +427,11 @@ public class TaskAttemptListenerImpl2 ex
   public JvmTask getTask(JvmContext jvmContext) throws IOException {
 
     // A rough imitation of code from TaskTracker.
-
-    // TODO XXX: Does ContainerHeartbeatHandler need to be pinged on getTask() ? 
     JVMId jvmId = jvmContext.jvmId;
-    LOG.info("ZZZ: JVM with ID : " + jvmId + " asked for a task");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("JVM with ID : " + jvmId + " asked for a task");
+    }
+    
     
     JvmTask jvmTask = null;
     // TODO: Is it an authorized container to get a task? Otherwise return null.
@@ -431,6 +441,7 @@ public class TaskAttemptListenerImpl2 ex
 
     WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap,
         jvmId.getId());
+    pingContainerHeartbeatHandler(wJvmID);
 
     ContainerId containerId = jvmIDToContainerIdMap.get(wJvmID);
     if (containerId == null) {
@@ -453,46 +464,27 @@ public class TaskAttemptListenerImpl2 ex
       }
     }
     return jvmTask;
-    
-//    
-//    // Try to look up the task. We remove it directly as we don't give
-//    // multiple tasks to a JVM
-//    if (!jvmIDToActiveAttemptMap.containsKey(wJvmID)) {
-//      LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
-//      jvmTask = TASK_FOR_INVALID_JVM;
-//    } else {
-//      if (!launchedJVMs.contains(wJvmID)) {
-//        jvmTask = null;
-//        LOG.info("JVM with ID: " + jvmId
-//            + " asking for task before AM launch registered. Given null task");
-//      } else {
-//        // remove the task as it is no more needed and free up the memory.
-//        // Also we have already told the JVM to process a task, so it is no
-//        // longer pending, and further request should ask it to exit.
-//        org.apache.hadoop.mapred.Task task =
-//            jvmIDToActiveAttemptMap.remove(wJvmID);
-//        launchedJVMs.remove(wJvmID);
-//        LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
-//        jvmTask = new JvmTask(task, false);
-//      }
-//    }
-//    return jvmTask;
   }
 
   @Override
   public void registerRunningJvm(WrappedJvmID jvmID, ContainerId containerId) {
-    LOG.info("XXX: JvmRegistration: " + jvmID + ", ContaienrId: " + containerId);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("JvmRegistartion: " + jvmID + ", ContainerId: " + containerId);
+    }
     jvmIDToContainerIdMap.putIfAbsent(jvmID, containerId);
   }
-  
+
   @Override
   public void unregisterRunningJvm(WrappedJvmID jvmID) {
-    LOG.info("TOREMOVE: Unregistering jvmId: " + jvmID);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("JVM Unregister: " + jvmID);
+    }
     if (jvmIDToContainerIdMap.remove(jvmID) == null) {
       LOG.warn("Attempt to unregister unknwon jvmtoContainerMap: " + jvmID);
     }
   }
-  
+
+  @Override
   public void registerTaskAttempt(TaskAttemptId attemptId, WrappedJvmID jvmId) {
     attemptToJvmIdMap.put(TypeConverter.fromYarn(attemptId), jvmId);
   }
@@ -505,58 +497,15 @@ public class TaskAttemptListenerImpl2 ex
   }
 
   public org.apache.hadoop.mapred.Task pullTaskAttempt(ContainerId containerId) {
-    // TODO XXX: pullTaskAttempt as part of the interface.
     AMContainerImpl container = (AMContainerImpl) context.getAllContainers()
         .get(containerId);
     return container.pullTaskAttempt();
   }
 
-//  @Override
-//  public void registerPendingTask(
-//      org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
-//    // Create the mapping so that it is easy to look up
-//    // when the jvm comes back to ask for Task.
-//
-//    // A JVM not present in this map is an illegal task/JVM.
-//    jvmIDToActiveAttemptMap.put(jvmID, task);
-//  }
-//
-//  @Override
-//  public void registerLaunchedTask(
-//      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
-//      WrappedJvmID jvmId) {
-//    // The AM considers the task to be launched (Has asked the NM to launch it)
-//    // The JVM will only be given a task after this registartion.
-//    launchedJVMs.add(jvmId);
-//
-//    taskHeartbeatHandler.register(attemptID);
-//  }
-//
-//  @Override
-//  public void unregister(
-//      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
-//      WrappedJvmID jvmID) {
-//
-//    // Unregistration also comes from the same TaskAttempt which does the
-//    // registration. Events are ordered at TaskAttempt, so unregistration will
-//    // always come after registration.
-//
-//    // Remove from launchedJVMs before jvmIDToActiveAttemptMap to avoid
-//    // synchronization issue with getTask(). getTask should be checking
-//    // jvmIDToActiveAttemptMap before it checks launchedJVMs.
-// 
-//    // remove the mappings if not already removed
-//    launchedJVMs.remove(jvmID);
-//    jvmIDToActiveAttemptMap.remove(jvmID);
-//
-//    //unregister this attempt
-//    taskHeartbeatHandler.unregister(attemptID);
-//  }
-
   @Override
   public ProtocolSignature getProtocolSignature(String protocol,
       long clientVersion, int clientMethodsHash) throws IOException {
     return ProtocolSignature.getProtocolSignature(this, 
         protocol, clientVersion, clientMethodsHash);
   }
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1388595&r1=1388594&r2=1388595&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
Fri Sep 21 18:10:19 2012
@@ -24,11 +24,15 @@ import static org.junit.Assert.assertTru
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.any;
 
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
 import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
@@ -48,8 +52,8 @@ public class TestTaskAttemptListenerImpl
 
     public MockTaskAttemptListenerImpl(AppContext context,
         JobTokenSecretManager jobTokenSecretManager,
-        TaskHeartbeatHandler hbHandler) {
-      super(context, hbHandler, null, jobTokenSecretManager);
+        ContainerHeartbeatHandler chh, TaskHeartbeatHandler thh) {
+      super(context, thh, chh, jobTokenSecretManager);
     }
 
     @Override
@@ -91,10 +95,11 @@ public class TestTaskAttemptListenerImpl
     when(appCtx.getEventHandler()).thenReturn(mockHandler);
     
     JobTokenSecretManager secret = mock(JobTokenSecretManager.class); 
-    TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    TaskHeartbeatHandler thh = mock(TaskHeartbeatHandler.class);
+    ContainerHeartbeatHandler chh = mock(ContainerHeartbeatHandler.class);
     
     MockTaskAttemptListenerImpl listener = 
-      new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler);
+      new MockTaskAttemptListenerImpl(appCtx, secret, chh, thh);
     Configuration conf = new Configuration();
     listener.init(conf);
     listener.start();
@@ -116,11 +121,13 @@ public class TestTaskAttemptListenerImpl
     result = listener.getTask(context1);
     assertNotNull(result);
     assertTrue(result.shouldDie);
+    verify(chh, never()).pinged(any(ContainerId.class));
 
     // Verify ask after JVM registration, but before container is assigned a task.
     listener.registerRunningJvm(wid1, containerId1);
     result = listener.getTask(context1);
     assertNull(result);
+    verify(chh, times(1)).pinged(any(ContainerId.class));
     
     // Verify ask after JVM registration, and when the container has a task.
     listener.registerRunningJvm(wid2, containerId2);
@@ -128,6 +135,7 @@ public class TestTaskAttemptListenerImpl
     assertNotNull(result);
     assertFalse(result.shouldDie);
     assertTrue(result.getTask() == task);
+    verify(chh, times(2)).pinged(any(ContainerId.class));
     ArgumentCaptor<Event> ac = ArgumentCaptor.forClass(Event.class);
     verify(mockHandler).handle(ac.capture());
     Event cEvent = ac.getValue();



Mime
View raw message