hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1432796 [3/5] - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai...
Date Mon, 14 Jan 2013 03:44:52 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Mon Jan 14 03:44:35 2013
@@ -17,28 +17,63 @@
  */
 package org.apache.hadoop.mapreduce.v2.app;
 
-import java.io.IOException;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
-import junit.framework.Assert;
+import java.io.File;
+import java.io.IOException;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestMRAppMaster {
+  private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
+  static String stagingDir = "staging/";
+  
+  @BeforeClass
+  public static void setup() {
+    //Do not error out if metrics are inited multiple times
+    DefaultMetricsSystem.setMiniClusterMode(true);
+    File dir = new File(stagingDir);
+    stagingDir = dir.getAbsolutePath();
+  }
+  
+  @Before
+  public void cleanup() throws IOException {
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdirs();
+  }
+  
   @Test
   public void testMRAppMasterForDifferentUser() throws IOException,
       InterruptedException {
     String applicationAttemptIdStr = "appattempt_1317529182569_0004_000001";
     String containerIdStr = "container_1317529182569_0004_000001_1";
-    String stagingDir = "/tmp/staging";
+    
     String userName = "TestAppMasterUser";
     ApplicationAttemptId applicationAttemptId = ConverterUtils
         .toApplicationAttemptId(applicationAttemptIdStr);
@@ -49,34 +84,208 @@ public class TestMRAppMaster {
     YarnConfiguration conf = new YarnConfiguration();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
     MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
-    Assert.assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
+    assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
         + ".staging", appMaster.stagingDirPath.toString());
   }
+  
+  @Test
+  public void testMRAppMasterMidLock() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+    Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+    FileSystem fs = FileSystem.get(conf);
+    //Create the file, but no end file so we should unregister with an error.
+    fs.create(start).close();
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
+    appMaster.stop();
+  }
+  
+  @Test
+  public void testMRAppMasterSuccessLock() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+    Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+    Path end = MRApps.getEndJobCommitSuccessFile(conf, userName, jobId);
+    FileSystem fs = FileSystem.get(conf);
+    fs.create(start).close();
+    fs.create(end).close();
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState);
+    appMaster.stop();
+  }
+  
+  @Test
+  public void testMRAppMasterFailLock() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    JobId jobId =  TypeConverter.toYarn(
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
+    Path start = MRApps.getStartJobCommitFile(conf, userName, jobId);
+    Path end = MRApps.getEndJobCommitFailureFile(conf, userName, jobId);
+    FileSystem fs = FileSystem.get(conf);
+    fs.create(start).close();
+    fs.create(end).close();
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    assertEquals(JobStateInternal.FAILED, appMaster.forcedState);
+    appMaster.stop();
+  }
+  
+  @Test
+  public void testMRAppMasterMissingStaging() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
+    String containerIdStr = "container_1317529182569_0004_000002_1";
+    String userName = "TestAppMasterUser";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+
+    //Delete the staging directory
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMaster appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis(), false);
+    boolean caught = false;
+    try {
+      MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    } catch (IOException e) {
+      //The IO Exception is expected
+      LOG.info("Caught expected Exception", e);
+      caught = true;
+    }
+    assertTrue(caught);
+    assertTrue(appMaster.errorHappenedShutDown);
+    //Copying the history file is disabled, but it is not really visible from 
+    //here
+    assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
+    appMaster.stop();
+  }
 }
 
 class MRAppMasterTest extends MRAppMaster {
 
   Path stagingDirPath;
   private Configuration conf;
+  private boolean overrideInitAndStart;
+  ContainerAllocator mockContainerAllocator;
 
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String host, int port, int httpPort,
       long submitTime) {
+    this(applicationAttemptId, containerId, host, port, httpPort, submitTime, 
+        true);
+  }
+  public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId, String host, int port, int httpPort,
+      long submitTime, boolean overrideInitAndStart) {
     super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
+    this.overrideInitAndStart = overrideInitAndStart;
+    mockContainerAllocator = mock(ContainerAllocator.class);
   }
 
   @Override
   public void init(Configuration conf) {
-    this.conf = conf;
+    if (overrideInitAndStart) {
+      this.conf = conf; 
+    } else {
+      super.init(conf);
+    }
+  }
+  
+  @Override 
+  protected void downloadTokensAndSetupUGI(Configuration conf) {
+    try {
+      this.currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+  }
+  
+  @Override
+  protected ContainerAllocator createContainerAllocator(
+      final ClientService clientService, final AppContext context) {
+    return mockContainerAllocator;
   }
 
   @Override
   public void start() {
-    try {
-      String user = UserGroupInformation.getCurrentUser().getShortUserName();
-      stagingDirPath = MRApps.getStagingAreaDir(conf, user);
-    } catch (Exception e) {
-      Assert.fail(e.getMessage());
+    if (overrideInitAndStart) {
+      try {
+        String user = UserGroupInformation.getCurrentUser().getShortUserName();
+        stagingDirPath = MRApps.getStagingAreaDir(conf, user);
+      } catch (Exception e) {
+        fail(e.getMessage());
+      }
+    } else {
+      super.start();
     }
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Mon Jan 14 03:44:35 2013
@@ -38,6 +38,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import junit.framework.Assert;
 
@@ -70,7 +71,9 @@ import org.apache.hadoop.mapreduce.v2.ut
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -1139,7 +1142,7 @@ public class TestRMContainerAllocator {
     }
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
         taskAttemptId);
-    Resource containerNeed = BuilderUtils.newResource(memory);
+    Resource containerNeed = BuilderUtils.newResource(memory, 1);
     if (earlierFailedAttempt) {
       return ContainerRequestEvent
           .createContainerRequestEventForFailedContainer(attemptId,
@@ -1222,8 +1225,8 @@ public class TestRMContainerAllocator {
       when(context.getApplicationAttemptId()).thenReturn(appAttemptId);
       when(context.getJob(isA(JobId.class))).thenReturn(job);
       when(context.getClusterInfo()).thenReturn(
-          new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
-              .newResource(10240)));
+          new ClusterInfo(BuilderUtils.newResource(1024, 1), BuilderUtils
+              .newResource(10240, 1)));
       when(context.getEventHandler()).thenReturn(new EventHandler() {
         @Override
         public void handle(Event event) {
@@ -1240,6 +1243,13 @@ public class TestRMContainerAllocator {
       return context;
     }
 
+    private static AppContext createAppContext(
+        ApplicationAttemptId appAttemptId, Job job, Clock clock) {
+      AppContext context = createAppContext(appAttemptId, job);
+      when(context.getClock()).thenReturn(clock);
+      return context;
+    }
+
     private static ClientService createMockClientService() {
       ClientService service = mock(ClientService.class);
       when(service.getBindAddress()).thenReturn(
@@ -1264,6 +1274,15 @@ public class TestRMContainerAllocator {
       super.start();
     }
 
+    public MyContainerAllocator(MyResourceManager rm, Configuration conf,
+        ApplicationAttemptId appAttemptId, Job job, Clock clock) {
+      super(createMockClientService(),
+          createAppContext(appAttemptId, job, clock));
+      this.rm = rm;
+      super.init(conf);
+      super.start();
+    }
+
     @Override
     protected AMRMProtocol createSchedulerProxy() {
       return this.rm.getApplicationMasterService();
@@ -1280,12 +1299,12 @@ public class TestRMContainerAllocator {
 
     @Override
     protected Resource getMinContainerCapability() {
-      return BuilderUtils.newResource(1024);
+      return BuilderUtils.newResource(1024, 1);
     }
 
     @Override
     protected Resource getMaxContainerCapability() {
-      return BuilderUtils.newResource(10240);
+      return BuilderUtils.newResource(10240, 1);
     }
 
     public void sendRequest(ContainerRequestEvent req) {
@@ -1465,6 +1484,66 @@ public class TestRMContainerAllocator {
         allocator.recalculatedReduceSchedule);
   }
 
+  @Test
+  public void testHeartbeatHandler() throws Exception {
+    LOG.info("Running testHeartbeatHandler");
+
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1);
+    ControlledClock clock = new ControlledClock(new SystemClock());
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getClock()).thenReturn(clock);
+    when(appContext.getApplicationID()).thenReturn(
+        BuilderUtils.newApplicationId(1, 1));
+
+    RMContainerAllocator allocator = new RMContainerAllocator(
+        mock(ClientService.class), appContext) {
+          @Override
+          protected void register() {
+          }
+          @Override
+          protected AMRMProtocol createSchedulerProxy() {
+            return mock(AMRMProtocol.class);
+          }
+          @Override
+          protected synchronized void heartbeat() throws Exception {
+          }
+    };
+    allocator.init(conf);
+    allocator.start();
+
+    clock.setTime(5);
+    int timeToWaitMs = 5000;
+    while (allocator.getLastHeartbeatTime() != 5 && timeToWaitMs > 0) {
+      Thread.sleep(10);
+      timeToWaitMs -= 10;
+    }
+    Assert.assertEquals(5, allocator.getLastHeartbeatTime());
+    clock.setTime(7);
+    timeToWaitMs = 5000;
+    while (allocator.getLastHeartbeatTime() != 7 && timeToWaitMs > 0) {
+      Thread.sleep(10);
+      timeToWaitMs -= 10;
+    }
+    Assert.assertEquals(7, allocator.getLastHeartbeatTime());
+
+    final AtomicBoolean callbackCalled = new AtomicBoolean(false);
+    allocator.runOnNextHeartbeat(new Runnable() {
+      @Override
+      public void run() {
+        callbackCalled.set(true);
+      }
+    });
+    clock.setTime(8);
+    timeToWaitMs = 5000;
+    while (allocator.getLastHeartbeatTime() != 8 && timeToWaitMs > 0) {
+      Thread.sleep(10);
+      timeToWaitMs -= 10;
+    }
+    Assert.assertEquals(8, allocator.getLastHeartbeatTime());
+    Assert.assertTrue(callbackCalled.get());
+  }
+
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Mon Jan 14 03:44:35 2013
@@ -626,6 +626,115 @@ public class TestRecovery {
     validateOutput();
   }
   
+  @Test
+  public void testRecoveryWithOldCommiter() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", false);
+    conf.setBoolean("mapred.reducer.new-api", false);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task reduceTask1 = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+        .next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+  
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait for map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    app.waitForState(reduceTask1, TaskState.RUNNING);
+    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+    
+    // write output corresponding to reduce1
+    writeOutput(reduce1Attempt1, conf);
+    
+    //send the done signal to the 1st reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for first reduce task to complete
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+    
+    //stop the app before the job completes.
+    app.stop();
+
+    //rerun
+    //in rerun the map will be recovered from previous run
+    app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+        ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", false);
+    conf.setBoolean("mapred.reducer.new-api", false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    reduceTask1 = it.next();
+    Task reduceTask2 = it.next();
+    
+    // map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port after recovery
+    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    // first reduce will be recovered, no need to send done
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED); 
+    
+    app.waitForState(reduceTask2, TaskState.RUNNING);
+    
+    TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
+        .iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
+    
+   //send the done signal to the 2nd reduce task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce2Attempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait to get it completed
+    app.waitForState(reduceTask2, TaskState.SUCCEEDED);
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    validateOutput();
+  }
+  
   private void writeBadOutput(TaskAttempt attempt, Configuration conf)
   throws Exception {
   TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Mon Jan 14 03:44:35 2013
@@ -38,10 +38,13 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -71,6 +74,10 @@ import org.junit.Test;
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
      ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
          ApplicationAttemptId.class);
      attemptId.setAttemptId(0);
@@ -92,6 +99,10 @@ import org.junit.Test;
      conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
      ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
          ApplicationAttemptId.class);
      attemptId.setAttemptId(0);
@@ -117,6 +128,10 @@ import org.junit.Test;
      conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
+     //Staging Dir exists
+     String user = UserGroupInformation.getCurrentUser().getShortUserName();
+     Path stagingDir = MRApps.getStagingAreaDir(conf, user);
+     when(fs.exists(stagingDir)).thenReturn(true);
      ApplicationAttemptId attemptId = recordFactory.newRecordInstance(
          ApplicationAttemptId.class);
      attemptId.setAttemptId(1);
@@ -166,6 +181,11 @@ import org.junit.Test;
      }
 
      @Override
+     public RMHeartbeatHandler getRMHeartbeatHandler() {
+       return getStubbedHeartbeatHandler(getContext());
+     }
+
+     @Override
      protected void sysexit() {      
      }
 
@@ -177,6 +197,7 @@ import org.junit.Test;
      @Override
      protected void downloadTokensAndSetupUGI(Configuration conf) {
      }
+
    }
 
   private final class MRAppTestCleanup extends MRApp {
@@ -191,7 +212,8 @@ import org.junit.Test;
     }
 
     @Override
-    protected Job createJob(Configuration conf) {
+    protected Job createJob(Configuration conf, JobStateInternal forcedState, 
+        String diagnostic) {
       UserGroupInformation currentUser = null;
       try {
         currentUser = UserGroupInformation.getCurrentUser();
@@ -201,8 +223,8 @@ import org.junit.Test;
       Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
           getDispatcher().getEventHandler(),
           getTaskAttemptListener(), getContext().getClock(),
-          getCommitter(), isNewApiCommitter(),
-          currentUser.getUserName(), getContext());
+          isNewApiCommitter(), currentUser.getUserName(), getContext(),
+          forcedState, diagnostic);
       ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
 
       getDispatcher().register(JobFinishEvent.Type.class,
@@ -239,6 +261,11 @@ import org.junit.Test;
     }
 
     @Override
+    public RMHeartbeatHandler getRMHeartbeatHandler() {
+      return getStubbedHeartbeatHandler(getContext());
+    }
+
+    @Override
     public void cleanupStagingDir() throws IOException {
       cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator;
     }
@@ -248,6 +275,20 @@ import org.junit.Test;
     }
   }
 
+  private static RMHeartbeatHandler getStubbedHeartbeatHandler(
+      final AppContext appContext) {
+    return new RMHeartbeatHandler() {
+      @Override
+      public long getLastHeartbeatTime() {
+        return appContext.getClock().getTime();
+      }
+      @Override
+      public void runOnNextHeartbeat(Runnable callback) {
+        callback.run();
+      }
+    };
+  }
+
   @Test
   public void testStagingCleanupOrder() throws Exception {
     MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Mon Jan 14 03:44:35 2013
@@ -19,171 +19,308 @@
 package org.apache.hadoop.mapreduce.v2.app.job.impl;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
+import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
-import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 
 /**
  * Tests various functions of the JobImpl class
  */
-@SuppressWarnings({"unchecked", "rawtypes"})
+@SuppressWarnings({"rawtypes"})
 public class TestJobImpl {
   
-  @Test
-  public void testJobNoTasksTransition() { 
-    JobNoTasksCompletedTransition trans = new JobNoTasksCompletedTransition();
-    JobImpl mockJob = mock(JobImpl.class);
-
-    // Force checkJobCompleteSuccess to return null
-    Task mockTask = mock(Task.class);
-    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
-    tasks.put(mockTask.getID(), mockTask);
-    mockJob.tasks = tasks;
+  static String stagingDir = "target/test-staging/";
 
-    when(mockJob.getInternalState()).thenReturn(JobStateInternal.ERROR);
-    JobEvent mockJobEvent = mock(JobEvent.class);
-    JobStateInternal state = trans.transition(mockJob, mockJobEvent);
-    Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
-        JobStateInternal.ERROR, state);
+  @BeforeClass
+  public static void setup() {    
+    File dir = new File(stagingDir);
+    stagingDir = dir.getAbsolutePath();
   }
 
+  @Before
+  public void cleanup() throws IOException {
+    File dir = new File(stagingDir);
+    if(dir.exists()) {
+      FileUtils.deleteDirectory(dir);
+    }
+    dir.mkdirs();
+  }
+  
   @Test
-  public void testCommitJobFailsJob() {
+  public void testJobNoTasks() {
+    Configuration conf = new Configuration();
+    conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = mock(OutputCommitter.class);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createStubbedJob(conf, dispatcher, 0);
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.SUCCEEDED);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
 
-    JobImpl mockJob = mock(JobImpl.class);
-    mockJob.tasks = new HashMap<TaskId, Task>();
-    OutputCommitter mockCommitter = mock(OutputCommitter.class);
-    EventHandler mockEventHandler = mock(EventHandler.class);
-    JobContext mockJobContext = mock(JobContext.class);
-
-    when(mockJob.getCommitter()).thenReturn(mockCommitter);
-    when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
-    when(mockJob.getJobContext()).thenReturn(mockJobContext);
-    when(mockJob.finished(JobStateInternal.KILLED)).thenReturn(
-        JobStateInternal.KILLED);
-    when(mockJob.finished(JobStateInternal.FAILED)).thenReturn(
-        JobStateInternal.FAILED);
-    when(mockJob.finished(JobStateInternal.SUCCEEDED)).thenReturn(
-        JobStateInternal.SUCCEEDED);
-
-    try {
-      doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class));
-    } catch (IOException e) {
-      // commitJob stubbed out, so this can't happen
-    }
-    doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
-    JobStateInternal jobState = JobImpl.checkJobCompleteSuccess(mockJob);
-    Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
-      "for successful job", jobState);
-    Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobStateInternal.FAILED, jobState);
-    verify(mockJob).abortJob(
-        eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+  @Test(timeout=20000)
+  public void testCommitJobFailsJob() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new TestingOutputCommitter(syncBarrier, false);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    completeJobTasks(job);
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    // let the committer fail and verify the job fails
+    syncBarrier.await();
+    assertJobState(job, JobStateInternal.FAILED);
+    dispatcher.stop();
+    commitHandler.stop();
   }
 
-  @Test
-  public void testCheckJobCompleteSuccess() {
-    
-    JobImpl mockJob = mock(JobImpl.class);
-    mockJob.tasks = new HashMap<TaskId, Task>();
-    OutputCommitter mockCommitter = mock(OutputCommitter.class);
-    EventHandler mockEventHandler = mock(EventHandler.class);
-    JobContext mockJobContext = mock(JobContext.class);
-    
-    when(mockJob.getCommitter()).thenReturn(mockCommitter);
-    when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
-    when(mockJob.getJobContext()).thenReturn(mockJobContext);
-    doNothing().when(mockJob).setFinishTime();
-    doNothing().when(mockJob).logJobHistoryFinishedEvent();
-    when(mockJob.finished(any(JobStateInternal.class))).thenReturn(
-        JobStateInternal.SUCCEEDED);
-
-    try {
-      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
-    } catch (IOException e) {
-      // commitJob stubbed out, so this can't happen
-    }
-    doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
-    Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
-      "for successful job",
-      JobImpl.checkJobCompleteSuccess(mockJob));
-    Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobStateInternal.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
+  @Test(timeout=20000)
+  public void testCheckJobCompleteSuccess() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new TestingOutputCommitter(syncBarrier, true);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    completeJobTasks(job);
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    // let the committer complete and verify the job succeeds
+    syncBarrier.await();
+    assertJobState(job, JobStateInternal.SUCCEEDED);
+    dispatcher.stop();
+    commitHandler.stop();
   }
 
-  @Test
-  public void testCheckJobCompleteSuccessFailed() {
-    JobImpl mockJob = mock(JobImpl.class);
+  @Test(timeout=20000)
+  public void testKilledDuringSetup() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = new StubbedOutputCommitter() {
+      @Override
+      public synchronized void setupJob(JobContext jobContext)
+          throws IOException {
+        while (!Thread.interrupted()) {
+          try {
+            wait();
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
 
-    // Make the completedTasks not equal the getTasks()
-    Task mockTask = mock(Task.class);
-    Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
-    tasks.put(mockTask.getID(), mockTask);
-    mockJob.tasks = tasks;
-    
-    try {
-      // Just in case the code breaks and reaches these calls
-      OutputCommitter mockCommitter = mock(OutputCommitter.class);
-      EventHandler mockEventHandler = mock(EventHandler.class);
-      doNothing().when(mockCommitter).commitJob(any(JobContext.class));
-      doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
-    } catch (IOException e) {
-      e.printStackTrace();    
-    }
-    Assert.assertNull("checkJobCompleteSuccess incorrectly returns not-null " +
-      "for unsuccessful job",
-      JobImpl.checkJobCompleteSuccess(mockJob));
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.SETUP);
+
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILLED);
+    dispatcher.stop();
+    commitHandler.stop();
   }
 
+  @Test(timeout=20000)
+  public void testKilledDuringCommit() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    OutputCommitter committer = new WaitingOutputCommitter(syncBarrier, true);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    completeJobTasks(job);
+    assertJobState(job, JobStateInternal.COMMITTING);
+
+    syncBarrier.await();
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILLED);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
+  @Test(timeout=20000)
+  public void testKilledDuringFailAbort() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = new StubbedOutputCommitter() {
+      @Override
+      public void setupJob(JobContext jobContext) throws IOException {
+        throw new IOException("forced failure");
+      }
+
+      @Override
+      public synchronized void abortJob(JobContext jobContext, State state)
+          throws IOException {
+        while (!Thread.interrupted()) {
+          try {
+            wait();
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.FAIL_ABORT);
+
+    job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILLED);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
+  @Test(timeout=20000)
+  public void testKilledDuringKillAbort() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    OutputCommitter committer = new StubbedOutputCommitter() {
+      @Override
+      public synchronized void abortJob(JobContext jobContext, State state)
+          throws IOException {
+        while (!Thread.interrupted()) {
+          try {
+            wait();
+          } catch (InterruptedException e) {
+          }
+        }
+      }
+    };
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
+
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.SETUP);
+
+    job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILL_ABORT);
+
+    job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+    assertJobState(job, JobStateInternal.KILLED);
+    dispatcher.stop();
+    commitHandler.stop();
+  }
 
   public static void main(String[] args) throws Exception {
     TestJobImpl t = new TestJobImpl();
-    t.testJobNoTasksTransition();
+    t.testJobNoTasks();
     t.testCheckJobCompleteSuccess();
-    t.testCheckJobCompleteSuccessFailed();
     t.testCheckAccess();
     t.testReportDiagnostics();
     t.testUberDecision();
@@ -208,7 +345,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -219,7 +356,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -230,7 +367,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -241,7 +378,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -252,7 +389,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null);
+        null, null, true, null, 0, null, null, null, null);
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
@@ -270,8 +407,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, mock(OutputCommitter.class),
-        true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null, null, null);
     job.handle(diagUpdateEvent);
     String diagnostics = job.getReport().getDiagnostics();
     Assert.assertNotNull(diagnostics);
