hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1196458 [4/19] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ assembly/ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-cli...
Date Wed, 02 Nov 2011 05:35:03 GMT
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Wed Nov  2 05:34:31 2011
@@ -21,10 +21,15 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -43,7 +48,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
@@ -55,21 +59,23 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.service.AbstractService;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * This class is responsible for launching of containers.
  */
 public class ContainerLauncherImpl extends AbstractService implements
     ContainerLauncher {
 
-  private static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
+  static final Log LOG = LogFactory.getLog(ContainerLauncherImpl.class);
+
+  int nmTimeOut;
 
   private AppContext context;
   private ThreadPoolExecutor launcherPool;
@@ -79,10 +85,14 @@ public class ContainerLauncherImpl exten
   private BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
   private RecordFactory recordFactory;
-  //have a cache/map of UGIs so as to avoid creating too many RPC
-  //client connection objects to the same NodeManager
-  private ConcurrentMap<String, UserGroupInformation> ugiMap = 
-    new ConcurrentHashMap<String, UserGroupInformation>();
+
+  // To track numNodes.
+  Set<String> allNodes = new HashSet<String>();
+
+  // have a cache/map of proxies so as to avoid creating multiple RPC
+  // client connection objects for the same container.
+  private Map<ContainerId, ContainerManager> clientCache
+    = new HashMap<ContainerId, ContainerManager>();
 
   public ContainerLauncherImpl(AppContext context) {
     super(ContainerLauncherImpl.class.getName());
@@ -95,14 +105,21 @@ public class ContainerLauncherImpl exten
     this.limitOnPoolSize = conf.getInt(
         MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
+    this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT,
+        ContainerLauncher.DEFAULT_NM__COMMAND_TIMEOUT);
     super.init(conf);
   }
 
   public void start() {
+
+    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+        "ContainerLauncher #%d").setDaemon(true).build();
+
     // Start with a default core-pool size of 10 and change it dynamically.
     launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
         Integer.MAX_VALUE, 1, TimeUnit.HOURS,
-        new LinkedBlockingQueue<Runnable>());
+        new LinkedBlockingQueue<Runnable>(),
+        tf);
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -123,14 +140,17 @@ public class ContainerLauncherImpl exten
 
             // nodes where containers will run at *this* point of time. This is
             // *not* the cluster size and doesn't need to be.
-            int numNodes = ugiMap.size();
+            int numNodes = allNodes.size();
             int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
 
             if (poolSize <= idealPoolSize) {
               // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
               // later is just a buffer so we are not always increasing the
               // pool-size
-              launcherPool.setCorePoolSize(idealPoolSize + INITIAL_POOL_SIZE);
+              int newPoolSize = idealPoolSize + INITIAL_POOL_SIZE;
+              LOG.info("Setting ContainerLauncher pool size to "
+                  + newPoolSize);
+              launcherPool.setCorePoolSize(newPoolSize);
             }
           }
 
@@ -143,13 +163,14 @@ public class ContainerLauncherImpl exten
         }
       }
     });
+    eventHandlingThread.setName("ContainerLauncher Event Handler");
     eventHandlingThread.start();
     super.start();
   }
 
   public void stop() {
     eventHandlingThread.interrupt();
-    launcherPool.shutdown();
+    launcherPool.shutdownNow();
     super.stop();
   }
 
@@ -159,31 +180,57 @@ public class ContainerLauncherImpl exten
 
     UserGroupInformation user = UserGroupInformation.getCurrentUser();
 
-    if (UserGroupInformation.isSecurityEnabled()) {
+    synchronized (this.clientCache) {
+
+      if (this.clientCache.containsKey(containerID)) {
+        return this.clientCache.get(containerID);
+      }
+
+      this.allNodes.add(containerManagerBindAddr);
+
+      if (UserGroupInformation.isSecurityEnabled()) {
+        Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
+            containerToken.getIdentifier().array(), containerToken
+                .getPassword().array(), new Text(containerToken.getKind()),
+            new Text(containerToken.getService()));
+        // the user in createRemoteUser in this context has to be ContainerID
+        user = UserGroupInformation.createRemoteUser(containerID.toString());
+        user.addToken(token);
+      }
+
+      ContainerManager proxy = user
+          .doAs(new PrivilegedAction<ContainerManager>() {
+            @Override
+            public ContainerManager run() {
+              YarnRPC rpc = YarnRPC.create(getConfig());
+              return (ContainerManager) rpc.getProxy(ContainerManager.class,
+                  NetUtils.createSocketAddr(containerManagerBindAddr),
+                  getConfig());
+            }
+          });
+      this.clientCache.put(containerID, proxy);
+      return proxy;
+    }
+  }
 
