hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r1090092 [2/2] - in /hadoop/mapreduce/branches/MR-279: mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ mr-client/hadoop-ma...
Date Fri, 08 Apr 2011 02:54:57 GMT
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Apr  8 02:54:56 2011
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
-import java.io.IOException;
-import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -33,21 +31,12 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -63,191 +52,50 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
-import org.apache.hadoop.yarn.service.AbstractService;
-
-
 
 /**
  * Allocates the container from the ResourceManager scheduler.
  */
-public class RMContainerAllocator extends AbstractService 
-implements ContainerAllocator {
-  private static final Log LOG = 
-    LogFactory.getLog(RMContainerAllocator.class);
+public class RMContainerAllocator extends RMCommunicator
+    implements ContainerAllocator {
+
+  private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
   private static final String ANY = "*";
-  private static int rmPollInterval;//millis
-  private ApplicationId applicationId;
-  private EventHandler eventHandler;
-  private volatile boolean stopped;
-  protected Thread allocatorThread;
-  private ApplicationMaster applicationMaster;
-  private AMRMProtocol scheduler;
-  private final ClientService clientService;
-  private int lastResponseID = 0;
-  
-  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private int lastResponseID;
+
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
 
   //mapping for assigned containers
-  private final Map<ContainerId, TaskAttemptId> assignedMap = 
-    new HashMap<ContainerId, TaskAttemptId>();
+  private final Map<ContainerId, TaskAttemptId> assignedMap =
+      new HashMap<ContainerId, TaskAttemptId>();
 
-  private final Map<Priority, 
-  Map<Resource,LinkedList<ContainerRequestEvent>>> localRequestsQueue = 
-    new HashMap<Priority, Map<Resource,LinkedList<ContainerRequestEvent>>>();
+  private final Map<Priority,
+  Map<Resource,LinkedList<ContainerRequestEvent>>> localRequestsQueue =
+      new HashMap<Priority, Map<Resource,LinkedList<ContainerRequestEvent>>>();
 
   //Key -> Priority
   //Value -> Map
-  //Key->ResourceName (eg. hostname, rackname, *)
+  //Key->ResourceName (e.g., hostname, rackname, *)
   //Value->Map
   //Key->Resource Capability
   //Value->ResourceReqeust
-  private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>> 
-  remoteRequestsTable = 
-    new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
-
+  private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+  remoteRequestsTable =
+      new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
 
-  private final Set<ResourceRequest> ask =new TreeSet<ResourceRequest>();
+  private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
   private final Set<Container> release = new TreeSet<Container>();
 
   public RMContainerAllocator(ClientService clientService, AppContext context) {
-    super("RMContainerAllocator");
-    this.clientService = clientService;
-    this.applicationId = context.getApplicationID();
-    this.eventHandler = context.getEventHandler();
-    this.applicationMaster = recordFactory.newRecordInstance(ApplicationMaster.class);
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    super.init(conf);
-    rmPollInterval = conf.getInt(YarnConfiguration.AM_EXPIRY_INTERVAL, 10000)/3;
-  }
-
-  @Override
-  public void start() {
-    scheduler= createSchedulerProxy();
-    //LOG.info("Scheduler is " + scheduler);
-    register();
-    startAllocatorThread();
-    super.start();
-  }
-
-  protected void register() {
-    //Register
-    applicationMaster.setApplicationId(applicationId);
-    applicationMaster.setHost(clientService.getBindAddress().getAddress().getHostAddress());
-    applicationMaster.setRpcPort(clientService.getBindAddress().getPort());
-    applicationMaster.setState(ApplicationState.RUNNING);
-    applicationMaster.setHttpPort(clientService.getHttpPort());
-    applicationMaster.setStatus(recordFactory.newRecordInstance(ApplicationStatus.class));
-    applicationMaster.getStatus().setApplicationId(applicationId);
-    applicationMaster.getStatus().setProgress(0.0f);
-    try {
-      RegisterApplicationMasterRequest request = recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
-      request.setApplicationMaster(applicationMaster);
-      scheduler.registerApplicationMaster(request);
-    } catch(Exception are) {
-      LOG.info("Exception while registering", are);
-      throw new YarnException(are);
-    }
-  }
-
-  protected void unregister() {
-    try {
-      applicationMaster.setState(ApplicationState.COMPLETED);
-      FinishApplicationMasterRequest request = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class);
-      request.setApplicationMaster(applicationMaster);
-      scheduler.finishApplicationMaster(request);
-    } catch(Exception are) {
-      LOG.info("Error while unregistering ", are);
-    }
-  }
-
-  @Override
-  public void stop() {
-    stopped = true;
-    allocatorThread.interrupt();
-    try {
-      allocatorThread.join();
-    } catch (InterruptedException ie) {
-      LOG.info("Interruped Exception while stopping", ie);
-    }
-    unregister();
-    super.stop();
-  }
-
-  protected void startAllocatorThread() {
-    allocatorThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        while (!stopped && !Thread.currentThread().isInterrupted()) {
-          try {
-            Thread.sleep(rmPollInterval);
-            try {
-              allocate();
-            } catch (Exception e) {
-              LOG.error("ERROR IN CONTACTING RM.", e);
-            }
-          } catch (InterruptedException e) {
-            LOG.info("Allocated thread interrupted. Returning");
-            return;
-          }
-        }
-      }
-    });
-    allocatorThread.start();
-  }
-
-  protected AMRMProtocol createSchedulerProxy() {
-    final YarnRPC rpc = YarnRPC.create(getConfig());
-    final Configuration conf = new Configuration(getConfig());
-    final String serviceAddr = conf.get(
-        YarnConfiguration.SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_SCHEDULER_BIND_ADDRESS);
-
-    UserGroupInformation currentUser;
-    try {
-      currentUser = UserGroupInformation.getCurrentUser();
-    } catch (IOException e) {
-      throw new YarnException(e);
-    }
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
-          SchedulerSecurityInfo.class, SecurityInfo.class);
-
-      String tokenURLEncodedStr =
-        System.getenv().get(
-            YarnConfiguration.APPLICATION_MASTER_TOKEN_ENV_NAME);
-      LOG.debug("AppMasterToken is " + tokenURLEncodedStr);
-      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
-
-      try {
-        token.decodeFromUrlString(tokenURLEncodedStr);
-      } catch (IOException e) {
-        throw new YarnException(e);
-      }
-
-      currentUser.addToken(token);
-    }
-
-    return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
-      @Override
-      public AMRMProtocol run() {
-        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
-            NetUtils.createSocketAddr(serviceAddr), conf);
-      }
-    });       
+    super(clientService, context);
   }
 
   // TODO: Need finer synchronization.