@@ -282,8 +418,7 @@ public class TestJobImpl {
         mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null,
         new SystemClock(), null,
-        mrAppMetrics, mock(OutputCommitter.class),
-        true, null, 0, null, null);
+        mrAppMetrics, true, null, 0, null, null, null, null);
     job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
     job.handle(diagUpdateEvent);
     diagnostics = job.getReport().getDiagnostics();
@@ -338,20 +473,23 @@ public class TestJobImpl {
     JobImpl job = new JobImpl(jobId, Records
         .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
         null, mock(JobTokenSecretManager.class), null, null, null,
-        mrAppMetrics, mock(OutputCommitter.class), true, null, 0, null, null);
-    InitTransition initTransition = getInitTransition();
+        mrAppMetrics, true, null, 0, null, null, null, null);
+    InitTransition initTransition = getInitTransition(2);
     JobEvent mockJobEvent = mock(JobEvent.class);
     initTransition.transition(job, mockJobEvent);
     boolean isUber = job.isUber();
     return isUber;
   }
 
-  private static InitTransition getInitTransition() {
+  private static InitTransition getInitTransition(final int numSplits) {
     InitTransition initTransition = new InitTransition() {
       @Override
       protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
-        return new TaskSplitMetaInfo[] { new TaskSplitMetaInfo(),
-            new TaskSplitMetaInfo() };
+        TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numSplits];
+        for (int i = 0; i < numSplits; ++i) {
+          splits[i] = new TaskSplitMetaInfo();
+        }
+        return splits;
       }
     };
     return initTransition;