-      Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
-          containerToken.getIdentifier().array(), containerToken
-              .getPassword().array(), new Text(containerToken.getKind()),
-          new Text(containerToken.getService()));
-      // the user in createRemoteUser in this context is not important
-      UserGroupInformation ugi = UserGroupInformation
-          .createRemoteUser(containerManagerBindAddr);
-      ugi.addToken(token);
-      ugiMap.putIfAbsent(containerManagerBindAddr, ugi);
+  private static class CommandTimer extends TimerTask {
+    private final Thread commandThread;
+    protected final ContainerLauncherEvent event;
+    protected final String message;
 
-      user = ugiMap.get(containerManagerBindAddr);    
+    public CommandTimer(Thread thread, ContainerLauncherEvent event) {
+      this.commandThread = thread;
+      this.event = event;
+      this.message = "Couldn't complete " + event.getType() + " on "
+          + event.getContainerID() + "/" + event.getTaskAttemptID()
+          + ". Interrupting and returning";
+    }
+
+    @Override
+    public void run() {
+      LOG.warn(this.message);
+      this.commandThread.interrupt();
     }
-    ContainerManager proxy =
-        user.doAs(new PrivilegedAction<ContainerManager>() {
-          @Override
-          public ContainerManager run() {
-            YarnRPC rpc = YarnRPC.create(getConfig());
-            return (ContainerManager) rpc.getProxy(ContainerManager.class,
-                NetUtils.createSocketAddr(containerManagerBindAddr),
-                getConfig());
-          }
-        });
-    return proxy;
   }
 
   /**
@@ -206,27 +253,53 @@ public class ContainerLauncherImpl exten
       final String containerManagerBindAddr = event.getContainerMgrAddress();
       ContainerId containerID = event.getContainerID();
       ContainerToken containerToken = event.getContainerToken();
+      TaskAttemptId taskAttemptID = event.getTaskAttemptID();
+
+      Timer timer = new Timer(true);
 
       switch(event.getType()) {
 
       case CONTAINER_REMOTE_LAUNCH:
-        ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent) event;
+        ContainerRemoteLaunchEvent launchEvent
+            = (ContainerRemoteLaunchEvent) event;
 
-        TaskAttemptId taskAttemptID = launchEv.getTaskAttemptID();
         try {
-          
-          ContainerManager proxy = 
-            getCMProxy(containerID, containerManagerBindAddr, containerToken);
-          
+          timer.schedule(new CommandTimer(Thread.currentThread(), event),
+              nmTimeOut);
+
+          ContainerManager proxy = getCMProxy(containerID,
+              containerManagerBindAddr, containerToken);
+
+          // Interruped during getProxy, but that didn't throw exception
+          if (Thread.currentThread().isInterrupted()) {
+            // The timer cancelled the command in the mean while.
+            String message = "Start-container for " + event.getContainerID()
+                + " got interrupted. Returning.";
+            sendContainerLaunchFailedMsg(taskAttemptID, message);
+            return;
+          }
+
           // Construct the actual Container
           ContainerLaunchContext containerLaunchContext =
-              launchEv.getContainer();
+              launchEvent.getContainer();
 
           // Now launch the actual container
           StartContainerRequest startRequest = recordFactory
               .newRecordInstance(StartContainerRequest.class);
           startRequest.setContainerLaunchContext(containerLaunchContext);
           StartContainerResponse response = proxy.startContainer(startRequest);
+
+          // container started properly. Stop the timer
+          timer.cancel();
+          if (Thread.currentThread().isInterrupted()) {
+            // The timer cancelled the command in the mean while, but
+            // startContainer didn't throw exception
+            String message = "Start-container for " + event.getContainerID()
+                + " got interrupted. Returning.";
+            sendContainerLaunchFailedMsg(taskAttemptID, message);
+            return;
+          }
+
           ByteBuffer portInfo = response
               .getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
           int port = -1;
@@ -248,12 +321,9 @@ public class ContainerLauncherImpl exten
         } catch (Throwable t) {
           String message = "Container launch failed for " + containerID
               + " : " + StringUtils.stringifyException(t);
-          LOG.error(message);
-          context.getEventHandler().handle(
-              new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
-          context.getEventHandler().handle(
-              new TaskAttemptEvent(taskAttemptID,
-                  TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
+          sendContainerLaunchFailedMsg(taskAttemptID, message);
+        } finally {
+          timer.cancel();
         }
 
         break;
@@ -265,24 +335,44 @@ public class ContainerLauncherImpl exten
           eventQueue.remove(event); // TODO: Any synchro needed?
           //deallocate the container
           context.getEventHandler().handle(
-              new ContainerAllocatorEvent(event.getTaskAttemptID(),
-              ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
+              new ContainerAllocatorEvent(taskAttemptID,
+                  ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
         } else {
+
           try {
-            ContainerManager proxy = 
-              getCMProxy(containerID, containerManagerBindAddr, containerToken);
-            // TODO:check whether container is launched
-
-            // kill the remote container if already launched
-            StopContainerRequest stopRequest = recordFactory
-                .newRecordInstance(StopContainerRequest.class);
-            stopRequest.setContainerId(event.getContainerID());
-            proxy.stopContainer(stopRequest);
+            timer.schedule(new CommandTimer(Thread.currentThread(), event),
+                nmTimeOut);
+
+            ContainerManager proxy = getCMProxy(containerID,
+                containerManagerBindAddr, containerToken);
 
+            if (Thread.currentThread().isInterrupted()) {
+              // The timer cancelled the command in the mean while. No need to
+              // return, send cleanedup event anyways.
+              LOG.info("Stop-container for " + event.getContainerID()
+                  + " got interrupted.");
+            } else {
+
+              // TODO:check whether container is launched
+
+              // kill the remote container if already launched
+              StopContainerRequest stopRequest = recordFactory
+                  .newRecordInstance(StopContainerRequest.class);
+              stopRequest.setContainerId(event.getContainerID());
+              proxy.stopContainer(stopRequest);
+            }
           } catch (Throwable t) {
-            //ignore the cleanup failure
-            LOG.warn("cleanup failed for container " + event.getContainerID() ,
-                t);
+            // ignore the cleanup failure
+            String message = "cleanup failed for container "
+                + event.getContainerID() + " : "
+                + StringUtils.stringifyException(t);
+            context.getEventHandler()
+                .handle(
+                    new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
+                        message));
+            LOG.warn(message);
+          } finally {
+            timer.cancel();
           }
 
           // after killing, send killed event to taskattempt
@@ -293,7 +383,17 @@ public class ContainerLauncherImpl exten
         break;
       }
     }
-    
+  }
+
+  @SuppressWarnings("unchecked")
+  void sendContainerLaunchFailedMsg(TaskAttemptId taskAttemptID,
+      String message) {
+    LOG.error(message);
+    context.getEventHandler().handle(
+        new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(taskAttemptID,
+            TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Wed Nov  2 05:34:31 2011
@@ -23,19 +23,23 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.AMResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -57,8 +61,10 @@ public class LocalContainerAllocator ext
       LogFactory.getLog(LocalContainerAllocator.class);
 
   private final EventHandler eventHandler;
-  private final ApplicationId appID;
+//  private final ApplicationId appID;
   private AtomicInteger containerCount = new AtomicInteger();
+  private long retryInterval;
+  private long retrystartTime;
 
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
@@ -67,7 +73,19 @@ public class LocalContainerAllocator ext
                                  AppContext context) {
     super(clientService, context);
     this.eventHandler = context.getEventHandler();
-    this.appID = context.getApplicationID();
+//    this.appID = context.getApplicationID();
+    
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    retryInterval =
+        getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+            MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+    // Init startTime to current time. If all goes well, it will be reset after
+    // first attempt to contact RM.
+    retrystartTime = System.currentTimeMillis();
   }
 
   @Override
@@ -77,10 +95,32 @@ public class LocalContainerAllocator ext
             .getApplicationProgress(), new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>());
     AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
-    AMResponse response = allocateResponse.getAMResponse();
+    AMResponse response;
+    try {
+      response = allocateResponse.getAMResponse();
+      // Reset retry count if no exception occurred.
+      retrystartTime = System.currentTimeMillis();
+    } catch (Exception e) {
+      // This can happen when the connection to the RM has gone down. Keep
+      // re-trying until the retryInterval has expired.
+      if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+        eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                         JobEventType.INTERNAL_ERROR));
+        throw new YarnException("Could not contact RM after " +
+                                retryInterval + " milliseconds.");
+      }
+      // Throw this up to the caller, which may decide to ignore it and
+      // continue to attempt to contact the RM.
+      throw e;
+    }
     if (response.getReboot()) {
-      // TODO
       LOG.info("Event from RM: shutting down Application Master");
+      // This can happen if the RM has been restarted. If it is in that state,
+      // this application must clean itself up.
+      eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                       JobEventType.INTERNAL_ERROR));
+      throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
+                               this.getContext().getApplicationID());
     }
   }
 
@@ -112,7 +152,7 @@ public class LocalContainerAllocator ext
         eventHandler.handle(jce);
       }
       eventHandler.handle(new TaskAttemptContainerAssignedEvent(
-          event.getAttemptID(), container));
+          event.getAttemptID(), container, applicationACLs));
     }
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java Wed Nov  2 05:34:31 2011
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.mapreduce.v2.app.recover;
 
+import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -31,4 +33,6 @@ public interface Recovery {
   Clock getClock();
   
   Set<TaskId> getCompletedTasks();
+  
+  List<AMInfo> getAMInfos();
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Wed Nov  2 05:34:31 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,16 +32,23 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -51,12 +59,14 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
 import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -70,6 +80,8 @@ import org.apache.hadoop.yarn.factories.
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 
 /*
  * Recovers the completed tasks from the previous life of Application Master.
@@ -82,9 +94,6 @@ import org.apache.hadoop.yarn.service.Se
 
 //TODO:
 //task cleanup for all non completed tasks
-//change job output committer to have 
-//    - atomic job output promotion
-//    - recover output of completed tasks
 
 public class RecoveryService extends CompositeService implements Recovery {
 
@@ -93,6 +102,7 @@ public class RecoveryService extends Com
   private static final Log LOG = LogFactory.getLog(RecoveryService.class);
 
   private final ApplicationAttemptId applicationAttemptId;
+  private final OutputCommitter committer;
   private final Dispatcher dispatcher;
   private final ControlledClock clock;
 
@@ -106,9 +116,10 @@ public class RecoveryService extends Com
   private volatile boolean recoveryMode = false;
 
   public RecoveryService(ApplicationAttemptId applicationAttemptId, 
-      Clock clock) {
+      Clock clock, OutputCommitter committer) {
     super("RecoveringDispatcher");
     this.applicationAttemptId = applicationAttemptId;
+    this.committer = committer;
     this.dispatcher = new RecoveryDispatcher();
     this.clock = new ControlledClock(clock);
       addService((Service) dispatcher);
@@ -120,17 +131,17 @@ public class RecoveryService extends Com
     // parse the history file
     try {
       parse();
-      if (completedTasks.size() > 0) {
-        recoveryMode = true;
-        LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS " + 
-            "TO RECOVER " + completedTasks.size());
-        LOG.info("Job launch time " + jobInfo.getLaunchTime());
-        clock.setTime(jobInfo.getLaunchTime());
-      }
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOG.warn(e);
       LOG.warn("Could not parse the old history file. Aborting recovery. "
-          + "Starting afresh.");
+          + "Starting afresh.", e);
+    }
+    if (completedTasks.size() > 0) {
+      recoveryMode = true;
+      LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS "
+          + "TO RECOVER " + completedTasks.size());
+      LOG.info("Job launch time " + jobInfo.getLaunchTime());
+      clock.setTime(jobInfo.getLaunchTime());
     }
   }
 
@@ -149,6 +160,25 @@ public class RecoveryService extends Com
     return completedTasks.keySet();
   }
 
+  @Override
+  public List<AMInfo> getAMInfos() {
+    if (jobInfo == null || jobInfo.getAMInfos() == null) {
+      return new LinkedList<AMInfo>();
+    }
+    List<AMInfo> amInfos = new LinkedList<AMInfo>();
+    for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
+        .getAMInfos()) {
+      AMInfo amInfo =
+          MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
+              jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
+              jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
+              jhAmInfo.getNodeManagerHttpPort());
+
+      amInfos.add(amInfo);
+    }
+    return amInfos;
+  }
+
   private void parse() throws IOException {
     // TODO: parse history file based on startCount
     String jobName = 
@@ -297,14 +327,28 @@ public class RecoveryService extends Com
         TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
             .getTaskAttemptID();
         TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
-        //TODO need to get the real port number MAPREDUCE-2666
-        actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId, -1));
+        actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId,
+            attInfo.getShufflePort()));
         // send the status update event
         sendStatusUpdateEvent(aId, attInfo);
 
         TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
         switch (state) {
         case SUCCEEDED:
+          //recover the task output
+          TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
+              attInfo.getAttemptId());
+          try {
+            committer.recoverTask(taskContext);
+          } catch (IOException e) {
+            actualHandler.handle(new JobDiagnosticsUpdateEvent(
+                aId.getTaskId().getJobId(), "Error in recovering task output " + 
+                e.getMessage()));
+            actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
+                JobEventType.INTERNAL_ERROR));
+          }
+          LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
+          
           // send the done event
           LOG.info("Sending done event to " + aId);
           actualHandler.handle(new TaskAttemptEvent(aId,
@@ -324,6 +368,16 @@ public class RecoveryService extends Com
         return;
       }
 
+      else if (event.getType() == 
+        ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
+        TaskAttemptId aId = ((ContainerLauncherEvent) event)
+          .getTaskAttemptID();
+        actualHandler.handle(
+           new TaskAttemptEvent(aId,
+                TaskAttemptEventType.TA_CONTAINER_CLEANED));
+        return;
+      }
+
       // delegate to the actual handler
       actualHandler.handle(event);
     }
@@ -334,7 +388,6 @@ public class RecoveryService extends Com
       TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
       taskAttemptStatus.id = yarnAttemptID;
       taskAttemptStatus.progress = 1.0f;
-      taskAttemptStatus.diagnosticInfo = "";
       taskAttemptStatus.stateString = attemptInfo.getTaskStatus(); 
       // taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
       taskAttemptStatus.phase = Phase.CLEANUP;
@@ -352,18 +405,17 @@ public class RecoveryService extends Com
     private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
         TaskAttemptInfo attemptInfo) {
       LOG.info("Sending assigned event to " + yarnAttemptID);
-      ContainerId cId = recordFactory
-          .newRecordInstance(ContainerId.class);
-      Container container = recordFactory
-          .newRecordInstance(Container.class);
-      container.setId(cId);
-      container.setNodeId(recordFactory
-          .newRecordInstance(NodeId.class));
-      container.setContainerToken(null);
-      container.setNodeHttpAddress(attemptInfo.getHostname() + ":" + 
-          attemptInfo.getHttpPort());
+      ContainerId cId = attemptInfo.getContainerId();
+
+      NodeId nodeId = ConverterUtils.toNodeId(attemptInfo.getHostname());
+      // Resource/Priority/ApplicationACLs are only needed while launching the
+      // container on an NM, these are already completed tasks, so setting them
+      // to null
+      Container container = BuilderUtils.newContainer(cId, nodeId,
+          attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(),
+          null, null, null);
       actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
-          container));
+          container, null));
     }
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Wed Nov  2 05:34:31 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.io.IOException;
 import java.security.PrivilegedAction;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,7 +36,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -54,7 +55,6 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
 import org.apache.hadoop.yarn.service.AbstractService;
 
 /**
@@ -73,6 +73,7 @@ public abstract class RMCommunicator ext
   protected int lastResponseID;
   private Resource minContainerCapability;
   private Resource maxContainerCapability;
+  protected Map<ApplicationAccessType, String> applicationACLs;
 
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
@@ -160,6 +161,7 @@ public abstract class RMCommunicator ext
         scheduler.registerApplicationMaster(request);
       minContainerCapability = response.getMinimumResourceCapability();
       maxContainerCapability = response.getMaximumResourceCapability();
+      this.applicationACLs = response.getApplicationACLs();
       LOG.info("minContainerCapability: " + minContainerCapability.getMemory());
       LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
     } catch (Exception are) {
@@ -231,6 +233,9 @@ public abstract class RMCommunicator ext
             Thread.sleep(rmPollInterval);
             try {
               heartbeat();
+            } catch (YarnException e) {
+              LOG.error("Error communicating with RM: " + e.getMessage() , e);
+              return;
             } catch (Exception e) {
               LOG.error("ERROR IN CONTACTING RM. ", e);
               // TODO: for other exceptions
@@ -242,6 +247,7 @@ public abstract class RMCommunicator ext
         }
       }
     });
+    allocatorThread.setName("RMCommunicator Allocator");
     allocatorThread.start();
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Wed Nov  2 05:34:31 2011
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -37,7 +35,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -50,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -125,7 +129,9 @@ public class RMContainerAllocator extend
   private float maxReduceRampupLimit = 0;
   private float maxReducePreemptionLimit = 0;
   private float reduceSlowStart = 0;
-
+  private long retryInterval;
+  private long retrystartTime;
+  
   public RMContainerAllocator(ClientService clientService, AppContext context) {
     super(clientService, context);
   }
@@ -143,6 +149,11 @@ public class RMContainerAllocator extend
         MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
     RackResolver.init(conf);
+    retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+                                MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
+    // Init startTime to current time. If all goes well, it will be reset after
+    // first attempt to contact RM.
+    retrystartTime = System.currentTimeMillis();
   }
 
   @Override
@@ -169,6 +180,7 @@ public class RMContainerAllocator extend
     LOG.info("Final Stats: " + getStat());
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public synchronized void handle(ContainerAllocatorEvent event) {
     LOG.info("Processing the event " + event.toString());
@@ -179,7 +191,13 @@ public class RMContainerAllocator extend
         if (mapResourceReqt == 0) {
           mapResourceReqt = reqEvent.getCapability().getMemory();
           int minSlotMemSize = getMinContainerCapability().getMemory();
-          mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize) * minSlotMemSize;
+          mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize)
+              * minSlotMemSize;
+          JobID id = TypeConverter.fromYarn(applicationId);
+          JobId jobId = TypeConverter.toYarn(id);
+          eventHandler.handle(new JobHistoryEvent(jobId, 
+              new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
+              mapResourceReqt)));
           LOG.info("mapResourceReqt:"+mapResourceReqt);
           if (mapResourceReqt > getMaxContainerCapability().getMemory()) {
             String diagMsg = "MAP capability required is more than the supported " +
@@ -199,12 +217,20 @@ public class RMContainerAllocator extend
           reduceResourceReqt = reqEvent.getCapability().getMemory();
           int minSlotMemSize = getMinContainerCapability().getMemory();
           //round off on slotsize
-          reduceResourceReqt = (int) Math.ceil((float) reduceResourceReqt/minSlotMemSize) * minSlotMemSize;
+          reduceResourceReqt = (int) Math.ceil((float) 
+              reduceResourceReqt/minSlotMemSize) * minSlotMemSize;
+          JobID id = TypeConverter.fromYarn(applicationId);
+          JobId jobId = TypeConverter.toYarn(id);
+          eventHandler.handle(new JobHistoryEvent(jobId, 
+              new NormalizedResourceEvent(
+                  org.apache.hadoop.mapreduce.TaskType.REDUCE,
+              reduceResourceReqt)));
           LOG.info("reduceResourceReqt:"+reduceResourceReqt);
           if (reduceResourceReqt > getMaxContainerCapability().getMemory()) {
-            String diagMsg = "REDUCE capability required is more than the supported " +
-            "max container capability in the cluster. Killing the Job. reduceResourceReqt: " + 
-            reduceResourceReqt + " maxContainerCapability:" + getMaxContainerCapability().getMemory();
+            String diagMsg = "REDUCE capability required is more than the " +
+            		"supported max container capability in the cluster. Killing the " +
+            		"Job. reduceResourceReqt: " + reduceResourceReqt +
+            		" maxContainerCapability:" + getMaxContainerCapability().getMemory();
             LOG.info(diagMsg);
             eventHandler.handle(new JobDiagnosticsUpdateEvent(
                 getJob().getID(), diagMsg));
@@ -217,7 +243,8 @@ public class RMContainerAllocator extend
           //add to the front of queue for fail fast
           pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
         } else {
-          pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));//reduces are added to pending and are slowly ramped up
+          pendingReduces.add(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
+          //reduces are added to pending and are slowly ramped up
         }
       }
       
@@ -410,10 +437,41 @@ public class RMContainerAllocator extend
         " rackLocalAssigned:" + rackLocalAssigned +
         " availableResources(headroom):" + getAvailableResources();
   }
-  
+
+  @SuppressWarnings("unchecked")
   private List<Container> getResources() throws Exception {
     int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
-    AMResponse response = makeRemoteRequest();
+    AMResponse response;
+    /*
+     * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
+     * milliseconds before aborting. During this interval, AM will still try
+     * to contact the RM.
+     */
+    try {
+      response = makeRemoteRequest();
+      // Reset retry count if no exception occurred.
+      retrystartTime = System.currentTimeMillis();
+    } catch (Exception e) {
+      // This can happen when the connection to the RM has gone down. Keep
+      // re-trying until the retryInterval has expired.
+      if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+        eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                         JobEventType.INTERNAL_ERROR));
+        throw new YarnException("Could not contact RM after " +
+                                retryInterval + " milliseconds.");
+      }
+      // Throw this up to the caller, which may decide to ignore it and
+      // continue to attempt to contact the RM.
+      throw e;
+    }
+    if (response.getReboot()) {
+      // This can happen if the RM has been restarted. If it is in that state,
+      // this application must clean itself up.
+      eventHandler.handle(new JobEvent(this.getJob().getID(),
+                                       JobEventType.INTERNAL_ERROR));
+      throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
+                               this.getContext().getApplicationID());
+    }
     int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
     List<Container> newContainers = response.getAllocatedContainers();
     List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
