Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6D2DD9641 for ; Thu, 5 Jan 2012 05:20:36 +0000 (UTC) Received: (qmail 60907 invoked by uid 500); 5 Jan 2012 05:20:33 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 60730 invoked by uid 500); 5 Jan 2012 05:20:16 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 60722 invoked by uid 99); 5 Jan 2012 05:20:12 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Jan 2012 05:20:12 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Jan 2012 05:20:05 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 57CEE23888D2; Thu, 5 Jan 2012 05:19:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1227485 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/... Date: Thu, 05 Jan 2012 05:19:45 -0000 To: mapreduce-commits@hadoop.apache.org From: sseth@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120105051945.57CEE23888D2@eris.apache.org> Author: sseth Date: Thu Jan 5 05:19:44 2012 New Revision: 1227485 URL: http://svn.apache.org/viewvc?rev=1227485&view=rev Log: MAPREDUCE-3569. TaskAttemptListener holds a global lock for all task-updates. (Contributed by Vinod Kumar Vavilapalli) Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1227485&r1=1227484&r2=1227485&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Jan 5 05:19:44 2012 @@ -193,6 +193,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv) + MAPREDUCE-3569. TaskAttemptListener holds a global lock for all + task-updates. (Vinod Kumar Vavilapalli via sseth) + BUG FIXES MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1227485&r1=1227484&r2=1227485&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Thu Jan 5 05:19:44 2012 @@ -19,14 +19,12 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -64,21 +62,22 @@ import org.apache.hadoop.yarn.service.Co * This class HAS to be in this package to access package private * methods/classes. */ +@SuppressWarnings({"unchecked" , "deprecation"}) public class TaskAttemptListenerImpl extends CompositeService implements TaskUmbilicalProtocol, TaskAttemptListener { + private static final JvmTask TASK_FOR_INVALID_JVM = new JvmTask(null, true); + private static final Log LOG = LogFactory.getLog(TaskAttemptListenerImpl.class); private AppContext context; private Server server; protected TaskHeartbeatHandler taskHeartbeatHandler; private InetSocketAddress address; - private Map jvmIDToActiveAttemptMap = - Collections.synchronizedMap(new HashMap()); + private ConcurrentMap + jvmIDToActiveAttemptMap + = new ConcurrentHashMap(); private JobTokenSecretManager jobTokenSecretManager = null; - private Set pendingJvms = - Collections.synchronizedSet(new HashSet()); public TaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager) { @@ -123,10 +122,9 @@ public class TaskAttemptListenerImpl ext server.start(); InetSocketAddress listenerAddress = server.getListenerAddress(); - this.address = - NetUtils.createSocketAddr(listenerAddress.getAddress() - .getLocalHost().getCanonicalHostName() - + ":" + listenerAddress.getPort()); + listenerAddress.getAddress(); + this.address = NetUtils.createSocketAddr(InetAddress.getLocalHost() + .getCanonicalHostName() + ":" + listenerAddress.getPort()); } catch (IOException e) { throw new YarnException(e); } @@ -408,57 +406,59 @@ public class TaskAttemptListenerImpl ext WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap, jvmId.getId()); - synchronized(this) { - if(pendingJvms.contains(wJvmID)) { - org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap.get(wJvmID); - if (task != null) { //there may be lag in the attempt getting added here - LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); - jvmTask = new JvmTask(task, false); - - //remove the task as it is no more needed and free up the memory - //Also we have already told the JVM to process a task, so it is no - //longer pending, and further request should ask it to exit. - pendingJvms.remove(wJvmID); - jvmIDToActiveAttemptMap.remove(wJvmID); - } - } else { - LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed."); - jvmTask = new JvmTask(null, true); - } + + // Try to look up the task. We remove it directly as we don't give + // multiple tasks to a JVM + org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap + .remove(wJvmID); + if (task != null) { + LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); + jvmTask = new JvmTask(task, false); + + // remove the task as it is no more needed and free up the memory + // Also we have already told the JVM to process a task, so it is no + // longer pending, and further request should ask it to exit. + } else { + LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed."); + jvmTask = TASK_FOR_INVALID_JVM; } return jvmTask; } @Override - public synchronized void registerPendingTask(WrappedJvmID jvmID) { - //Save this JVM away as one that has not been handled yet - pendingJvms.add(jvmID); + public void registerPendingTask( + org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) { + // Create the mapping so that it is easy to look up + // when the jvm comes back to ask for Task. + + // A JVM not present in this map is an illegal task/JVM. + jvmIDToActiveAttemptMap.put(jvmID, task); } @Override public void registerLaunchedTask( - org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, - org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) { - synchronized(this) { - //create the mapping so that it is easy to look up - //when it comes back to ask for Task. - jvmIDToActiveAttemptMap.put(jvmID, task); - //This should not need to happen here, but just to be on the safe side - if(!pendingJvms.add(jvmID)) { - LOG.warn(jvmID+" launched without first being registered"); - } - } - //register this attempt + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID) { + + // The task is launched. Register this for expiry-tracking. + + // Timing can cause this to happen after the real JVM launches and gets a + // task which is still fine as we will only be tracking for expiry a little + // late than usual. taskHeartbeatHandler.register(attemptID); } @Override - public void unregister(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, + public void unregister( + org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID, WrappedJvmID jvmID) { - //remove the mapping if not already removed + + // Unregistration also comes from the same TaskAttempt which does the + // registration. Events are ordered at TaskAttempt, so unregistration will + // always come after registration. + + // remove the mapping if not already removed jvmIDToActiveAttemptMap.remove(jvmID); - //remove the pending if not already removed - pendingJvms.remove(jvmID); + //unregister this attempt taskHeartbeatHandler.unregister(attemptID); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java?rev=1227485&r1=1227484&r2=1227485&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java Thu Jan 5 05:19:44 2012 @@ -32,20 +32,21 @@ public interface TaskAttemptListener { InetSocketAddress getAddress(); /** - * register a JVM with the listener. This should be called as soon as a + * Register a JVM with the listener. This should be called as soon as a * JVM ID is assigned to a task attempt, before it has been launched. + * @param task the task itself for this JVM. * @param jvmID The ID of the JVM . */ - void registerPendingTask(WrappedJvmID jvmID); + void registerPendingTask(Task task, WrappedJvmID jvmID); /** - * Register the task and task attempt with the JVM. This should be called - * when the JVM has been launched. - * @param attemptID the id of the attempt for this JVM. - * @param task the task itself for this JVM. - * @param jvmID the id of the JVM handling the task. + * Register task attempt. This should be called when the JVM has been + * launched. + * + * @param attemptID + * the id of the attempt for this JVM. */ - void registerLaunchedTask(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID); + void registerLaunchedTask(TaskAttemptId attemptID); /** * Unregister the JVM and the attempt associated with it. This should be Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1227485&r1=1227484&r2=1227485&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu Jan 5 05:19:44 2012 @@ -1109,7 +1109,8 @@ public abstract class TaskAttemptImpl im taskAttempt.jvmID = new WrappedJvmID( taskAttempt.remoteTask.getTaskID().getJobID(), taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId()); - taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.jvmID); + taskAttempt.taskAttemptListener.registerPendingTask( + taskAttempt.remoteTask, taskAttempt.jvmID); //launch the container //create the container object to be launched for a given Task attempt @@ -1198,10 +1199,9 @@ public abstract class TaskAttemptImpl im taskAttempt.launchTime = taskAttempt.clock.getTime(); taskAttempt.shufflePort = event.getShufflePort(); - // register it to TaskAttemptListener so that it start listening - // for it - taskAttempt.taskAttemptListener.registerLaunchedTask( - taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID); + // register it to TaskAttemptListener so that it can start monitoring it. + taskAttempt.taskAttemptListener + .registerLaunchedTask(taskAttempt.attemptId); //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO: Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1227485&r1=1227484&r2=1227485&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Thu Jan 5 05:19:44 2012 @@ -17,8 +17,11 @@ */ package org.apache.hadoop.mapred; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.io.IOException; @@ -68,33 +71,47 @@ public class TestTaskAttemptListenerImpl JVMId id = new JVMId("foo",1, true, 1); WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId()); + // Verify ask before registration. //The JVM ID has not been registered yet so we should kill it. JvmContext context = new JvmContext(); context.jvmId = id; JvmTask result = listener.getTask(context); assertNotNull(result); assertTrue(result.shouldDie); - - //Now register the JVM, and see - listener.registerPendingTask(wid); - result = listener.getTask(context); - assertNull(result); - + + // Verify ask after registration but before launch TaskAttemptId attemptID = mock(TaskAttemptId.class); Task task = mock(Task.class); //Now put a task with the ID - listener.registerLaunchedTask(attemptID, task, wid); + listener.registerPendingTask(task, wid); + result = listener.getTask(context); + assertNotNull(result); + assertFalse(result.shouldDie); + // Unregister for more testing. + listener.unregister(attemptID, wid); + + // Verify ask after registration and launch + //Now put a task with the ID + listener.registerPendingTask(task, wid); + listener.registerLaunchedTask(attemptID); verify(hbHandler).register(attemptID); result = listener.getTask(context); assertNotNull(result); assertFalse(result.shouldDie); - + // Don't unregister yet for more testing. + //Verify that if we call it again a second time we are told to die. result = listener.getTask(context); assertNotNull(result); assertTrue(result.shouldDie); - + listener.unregister(attemptID, wid); + + // Verify after unregistration. + result = listener.getTask(context); + assertNotNull(result); + assertTrue(result.shouldDie); + listener.stop(); } } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1227485&r1=1227484&r2=1227485&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Thu Jan 5 05:19:44 2012 @@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.util.Build * Mock MRAppMaster. Doesn't start RPC servers. * No threads are started except of the event Dispatcher thread. */ +@SuppressWarnings("unchecked") public class MRApp extends MRAppMaster { private static final Log LOG = LogFactory.getLog(MRApp.class); @@ -323,13 +324,13 @@ public class MRApp extends MRAppMaster { return NetUtils.createSocketAddr("localhost:54321"); } @Override - public void registerLaunchedTask(TaskAttemptId attemptID, - org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {} + public void registerLaunchedTask(TaskAttemptId attemptID) {} @Override public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) { } @Override - public void registerPendingTask(WrappedJvmID jvmID) { + public void registerPendingTask(org.apache.hadoop.mapred.Task task, + WrappedJvmID jvmID) { } }; } @@ -357,7 +358,6 @@ public class MRApp extends MRAppMaster { public MockContainerLauncher() { } - @SuppressWarnings("unchecked") @Override public void handle(ContainerLauncherEvent event) { switch (event.getType()) {