@@ -360,19 +498,24 @@ public class TestJobImpl {
   @Test
   public void testTransitionsAtFailed() throws IOException {
     Configuration conf = new Configuration();
-    JobID jobID = JobID.forName("job_1234567890000_0001");
-    JobId jobId = TypeConverter.toYarn(jobID);
+    AsyncDispatcher dispatcher = new AsyncDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+
     OutputCommitter committer = mock(OutputCommitter.class);
     doThrow(new IOException("forcefail"))
       .when(committer).setupJob(any(JobContext.class));
-    InlineDispatcher dispatcher = new InlineDispatcher();
-    JobImpl job = new StubbedJob(jobId, Records
-        .newRecord(ApplicationAttemptId.class), conf,
-        dispatcher.getEventHandler(), committer, true, null);
+    CommitterEventHandler commitHandler =
+        createCommitterEventHandler(dispatcher, committer);
+    commitHandler.init(conf);
+    commitHandler.start();
 
-    dispatcher.register(JobEventType.class, job);
+    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
-    Assert.assertEquals(JobState.FAILED, job.getState());
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.FAILED);
 
     job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
     Assert.assertEquals(JobState.FAILED, job.getState());
@@ -382,17 +525,100 @@ public class TestJobImpl {
     Assert.assertEquals(JobState.FAILED, job.getState());
     job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
     Assert.assertEquals(JobState.FAILED, job.getState());
+
+    dispatcher.stop();
+    commitHandler.stop();
+  }
+
+  private static CommitterEventHandler createCommitterEventHandler(
+      Dispatcher dispatcher, OutputCommitter committer) {
+    final SystemClock clock = new SystemClock();
+    AppContext appContext = mock(AppContext.class);
+    when(appContext.getEventHandler()).thenReturn(
+        dispatcher.getEventHandler());
+    when(appContext.getClock()).thenReturn(clock);
+    RMHeartbeatHandler heartbeatHandler = new RMHeartbeatHandler() {
+      @Override
+      public long getLastHeartbeatTime() {
+        return clock.getTime();
+      }
+      @Override
+      public void runOnNextHeartbeat(Runnable callback) {
+        callback.run();
+      }
+    };
+    ApplicationAttemptId id = 
+      ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
+    when(appContext.getApplicationID()).thenReturn(id.getApplicationId());
+    when(appContext.getApplicationAttemptId()).thenReturn(id);
+    CommitterEventHandler handler =
+        new CommitterEventHandler(appContext, committer, heartbeatHandler);
+    dispatcher.register(CommitterEventType.class, handler);
+    return handler;
+  }
+
+  private static StubbedJob createStubbedJob(Configuration conf,
+      Dispatcher dispatcher, int numSplits) {
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+    StubbedJob job = new StubbedJob(jobId,
+        Records.newRecord(ApplicationAttemptId.class), conf,
+        dispatcher.getEventHandler(), true, "somebody", numSplits);
+    dispatcher.register(JobEventType.class, job);
+    EventHandler mockHandler = mock(EventHandler.class);
+    dispatcher.register(TaskEventType.class, mockHandler);
+    dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+        mockHandler);
+    dispatcher.register(JobFinishEvent.Type.class, mockHandler);
+    return job;
+  }
+
+  private static StubbedJob createRunningStubbedJob(Configuration conf,
+      Dispatcher dispatcher, int numSplits) {
+    StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
+    assertJobState(job, JobStateInternal.INITED);
+    job.handle(new JobEvent(job.getID(), JobEventType.JOB_START));
+    assertJobState(job, JobStateInternal.RUNNING);
+    return job;
+  }
+
+  private static void completeJobTasks(JobImpl job) {
+    // complete the map tasks and the reduce tasks so we start committing
+    int numMaps = job.getTotalMaps();
+    for (int i = 0; i < numMaps; ++i) {
+      job.handle(new JobTaskEvent(
+          MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
+          TaskState.SUCCEEDED));
+      Assert.assertEquals(JobState.RUNNING, job.getState());
+    }
+    int numReduces = job.getTotalReduces();
+    for (int i = 0; i < numReduces; ++i) {
+      job.handle(new JobTaskEvent(
+          MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
+          TaskState.SUCCEEDED));
+      Assert.assertEquals(JobState.RUNNING, job.getState());
+    }
+  }
+
+  private static void assertJobState(JobImpl job, JobStateInternal state) {
+    int timeToWaitMsec = 5 * 1000;
+    while (timeToWaitMsec > 0 && job.getInternalState() != state) {
+      try {
+        Thread.sleep(10);
+        timeToWaitMsec -= 10;
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+    Assert.assertEquals(state, job.getInternalState());
   }
 
   private static class StubbedJob extends JobImpl {
     //override the init transition
-    private final InitTransition initTransition = getInitTransition();
-    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
-        = stateMachineFactory.addTransition(JobStateInternal.NEW,
-            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
-            JobEventType.JOB_INIT,
-            // This is abusive.
-            initTransition);
+    private final InitTransition initTransition;
+    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
+        localFactory;
 
     private final StateMachine<JobStateInternal, JobEventType, JobEvent>
         localStateMachine;
@@ -404,15 +630,103 @@ public class TestJobImpl {
 
     public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
         Configuration conf, EventHandler eventHandler,
-        OutputCommitter committer, boolean newApiCommitter, String user) {
+        boolean newApiCommitter, String user, int numSplits) {
       super(jobId, applicationAttemptId, conf, eventHandler,
           null, new JobTokenSecretManager(), new Credentials(),
-          new SystemClock(), null, MRAppMetrics.create(), committer,
-          newApiCommitter, user, System.currentTimeMillis(), null, null);
+          new SystemClock(), null, MRAppMetrics.create(),
+          newApiCommitter, user, System.currentTimeMillis(), null, null, null,
+          null);
+
+      initTransition = getInitTransition(numSplits);
+      localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,
+            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
+            JobEventType.JOB_INIT,
+            // This is abusive.
+            initTransition);
 
       // This "this leak" is okay because the retained pointer is in an
       //  instance variable.
       localStateMachine = localFactory.make(this);
     }
   }