@@ -509,18 +567,6 @@ public class RMContainerAllocator extend
         request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
       } else {
         for (String host : event.getHosts()) {
-          //host comes from data splitLocations which are hostnames. Containers
-          // use IP addresses.
-          //TODO Temporary fix for locality. Use resolvers from h-common. 
-          // Cache to make this more efficient ?
-          InetAddress addr = null;
-          try {
-            addr = InetAddress.getByName(host);
-          } catch (UnknownHostException e) {
-            LOG.warn("Unable to resolve host to IP for host [: " + host + "]");
-          }
-          if (addr != null) //Fallback to host if resolve fails.
-            host = addr.getHostAddress();
           LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
           if (list == null) {
             list = new LinkedList<TaskAttemptId>();
@@ -550,6 +596,7 @@ public class RMContainerAllocator extend
       addContainerReq(req);
     }
     
+    @SuppressWarnings("unchecked")
     private void assign(List<Container> allocatedContainers) {
       Iterator<Container> it = allocatedContainers.iterator();
       LOG.info("Got allocated containers " + allocatedContainers.size());
@@ -557,26 +604,101 @@ public class RMContainerAllocator extend
       while (it.hasNext()) {
         Container allocated = it.next();
         LOG.info("Assigning container " + allocated);
-        ContainerRequest assigned = assign(allocated);
-          
-        if (assigned != null) {
-          // Update resource requests
-          decContainerReq(assigned);
-
-          // send the container-assigned event to task attempt
-          eventHandler.handle(new TaskAttemptContainerAssignedEvent(
-              assigned.attemptID, allocated));
-
-          assignedRequests.add(allocated.getId(), assigned.attemptID);
-          
-          LOG.info("Assigned container (" + allocated + ") " +
-              " to task " + assigned.attemptID +
-              " on node " + allocated.getNodeId().toString());
-        } else {
-          //not assigned to any request, release the container
-          LOG.info("Releasing unassigned and invalid container " + allocated
-              + ". RM has gone crazy, someone go look!"
-              + " Hey RM, if you are so rich, go donate to non-profits!");
+        
+        // check if allocated container meets memory requirements 
+        // and whether we have any scheduled tasks that need 
+        // a container to be assigned
+        boolean isAssignable = true;
+        Priority priority = allocated.getPriority();
+        if (PRIORITY_FAST_FAIL_MAP.equals(priority) 
+            || PRIORITY_MAP.equals(priority)) {
+          if (allocated.getResource().getMemory() < mapResourceReqt
+              || maps.isEmpty()) {
+            LOG.info("Cannot assign container " + allocated 
+                + " for a map as either "
+                + " container memory less than required " + mapResourceReqt
+                + " or no pending map tasks - maps.isEmpty=" 
+                + maps.isEmpty()); 
+            isAssignable = false; 
+          }
+        } 
+        else if (PRIORITY_REDUCE.equals(priority)) {
+          if (allocated.getResource().getMemory() < reduceResourceReqt
+              || reduces.isEmpty()) {
+            LOG.info("Cannot assign container " + allocated 
+                + " for a reduce as either "
+                + " container memory less than required " + reduceResourceReqt
+                + " or no pending reduce tasks - reduces.isEmpty=" 
+                + reduces.isEmpty()); 
+            isAssignable = false;
+          }
+        }          
+        
+        boolean blackListed = false;         
+        ContainerRequest assigned = null;
+        
+        if (isAssignable) {
+          // do not assign if allocated container is on a  
+          // blacklisted host
+          blackListed = isNodeBlacklisted(allocated.getNodeId().getHost());
+          if (blackListed) {
+            // we need to request for a new container 
+            // and release the current one
+            LOG.info("Got allocated container on a blacklisted "
+                + " host. Releasing container " + allocated);
+
+            // find the request matching this allocated container 
+            // and replace it with a new one 
+            ContainerRequest toBeReplacedReq = 
+                getContainerReqToReplace(allocated);
+            if (toBeReplacedReq != null) {
+              LOG.info("Placing a new container request for task attempt " 
+                  + toBeReplacedReq.attemptID);
+              ContainerRequest newReq = 
+                  getFilteredContainerRequest(toBeReplacedReq);
+              decContainerReq(toBeReplacedReq);
+              if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
+                  TaskType.MAP) {
+                maps.put(newReq.attemptID, newReq);
+              }
+              else {
+                reduces.put(newReq.attemptID, newReq);
+              }
+              addContainerReq(newReq);
+            }
+            else {
+              LOG.info("Could not map allocated container to a valid request."
+                  + " Releasing allocated container " + allocated);
+            }
+          }
+          else {
+            assigned = assign(allocated);
+            if (assigned != null) {
+              // Update resource requests
+              decContainerReq(assigned);
+
+              // send the container-assigned event to task attempt
+              eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+                  assigned.attemptID, allocated, applicationACLs));
+
+              assignedRequests.add(allocated.getId(), assigned.attemptID);
+
+              LOG.info("Assigned container (" + allocated + ") " +
+                  " to task " + assigned.attemptID +
+                  " on node " + allocated.getNodeId().toString());
+            }
+            else {
+              //not assigned to any request, release the container
+              LOG.info("Releasing unassigned and invalid container " 
+                  + allocated + ". RM has gone crazy, someone go look!"
+                  + " Hey RM, if you are so rich, go donate to non-profits!");
+            }
+          }
+        }
+        
+        // release container if it was blacklisted 
+        // or if we could not assign it 
+        if (blackListed || assigned == null) {
           containersReleased++;
           release(allocated.getId());
         }
@@ -604,12 +726,38 @@ public class RMContainerAllocator extend
       return assigned;
     }
     
+    private ContainerRequest getContainerReqToReplace(Container allocated) {
+      Priority priority = allocated.getPriority();
+      ContainerRequest toBeReplaced = null;
+      if (PRIORITY_FAST_FAIL_MAP.equals(priority) 
+          || PRIORITY_MAP.equals(priority)) {
+        // allocated container was for a map
+        String host = allocated.getNodeId().getHost();
+        LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
+        if (list != null && list.size() > 0) {
+          TaskAttemptId tId = list.removeLast();
+          if (maps.containsKey(tId)) {
+            toBeReplaced = maps.remove(tId);
+          }
+        }
+        else {
+          TaskAttemptId tId = maps.keySet().iterator().next();
+          toBeReplaced = maps.remove(tId);          
+        }        
+      }
+      else if (PRIORITY_REDUCE.equals(priority)) {
+        TaskAttemptId tId = reduces.keySet().iterator().next();
+        toBeReplaced = reduces.remove(tId);    
+      }
+      return toBeReplaced;
+    }
+    
     
+    @SuppressWarnings("unchecked")
     private ContainerRequest assignToFailedMap(Container allocated) {
       //try to assign to earlierFailedMaps if present
       ContainerRequest assigned = null;
-      while (assigned == null && earlierFailedMaps.size() > 0 && 
-          allocated.getResource().getMemory() >= mapResourceReqt) {
+      while (assigned == null && earlierFailedMaps.size() > 0) {
         TaskAttemptId tId = earlierFailedMaps.removeFirst();
         if (maps.containsKey(tId)) {
           assigned = maps.remove(tId);
@@ -627,8 +775,7 @@ public class RMContainerAllocator extend
     private ContainerRequest assignToReduce(Container allocated) {
       ContainerRequest assigned = null;
       //try to assign to reduces if present
-      if (assigned == null && reduces.size() > 0
-          && allocated.getResource().getMemory() >= reduceResourceReqt) {
+      if (assigned == null && reduces.size() > 0) {
         TaskAttemptId tId = reduces.keySet().iterator().next();
         assigned = reduces.remove(tId);
         LOG.info("Assigned to reduce");
@@ -636,13 +783,13 @@ public class RMContainerAllocator extend
       return assigned;
     }
     
+    @SuppressWarnings("unchecked")
     private ContainerRequest assignToMap(Container allocated) {
     //try to assign to maps if present 
       //first by host, then by rack, followed by *
       ContainerRequest assigned = null;
-      while (assigned == null && maps.size() > 0
-          && allocated.getResource().getMemory() >= mapResourceReqt) {
-        String host = getHost(allocated.getNodeId().toString());
+      while (assigned == null && maps.size() > 0) {
+        String host = allocated.getNodeId().getHost();
         LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
         while (list != null && list.size() > 0) {
           LOG.info("Host matched to the request list " + host);
@@ -712,7 +859,8 @@ public class RMContainerAllocator extend
     }
 
     void preemptReduce(int toPreempt) {
-      List<TaskAttemptId> reduceList = new ArrayList(reduces.keySet());
+      List<TaskAttemptId> reduceList = new ArrayList<TaskAttemptId>
+        (reduces.keySet());
       //sort reduces on progress
       Collections.sort(reduceList,
           new Comparator<TaskAttemptId>() {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Wed Nov  2 05:34:31 2011
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -63,7 +65,7 @@ public abstract class RMContainerRequest
   //Key->ResourceName (e.g., hostname, rackname, *)
   //Value->Map
   //Key->Resource Capability
-  //Value->ResourceReqeust
+  //Value->ResourceRequest
   private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
   remoteRequestsTable =
       new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
@@ -87,14 +89,22 @@ public abstract class RMContainerRequest
     final String[] racks;
     //final boolean earlierAttemptFailed;
     final Priority priority;
+    
     public ContainerRequest(ContainerRequestEvent event, Priority priority) {
-      this.attemptID = event.getAttemptID();
-      this.capability = event.getCapability();
-      this.hosts = event.getHosts();
-      this.racks = event.getRacks();
-      //this.earlierAttemptFailed = event.getEarlierAttemptFailed();
+      this(event.getAttemptID(), event.getCapability(), event.getHosts(),
+          event.getRacks(), priority);
+    }
+    
+    public ContainerRequest(TaskAttemptId attemptID,
+        Resource capability, String[] hosts, String[] racks, 
+        Priority priority) {
+      this.attemptID = attemptID;
+      this.capability = capability;
+      this.hosts = hosts;
+      this.racks = racks;
       this.priority = priority;
     }
+    
   }
 
   @Override
@@ -149,14 +159,35 @@ public abstract class RMContainerRequest
       //remove all the requests corresponding to this hostname
       for (Map<String, Map<Resource, ResourceRequest>> remoteRequests 
           : remoteRequestsTable.values()){
-        //remove from host
-        Map<Resource, ResourceRequest> reqMap = remoteRequests.remove(hostName);
+        //remove from host if no pending allocations
+        boolean foundAll = true;
+        Map<Resource, ResourceRequest> reqMap = remoteRequests.get(hostName);
         if (reqMap != null) {
           for (ResourceRequest req : reqMap.values()) {
-            ask.remove(req);
+            if (!ask.remove(req)) {
+              foundAll = false;
+              // if ask already sent to RM, we can try and overwrite it if possible.
+              // send a new ask to RM with numContainers
+              // specified for the blacklisted host to be 0.
+              ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req);
+              zeroedRequest.setNumContainers(0);
+              // to be sent to RM on next heartbeat
+              ask.add(zeroedRequest);
+            }
+          }
+          // if all requests were still in ask queue
+          // we can remove this request
+          if (foundAll) {
+            remoteRequests.remove(hostName);
           }
         }
-        //TODO: remove from rack
+        // TODO handling of rack blacklisting
+        // Removing from rack should be dependent on no. of failures within the rack 
+        // Blacklisting a rack on the basis of a single node's blacklisting 
+        // may be overly aggressive. 
+        // Node failures could be co-related with other failures on the same rack 
+        // but we probably need a better approach at trying to decide how and when 
+        // to blacklist a rack
       }
     } else {
       nodeFailures.put(hostName, failures);
@@ -171,7 +202,9 @@ public abstract class RMContainerRequest
     // Create resource requests
     for (String host : req.hosts) {
       // Data-local
-      addResourceRequest(req.priority, host, req.capability);
+      if (!isNodeBlacklisted(host)) {
+        addResourceRequest(req.priority, host, req.capability);
+      }      
     }
 
     // Nothing Rack-local for now
@@ -234,6 +267,14 @@ public abstract class RMContainerRequest
     Map<String, Map<Resource, ResourceRequest>> remoteRequests =
       this.remoteRequestsTable.get(priority);
     Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    if (reqMap == null) {
+      // as we modify the resource requests by filtering out blacklisted hosts 
+      // when they are added, this value may be null when being 
+      // decremented
+      LOG.debug("Not decrementing resource as " + resourceName
+          + " is not present in request table");
+      return;
+    }
     ResourceRequest remoteRequest = reqMap.get(capability);
 
     LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
@@ -267,4 +308,23 @@ public abstract class RMContainerRequest
     release.add(containerId);
   }
   
+  protected boolean isNodeBlacklisted(String hostname) {
+    if (!nodeBlacklistingEnabled) {
+      return false;
+    }
+    return blacklistedNodes.contains(hostname);
+  }
+  
+  protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) {
+    ArrayList<String> newHosts = new ArrayList<String>();
+    for (String host : orig.hosts) {
+      if (!isNodeBlacklisted(host)) {
+        newHosts.add(host);      
+      }
+    }
+    String[] hosts = newHosts.toArray(new String[newHosts.size()]);
+    ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability,
+        hosts, orig.racks, orig.priority); 
+    return newReq;
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/ExponentiallySmoothedTaskRuntimeEstimator.java Wed Nov  2 05:34:31 2011
@@ -135,9 +135,9 @@ public class ExponentiallySmoothedTaskRu
 
     lambda
         = conf.getLong(MRJobConfig.MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS,
-            MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMNOOTH_LAMBDA_MS);
+            MRJobConfig.DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS);
     smoothedValue
-        = conf.getBoolean(MRJobConfig.MR_AM_TASK_EXTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
+        = conf.getBoolean(MRJobConfig.MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE, true)
             ? SmoothedValue.RATE : SmoothedValue.TIME_PER_UNIT_PROGRESS;
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java Wed Nov  2 05:34:31 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -31,6 +32,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.service.AbstractService;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
 
   private static final Log LOG = LogFactory.getLog(TaskCleanerImpl.class);
@@ -47,8 +50,11 @@ public class TaskCleanerImpl extends Abs
   }
 
   public void start() {
+    ThreadFactory tf = new ThreadFactoryBuilder()
+      .setNameFormat("TaskCleaner #%d")
+      .build();
     launcherPool = new ThreadPoolExecutor(1, 5, 1, 
-        TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
+        TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
     eventHandlingThread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -65,6 +71,7 @@ public class TaskCleanerImpl extends Abs
           launcherPool.execute(new EventProcessor(event));        }
       }
     });
+    eventHandlingThread.setName("TaskCleaner Event Handler");
     eventHandlingThread.start();
     super.start();
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java Wed Nov  2 05:34:31 2011
@@ -28,9 +28,12 @@ import javax.servlet.http.HttpServletRes
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.StringHelper;
 import org.apache.hadoop.yarn.util.Times;
@@ -267,6 +270,29 @@ public class AppController extends Contr
     setStatus(HttpServletResponse.SC_NOT_FOUND);
     setTitle(join("Not found: ", s));
   }
+  
+  /**
+   * Render a ACCESS_DENIED error.
+   * @param s the error message to include.
+   */
+  void accessDenied(String s) {
+    setStatus(HttpServletResponse.SC_FORBIDDEN);
+    setTitle(join("Access denied: ", s));
+    throw new RuntimeException("Access denied: " + s);
+  }
+
+  /**
+   * check for job access.
+   * @param job the job that is being accessed
+   */
+  void checkAccess(Job job) {
+    UserGroupInformation callerUgi = UserGroupInformation.createRemoteUser(
+        request().getRemoteUser());
+    if (!job.checkAccess(callerUgi, JobACL.VIEW_JOB)) {
+      accessDenied("User " + request().getRemoteUser() + " does not have " +
+          " permissions.");
+    }
+  }
 
   /**
    * Ensure that a JOB_ID was passed into the page.
@@ -281,6 +307,9 @@ public class AppController extends Contr
       if (app.getJob() == null) {
         notFound($(JOB_ID));
       }
+      /* check for acl access */
+      Job job = app.context.getJob(jobID);
+      checkAccess(job);
     } catch (Exception e) {
       badRequest(e.getMessage() == null ? 
           e.getClass().getName() : e.getMessage());
@@ -296,7 +325,8 @@ public class AppController extends Contr
         throw new RuntimeException("missing task ID");
       }
       TaskId taskID = MRApps.toTaskID($(TASK_ID));
-      app.setJob(app.context.getJob(taskID.getJobId()));
+      Job job = app.context.getJob(taskID.getJobId());
+      app.setJob(job);
       if (app.getJob() == null) {
         notFound(MRApps.toString(taskID.getJobId()));
       } else {
@@ -305,6 +335,7 @@ public class AppController extends Contr
           notFound($(TASK_ID));
         }
       }
+      checkAccess(job);
     } catch (Exception e) {
       badRequest(e.getMessage());
     }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersBlock.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersBlock.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersBlock.java Wed Nov  2 05:34:31 2011
@@ -119,11 +119,16 @@ public class CountersBlock extends HtmlB
       for (Counter counter : g.getAllCounters().values()) {
         // Ditto
         TR<TBODY<TABLE<TD<TR<TBODY<TABLE<DIV<Hamlet>>>>>>>> groupRow = group.
-          tr().
-            td().$title(counter.getName()).
+          tr();
+          if (mg == null && rg == null) {
+            groupRow.td().$title(counter.getName())._(counter.getDisplayName()).
+            _();
+          } else {
+            groupRow.td().$title(counter.getName()).
               a(url(urlBase,urlId,g.getName(), 
                   counter.getName()), counter.getDisplayName()).
             _();
+          }
         if (map != null) {
           Counter mc = mg == null ? null : mg.getCounter(counter.getName());
           Counter rc = rg == null ? null : rg.getCounter(counter.getName());
@@ -168,12 +173,11 @@ public class CountersBlock extends HtmlB
     }
     // Get all types of counters
     Map<TaskId, Task> tasks = job.getTasks();
-    total = JobImpl.newCounters();
+    total = job.getCounters();
     map = JobImpl.newCounters();
     reduce = JobImpl.newCounters();
     for (Task t : tasks.values()) {
       Counters counters = t.getCounters();
-      JobImpl.incrAllCounters(total, counters);
       switch (t.getType()) {
         case MAP:     JobImpl.incrAllCounters(map, counters);     break;
         case REDUCE:  JobImpl.incrAllCounters(reduce, counters);  break;
@@ -184,4 +188,4 @@ public class CountersBlock extends HtmlB
   private String fixGroupDisplayName(CharSequence name) {
     return name.toString().replace(".", ".\u200B").replace("$", "\u200B$");
   }
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/NavBlock.java Wed Nov  2 05:34:31 2011
@@ -18,10 +18,13 @@
 
 package org.apache.hadoop.mapreduce.v2.app.webapp;
 
+import java.util.List;
+
 import com.google.inject.Inject;
 
 import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
 
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
@@ -47,6 +50,10 @@ public class NavBlock extends HtmlBlock 
           li().a(url("app"), "Jobs")._()._();
     if (app.getJob() != null) {
       String jobid = MRApps.toString(app.getJob().getID());
+      List<AMInfo> amInfos = app.getJob().getAMInfos();
+      AMInfo thisAmInfo = amInfos.get(amInfos.size()-1);
+      String nodeHttpAddress = thisAmInfo.getNodeManagerHost() + ":" 
+          + thisAmInfo.getNodeManagerHttpPort();
       nav.
         h3("Job").
         ul().
@@ -54,7 +61,11 @@ public class NavBlock extends HtmlBlock 
           li().a(url("jobcounters", jobid), "Counters")._().
           li().a(url("conf", jobid), "Configuration")._().
           li().a(url("tasks", jobid, "m"), "Map tasks")._().
-          li().a(url("tasks", jobid, "r"), "Reduce tasks")._()._();
+          li().a(url("tasks", jobid, "r"), "Reduce tasks")._().
+          li().a(".logslink", url("http://", nodeHttpAddress, "node",
+              "containerlogs", thisAmInfo.getContainerId().toString(), 
+              app.getJob().getUserName()), 
+              "AM Logs")._()._();
       if (app.getTask() != null) {
         String taskid = MRApps.toString(app.getTask().getID());
         nav.

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Wed Nov  2 05:34:31 2011
@@ -86,7 +86,7 @@ public class TaskPage extends AppView {
           String containerIdStr = ConverterUtils.toString(containerId);
           nodeTd._(" ").
             a(".logslink", url("http://", nodeHttpAddr, "node", "containerlogs",
-              containerIdStr), "logs");
+              containerIdStr, app.getJob().getUserName()), "logs");
         }
         nodeTd._().
           td(".ts", Times.format(startTime)).

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Wed Nov  2 05:34:31 2011
@@ -31,8 +31,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@@ -77,6 +81,7 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 
 
 /**
@@ -91,6 +96,10 @@ public class MRApp extends MRAppMaster {
 
   private File testWorkDir;
   private Path testAbsPath;
+  
+  public static String NM_HOST = "localhost";
+  public static int NM_PORT = 1234;
+  public static int NM_HTTP_PORT = 9999;
 
   private static final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
@@ -118,10 +127,21 @@ public class MRApp extends MRAppMaster {
     applicationAttemptId.setAttemptId(startCount);
     return applicationAttemptId;
   }
+  
+  private static ContainerId getContainerId(ApplicationId applicationId,
+      int startCount) {
+    ApplicationAttemptId appAttemptId =
+        getApplicationAttemptId(applicationId, startCount);
+    ContainerId containerId =
+        BuilderUtils.newContainerId(appAttemptId, startCount);
+    return containerId;
+  }
 
   public MRApp(int maps, int reduces, boolean autoComplete, String testName, 
       boolean cleanOnStart, int startCount) {
-    super(getApplicationAttemptId(applicationId, startCount));
+    super(getApplicationAttemptId(applicationId, startCount), getContainerId(
+        applicationId, startCount), NM_HOST, NM_PORT, NM_HTTP_PORT, System
+        .currentTimeMillis());
     this.testWorkDir = new File("target", testName);
     testAbsPath = new Path(testWorkDir.getAbsolutePath());
     LOG.info("PathUsed: " + testAbsPath);
@@ -253,9 +273,11 @@ public class MRApp extends MRAppMaster {
     } catch (IOException e) {
       throw new YarnException(e);
     }
-    Job newJob = new TestJob(conf, getAppID(), getDispatcher().getEventHandler(),
-                             getTaskAttemptListener(), getContext().getClock(),
-                             currentUser.getUserName());
+    Job newJob = new TestJob(getJobId(), getAttemptID(), conf, 
+    		getDispatcher().getEventHandler(),
+            getTaskAttemptListener(), getContext().getClock(),
+            getCommitter(), isNewApiCommitter(),
+            currentUser.getUserName());
     ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
     getDispatcher().register(JobFinishEvent.Type.class,
@@ -277,11 +299,14 @@ public class MRApp extends MRAppMaster {
         return null;
       }
       @Override
-      public void register(TaskAttemptId attemptID, 
+      public void registerLaunchedTask(TaskAttemptId attemptID, 
           org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {}
       @Override
       public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) {
       }
+      @Override
+      public void registerPendingTask(WrappedJvmID jvmID) {
+      }
     };
   }
 
@@ -301,15 +326,18 @@ public class MRApp extends MRAppMaster {
   }
 
   class MockContainerLauncher implements ContainerLauncher {
+
+    //We are running locally so set the shuffle port to -1 
+    int shufflePort = -1;
+
+    @SuppressWarnings("unchecked")
     @Override
     public void handle(ContainerLauncherEvent event) {
       switch (event.getType()) {
       case CONTAINER_REMOTE_LAUNCH:
-        //We are running locally so set the shuffle port to -1 
         getContext().getEventHandler().handle(
             new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(),
-                -1)
-            );
+                shufflePort));
         
         attemptLaunched(event.getTaskAttemptID());
         break;
@@ -341,16 +369,22 @@ public class MRApp extends MRAppMaster {
         ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
         cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
         cId.setId(containerCount++);
-        Container container = recordFactory.newRecordInstance(Container.class);
-        container.setId(cId);
-        container.setNodeId(recordFactory.newRecordInstance(NodeId.class));
-        container.getNodeId().setHost("dummy");
-        container.getNodeId().setPort(1234);
-        container.setContainerToken(null);
-        container.setNodeHttpAddress("localhost:9999");
+        NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
+        Container container = BuilderUtils.newContainer(cId, nodeId,
+            NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
+        JobID id = TypeConverter.fromYarn(applicationId);
+        JobId jobId = TypeConverter.toYarn(id);
+        getContext().getEventHandler().handle(new JobHistoryEvent(jobId, 
+            new NormalizedResourceEvent(
+                org.apache.hadoop.mapreduce.TaskType.REDUCE,
+            100)));
+        getContext().getEventHandler().handle(new JobHistoryEvent(jobId, 
+            new NormalizedResourceEvent(
+                org.apache.hadoop.mapreduce.TaskType.MAP,
+            100)));
         getContext().getEventHandler().handle(
             new TaskAttemptContainerAssignedEvent(event.getAttemptID(),
-                container));
+                container, null));
       }
     };
   }
@@ -402,13 +436,15 @@ public class MRApp extends MRAppMaster {
       return localStateMachine;
     }
 
-    public TestJob(Configuration conf, ApplicationId applicationId,
-        EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
-        Clock clock, String user) {
-      super(getApplicationAttemptId(applicationId, getStartCount()), 
+    public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
+        Configuration conf, EventHandler eventHandler,
+        TaskAttemptListener taskAttemptListener, Clock clock,
+        OutputCommitter committer, boolean newApiCommitter, String user) {
+      super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
           conf, eventHandler, taskAttemptListener,
-          new JobTokenSecretManager(), new Credentials(), clock, 
-          getCompletedTaskFromPreviousRun(), metrics, user);
+          new JobTokenSecretManager(), new Credentials(), clock,
+          getCompletedTaskFromPreviousRun(), metrics, committer,
+          newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos());
 
       // This "this leak" is okay because the retained pointer is in an
       //  instance variable.

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Wed Nov  2 05:34:31 2011
@@ -144,7 +144,7 @@ public class MRAppBenchmark {
                   getContext().getEventHandler()
                       .handle(
                       new TaskAttemptContainerAssignedEvent(event
-                          .getAttemptID(), container));
+                          .getAttemptID(), container, null));
                   concurrentRunningTasks++;
                 } else {
                   Thread.sleep(1000);



Mime
View raw message