-  protected synchronized void allocate() throws Exception {
+  @Override
+  protected synchronized void heartbeat() throws Exception {
     assign(getResources());
   }
 
@@ -299,7 +147,7 @@ implements ContainerAllocator {
 
   private void addResourceRequest(Priority priority, String resourceName,
       Resource capability) {
-    Map<String, Map<Resource, ResourceRequest>> remoteRequests = 
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
       this.remoteRequestsTable.get(priority);
     if (remoteRequests == null) {
       remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
@@ -332,7 +180,7 @@ implements ContainerAllocator {
 
   private void decResourceRequest(Priority priority, String resourceName,
       Resource capability) {
-    Map<String, Map<Resource, ResourceRequest>> remoteRequests = 
+    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
       this.remoteRequestsTable.get(priority);
     Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
     ResourceRequest remoteRequest = reqMap.get(capability);
@@ -352,37 +200,39 @@ implements ContainerAllocator {
         remoteRequestsTable.remove(priority);
       }
       //remove from ask if it may have
-      ask.remove(remoteRequest); 
+      ask.remove(remoteRequest);
     } else {
       ask.add(remoteRequest);//this will override the request if ask doesn't
       //already have it.
     }
 
-    LOG.info("AFTER decResourceRequest:" + " applicationId=" + applicationId.getId()
-        + " priority=" + priority.getPriority() + " resourceName=" + resourceName
-        + " numContainers=" + remoteRequest.getNumContainers() + " #asks="
-        + ask.size());
+    LOG.info("AFTER decResourceRequest:" + " applicationId="
+             + applicationId.getId() + " priority=" + priority.getPriority()
+             + " resourceName=" + resourceName + " numContainers="
+             + remoteRequest.getNumContainers() + " #asks=" + ask.size());
   }
 
   private List<Container> getResources() throws Exception {
-    ApplicationStatus status = recordFactory.newRecordInstance(ApplicationStatus.class);
+    ApplicationStatus status =
+        recordFactory.newRecordInstance(ApplicationStatus.class);
     status.setApplicationId(applicationId);
     status.setResponseId(lastResponseID);
-    
-    AllocateRequest allocateRequest = recordFactory.newRecordInstance(AllocateRequest.class);
+
+    AllocateRequest allocateRequest =
+        recordFactory.newRecordInstance(AllocateRequest.class);
     allocateRequest.setApplicationStatus(status);
     allocateRequest.addAllAsks(new ArrayList<ResourceRequest>(ask));
     allocateRequest.addAllReleases(new ArrayList<Container>(release));
     AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
-    AMResponse response = allocateResponse.getAMResponse(); 
+    AMResponse response = allocateResponse.getAMResponse();
     lastResponseID = response.getResponseId();
     List<Container> allContainers = response.getContainerList();
     ask.clear();
     release.clear();
 
     LOG.info("getResources() for " + applicationId + ":" +
-        " ask=" + ask.size() + 
-        " release= "+ release.size() + 
+        " ask=" + ask.size() +
+        " release= "+ release.size() +
         " recieved=" + allContainers.size());
     List<Container> allocatedContainers = new ArrayList<Container>();
     for (Container cont : allContainers) {
@@ -393,11 +243,11 @@ implements ContainerAllocator {
         LOG.info("Received completed container " + cont);
         TaskAttemptId attemptID = assignedMap.remove(cont.getId());
         if (attemptID == null) {
-          LOG.error("Container complete event for unknown container id " + 
+          LOG.error("Container complete event for unknown container id " +
               cont.getId());
         } else {
           //send the container completed event to Task attempt
-          eventHandler.handle(new TaskAttemptEvent(attemptID, 
+          eventHandler.handle(new TaskAttemptEvent(attemptID,
               TaskAttemptEventType.TA_CONTAINER_COMPLETED));
         }
       }
@@ -409,9 +259,9 @@ implements ContainerAllocator {
   private void assign(List<Container> allocatedContainers) {
     // Schedule in priority order
     for (Priority priority : localRequestsQueue.keySet()) {
-      LOG.info("Assigning for priority " + priority); 
+      LOG.info("Assigning for priority " + priority);
       assign(priority, allocatedContainers);
-      if (allocatedContainers.isEmpty()) { 
+      if (allocatedContainers.isEmpty()) {
         break;
       }
     }
@@ -433,7 +283,7 @@ implements ContainerAllocator {
       String host = allocatedContainer.getHostName();
       Resource capability = allocatedContainer.getResource();
 
-      LinkedList<ContainerRequestEvent> requestList = 
+      LinkedList<ContainerRequestEvent> requestList =
         localRequestsQueue.get(priority).get(capability);
 
       if (requestList == null) {
@@ -464,22 +314,21 @@ implements ContainerAllocator {
       }
 
       if (assigned != null) {
-
         i.remove(); // Remove from allocated Containers list also.
 
         // Update resource requests
         decResourceRequest(priority, ANY, capability);
 
-        //send the container assigned event to Task attempt
-        eventHandler.handle(new TaskAttemptContainerAssignedEvent(assigned
-            .getAttemptID(), allocatedContainer.getId(),
+        // send the container-assigned event to task attempt
+        eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+            assigned.getAttemptID(), allocatedContainer.getId(),
             allocatedContainer.getHostName(),
             allocatedContainer.getContainerToken()));
 
         assignedMap.put(allocatedContainer.getId(), assigned.getAttemptID());
 
         LOG.info("Assigned container (" + allocatedContainer + ") " +
-            " to task " + assigned.getAttemptID() + " at priority " + priority + 
+            " to task " + assigned.getAttemptID() + " at priority " + priority +
             " on node " + allocatedContainer.getHostName());
       }
     }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Apr  8 02:54:56 2011
@@ -78,14 +78,16 @@ public class MRApp extends MRAppMaster {
   int maps;
   int reduces;
 
-  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
   
-  //if true tasks complete automatically as soon as they are launched
+  //if true, tasks complete automatically as soon as they are launched
   protected boolean autoComplete = false;
 
   public MRApp(int maps, int reduces, boolean autoComplete) {
     
-    super(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
+    super(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
+        ApplicationId.class));
     this.maps = maps;
     this.reduces = reduces;
     this.autoComplete = autoComplete;
@@ -158,30 +160,33 @@ public class MRApp extends MRAppMaster {
   public void verifyCompleted() {
     for (Job job : getContext().getAllJobs().values()) {
       JobReport jobReport = job.getReport();
-      Assert.assertTrue("Job start time is  not less than finish time",
+      Assert.assertTrue("Job start time is not less than finish time",
           jobReport.getStartTime() < jobReport.getFinishTime());
       Assert.assertTrue("Job finish time is in future",
           jobReport.getFinishTime() < System.currentTimeMillis());
       for (Task task : job.getTasks().values()) {
         TaskReport taskReport = task.getReport();
-        Assert.assertTrue("Task start time is  not less than finish time",
+        Assert.assertTrue("Task start time is not less than finish time",
             taskReport.getStartTime() < taskReport.getFinishTime());
         for (TaskAttempt attempt : task.getAttempts().values()) {
           TaskAttemptReport attemptReport = attempt.getReport();
-          Assert.assertTrue("Attempt start time is  not less than finish time",
+          Assert.assertTrue("Attempt start time is not less than finish time",
               attemptReport.getStartTime() < attemptReport.getFinishTime());
         }
       }
     }
   }
 
-  protected void startJobs() {
-    Job job = new TestJob(getAppID(), getDispatcher().getEventHandler(),
-        getTaskAttemptListener());
-    ((AppContext) getContext()).getAllJobs().put(job.getID(), job);
-
-    getDispatcher().register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+  public Job createJob(Configuration conf) {
+    Job newJob = new TestJob(getAppID(), getDispatcher().getEventHandler(),
+                             getTaskAttemptListener());
+    ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
+
+    // FIXME?  why does this work?  MRAppMaster init() already registered one...
+    getDispatcher().register(
+        org.apache.hadoop.mapreduce.jobhistory.EventType.class,
         createJobHistoryHandler(getConfig()));
+
     getDispatcher().register(JobFinishEvent.Type.class,
         new EventHandler<JobFinishEvent>() {
           @Override
@@ -189,11 +194,8 @@ public class MRApp extends MRAppMaster {
             stop();
           }
         });
-    
-    /** create a job event for job intialization **/
-    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
-    /** send init on the job. this triggers the job execution.**/
-    getDispatcher().getEventHandler().handle(initJobEvent);
+
+    return newJob;
   }
 
   @Override
@@ -213,7 +215,8 @@ public class MRApp extends MRAppMaster {
   }
 
   @Override
-  protected ContainerLauncher createContainerLauncher(AppContext context) {
+  protected ContainerLauncher createContainerLauncher(AppContext context,
+                                                      boolean isLocal) {
     return new MockContainerLauncher();
   }
 
@@ -248,7 +251,7 @@ public class MRApp extends MRAppMaster {
 
   @Override
   protected ContainerAllocator createContainerAllocator(
-      ClientService clientService, AppContext context) {
+      ClientService clientService, AppContext context, boolean isLocal) {
     return new ContainerAllocator(){
       private int containerCount;
       @Override
@@ -292,19 +295,17 @@ public class MRApp extends MRAppMaster {
   }
 
   class TestJob extends JobImpl {
-    //overwrite the init transition
+    //override the init transition
     StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> localFactory
-         //overwrite the init transition
-         = stateMachineFactory.addTransition
-                 (JobState.NEW,
-                  EnumSet.of(JobState.RUNNING, JobState.FAILED),
-                  JobEventType.JOB_INIT,
-                  // This is abusive.
-                  new TestInitTransition(getConfig(), maps, reduces));
+        = stateMachineFactory.addTransition(JobState.NEW,
+            EnumSet.of(JobState.INITED, JobState.FAILED),
+            JobEventType.JOB_INIT,
+            // This is abusive.
+            new TestInitTransition(getConfig(), maps, reduces));
 
     private final StateMachine<JobState, JobEventType, JobEvent>
-           localStateMachine;
-    
+        localStateMachine;
+
     @Override
     protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
       return localStateMachine;
@@ -319,9 +320,8 @@ public class MRApp extends MRAppMaster {
       //  instance variable.
       localStateMachine = localFactory.make(this);
     }
-    
   }
-  
+
   //Override InitTransition to not look for split files etc
   static class TestInitTransition extends JobImpl.InitTransition {
     private Configuration config;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java Fri Apr  8 02:54:56 2011
@@ -89,7 +89,7 @@ public class MRAppBenchmark {
     
     @Override
     protected ContainerAllocator createContainerAllocator(
-        ClientService clientService, AppContext context) {
+        ClientService clientService, AppContext context, boolean isLocal) {
       return new ThrottledContainerAllocator();
     }
     

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Fri Apr  8 02:54:56 2011
@@ -417,8 +417,13 @@ public class MockJobs extends MockApps {
       }
 
       @Override
-      public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(int fromEventId,
-                                                           int maxEvents) {
+      public boolean isUber() {
+        return false;
+      }
+
+      @Override
+      public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
+          int fromEventId, int maxEvents) {
         return null;
       }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Fri Apr  8 02:54:56 2011
@@ -51,24 +51,25 @@ public class TestFail {
   //The job succeeds.
   public void testFailTask() throws Exception {
     MRApp app = new MockFirstFailingAttemptMRApp(1, 0);
-    Job job = app.submit(new Configuration());
+    Configuration conf = new Configuration();
+    // this test requires two task attempts, but uberization overrides max to 1
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
     app.waitForState(job, JobState.SUCCEEDED);
     Map<TaskId,Task> tasks = job.getTasks();
-    Assert.assertEquals("No of tasks is not correct", 1, 
-        tasks.size());
+    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
     Task task = tasks.values().iterator().next();
-    Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED, 
+    Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED,
         task.getReport().getTaskState());
-    Map<TaskAttemptId, TaskAttempt> attempts = 
-      tasks.values().iterator().next().getAttempts();
-    Assert.assertEquals("No of attempts is not correct", 2, 
-        attempts.size());
+    Map<TaskAttemptId, TaskAttempt> attempts =
+        tasks.values().iterator().next().getAttempts();
+    Assert.assertEquals("Num attempts is not correct", 2, attempts.size());
     //one attempt must be failed 
     //and another must have succeeded
     Iterator<TaskAttempt> it = attempts.values().iterator();
-    Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED, 
-          it.next().getReport().getTaskAttemptState());
-    Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED, 
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+        it.next().getReport().getTaskAttemptState());
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED,
         it.next().getReport().getTaskAttemptState());
   }
 
@@ -141,20 +142,22 @@ public class TestFail {
     Configuration conf = new Configuration();
     int maxAttempts = 2;
     conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
+    // disable uberization (requires entire job to be reattempted, so max for
+    // subtask attempts is overridden to 1)
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
     Job job = app.submit(conf);
     app.waitForState(job, JobState.FAILED);
     Map<TaskId,Task> tasks = job.getTasks();
-    Assert.assertEquals("No of tasks is not correct", 1, 
-        tasks.size());
+    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
     Task task = tasks.values().iterator().next();
-    Assert.assertEquals("Task state not correct", TaskState.FAILED, 
+    Assert.assertEquals("Task state not correct", TaskState.FAILED,
         task.getReport().getTaskState());
-    Map<TaskAttemptId, TaskAttempt> attempts = 
-      tasks.values().iterator().next().getAttempts();
-    Assert.assertEquals("No of attempts is not correct", maxAttempts, 
+    Map<TaskAttemptId, TaskAttempt> attempts =
+        tasks.values().iterator().next().getAttempts();
+    Assert.assertEquals("Num attempts is not correct", maxAttempts,
         attempts.size());
     for (TaskAttempt attempt : attempts.values()) {
-      Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED, 
+      Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
           attempt.getReport().getTaskAttemptState());
     }
   }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Fri Apr  8 02:54:56 2011
@@ -24,6 +24,7 @@ import java.util.Iterator;
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
@@ -44,10 +45,13 @@ public class TestFetchFailure {
   public void testFetchFailure() throws Exception {
     MRApp app = new MRApp(1, 1, false);
     Configuration conf = new Configuration();
+    // map -> reduce -> fetch-failure -> map retry is incompatible with
+    // sequential, single-task-attempt approach in uber-AM, so disable:
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
     Job job = app.submit(conf);
     app.waitForState(job, JobState.RUNNING);
     //all maps would be running
-    Assert.assertEquals("No of tasks not correct",
+    Assert.assertEquals("Num tasks not correct",
        2, job.getTasks().size());
     Iterator<Task> it = job.getTasks().values().iterator();
     Task mapTask = it.next();
@@ -68,7 +72,7 @@ public class TestFetchFailure {
     
     TaskAttemptCompletionEvent[] events = 
       job.getTaskAttemptCompletionEvents(0, 100);
-    Assert.assertEquals("No of completion events not correct",
+    Assert.assertEquals("Num completion events not correct",
         1, events.length);
     Assert.assertEquals("Event status not correct",
         TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus());
@@ -91,7 +95,7 @@ public class TestFetchFailure {
     Assert.assertEquals("Map TaskAttempt state not correct",
         TaskAttemptState.FAILED, mapAttempt1.getState());
 
-    Assert.assertEquals("No of attempts in Map Task not correct",
+    Assert.assertEquals("Num attempts in Map Task not correct",
         2, mapTask.getAttempts().size());
     
     Iterator<TaskAttempt> atIt = mapTask.getAttempts().values().iterator();
@@ -119,7 +123,7 @@ public class TestFetchFailure {
         TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
     
     events = job.getTaskAttemptCompletionEvents(0, 100);
-    Assert.assertEquals("No of completion events not correct",
+    Assert.assertEquals("Num completion events not correct",
         4, events.length);
     Assert.assertEquals("Event map attempt id not correct",
         mapAttempt1.getID(), events[0].getAttemptId());

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Fri Apr  8 02:54:56 2011
@@ -55,30 +55,29 @@ public class TestMRApp {
     MRApp app = new MRApp(1, 0, false);
     Job job = app.submit(new Configuration());
     app.waitForState(job, JobState.RUNNING);
-    Assert.assertEquals("No of tasks not correct",
-        1, job.getTasks().size());
-     Iterator<Task> it = job.getTasks().values().iterator();
-     Task task = it.next();
-     app.waitForState(task, TaskState.RUNNING);
-     TaskAttempt attempt = task.getAttempts().values().iterator().next();
-     app.waitForState(attempt, TaskAttemptState.RUNNING);
-     
-     //send the commit pending signal to the task
-     app.getContext().getEventHandler().handle(
-         new TaskAttemptEvent(
-             attempt.getID(),
-             TaskAttemptEventType.TA_COMMIT_PENDING));
-     
-     //wait for first attempt to commit pending
-     app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING);
-     
-     //send the done signal to the task
-     app.getContext().getEventHandler().handle(
-         new TaskAttemptEvent(
-             task.getAttempts().values().iterator().next().getID(),
-             TaskAttemptEventType.TA_DONE));
-     
-     app.waitForState(job, JobState.SUCCEEDED);
+    Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task task = it.next();
+    app.waitForState(task, TaskState.RUNNING);
+    TaskAttempt attempt = task.getAttempts().values().iterator().next();
+    app.waitForState(attempt, TaskAttemptState.RUNNING);
+
+    //send the commit pending signal to the task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            attempt.getID(),
+            TaskAttemptEventType.TA_COMMIT_PENDING));
+
+    //wait for first attempt to commit pending
+    app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING);
+
+    //send the done signal to the task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    app.waitForState(job, JobState.SUCCEEDED);
   }
 
   @Test
@@ -87,11 +86,12 @@ public class TestMRApp {
     Configuration conf = new Configuration();
     //after half of the map completion, reduce will start
     conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
+    //uberization forces full slowstart (1.0), so disable that
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
     Job job = app.submit(conf);
     app.waitForState(job, JobState.RUNNING);
     //all maps would be running
-    Assert.assertEquals("No of tasks not correct",
-       3, job.getTasks().size());
+    Assert.assertEquals("Num tasks not correct", 3, job.getTasks().size());
     Iterator<Task> it = job.getTasks().values().iterator();
     Task mapTask1 = it.next();
     Task mapTask2 = it.next();
@@ -144,19 +144,18 @@ public class TestMRApp {
     MRApp app = new MRApp(1, 0, false);
     Job job = app.submit(new Configuration());
     app.waitForState(job, JobState.RUNNING);
-    Assert.assertEquals("No of tasks not correct",
-        1, job.getTasks().size());
-     Iterator<Task> it = job.getTasks().values().iterator();
-     Task task = it.next();
-     app.waitForState(task, TaskState.RUNNING);
-     
-     //send an invalid event on task at current state
-     app.getContext().getEventHandler().handle(
-         new TaskEvent(
-             task.getID(), TaskEventType.T_SCHEDULE));
-     
-     //this must lead to job error
-     app.waitForState(job, JobState.ERROR);
+    Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task task = it.next();
+    app.waitForState(task, TaskState.RUNNING);
+
+    //send an invalid event on task at current state
+    app.getContext().getEventHandler().handle(
+        new TaskEvent(
+            task.getID(), TaskEventType.T_SCHEDULE));
+
+    //this must lead to job error
+    app.waitForState(job, JobState.ERROR);
   }
 
   @Test
@@ -170,7 +169,7 @@ public class TestMRApp {
 
   @Test
   public void checkTaskStateTypeConversion() {
-  //verify that all states can be converted without 
+    //verify that all states can be converted without 
     // throwing an exception
     for (TaskState state : TaskState.values()) {
       TypeConverter.fromYarn(state);

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java Fri Apr  8 02:54:56 2011
@@ -66,7 +66,7 @@ public class TestMRClientService {
     Configuration conf = new Configuration();
     Job job = app.submit(conf);
     app.waitForState(job, JobState.RUNNING);
-    Assert.assertEquals("No of tasks not correct", 1, job.getTasks().size());
+    Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
     Iterator<Task> it = job.getTasks().values().iterator();
     Task task = it.next();
     app.waitForState(task, TaskState.RUNNING);
@@ -97,67 +97,76 @@ public class TestMRClientService {
     MRClientProtocol proxy =
       (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
           app.clientService.getBindAddress(), conf);
-    GetCountersRequest gcRequest = recordFactory.newRecordInstance(GetCountersRequest.class);    
+    GetCountersRequest gcRequest =
+        recordFactory.newRecordInstance(GetCountersRequest.class);    
     gcRequest.setJobId(job.getID());
-    Assert.assertNotNull("Counters is null", proxy.getCounters(gcRequest).getCounters());
-    
-    GetJobReportRequest gjrRequest = recordFactory.newRecordInstance(GetJobReportRequest.class);
+    Assert.assertNotNull("Counters is null",
+        proxy.getCounters(gcRequest).getCounters());
+
+    GetJobReportRequest gjrRequest =
+        recordFactory.newRecordInstance(GetJobReportRequest.class);
     gjrRequest.setJobId(job.getID());
-    Assert.assertNotNull("JobReport is null", proxy.getJobReport(gjrRequest).getJobReport());
-    
-    GetTaskAttemptCompletionEventsRequest gtaceRequest = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
+    Assert.assertNotNull("JobReport is null",
+        proxy.getJobReport(gjrRequest).getJobReport());
+
+    GetTaskAttemptCompletionEventsRequest gtaceRequest =
+        recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
     gtaceRequest.setJobId(job.getID());
     gtaceRequest.setFromEventId(0);
     gtaceRequest.setMaxEvents(10);
     Assert.assertNotNull("TaskCompletionEvents is null", 
         proxy.getTaskAttemptCompletionEvents(gtaceRequest).getCompletionEventList());
-    
-    GetDiagnosticsRequest gdRequest = recordFactory.newRecordInstance(GetDiagnosticsRequest.class);
+
+    GetDiagnosticsRequest gdRequest =
+        recordFactory.newRecordInstance(GetDiagnosticsRequest.class);
     gdRequest.setTaskAttemptId(attempt.getID());
     Assert.assertNotNull("Diagnostics is null", 
         proxy.getDiagnostics(gdRequest).getDiagnosticsList());
-    
-    GetTaskAttemptReportRequest gtarRequest = recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
+
+    GetTaskAttemptReportRequest gtarRequest =
+        recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
     gtarRequest.setTaskAttemptId(attempt.getID());
     Assert.assertNotNull("TaskAttemptReport is null", 
         proxy.getTaskAttemptReport(gtarRequest).getTaskAttemptReport());
-    
-    GetTaskReportRequest gtrRequest = recordFactory.newRecordInstance(GetTaskReportRequest.class);
+
+    GetTaskReportRequest gtrRequest =
+        recordFactory.newRecordInstance(GetTaskReportRequest.class);
     gtrRequest.setTaskId(task.getID());
     Assert.assertNotNull("TaskReport is null", 
         proxy.getTaskReport(gtrRequest).getTaskReport());
-    
-    GetTaskReportsRequest gtreportsRequest = recordFactory.newRecordInstance(GetTaskReportsRequest.class);
+
+    GetTaskReportsRequest gtreportsRequest =
+        recordFactory.newRecordInstance(GetTaskReportsRequest.class);
     gtreportsRequest.setJobId(job.getID());
     gtreportsRequest.setTaskType(TaskType.MAP);
     Assert.assertNotNull("TaskReports for map is null", 
         proxy.getTaskReports(gtreportsRequest).getTaskReportList());
-    
-    gtreportsRequest = recordFactory.newRecordInstance(GetTaskReportsRequest.class);
+
+    gtreportsRequest =
+        recordFactory.newRecordInstance(GetTaskReportsRequest.class);
     gtreportsRequest.setJobId(job.getID());
     gtreportsRequest.setTaskType(TaskType.REDUCE);
     Assert.assertNotNull("TaskReports for reduce is null", 
         proxy.getTaskReports(gtreportsRequest).getTaskReportList());
-    
+
     List<String> diag = proxy.getDiagnostics(gdRequest).getDiagnosticsList();
-    Assert.assertEquals("No of diagnostic not correct" , 2 , diag.size());
-    Assert.assertEquals("Diag 1 not correct" , 
+    Assert.assertEquals("Num diagnostics not correct", 2 , diag.size());
+    Assert.assertEquals("Diag 1 not correct",
         diagnostic1, diag.get(0).toString());
-    Assert.assertEquals("Diag 2 not correct" , 
+    Assert.assertEquals("Diag 2 not correct",
         diagnostic2, diag.get(1).toString());
-    
+
     TaskReport taskReport = proxy.getTaskReport(gtrRequest).getTaskReport();
-    Assert.assertEquals("No of diagnostic not correct", 2, 
+    Assert.assertEquals("Num diagnostics not correct", 2,
         taskReport.getDiagnosticsCount());
-    
+
     //send the done signal to the task
     app.getContext().getEventHandler().handle(
         new TaskAttemptEvent(
             task.getAttempts().values().iterator().next().getID(),
             TaskAttemptEventType.TA_DONE));
-    
+
     app.waitForState(job, JobState.SUCCEEDED);
-    
   }
 
   class MRAppWithClientService extends MRApp {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Apr  8 02:54:56 2011
@@ -376,7 +376,7 @@ public class TestRMContainerAllocator {
     public List<TaskAttemptContainerAssignedEvent> schedule() {
       //run the scheduler
       try {
-        allocate();
+        heartbeat();
       } catch (Exception e) {
 
       }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Fri Apr  8 02:54:56 2011
@@ -468,6 +468,11 @@ public class TestRuntimeEstimators {
     public int getTotalReduces() {
       return reduceTasks.size();
     }
+
+    @Override
+    public boolean isUber() {
+      return false;
+    }
   }
 
   /*

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro Fri Apr  8 02:54:56 2011
@@ -101,6 +101,7 @@ protocol MRClientProtocol {
 
   enum JobState {
     NEW,
+    INITED,
     RUNNING,
     SUCCEEDED,
     FAILED,

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Fri Apr  8 02:54:56 2011
@@ -259,6 +259,7 @@ public class TypeConverter {
   public static int fromYarn(JobState state) {
     switch (state) {
     case NEW:
+    case INITED:
       return org.apache.hadoop.mapred.JobStatus.PREP;
     case RUNNING:
       return org.apache.hadoop.mapred.JobStatus.RUNNING;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java Fri Apr  8 02:54:56 2011
@@ -1,7 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.mapreduce.v2.api.records;
 
 public enum JobState {
   NEW,
+  INITED,
   RUNNING,
   SUCCEEDED,
   FAILED,

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto Fri Apr  8 02:54:56 2011
@@ -103,12 +103,13 @@ message TaskAttemptReportProto {
 
 enum JobStateProto {
   J_NEW = 1;
-  J_RUNNING = 2;
-  J_SUCCEEDED = 3;
-  J_FAILED = 4;
-  J_KILL_WAIT = 5;
-  J_KILLED = 6;
-  J_ERROR = 7;
+  J_INITED = 2;
+  J_RUNNING = 3;
+  J_SUCCEEDED = 4;
+  J_FAILED = 5;
+  J_KILL_WAIT = 6;
+  J_KILLED = 7;
+  J_ERROR = 8;
 }
 
 message JobReportProto {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Fri Apr  8 02:54:56 2011
@@ -224,10 +224,15 @@ public class CompletedJob implements org
   }
 
   @Override
+  public boolean isUber() {
+    return false;
+  }
+
+  @Override
   public Map<TaskId, Task> getTasks(TaskType taskType) {
     if (TaskType.MAP.equals(taskType)) {
       return mapTasks;
-    } else {//we have only two type of tasks
+    } else {//we have only two types of tasks
       return reduceTasks;
     }
   }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Fri Apr  8 02:54:56 2011
@@ -202,7 +202,7 @@ public class YARNRunner implements Clien
   
   @Override
   public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
-  throws IOException, InterruptedException{
+  throws IOException, InterruptedException {
 
     // Upload only in security mode: TODO
     Path applicationTokensFile =
@@ -258,16 +258,19 @@ public class YARNRunner implements Clien
   private ApplicationSubmissionContext getApplicationSubmissionContext(
       Configuration jobConf,
       String jobSubmitDir, Credentials ts) throws IOException {
-    ApplicationSubmissionContext appContext = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    ApplicationSubmissionContext appContext =
+        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
     ApplicationId applicationId = resMgrDelegate.getApplicationId();
     appContext.setApplicationId(applicationId);
     Resource capability = recordFactory.newRecordInstance(Resource.class);
-    capability.setMemory(conf.getInt(YARN_AM_RESOURCE_KEY, DEFAULT_YARN_AM_RESOURCE));
-    LOG.info("Master capability = " + capability);
+    capability.setMemory(
+        conf.getInt(YARN_AM_RESOURCE_KEY, DEFAULT_YARN_AM_RESOURCE));
+    LOG.info("AppMaster capability = " + capability);
     appContext.setMasterCapability(capability);
 
     FileContext defaultFS = FileContext.getFileContext(conf);
-    Path jobConfPath = new Path(jobSubmitDir, YARNApplicationConstants.JOB_CONF_FILE);
+    Path jobConfPath =
+        new Path(jobSubmitDir, YARNApplicationConstants.JOB_CONF_FILE);
     
     URL yarnUrlForJobSubmitDir =
         ConverterUtils.getYarnUrlFromPath(defaultFS.makeQualified(new Path(
@@ -290,9 +293,9 @@ public class YARNRunner implements Clien
     // TODO gross hack
     for (String s : new String[] { "job.split", "job.splitmetainfo",
         YarnConfiguration.APPLICATION_TOKENS_FILE }) {
-      appContext.setResourceTodo(YARNApplicationConstants.JOB_SUBMIT_DIR + "/" + s,
-          createApplicationResource(defaultFS,
-              new Path(jobSubmitDir, s)));
+      appContext.setResourceTodo(
+          YARNApplicationConstants.JOB_SUBMIT_DIR + "/" + s,
+          createApplicationResource(defaultFS, new Path(jobSubmitDir, s)));
     }
 
     // TODO: Only if security is on.
@@ -332,8 +335,8 @@ public class YARNRunner implements Clien
     vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster");
     vargs.add(String.valueOf(applicationId.getClusterTimestamp()));
     vargs.add(String.valueOf(applicationId.getId()));
-    vargs.add("1>logs/stderr");
-    vargs.add("2>logs/stdout");
+    vargs.add("1>logs/stdout");
+    vargs.add("2>logs/stderr");
 
     Vector<String> vargsFinal = new Vector<String>(8);
     // Final commmand

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Fri Apr  8 02:54:56 2011
@@ -39,7 +39,7 @@ import org.apache.hadoop.yarn.service.Ab
 import org.apache.hadoop.yarn.service.Service;
 
 /**
- * Configures and starts the MR specific components in the YARN cluster.
+ * Configures and starts the MR-specific components in the YARN cluster.
  *
  */
 public class MiniMRYarnCluster extends MiniYARNCluster {
@@ -116,7 +116,8 @@ public class MiniMRYarnCluster extends M
       super.stop();
     }
   }
+
   public JobHistoryServer getHistoryServer() {
-	  return this.historyServer;
+    return this.historyServer;
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Fri Apr  8 02:54:56 2011
@@ -62,13 +62,14 @@ public class TestMRJobs {
 
   private static final Log LOG = LogFactory.getLog(TestMRJobs.class);
 
-  private static MiniMRYarnCluster mrCluster;
+  protected static MiniMRYarnCluster mrCluster;
 
   @BeforeClass
   public static void setup() {
 
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
-      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+               + " not found. Not running test.");
       return;
     }
 
@@ -77,12 +78,17 @@ public class TestMRJobs {
       mrCluster.init(new Configuration());
       mrCluster.start();
     }
+
+    // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+    // for corresponding uberized tests.
+    mrCluster.getConfig().setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
   }
 
   @AfterClass
   public static void tearDown() {
     if (mrCluster != null) {
       mrCluster.stop();
+      mrCluster = null;
     }
   }
 
@@ -90,28 +96,41 @@ public class TestMRJobs {
   public void testSleepJob() throws IOException, InterruptedException,
       ClassNotFoundException { 
 
+    LOG.info("\n\n\nStarting testSleepJob().");
+
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
-      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+               + " not found. Not running test.");
       return;
     }
 
     SleepJob sleepJob = new SleepJob();
     sleepJob.setConf(mrCluster.getConfig());
-    //Job with 3 maps and 2 reduces
-    Job job = sleepJob.createJob(3, 2, 10000, 1, 5000, 1);
+
+    int numReduces = mrCluster.getConfig().getInt("TestMRJobs.testSleepJob.reduces", 2); // or mrCluster.getConfig().getInt(MRJobConfig.NUM_REDUCES, 2);
+
+    // job with 3 maps (10s) and numReduces reduces (5s), 1 "record" each:
+    Job job = sleepJob.createJob(3, numReduces, 10000, 1, 5000, 1);
+
     // TODO: We should not be setting MRAppJar as job.jar. It should be
     // uploaded separately by YarnRunner.
     job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
     job.waitForCompletion(true);
     Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+
+    // TODO later:  add explicit "isUber()" checks of some sort (extend
+    // JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
   }
 
   @Test
   public void testRandomWriter() throws IOException, InterruptedException,
       ClassNotFoundException {
 
+    LOG.info("\n\n\nStarting testRandomWriter().");
+
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
-      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+               + " not found. Not running test.");
       return;
     }
 
@@ -140,43 +159,24 @@ public class TestMRJobs {
       }
     }
     Assert.assertEquals("Number of part files is wrong!", 3, count);
+
+    // TODO later:  add explicit "isUber()" checks of some sort
   }
 
   @Test
   public void testFailingMapper() throws IOException, InterruptedException,
       ClassNotFoundException {
 
+    LOG.info("\n\n\nStarting testFailingMapper().");
+
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
-      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+               + " not found. Not running test.");
       return;
     }
 
-    int numMaps = 1;
-    mrCluster.getConfig().setInt(MRJobConfig.NUM_MAPS, numMaps);
-    
-    mrCluster.getConfig().setInt("mapreduce.task.timeout", 10*1000);//reduce the timeout
-    mrCluster.getConfig().setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the no of attempts
+    Job job = runFailingMapperJob();
 
-    Job job = new Job(mrCluster.getConfig());
-
-    job.setJarByClass(FailingMapper.class);
-    job.setJobName("failmapper");
-
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(Text.class);
-
-    job.setInputFormatClass(RandomInputFormat.class);
-    job.setMapperClass(FailingMapper.class);
-
-    job.setOutputFormatClass(TextOutputFormat.class);
-    job.setNumReduceTasks(0);
-    
-    FileOutputFormat.setOutputPath(job, new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
-        "failmapper-output"));
-    // TODO: We should not be setting MRAppJar as job.jar. It should be
-    // uploaded separately by YarnRunner.
-    job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
-    job.waitForCompletion(true);
     TaskID taskID = new TaskID(job.getJobID(), TaskType.MAP, 0);
     TaskAttemptID aId = new TaskAttemptID(taskID, 0);
     System.out.println("Diagnostics for " + aId + " :");
@@ -195,17 +195,50 @@ public class TestMRJobs {
     Assert.assertEquals(TaskCompletionEvent.Status.FAILED, 
         events[1].getStatus().FAILED);
     Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+
+    // TODO later:  add explicit "isUber()" checks of some sort
+  }
+
+  protected Job runFailingMapperJob()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    mrCluster.getConfig().setInt(MRJobConfig.NUM_MAPS, 1);
+    mrCluster.getConfig().setInt("mapreduce.task.timeout", 10*1000);//reduce the timeout
+    mrCluster.getConfig().setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the number of attempts
+
+    Job job = new Job(mrCluster.getConfig());
+
+    job.setJarByClass(FailingMapper.class);
+    job.setJobName("failmapper");
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setInputFormatClass(RandomInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    job.setMapperClass(FailingMapper.class);
+    job.setNumReduceTasks(0);
+    
+    FileOutputFormat.setOutputPath(job,
+        new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+        "failmapper-output"));
+    // TODO: We should not be setting MRAppJar as job.jar. It should be
+    // uploaded separately by YarnRunner.
+    job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+    job.waitForCompletion(true);
+
+    return job;
   }
 
 //@Test
   public void testSleepJobWithSecurityOn() throws IOException,
       InterruptedException, ClassNotFoundException {
 
+    LOG.info("\n\n\nStarting testSleepJobWithSecurityOn().");
+
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
       return;
     }
 
-    mrCluster.getConfig().set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+    mrCluster.getConfig().set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         "kerberos");
     mrCluster.getConfig().set(RMConfig.RM_KEYTAB, "/etc/krb5.keytab");
     mrCluster.getConfig().set(NMConfig.NM_KEYTAB, "/etc/krb5.keytab");
@@ -239,6 +272,8 @@ public class TestMRJobs {
         return null;
       }
     });
+
+    // TODO later:  add explicit "isUber()" checks of some sort
   }
 
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java Fri Apr  8 02:54:56 2011
@@ -47,7 +47,8 @@ public class TestMRJobsWithHistoryServic
   public void setup() throws InterruptedException, IOException {
 
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
-      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+               + " not found. Not running test.");
       return;
     }
 

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java?rev=1090092&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java Fri Apr  8 02:54:56 2011
@@ -0,0 +1,131 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.Exception;
+import java.security.PrivilegedExceptionAction;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.FailingMapper;
+import org.apache.hadoop.RandomTextWriterJob;
+import org.apache.hadoop.SleepJob;
+import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.v2.TestMRJobs;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.YarnServerConfig;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestUberAM extends TestMRJobs {
+
+  private static final Log LOG = LogFactory.getLog(TestUberAM.class);
+
+  @BeforeClass
+  public static void setup() {
+    TestMRJobs.setup();
+    mrCluster.getConfig().setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+  }
+
+  @Override
+  public void testSleepJob()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    mrCluster.getConfig().setInt("TestMRJobs.testSleepJob.reduces", 1);
+    super.testSleepJob();
+  }
+
+  @Override
+  public void testRandomWriter()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    super.testRandomWriter();
+  }
+
+  @Override
+  public void testFailingMapper()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    LOG.info("\n\n\nStarting uberized testFailingMapper().");
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    Job job = runFailingMapperJob();
+
+    // should be able to get diags for single task attempt...
+    TaskID taskID = new TaskID(job.getJobID(), TaskType.MAP, 0);
+    TaskAttemptID aId = new TaskAttemptID(taskID, 0);
+    System.out.println("Diagnostics for " + aId + " :");
+    for (String diag : job.getTaskDiagnostics(aId)) {
+      System.out.println(diag);
+    }
+    // ...but not for second (shouldn't exist:  uber-AM overrode max attempts)
+    boolean secondTaskAttemptExists = true;
+    try {
+      aId = new TaskAttemptID(taskID, 1);
+      System.out.println("Diagnostics for " + aId + " :");
+      for (String diag : job.getTaskDiagnostics(aId)) {
+        System.out.println(diag);
+      }
+    } catch (Exception e) {
+      secondTaskAttemptExists = false;
+    }
+    Assert.assertEquals(false, secondTaskAttemptExists);
+
+    TaskCompletionEvent[] events = job.getTaskCompletionEvents(0, 2);
+    Assert.assertEquals(1, events.length);
+    Assert.assertEquals(TaskCompletionEvent.Status.FAILED,
+        events[0].getStatus().FAILED);
+    Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+
+    // TODO later:  add explicit "isUber()" checks of some sort
+  }
+
+//@Test  //FIXME:  if/when the corresponding TestMRJobs test gets enabled, do so here as well (potentially with mods for ubermode)
+  public void testSleepJobWithSecurityOn()
+  throws IOException, InterruptedException, ClassNotFoundException {
+    super.testSleepJobWithSecurityOn();
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1090092&r1=1090091&r2=1090092&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Fri Apr  8 02:54:56 2011
@@ -42,6 +42,7 @@ public class MiniYARNCluster extends Com
 
   private static final Log LOG = LogFactory.getLog(MiniYARNCluster.class);
 
+  // temp fix until metrics system can auto-detect itself running in unit test:
   static {
     DefaultMetricsSystem.setMiniClusterMode(true);
   }



Mime
View raw message