+
+  private static class StubbedOutputCommitter extends OutputCommitter {
+
+    public StubbedOutputCommitter() {
+      super();
+    }
+
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) throws IOException {
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) throws IOException {
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) throws IOException {
+    }
+  }
+
+  private static class TestingOutputCommitter extends StubbedOutputCommitter {
+    CyclicBarrier syncBarrier;
+    boolean shouldSucceed;
+
+    public TestingOutputCommitter(CyclicBarrier syncBarrier,
+        boolean shouldSucceed) {
+      super();
+      this.syncBarrier = syncBarrier;
+      this.shouldSucceed = shouldSucceed;
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      try {
+        syncBarrier.await();
+      } catch (BrokenBarrierException e) {
+      } catch (InterruptedException e) {
+      }
+
+      if (!shouldSucceed) {
+        throw new IOException("forced failure");
+      }
+    }
+  }
+
+  private static class WaitingOutputCommitter extends TestingOutputCommitter {
+    public WaitingOutputCommitter(CyclicBarrier syncBarrier,
+        boolean shouldSucceed) {
+      super(syncBarrier, shouldSucceed);
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      try {
+        syncBarrier.await();
+      } catch (BrokenBarrierException e) {
+      } catch (InterruptedException e) {
+      }
+
+      while (!Thread.interrupted()) {
+        try {
+          synchronized (this) {
+            wait();
+          }
+        } catch (InterruptedException e) {
+          break;
+        }
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Mon Jan 14 03:44:35 2013
@@ -43,7 +43,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -198,7 +197,7 @@ public class TestTaskAttempt{
     conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
     conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
     app.setClusterInfo(new ClusterInfo(BuilderUtils
-        .newResource(minContainerSize), BuilderUtils.newResource(10240)));
+        .newResource(minContainerSize, 1), BuilderUtils.newResource(10240, 1)));
 
     Job job = app.submit(conf);
     app.waitForState(job, JobState.RUNNING);
@@ -253,10 +252,9 @@ public class TestTaskAttempt{
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
     Path jobFile = mock(Path.class);
     JobConf jobConf = new JobConf();
-    OutputCommitter outputCommitter = mock(OutputCommitter.class);
     TaskAttemptImpl taImpl =
         new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
-            taskSplitMetaInfo, jobConf, taListener, outputCommitter, null,
+            taskSplitMetaInfo, jobConf, taListener, null,
             null, clock, null);
     return taImpl;
   }
@@ -342,7 +340,7 @@ public class TestTaskAttempt{
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
           splits, jobConf, taListener,
-          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          mock(Token.class), new Credentials(),
           new SystemClock(), null);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -397,7 +395,7 @@ public class TestTaskAttempt{
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
           splits, jobConf, taListener,
-          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          mock(Token.class), new Credentials(),
           new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -453,7 +451,7 @@ public class TestTaskAttempt{
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
           splits, jobConf, taListener,
-          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          mock(Token.class), new Credentials(),
           new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -512,7 +510,7 @@ public class TestTaskAttempt{
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
           splits, jobConf, taListener,
-          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          mock(Token.class), new Credentials(),
           new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -578,7 +576,7 @@ public class TestTaskAttempt{
     when(resource.getMemory()).thenReturn(1024);
 
     TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
-        jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+        jobFile, 1, splits, jobConf, taListener,
         mock(Token.class), new Credentials(), new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
@@ -628,7 +626,7 @@ public class TestTaskAttempt{
     when(resource.getMemory()).thenReturn(1024);
 
     TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
-        jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+        jobFile, 1, splits, jobConf, taListener,
         mock(Token.class), new Credentials(), new SystemClock(), appCtx);
 
     NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java Mon Jan 14 03:44:35 2013
@@ -40,7 +40,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -107,7 +106,7 @@ public class TestTaskAttemptContainerReq
     TaskAttemptImpl taImpl =
         new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
             mock(TaskSplitMetaInfo.class), jobConf, taListener,
-            mock(OutputCommitter.class), jobToken, credentials,
+            jobToken, credentials,
             new SystemClock(), null);
 
     jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Mon Jan 14 03:44:35 2013
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -71,7 +70,6 @@ public class TestTaskImpl {
   
   private JobConf conf;
   private TaskAttemptListener taskAttemptListener;
-  private OutputCommitter committer;
   private Token<JobTokenIdentifier> jobToken;
   private JobId jobId;
   private Path remoteJobConfFile;
@@ -99,13 +97,13 @@ public class TestTaskImpl {
 
     public MockTaskImpl(JobId jobId, int partition,
         EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
-        TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+        TaskAttemptListener taskAttemptListener,
         Token<JobTokenIdentifier> jobToken,
         Credentials credentials, Clock clock,
         Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
         MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
       super(jobId, taskType , partition, eventHandler,
-          remoteJobConfFile, conf, taskAttemptListener, committer, 
+          remoteJobConfFile, conf, taskAttemptListener,
           jobToken, credentials, clock,
           completedTasksFromPreviousRun, startCount, metrics, appContext);
       this.taskType = taskType;
@@ -120,7 +118,7 @@ public class TestTaskImpl {
     protected TaskAttemptImpl createAttempt() {
       MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter, 
           eventHandler, taskAttemptListener, remoteJobConfFile, partition,
-          conf, committer, jobToken, credentials, clock, appContext, taskType);
+          conf, jobToken, credentials, clock, appContext, taskType);
       taskAttempts.add(attempt);
       return attempt;
     }
@@ -145,12 +143,11 @@ public class TestTaskImpl {
 
     public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
-        JobConf conf, OutputCommitter committer,
-        Token<JobTokenIdentifier> jobToken,
+        JobConf conf, Token<JobTokenIdentifier> jobToken,
         Credentials credentials, Clock clock,
         AppContext appContext, TaskType taskType) {
       super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
-          dataLocations, committer, jobToken, credentials, clock, appContext);
+          dataLocations, jobToken, credentials, clock, appContext);
       this.taskType = taskType;
     }
 
@@ -210,7 +207,6 @@ public class TestTaskImpl {
     
     conf = new JobConf();
     taskAttemptListener = mock(TaskAttemptListener.class);
-    committer = mock(OutputCommitter.class);
     jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
     remoteJobConfFile = mock(Path.class);
     credentials = null;
@@ -235,7 +231,7 @@ public class TestTaskImpl {
   
   private MockTaskImpl createMockTask(TaskType taskType) {
     return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
-        remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
+        remoteJobConfFile, conf, taskAttemptListener, jobToken,
         credentials, clock,
         completedTasksFromPreviousRun, startCount,
         metrics, appContext, taskType);
@@ -606,7 +602,7 @@ public class TestTaskImpl {
   @Test
   public void testFailedTransitions() {
     mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
-        remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
+        remoteJobConfFile, conf, taskAttemptListener, jobToken,
         credentials, clock,
         completedTasksFromPreviousRun, startCount,
         metrics, appContext, TaskType.MAP) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java Mon Jan 14 03:44:35 2013
@@ -99,8 +99,8 @@ public class TestLocalContainerAllocator
       when(ctx.getApplicationAttemptId()).thenReturn(attemptId);
       when(ctx.getJob(isA(JobId.class))).thenReturn(job);
       when(ctx.getClusterInfo()).thenReturn(
-          new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
-              .newResource(10240)));
+          new ClusterInfo(BuilderUtils.newResource(1024, 1), BuilderUtils
+              .newResource(10240, 1)));
       when(ctx.getEventHandler()).thenReturn(eventHandler);
       return ctx;
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml Mon Jan 14 03:44:35 2013
@@ -81,6 +81,7 @@
             <configuration>
               <executable>protoc</executable>
               <arguments>
+                <argument>-I../../../hadoop-common-project/hadoop-common/src/main/proto/</argument>
                 <argument>-I../../../hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/</argument>
                 <argument>-Isrc/main/proto/</argument>
                 <argument>--java_out=target/generated-sources/proto</argument>

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Jan 14 03:44:35 2013
@@ -67,7 +67,6 @@ import com.google.common.util.concurrent
 /** Implements MapReduce locally, in-process, for debugging. */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-@SuppressWarnings("deprecation")
 public class LocalJobRunner implements ClientProtocol {
   public static final Log LOG =
     LogFactory.getLog(LocalJobRunner.class);
@@ -610,8 +609,12 @@ public class LocalJobRunner implements C
   // JobSubmissionProtocol methods
 
   private static int jobid = 0;
+  // used for making sure that local jobs run in different jvms don't
+  // collide on staging or job directories
+  private int randid;
+  
   public synchronized org.apache.hadoop.mapreduce.JobID getNewJobID() {
-    return new org.apache.hadoop.mapreduce.JobID("local", ++jobid);
+    return new org.apache.hadoop.mapreduce.JobID("local" + randid, ++jobid);
   }
 
   public org.apache.hadoop.mapreduce.JobStatus submitJob(
@@ -686,7 +689,7 @@ public class LocalJobRunner implements C
    */
   public TaskTrackerInfo[] getActiveTrackers() 
       throws IOException, InterruptedException {
-    return null;
+    return new TaskTrackerInfo[0];
   }
 
   /** 
@@ -695,7 +698,7 @@ public class LocalJobRunner implements C
    */
   public TaskTrackerInfo[] getBlacklistedTrackers() 
       throws IOException, InterruptedException {
-    return null;
+    return new TaskTrackerInfo[0];
   }
 
   public TaskCompletionEvent[] getTaskCompletionEvents(
@@ -740,10 +743,11 @@ public class LocalJobRunner implements C
         "/tmp/hadoop/mapred/staging"));
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     String user;
+    randid = rand.nextInt(Integer.MAX_VALUE);
     if (ugi != null) {
-      user = ugi.getShortUserName() + rand.nextInt();
+      user = ugi.getShortUserName() + randid;
     } else {
-      user = "dummy" + rand.nextInt();
+      user = "dummy" + randid;
     }
     return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/HSClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/HSClientProtocol.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/HSClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/HSClientProtocol.java Mon Jan 14 03:44:35 2013
@@ -18,5 +18,6 @@
 
 package org.apache.hadoop.mapreduce.v2.api;
 
+
 public interface HSClientProtocol extends MRClientProtocol {
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRClientProtocol.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRClientProtocol.java Mon Jan 14 03:44:35 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.net.InetSocketAddress;
 
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@@ -44,6 +46,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 
 public interface MRClientProtocol {
@@ -64,4 +68,24 @@ public interface MRClientProtocol {
   public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws YarnRemoteException;
   public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws YarnRemoteException;
   public GetDelegationTokenResponse getDelegationToken(GetDelegationTokenRequest request) throws YarnRemoteException;
+  
+  /**
+   * Renew an existing delegation token.
+   * 
+   * @param request the delegation token to be renewed.
+   * @return the new expiry time for the delegation token.
+   * @throws YarnRemoteException
+   */
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws YarnRemoteException;
+
+  /**
+   * Cancel an existing delegation token.
+   * 
+   * @param request the delegation token to be cancelled.
+   * @return an empty response.
+   * @throws YarnRemoteException
+   */
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws YarnRemoteException;
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRDelegationTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRDelegationTokenIdentifier.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRDelegationTokenIdentifier.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/MRDelegationTokenIdentifier.java Mon Jan 14 03:44:35 2013
@@ -20,6 +20,7 @@
 package org.apache.hadoop.mapreduce.v2.api;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -30,6 +31,8 @@ import org.apache.hadoop.security.token.
  * issued by JobHistoryServer to delegate
  * MR tasks talking to the JobHistoryServer.
  */
+@Private
+// TODO Move to a different package.
 public class MRDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
 
   public static final Text KIND_NAME = new Text("MR_DELEGATION_TOKEN");

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/impl/pb/client/MRClientProtocolPBClientImpl.java Mon Jan 14 03:44:35 2013
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapreduce.v2.api.impl.pb.client;
 
 import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
 import java.net.InetSocketAddress;
 
 import org.apache.hadoop.conf.Configuration;
@@ -27,6 +26,8 @@ import org.apache.hadoop.ipc.ProtobufRpc
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@@ -51,6 +52,10 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.CancelDelegationTokenRequestPBImpl;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.CancelDelegationTokenResponsePBImpl;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.FailTaskAttemptRequestPBImpl;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.FailTaskAttemptResponsePBImpl;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.GetCountersRequestPBImpl;
@@ -75,6 +80,9 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskAttemptResponsePBImpl;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskRequestPBImpl;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.KillTaskResponsePBImpl;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.RenewDelegationTokenRequestPBImpl;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.impl.pb.RenewDelegationTokenResponsePBImpl;
+import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.CancelDelegationTokenRequestProto;
 import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.FailTaskAttemptRequestProto;
 import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetCountersRequestProto;
 import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.GetDelegationTokenRequestProto;
@@ -87,6 +95,7 @@ import org.apache.hadoop.mapreduce.v2.pr
 import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillJobRequestProto;
 import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskAttemptRequestProto;
 import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskRequestProto;
+import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.RenewDelegationTokenRequestProto;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
 
@@ -242,5 +251,31 @@ public class MRClientProtocolPBClientImp
       throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
     }
   }
-  
+ 
+  @Override
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws YarnRemoteException {
+    RenewDelegationTokenRequestProto requestProto = 
+        ((RenewDelegationTokenRequestPBImpl) request).getProto();
+    try {
+      return new RenewDelegationTokenResponsePBImpl(proxy.renewDelegationToken(
+          null, requestProto));
+    } catch (ServiceException e) {
+      throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
+    }
+  }
+
+  @Override
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws YarnRemoteException {
+    CancelDelegationTokenRequestProto requestProto =
+        ((CancelDelegationTokenRequestPBImpl) request).getProto();
+    try {
+      return new CancelDelegationTokenResponsePBImpl(
+          proxy.cancelDelegationToken(null, requestProto));
+
+    } catch (ServiceException e) {
+      throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e);
+    }
+  }
 }



Mime
View raw message