hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1463203 [2/4] - in /hadoop/common/branches/HDFS-347/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop...
Date Mon, 01 Apr 2013 16:47:42 GMT
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Mon Apr  1 16:47:16 2013
@@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
@@ -1645,6 +1646,32 @@ public class TestRMContainerAllocator {
     Assert.assertTrue(callbackCalled.get());
   }
 
+  @Test
+  public void testCompletedContainerEvent() {
+    RMContainerAllocator allocator = new RMContainerAllocator(
+        mock(ClientService.class), mock(AppContext.class));
+    
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
+        MRBuilderUtils.newTaskId(
+            MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
+    ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
+    ContainerStatus status = BuilderUtils.newContainerStatus(
+        containerId, ContainerState.RUNNING, "", 0);
+
+    ContainerStatus abortedStatus = BuilderUtils.newContainerStatus(
+        containerId, ContainerState.RUNNING, "",
+        YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS);
+    
+    TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
+        attemptId);
+    Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+        event.getType());
+    
+    TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
+        abortedStatus, attemptId);
+    Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
+  }
+  
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Mon Apr  1 16:47:16 2013
@@ -900,6 +900,117 @@ public class TestRecovery {
 
   }
 
+  @Test(timeout=30000)
+  public void testRecoveryWithoutShuffleSecret() throws Exception {
+
+    int runCount = 0;
+    MRApp app = new MRAppNoShuffleSecret(2, 1, false,
+        this.getClass().getName(), true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask = it.next();
+
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+
+    //before sending the TA_DONE, event make sure attempt has come to
+    //RUNNING state
+    app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+
+    // reduces must be in NEW state
+    Assert.assertEquals("Reduce Task state not correct",
+        TaskState.RUNNING, reduceTask.getReport().getTaskState());
+
+    //send the done signal to the 1st map attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for first map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    //stop the app
+    app.stop();
+
+    //in recovery the 1st map should NOT be recovered from previous run
+    //since the shuffle secret was not provided with the job credentials
+    //and had to be rolled per app attempt
+    app = new MRAppNoShuffleSecret(2, 1, false,
+        this.getClass().getName(), false, ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask = it.next();
+
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to
+    //RUNNING state
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+
+    //send the done signal to the 2nd map task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask2.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait to get it completed
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    //verify first map task is still running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+
+    //send the done signal to the 2nd map task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask1.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait to get it completed
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    //wait for reduce to be running before sending done
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    //send the done signal to the reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduceTask.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+  }
+
   private void writeBadOutput(TaskAttempt attempt, Configuration conf)
   throws Exception {
   TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
@@ -1019,6 +1130,18 @@ public class TestRecovery {
     }
   }
 
+  static class MRAppNoShuffleSecret extends MRAppWithHistory {
+    public MRAppNoShuffleSecret(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart, int startCount) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
+    }
+
+    @Override
+    protected void downloadTokensAndSetupUGI(Configuration conf) {
+      // do NOT put a shuffle secret in the job credentials
+    }
+  }
+
   public static void main(String[] arg) throws Exception {
     TestRecovery test = new TestRecovery();
     test.testCrashed();

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Mon Apr  1 16:47:16 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
@@ -639,6 +640,11 @@ public class TestRuntimeEstimators {
     }
 
     @Override
+    public Phase getPhase() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
     public TaskAttemptState getState() {
       if (overridingState != null) {
         return overridingState;

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Mon Apr  1 16:47:16 2013
@@ -49,7 +49,6 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -93,10 +92,9 @@ import org.junit.Test;
      verify(fs).delete(stagingJobPath, true);
    }
    
-   @Test
+   @Test (timeout = 30000)
    public void testDeletionofStagingOnKill() throws IOException {
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
-     conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 4);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
      //Staging Dir exists
@@ -113,7 +111,7 @@ import org.junit.Test;
      JobId jobid = recordFactory.newRecordInstance(JobId.class);
      jobid.setAppId(appId);
      ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
-     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 4);
      appMaster.init(conf);
      //simulate the process being killed
      MRAppMaster.MRAppMasterShutdownHook hook = 
@@ -122,10 +120,9 @@ import org.junit.Test;
      verify(fs, times(0)).delete(stagingJobPath, true);
    }
    
-   @Test
+   @Test (timeout = 30000)
    public void testDeletionofStagingOnKillLastTry() throws IOException {
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir);
-     conf.setInt(YarnConfiguration.RM_AM_MAX_RETRIES, 1);
      fs = mock(FileSystem.class);
      when(fs.delete(any(Path.class), anyBoolean())).thenReturn(true);
      //Staging Dir exists
@@ -142,7 +139,8 @@ import org.junit.Test;
      JobId jobid = recordFactory.newRecordInstance(JobId.class);
      jobid.setAppId(appId);
      ContainerAllocator mockAlloc = mock(ContainerAllocator.class);
-     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc);
+     MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc,
+         MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
      appMaster.init(conf);
      //simulate the process being killed
      MRAppMaster.MRAppMasterShutdownHook hook = 
@@ -155,15 +153,16 @@ import org.junit.Test;
      ContainerAllocator allocator;
 
      public TestMRApp(ApplicationAttemptId applicationAttemptId, 
-         ContainerAllocator allocator) {
+         ContainerAllocator allocator, int maxAppAttempts) {
        super(applicationAttemptId, BuilderUtils.newContainerId(
-           applicationAttemptId, 1), "testhost", 2222, 3333, System
-           .currentTimeMillis());
+           applicationAttemptId, 1), "testhost", 2222, 3333,
+           System.currentTimeMillis(), maxAppAttempts);
        this.allocator = allocator;
      }
 
      public TestMRApp(ApplicationAttemptId applicationAttemptId) {
-       this(applicationAttemptId, null);
+       this(applicationAttemptId, null,
+           MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS);
      }
 
      @Override

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Mon Apr  1 16:47:16 2013
@@ -491,7 +491,7 @@ public class TestJobImpl {
     MRAppMetrics mrAppMetrics = MRAppMetrics.create();
     JobImpl job = new JobImpl(jobId, Records
         .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
-        null, mock(JobTokenSecretManager.class), null, null, null,
+        null, new JobTokenSecretManager(), new Credentials(), null, null,
         mrAppMetrics, true, null, 0, null, null, null, null);
     InitTransition initTransition = getInitTransition(2);
     JobEvent mockJobEvent = mock(JobEvent.class);

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java Mon Apr  1 16:47:16 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.junit.Test;
 
@@ -37,7 +38,7 @@ public class TestMapReduceChildJVM {
 
   private static final Log LOG = LogFactory.getLog(TestMapReduceChildJVM.class);
 
-  @Test
+  @Test (timeout = 30000)
   public void testCommandLine() throws Exception {
 
     MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
@@ -46,10 +47,10 @@ public class TestMapReduceChildJVM {
     app.verifyCompleted();
 
     Assert.assertEquals(
-      "[exec $JAVA_HOME/bin/java" +
+      "[" + envVar("JAVA_HOME") + "/bin/java" +
       " -Djava.net.preferIPv4Stack=true" +
       " -Dhadoop.metrics.log.level=WARN" +
-      "  -Xmx200m -Djava.io.tmpdir=$PWD/tmp" +
+      "  -Xmx200m -Djava.io.tmpdir=" + envVar("PWD") + "/tmp" +
       " -Dlog4j.configuration=container-log4j.properties" +
       " -Dyarn.app.mapreduce.container.log.dir=<LOG_DIR>" +
       " -Dyarn.app.mapreduce.container.log.filesize=0" +
@@ -88,4 +89,16 @@ public class TestMapReduceChildJVM {
       };
     }
   }
+
+  /**
+   * Returns platform-specific string for retrieving the value of an environment
+   * variable with the given name.  On Unix, this returns $name.  On Windows,
+   * this returns %name%.
+   * 
+   * @param name String environment variable name
+   * @return String for retrieving value of environment variable
+   */
+  private static String envVar(String name) {
+    return Shell.WINDOWS ? '%' + name + '%' : '$' + name;
+  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Mon Apr  1 16:47:16 2013
@@ -491,8 +491,26 @@ public class TestTaskImpl {
     assert(mockTask.getProgress() == progress);
         
   }
+
   
   @Test
+  public void testKillDuringTaskAttemptCommit() {
+    mockTask = createMockTask(TaskType.REDUCE);        
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.COMMIT_PENDING);
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+
+    TaskAttemptId commitAttempt = getLastAttempt().getAttemptId();
+    updateLastAttemptState(TaskAttemptState.KILLED);
+    killRunningTaskAttempt(commitAttempt);
+
+    assertFalse(mockTask.canCommit(commitAttempt));
+  }
+
+  @Test
   public void testFailureDuringTaskAttemptCommit() {
     mockTask = createMockTask(TaskType.MAP);        
     TaskId taskId = getNewTaskID();

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java Mon Apr  1 16:47:16 2013
@@ -37,9 +37,7 @@ public class LocalClientProtocolProvider
     if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
       return null;
     }
-    if (conf.get("mapreduce.job.maps") == null) {
-      conf.setInt("mapreduce.job.maps", 1);
-    }
+    conf.setInt(JobContext.NUM_MAPS, 1);
 
     return new LocalJobRunner(conf);
   }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Mon Apr  1 16:47:16 2013
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -39,6 +40,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -69,31 +72,35 @@ public class TestMRApps {
   }
   
   private static void delete(File dir) throws IOException {
-    Path p = new Path("file://"+dir.getAbsolutePath());
     Configuration conf = new Configuration();
-    FileSystem fs = p.getFileSystem(conf);
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path p = fs.makeQualified(new Path(dir.getAbsolutePath()));
     fs.delete(p, true);
   }
 
-  @Test public void testJobIDtoString() {
+  @Test (timeout = 120000)
+  public void testJobIDtoString() {
     JobId jid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class);
     jid.setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
     assertEquals("job_0_0000", MRApps.toString(jid));
   }
 
-  @Test public void testToJobID() {
+  @Test (timeout = 120000)
+  public void testToJobID() {
     JobId jid = MRApps.toJobID("job_1_1");
     assertEquals(1, jid.getAppId().getClusterTimestamp());
     assertEquals(1, jid.getAppId().getId());
     assertEquals(1, jid.getId()); // tests against some proto.id and not a job.id field
   }
 
-  @Test(expected=IllegalArgumentException.class) public void testJobIDShort() {
+  @Test (timeout = 120000, expected=IllegalArgumentException.class)
+  public void testJobIDShort() {
     MRApps.toJobID("job_0_0_0");
   }
 
   //TODO_get.set
-  @Test public void testTaskIDtoString() {
+  @Test (timeout = 120000)
+  public void testTaskIDtoString() {
     TaskId tid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class);
     tid.setJobId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class));
     tid.getJobId().setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
@@ -108,7 +115,8 @@ public class TestMRApps {
     assertEquals("task_0_0000_r_000000", MRApps.toString(tid));
   }
 
-  @Test public void testToTaskID() {
+  @Test (timeout = 120000)
+  public void testToTaskID() {
     TaskId tid = MRApps.toTaskID("task_1_2_r_3");
     assertEquals(1, tid.getJobId().getAppId().getClusterTimestamp());
     assertEquals(2, tid.getJobId().getAppId().getId());
@@ -120,16 +128,19 @@ public class TestMRApps {
     assertEquals(TaskType.MAP, tid.getTaskType());
   }
 
-  @Test(expected=IllegalArgumentException.class) public void testTaskIDShort() {
+  @Test(timeout = 120000, expected=IllegalArgumentException.class) 
+  public void testTaskIDShort() {
     MRApps.toTaskID("task_0_0000_m");
   }
 
-  @Test(expected=IllegalArgumentException.class) public void testTaskIDBadType() {
+  @Test(timeout = 120000, expected=IllegalArgumentException.class) 
+  public void testTaskIDBadType() {
     MRApps.toTaskID("task_0_0000_x_000000");
   }
 
   //TODO_get.set
-  @Test public void testTaskAttemptIDtoString() {
+  @Test (timeout = 120000)
+  public void testTaskAttemptIDtoString() {
     TaskAttemptId taid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptId.class);
     taid.setTaskId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class));
     taid.getTaskId().setTaskType(TaskType.MAP);
@@ -138,7 +149,8 @@ public class TestMRApps {
     assertEquals("attempt_0_0000_m_000000_0", MRApps.toString(taid));
   }
 
-  @Test public void testToTaskAttemptID() {
+  @Test (timeout = 120000)
+  public void testToTaskAttemptID() {
     TaskAttemptId taid = MRApps.toTaskAttemptID("attempt_0_1_m_2_3");
     assertEquals(0, taid.getTaskId().getJobId().getAppId().getClusterTimestamp());
     assertEquals(1, taid.getTaskId().getJobId().getAppId().getId());
@@ -147,11 +159,13 @@ public class TestMRApps {
     assertEquals(3, taid.getId());
   }
 
-  @Test(expected=IllegalArgumentException.class) public void testTaskAttemptIDShort() {
+  @Test(timeout = 120000, expected=IllegalArgumentException.class) 
+  public void testTaskAttemptIDShort() {
     MRApps.toTaskAttemptID("attempt_0_0_0_m_0");
   }
 
-  @Test public void testGetJobFileWithUser() {
+  @Test (timeout = 120000)
+  public void testGetJobFileWithUser() {
     Configuration conf = new Configuration();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/my/path/to/staging");
     String jobFile = MRApps.getJobFile(conf, "dummy-user", 
@@ -161,49 +175,57 @@ public class TestMRApps {
         "/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile);
   }
 
-  @Test public void testSetClasspath() throws IOException {
+  @Test (timeout = 120000)
+  public void testSetClasspath() throws IOException {
     Job job = Job.getInstance();
     Map<String, String> environment = new HashMap<String, String>();
     MRApps.setClasspath(environment, job.getConfiguration());
-    assertTrue(environment.get("CLASSPATH").startsWith("$PWD:"));
+    assertTrue(environment.get("CLASSPATH").startsWith(
+      ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
     String yarnAppClasspath = 
         job.getConfiguration().get(
             YarnConfiguration.YARN_APPLICATION_CLASSPATH);
     if (yarnAppClasspath != null) {
-      yarnAppClasspath = yarnAppClasspath.replaceAll(",\\s*", ":").trim();
+      yarnAppClasspath = yarnAppClasspath.replaceAll(",\\s*", File.pathSeparator)
+        .trim();
     }
     assertTrue(environment.get("CLASSPATH").contains(yarnAppClasspath));
     String mrAppClasspath = 
         job.getConfiguration().get(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH);
     if (mrAppClasspath != null) {
-      mrAppClasspath = mrAppClasspath.replaceAll(",\\s*", ":").trim();
+      mrAppClasspath = mrAppClasspath.replaceAll(",\\s*", File.pathSeparator)
+        .trim();
     }
     assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath));
   }
   
-  @Test public void testSetClasspathWithArchives () throws IOException {
+  @Test (timeout = 120000)
+  public void testSetClasspathWithArchives () throws IOException {
     File testTGZ = new File(testWorkDir, "test.tgz");
     FileOutputStream out = new FileOutputStream(testTGZ);
     out.write(0);
     out.close();
     Job job = Job.getInstance();
     Configuration conf = job.getConfiguration();
-    conf.set(MRJobConfig.CLASSPATH_ARCHIVES, "file://" 
-        + testTGZ.getAbsolutePath());
-    conf.set(MRJobConfig.CACHE_ARCHIVES, "file://"
-        + testTGZ.getAbsolutePath() + "#testTGZ");
+    String testTGZQualifiedPath = FileSystem.getLocal(conf).makeQualified(new Path(
+      testTGZ.getAbsolutePath())).toString();
+    conf.set(MRJobConfig.CLASSPATH_ARCHIVES, testTGZQualifiedPath);
+    conf.set(MRJobConfig.CACHE_ARCHIVES, testTGZQualifiedPath + "#testTGZ");
     Map<String, String> environment = new HashMap<String, String>();
     MRApps.setClasspath(environment, conf);
-    assertTrue(environment.get("CLASSPATH").startsWith("$PWD:"));
+    assertTrue(environment.get("CLASSPATH").startsWith(
+      ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
     String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
     if (confClasspath != null) {
-      confClasspath = confClasspath.replaceAll(",\\s*", ":").trim();
+      confClasspath = confClasspath.replaceAll(",\\s*", File.pathSeparator)
+        .trim();
     }
     assertTrue(environment.get("CLASSPATH").contains(confClasspath));
     assertTrue(environment.get("CLASSPATH").contains("testTGZ"));
   }
 
- @Test public void testSetClasspathWithUserPrecendence() {
+ @Test (timeout = 120000)
+ public void testSetClasspathWithUserPrecendence() {
     Configuration conf = new Configuration();
     conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
     Map<String, String> env = new HashMap<String, String>();
@@ -213,11 +235,16 @@ public class TestMRApps {
       fail("Got exception while setting classpath");
     }
     String env_str = env.get("CLASSPATH");
-    assertSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
-      env_str.indexOf("$PWD:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*"), 0);
+    String expectedClasspath = StringUtils.join(File.pathSeparator,
+      Arrays.asList(ApplicationConstants.Environment.PWD.$(), "job.jar/job.jar",
+        "job.jar/classes/", "job.jar/lib/*",
+        ApplicationConstants.Environment.PWD.$() + "/*"));
+    assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
+      env_str.startsWith(expectedClasspath));
   }
 
-  @Test public void testSetClasspathWithNoUserPrecendence() {
+  @Test (timeout = 120000)
+  public void testSetClasspathWithNoUserPrecendence() {
     Configuration conf = new Configuration();
     conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
     Map<String, String> env = new HashMap<String, String>();
@@ -227,31 +254,36 @@ public class TestMRApps {
       fail("Got exception while setting classpath");
     }
     String env_str = env.get("CLASSPATH");
-    int index = 
-         env_str.indexOf("job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*");
-    assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not"
-            + " in the classpath!", index, -1);
-    assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
-      index, 0);
+    String expectedClasspath = StringUtils.join(File.pathSeparator,
+      Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
+        ApplicationConstants.Environment.PWD.$() + "/*"));
+    assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in"
+      + " the classpath!", env_str.contains(expectedClasspath));
+    assertFalse("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
+      env_str.startsWith(expectedClasspath));
   }
   
-  @Test public void testSetClasspathWithJobClassloader() throws IOException {
+  @Test (timeout = 120000)
+  public void testSetClasspathWithJobClassloader() throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
     Map<String, String> env = new HashMap<String, String>();
     MRApps.setClasspath(env, conf);
     String cp = env.get("CLASSPATH");
     String appCp = env.get("APP_CLASSPATH");
-    assertSame("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is"
-        + " in the classpath!", cp.indexOf("jar:job"), -1);
-    assertSame("MAPREDUCE_JOB_CLASSLOADER true, but PWD is"
-        + " in the classpath!", cp.indexOf("PWD"), -1);
-    assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not"
-        + " in the app classpath!",
-        "$PWD:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*", appCp);
+    assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is in the"
+      + " classpath!", cp.contains("jar" + File.pathSeparator + "job"));
+    assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!",
+      cp.contains("PWD"));
+    String expectedAppClasspath = StringUtils.join(File.pathSeparator,
+      Arrays.asList(ApplicationConstants.Environment.PWD.$(), "job.jar/job.jar",
+        "job.jar/classes/", "job.jar/lib/*",
+        ApplicationConstants.Environment.PWD.$() + "/*"));
+    assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app"
+      + " classpath!", expectedAppClasspath, appCp);
   }
   
-  @Test
+  @Test (timeout = 30000)
   public void testSetupDistributedCacheEmpty() throws IOException {
     Configuration conf = new Configuration();
     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
@@ -261,7 +293,7 @@ public class TestMRApps {
   }
   
   @SuppressWarnings("deprecation")
-  @Test(expected = InvalidJobConfException.class)
+  @Test(timeout = 120000, expected = InvalidJobConfException.class)
   public void testSetupDistributedCacheConflicts() throws Exception {
     Configuration conf = new Configuration();
     conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
@@ -292,7 +324,7 @@ public class TestMRApps {
   }
   
   @SuppressWarnings("deprecation")
-  @Test(expected = InvalidJobConfException.class)
+  @Test(timeout = 120000, expected = InvalidJobConfException.class)
   public void testSetupDistributedCacheConflictsFiles() throws Exception {
     Configuration conf = new Configuration();
     conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
@@ -320,7 +352,7 @@ public class TestMRApps {
   }
   
   @SuppressWarnings("deprecation")
-  @Test
+  @Test (timeout = 30000)
   public void testSetupDistributedCache() throws Exception {
     Configuration conf = new Configuration();
     conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Mon Apr  1 16:47:16 2013
@@ -212,6 +212,7 @@
           {"name": "rackname", "type": "string"},
           {"name": "status", "type": "string"},
           {"name": "error", "type": "string"},
+          {"name": "counters", "type": "JhCounters"},
           {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
           {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
           {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
@@ -226,7 +227,8 @@
           {"name": "finishTime", "type": "long"},
           {"name": "error", "type": "string"},
           {"name": "failedDueToAttempt", "type": ["null", "string"] },
-          {"name": "status", "type": "string"}
+          {"name": "status", "type": "string"},
+          {"name": "counters", "type": "JhCounters"}
       ]
      },
 

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Mon Apr  1 16:47:16 2013
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.ClassUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.log4j.Level;
@@ -453,7 +454,7 @@ public class JobConf extends Configurati
    * @param cls the example class.
    */
   public void setJarByClass(Class cls) {
-    String jar = findContainingJar(cls);
+    String jar = ClassUtil.findContainingJar(cls);
     if (jar != null) {
       setJar(jar);
     }   
@@ -1811,7 +1812,7 @@ public class JobConf extends Configurati
     return 
     (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
   }
-  
+
   /** 
    * Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing
@@ -1822,35 +1823,9 @@ public class JobConf extends Configurati
    * @throws IOException
    */
   public static String findContainingJar(Class my_class) {
-    ClassLoader loader = my_class.getClassLoader();
-    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
-    try {
-      for(Enumeration itr = loader.getResources(class_file);
-          itr.hasMoreElements();) {
-        URL url = (URL) itr.nextElement();
-        if ("jar".equals(url.getProtocol())) {
-          String toReturn = url.getPath();
-          if (toReturn.startsWith("file:")) {
-            toReturn = toReturn.substring("file:".length());
-          }
-          // URLDecoder is a misnamed class, since it actually decodes
-          // x-www-form-urlencoded MIME type rather than actual
-          // URL encoding (which the file path has). Therefore it would
-          // decode +s to ' 's which is incorrect (spaces are actually
-          // either unencoded or encoded as "%20"). Replace +s first, so
-          // that they are kept sacred during the decoding process.
-          toReturn = toReturn.replaceAll("\\+", "%2B");
-          toReturn = URLDecoder.decode(toReturn, "UTF-8");
-          return toReturn.replaceAll("!.*$", "");
-        }
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return null;
+    return ClassUtil.findContainingJar(my_class);
   }
 
-
   /**
    * Get the memory required to run a task of this job, in bytes. See
    * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java Mon Apr  1 16:47:16 2013
@@ -19,8 +19,6 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 
@@ -39,62 +37,7 @@ public class JobEndNotifier {
   private static final Log LOG =
     LogFactory.getLog(JobEndNotifier.class.getName());
 
-  private static Thread thread;
-  private static volatile boolean running;
-  private static BlockingQueue<JobEndStatusInfo> queue =
-    new DelayQueue<JobEndStatusInfo>();
-
-  public static void startNotifier() {
-    running = true;
-    thread = new Thread(
-                        new Runnable() {
-                          public void run() {
-                            try {
-                              while (running) {
-                                sendNotification(queue.take());
-                              }
-                            }
-                            catch (InterruptedException irex) {
-                              if (running) {
-                                LOG.error("Thread has ended unexpectedly", irex);
-                              }
-                            }
-                          }
-
-                          private void sendNotification(JobEndStatusInfo notification) {
-                            try {
-                              int code = httpNotification(notification.getUri());
-                              if (code != 200) {
-                                throw new IOException("Invalid response status code: " + code);
-                              }
-                            }
-                            catch (IOException ioex) {
-                              LOG.error("Notification failure [" + notification + "]", ioex);
-                              if (notification.configureForRetry()) {
-                                try {
-                                  queue.put(notification);
-                                }
-                                catch (InterruptedException iex) {
-                                  LOG.error("Notification queuing error [" + notification + "]",
-                                            iex);
-                                }
-                              }
-                            }
-                            catch (Exception ex) {
-                              LOG.error("Notification failure [" + notification + "]", ex);
-                            }
-                          }
-
-                        }
-
-                        );
-    thread.start();
-  }
-
-  public static void stopNotifier() {
-    running = false;
-    thread.interrupt();
-  }
+ 
 
   private static JobEndStatusInfo createNotification(JobConf conf,
                                                      JobStatus status) {
@@ -118,18 +61,6 @@ public class JobEndNotifier {
     return notification;
   }
 
-  public static void registerNotification(JobConf jobConf, JobStatus status) {
-    JobEndStatusInfo notification = createNotification(jobConf, status);
-    if (notification != null) {
-      try {
-        queue.put(notification);
-      }
-      catch (InterruptedException iex) {
-        LOG.error("Notification queuing failure [" + notification + "]", iex);
-      }
-    }
-  }
-
   private static int httpNotification(String uri) throws IOException {
     URI url = new URI(uri, false);
     HttpClient m_client = new HttpClient();
@@ -194,10 +125,6 @@ public class JobEndNotifier {
       return retryInterval;
     }
 
-    public long getDelayTime() {
-      return delayTime;
-    }
-
     public boolean configureForRetry() {
       boolean retry = false;
       if (getRetryAttempts() > 0) {

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueInfo.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueInfo.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueInfo.java Mon Apr  1 16:47:16 2013
@@ -67,7 +67,8 @@ public class JobQueueInfo extends QueueI
    * 
    * @param queueName Name of the job queue.
    */
-  protected void setQueueName(String queueName) {
+  @InterfaceAudience.Private
+  public void setQueueName(String queueName) {
     super.setQueueName(queueName);
   }
 
@@ -76,7 +77,8 @@ public class JobQueueInfo extends QueueI
    * 
    * @param schedulingInfo
    */
-  protected void setSchedulingInfo(String schedulingInfo) {
+  @InterfaceAudience.Private
+  public void setSchedulingInfo(String schedulingInfo) {
     super.setSchedulingInfo(schedulingInfo);
   }
 
@@ -84,15 +86,21 @@ public class JobQueueInfo extends QueueI
    * Set the state of the queue
    * @param state state of the queue.
    */
-  protected void setQueueState(String state) {
+  @InterfaceAudience.Private
+  public void setQueueState(String state) {
     super.setState(QueueState.getState(state));
   }
   
-  String getQueueState() {
+  /**
+   * Use getState() instead
+   */
+  @Deprecated
+  public String getQueueState() {
     return super.getState().toString();
   }
   
-  protected void setChildren(List<JobQueueInfo> children) {
+  @InterfaceAudience.Private
+  public void setChildren(List<JobQueueInfo> children) {
     List<QueueInfo> list = new ArrayList<QueueInfo>();
     for (JobQueueInfo q : children) {
       list.add(q);
@@ -108,7 +116,8 @@ public class JobQueueInfo extends QueueI
     return list;
   }
 
-  protected void setProperties(Properties props) {
+  @InterfaceAudience.Private
+  public void setProperties(Properties props) {
     super.setProperties(props);
   }
 
@@ -141,7 +150,8 @@ public class JobQueueInfo extends QueueI
     setChildren(children);
   }
 
-  protected void setJobStatuses(org.apache.hadoop.mapreduce.JobStatus[] stats) {
+  @InterfaceAudience.Private
+  public void setJobStatuses(org.apache.hadoop.mapreduce.JobStatus[] stats) {
     super.setJobStatuses(stats);
   }
 

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java Mon Apr  1 16:47:16 2013
@@ -77,6 +77,59 @@ public class JobStatus extends org.apach
    */
   public JobStatus() {
   }
+  
+  @Deprecated
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+      float cleanupProgress, int runState) {
+    this(jobid, mapProgress, reduceProgress, cleanupProgress, runState, null,
+        null, null, null);
+  }
+
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param runState The current state of the job
+   */
+  @Deprecated
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+      int runState) {
+    this (jobid, mapProgress, reduceProgress, runState, null, null, null, null);
+  }
+
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param runState The current state of the job
+   * @param jp Priority of the job.
+   */
+  @Deprecated
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+      float cleanupProgress, int runState, JobPriority jp) {
+    this(jobid, mapProgress, reduceProgress, cleanupProgress, runState, jp,
+        null, null, null, null);
+  }
+
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param setupProgress The progress made on the setup
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param cleanupProgress The progress made on the cleanup
+   * @param runState The current state of the job
+   * @param jp Priority of the job.
+   */
+  @Deprecated
+  public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+      float reduceProgress, float cleanupProgress, 
+      int runState, JobPriority jp) {
+    this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
+        runState, jp, null, null, null, null);
+  }
 
   /**
    * Create a job status object for a given jobid.

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Mon Apr  1 16:47:16 2013
@@ -169,7 +169,7 @@ public class Merger {  
   }
 
 
-  static <K extends Object, V extends Object>
+  public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
                           Class<K> keyClass, Class<V> valueClass,
                           CompressionCodec codec,

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueConfigurationParser.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueConfigurationParser.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueConfigurationParser.java Mon Apr  1 16:47:16 2013
@@ -449,7 +449,7 @@ class QueueConfigurationParser {
     q.appendChild(propsElement);
 
     // Queue-state
-    String queueState = jqi.getQueueState();
+    String queueState = jqi.getState().getStateName();
     if (queueState != null
         && !queueState.equals(QueueState.UNDEFINED.getStateName())) {
       Element qStateElement = document.createElement(STATE_TAG);

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueManager.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueManager.java Mon Apr  1 16:47:16 2013
@@ -40,7 +40,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.List;
 import java.net.URL;
 
 
@@ -487,73 +486,8 @@ public class QueueManager {
       new QueueAclsInfo[queueAclsInfolist.size()]);
   }
 
-  /**
-   * ONLY FOR TESTING - Do not use in production code.
-   * This method is used for setting up of leafQueues only.
-   * We are not setting the hierarchy here.
-   *
-   * @param queues
-   */
-  synchronized void setQueues(Queue[] queues) {
-    root.getChildren().clear();
-    leafQueues.clear();
-    allQueues.clear();
-
-    for (Queue queue : queues) {
-      root.addChild(queue);
-    }
-    //At this point we have root populated
-    //update data structures leafNodes.
-    leafQueues = getRoot().getLeafQueues();
-    allQueues.putAll(getRoot().getInnerQueues());
-    allQueues.putAll(leafQueues);
-  }
-
-  /**
-   * Return an array of {@link JobQueueInfo} objects for the root
-   * queues configured in the system.
-   * <p/>
-   * Root queues are queues that are at the top-most level in the
-   * hierarchy of queues in mapred-queues.xml, or they are the queues
-   * configured in the mapred.queue.names key in mapred-site.xml.
-   *
-   * @return array of JobQueueInfo objects for root level queues.
-   */
-
-  JobQueueInfo[] getRootQueues() {
-    List<JobQueueInfo> list = getRoot().getJobQueueInfo().getChildren();
-    return list.toArray(new JobQueueInfo[list.size()]);
-  }
-
-  /**
-   * Get the complete hierarchy of children for queue
-   * queueName
-   *
-   * @param queueName
-   * @return
-   */
-  JobQueueInfo[] getChildQueues(String queueName) {
-    List<JobQueueInfo> list =
-      allQueues.get(queueName).getJobQueueInfo().getChildren();
-    if (list != null) {
-      return list.toArray(new JobQueueInfo[list.size()]);
-    } else {
-      return new JobQueueInfo[0];
-    }
-  }
-
-  /**
-   * Used only for testing purposes .
-   * This method is unstable as refreshQueues would leave this
-   * data structure in unstable state.
-   *
-   * @param queueName
-   * @return
-   */
-  Queue getQueue(String queueName) {
-    return this.allQueues.get(queueName);
-  }
-
+ 
+ 
 
   /**
    * Return if ACLs are enabled for the Map/Reduce system
@@ -573,29 +507,7 @@ public class QueueManager {
     return root;
   }
 
-  /**
-   * Returns the specific queue ACL for the given queue.
-   * Returns null if the given queue does not exist or the acl is not
-   * configured for that queue.
-   * If acls are disabled(mapreduce.cluster.acls.enabled set to false), returns
-   * ACL with all users.
-   */
-  synchronized AccessControlList getQueueACL(String queueName,
-      QueueACL qACL) {
-    if (areAclsEnabled) {
-      Queue q = leafQueues.get(queueName);
-      if (q != null) {
-        return q.getAcls().get(toFullPropertyName(
-          queueName, qACL.getAclName()));
-      }
-      else {
-        LOG.warn("Queue " + queueName + " is not present.");
-        return null;
-      }
-    }
-    return new AccessControlList("*");
-  }
-
+  
   /**
    * Dumps the configuration of hierarchy of queues
    * @param out the writer object to which dump is written

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Mon Apr  1 16:47:16 2013
@@ -185,6 +185,7 @@ abstract public class Task implements Wr
   private int numSlotsRequired;
   protected TaskUmbilicalProtocol umbilical;
   protected SecretKey tokenSecret;
+  protected SecretKey shuffleSecret;
   protected GcTimeUpdater gcUpdater;
 
   ////////////////////////////////////////////
@@ -261,7 +262,22 @@ abstract public class Task implements Wr
     return this.tokenSecret;
   }
 
-  
+  /**
+   * Set the secret key used to authenticate the shuffle
+   * @param shuffleSecret the secret
+   */
+  public void setShuffleSecret(SecretKey shuffleSecret) {
+    this.shuffleSecret = shuffleSecret;
+  }
+
+  /**
+   * Get the secret key used to authenticate the shuffle
+   * @return the shuffle secret
+   */
+  public SecretKey getShuffleSecret() {
+    return this.shuffleSecret;
+  }
+
   /**
    * Get the index of this task within the job.
    * @return the integer part of the task id

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Mon Apr  1 16:47:16 2013
@@ -386,73 +386,6 @@ public class TaskLog {
     return conf.getLong(JobContext.TASK_USERLOG_LIMIT, 0) * 1024;
   }
 
-  /**
-   * Wrap a command in a shell to capture stdout and stderr to files.
-   * If the tailLength is 0, the entire output will be saved.
-   * @param cmd The command and the arguments that should be run
-   * @param stdoutFilename The filename that stdout should be saved to
-   * @param stderrFilename The filename that stderr should be saved to
-   * @param tailLength The length of the tail to be saved.
-   * @return the modified command that should be run
-   */
-  public static List<String> captureOutAndError(List<String> cmd, 
-                                                File stdoutFilename,
-                                                File stderrFilename,
-                                                long tailLength
-                                               ) throws IOException {
-    return captureOutAndError(null, cmd, stdoutFilename,
-                              stderrFilename, tailLength, false);
-  }
-
-  /**
-   * Wrap a command in a shell to capture stdout and stderr to files.
-   * Setup commands such as setting memory limit can be passed which 
-   * will be executed before exec.
-   * If the tailLength is 0, the entire output will be saved.
-   * @param setup The setup commands for the execed process.
-   * @param cmd The command and the arguments that should be run
-   * @param stdoutFilename The filename that stdout should be saved to
-   * @param stderrFilename The filename that stderr should be saved to
-   * @param tailLength The length of the tail to be saved.
-   * @return the modified command that should be run
-   */
-  public static List<String> captureOutAndError(List<String> setup,
-                                                List<String> cmd, 
-                                                File stdoutFilename,
-                                                File stderrFilename,
-                                                long tailLength
-                                               ) throws IOException {
-    return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
-                              tailLength, false);
-  }
-
-  /**
-   * Wrap a command in a shell to capture stdout and stderr to files.
-   * Setup commands such as setting memory limit can be passed which 
-   * will be executed before exec.
-   * If the tailLength is 0, the entire output will be saved.
-   * @param setup The setup commands for the execed process.
-   * @param cmd The command and the arguments that should be run
-   * @param stdoutFilename The filename that stdout should be saved to
-   * @param stderrFilename The filename that stderr should be saved to
-   * @param tailLength The length of the tail to be saved.
-   * @param pidFileName The name of the pid-file. pid-file's usage is deprecated
-   * @return the modified command that should be run
-   * 
-   * @deprecated     pidFiles are no more used. Instead pid is exported to
-   *                 env variable JVM_PID.
-   */
-  @Deprecated
-  public static List<String> captureOutAndError(List<String> setup,
-                                                List<String> cmd, 
-                                                File stdoutFilename,
-                                                File stderrFilename,
-                                                long tailLength,
-                                                String pidFileName
-                                               ) throws IOException {
-    return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
-        tailLength, false);
-  }
   
   /**
    * Wrap a command in a shell to capture stdout and stderr to files.
@@ -607,25 +540,6 @@ public class TaskLog {
     return command.toString();
   }
   
-  /**
-   * Wrap a command in a shell to capture debug script's 
-   * stdout and stderr to debugout.
-   * @param cmd The command and the arguments that should be run
-   * @param debugoutFilename The filename that stdout and stderr
-   *  should be saved to.
-   * @return the modified command that should be run
-   * @throws IOException
-   */
-  public static List<String> captureDebugOut(List<String> cmd, 
-                                             File debugoutFilename
-                                            ) throws IOException {
-    String debugout = FileUtil.makeShellPath(debugoutFilename);
-    List<String> result = new ArrayList<String>(3);
-    result.add(bashCommand);
-    result.add("-c");
-    result.add(buildDebugScriptCommandLine(cmd, debugout));
-    return result;
-  }
   
   /**
    * Method to return the location of user log directory.

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java Mon Apr  1 16:47:16 2013
@@ -523,17 +523,5 @@ public abstract class TaskStatus impleme
     return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus();
   }
 
-  static TaskStatus readTaskStatus(DataInput in) throws IOException {
-    boolean isMap = in.readBoolean();
-    TaskStatus taskStatus = createTaskStatus(isMap);
-    taskStatus.readFields(in);
-    return taskStatus;
-  }
-  
-  static void writeTaskStatus(DataOutput out, TaskStatus taskStatus) 
-  throws IOException {
-    out.writeBoolean(taskStatus.getIsMap());
-    taskStatus.write(out);
-  }
 }
 

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/IdentityMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/IdentityMapper.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/IdentityMapper.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/IdentityMapper.java Mon Apr  1 16:47:16 2013
@@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.MapReduc
 public class IdentityMapper<K, V>
     extends MapReduceBase implements Mapper<K, V, K, V> {
 
-  /** The identify function.  Input key/value pair is written directly to
+  /** The identity function.  Input key/value pair is written directly to
    * output.*/
   public void map(K key, V val,
                   OutputCollector<K, V> output, Reporter reporter)

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java Mon Apr  1 16:47:16 2013
@@ -23,11 +23,15 @@ import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -62,6 +66,8 @@ import com.google.common.base.Charsets;
 @InterfaceStability.Unstable
 class JobSubmitter {
   protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
+  private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1";
+  private static final int SHUFFLE_KEY_LENGTH = 64;
   private FileSystem jtFs;
   private ClientProtocol submitClient;
   private String submitHostName;
@@ -359,6 +365,20 @@ class JobSubmitter {
       
       populateTokenCache(conf, job.getCredentials());
 
+      // generate a secret to authenticate shuffle transfers
+      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
+        KeyGenerator keyGen;
+        try {
+          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
+          keyGen.init(SHUFFLE_KEY_LENGTH);
+        } catch (NoSuchAlgorithmException e) {
+          throw new IOException("Error generating shuffle secret key", e);
+        }
+        SecretKey shuffleKey = keyGen.generateKey();
+        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
+            job.getCredentials());
+      }
+
       copyAndConfigureFiles(job, submitJobDir);
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
       

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Mon Apr  1 16:47:16 2013
@@ -63,6 +63,9 @@ public interface MRJobConfig {
 
   public static final String SPLIT_FILE = "mapreduce.job.splitfile";
 
+  public static final String SPLIT_METAINFO_MAXSIZE = "mapreduce.job.split.metainfo.maxsize";
+  public static final long DEFAULT_SPLIT_METAINFO_MAXSIZE = 10000000L;
+
   public static final String NUM_MAPS = "mapreduce.job.maps";
 
   public static final String MAX_TASK_FAILURES_PER_TRACKER = "mapreduce.job.maxtaskfailures.per.tracker";
@@ -660,5 +663,13 @@ public interface MRJobConfig {
   
   public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
       "^mapreduce\\.workflow\\.adjacency\\..+";
+
+  /**
+   * The maximum number of application attempts.
+   * It is a application-specific setting.
+   */
+  public static final String MR_AM_MAX_ATTEMPTS = "mapreduce.am.max-attempts";
+
+  public static final int DEFAULT_MR_AM_MAX_ATTEMPTS = 1;
   
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Mon Apr  1 16:47:16 2013
@@ -295,6 +295,7 @@ public class JobHistoryParser implements
     attemptInfo.shuffleFinishTime = event.getFinishTime();
     attemptInfo.sortFinishTime = event.getFinishTime();
     attemptInfo.mapFinishTime = event.getFinishTime();
+    attemptInfo.counters = event.getCounters();
     if(TaskStatus.State.SUCCEEDED.toString().equals(taskInfo.status))
     {
       //this is a successful task
@@ -347,6 +348,7 @@ public class JobHistoryParser implements
     taskInfo.finishTime = event.getFinishTime();
     taskInfo.error = StringInterner.weakIntern(event.getError());
     taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
+    taskInfo.counters = event.getCounters();
     info.errorInfo = "Task " + taskInfo.taskId +" failed " +
     taskInfo.attemptsMap.size() + " times ";
   }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java Mon Apr  1 16:47:16 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.jobh
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -36,8 +37,24 @@ import org.apache.avro.util.Utf8;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
-  private TaskAttemptUnsuccessfulCompletion datum =
-    new TaskAttemptUnsuccessfulCompletion();
+
+  private TaskAttemptUnsuccessfulCompletion datum = null;
+
+  private TaskAttemptID attemptId;
+  private TaskType taskType;
+  private String status;
+  private long finishTime;
+  private String hostname;
+  private int port;
+  private String rackName;
+  private String error;
+  private Counters counters;
+  int[][] allSplits;
+  int[] clockSplits;
+  int[] cpuUsages;
+  int[] vMemKbytes;
+  int[] physMemKbytes;
+  private static final Counters EMPTY_COUNTERS = new Counters();
 
   /** 
    * Create an event to record the unsuccessful completion of attempts
@@ -49,6 +66,7 @@ public class TaskAttemptUnsuccessfulComp
    * @param port rpc port for for the tracker
    * @param rackName Name of the rack where the attempt executed
    * @param error Error string
+   * @param counters Counters for the attempt
    * @param allSplits the "splits", or a pixelated graph of various
    *        measurable worker node state variables against progress.
    *        Currently there are four; wallclock time, CPU time,
@@ -58,31 +76,25 @@ public class TaskAttemptUnsuccessfulComp
        (TaskAttemptID id, TaskType taskType,
         String status, long finishTime,
         String hostname, int port, String rackName,
-        String error, int[][] allSplits) {
-    datum.taskid = new Utf8(id.getTaskID().toString());
-    datum.taskType = new Utf8(taskType.name());
-    datum.attemptId = new Utf8(id.toString());
-    datum.finishTime = finishTime;
-    datum.hostname = new Utf8(hostname);
-    if (rackName != null) {
-      datum.rackname = new Utf8(rackName);
-    }
-    datum.port = port;
-    datum.error = new Utf8(error);
-    datum.status = new Utf8(status);
-
-    datum.clockSplits 
-      = AvroArrayUtils.toAvro
-           (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
-    datum.cpuUsages 
-      = AvroArrayUtils.toAvro
-           (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
-    datum.vMemKbytes 
-      = AvroArrayUtils.toAvro
-           (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
-    datum.physMemKbytes 
-      = AvroArrayUtils.toAvro
-           (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+        String error, Counters counters, int[][] allSplits) {
+    this.attemptId = id;
+    this.taskType = taskType;
+    this.status = status;
+    this.finishTime = finishTime;
+    this.hostname = hostname;
+    this.port = port;
+    this.rackName = rackName;
+    this.error = error;
+    this.counters = counters;
+    this.allSplits = allSplits;
+    this.clockSplits =
+        ProgressSplitsBlock.arrayGetWallclockTime(allSplits);
+    this.cpuUsages =
+        ProgressSplitsBlock.arrayGetCPUTime(allSplits);
+    this.vMemKbytes =
+        ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
+    this.physMemKbytes =
+        ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
   }
 
   /** 
@@ -103,42 +115,109 @@ public class TaskAttemptUnsuccessfulComp
        (TaskAttemptID id, TaskType taskType,
         String status, long finishTime, 
         String hostname, String error) {
-    this(id, taskType, status, finishTime, hostname, -1, "", error, null);
+    this(id, taskType, status, finishTime, hostname, -1, "",
+        error, EMPTY_COUNTERS, null);
+  }
+  
+  public TaskAttemptUnsuccessfulCompletionEvent
+      (TaskAttemptID id, TaskType taskType,
+       String status, long finishTime,
+       String hostname, int port, String rackName,
+       String error, int[][] allSplits) {
+    this(id, taskType, status, finishTime, hostname, port,
+        rackName, error, EMPTY_COUNTERS, null);
   }
 
   TaskAttemptUnsuccessfulCompletionEvent() {}
 
-  public Object getDatum() { return datum; }
-  public void setDatum(Object datum) {
-    this.datum = (TaskAttemptUnsuccessfulCompletion)datum;
+  public Object getDatum() {
+    if(datum == null) {
+      datum = new TaskAttemptUnsuccessfulCompletion();
+      datum.taskid = new Utf8(attemptId.getTaskID().toString());
+      datum.taskType = new Utf8(taskType.name());
+      datum.attemptId = new Utf8(attemptId.toString());
+      datum.finishTime = finishTime;
+      datum.hostname = new Utf8(hostname);
+      if (rackName != null) {
+        datum.rackname = new Utf8(rackName);
+      }
+      datum.port = port;
+      datum.error = new Utf8(error);
+      datum.status = new Utf8(status);
+
+      datum.counters = EventWriter.toAvro(counters);
+
+      datum.clockSplits = AvroArrayUtils.toAvro(ProgressSplitsBlock
+          .arrayGetWallclockTime(allSplits));
+      datum.cpuUsages = AvroArrayUtils.toAvro(ProgressSplitsBlock
+          .arrayGetCPUTime(allSplits));
+      datum.vMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
+          .arrayGetVMemKbytes(allSplits));
+      datum.physMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
+          .arrayGetPhysMemKbytes(allSplits));
+    }
+    return datum;
+  }
+  
+  
+  
+  public void setDatum(Object odatum) {
+    this.datum =
+        (TaskAttemptUnsuccessfulCompletion)odatum;
+    this.attemptId =
+        TaskAttemptID.forName(datum.attemptId.toString());
+    this.taskType =
+        TaskType.valueOf(datum.taskType.toString());
+    this.finishTime = datum.finishTime;
+    this.hostname = datum.hostname.toString();
+    this.rackName = datum.rackname.toString();
+    this.port = datum.port;
+    this.status = datum.status.toString();
+    this.error = datum.error.toString();
+    this.counters =
+        EventReader.fromAvro(datum.counters);
+    this.clockSplits =
+        AvroArrayUtils.fromAvro(datum.clockSplits);
+    this.cpuUsages =
+        AvroArrayUtils.fromAvro(datum.cpuUsages);
+    this.vMemKbytes =
+        AvroArrayUtils.fromAvro(datum.vMemKbytes);
+    this.physMemKbytes =
+        AvroArrayUtils.fromAvro(datum.physMemKbytes);
   }
 
   /** Get the task id */
-  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
+  public TaskID getTaskId() {
+    return attemptId.getTaskID();
+  }
   /** Get the task type */
   public TaskType getTaskType() {
-    return TaskType.valueOf(datum.taskType.toString());
+    return TaskType.valueOf(taskType.toString());
   }
   /** Get the attempt id */
   public TaskAttemptID getTaskAttemptId() {
-    return TaskAttemptID.forName(datum.attemptId.toString());
+    return attemptId;
   }
   /** Get the finish time */
-  public long getFinishTime() { return datum.finishTime; }
+  public long getFinishTime() { return finishTime; }
   /** Get the name of the host where the attempt executed */
-  public String getHostname() { return datum.hostname.toString(); }
+  public String getHostname() { return hostname; }
   /** Get the rpc port for the host where the attempt executed */
-  public int getPort() { return datum.port; }
+  public int getPort() { return port; }
   
   /** Get the rack name of the node where the attempt ran */
   public String getRackName() {
-    return datum.rackname == null ? null : datum.rackname.toString();
+    return rackName == null ? null : rackName.toString();
   }
   
   /** Get the error string */
-  public String getError() { return datum.error.toString(); }
+  public String getError() { return error.toString(); }
   /** Get the task status */
-  public String getTaskStatus() { return datum.status.toString(); }
+  public String getTaskStatus() {
+    return status.toString();
+  }
+  /** Get the counters */
+  Counters getCounters() { return counters; }
   /** Get the event type */
   public EventType getEventType() {
     // Note that the task type can be setup/map/reduce/cleanup but the 
@@ -157,16 +236,16 @@ public class TaskAttemptUnsuccessfulComp
 
 
   public int[] getClockSplits() {
-    return AvroArrayUtils.fromAvro(datum.clockSplits);
+    return clockSplits;
   }
   public int[] getCpuUsages() {
-    return AvroArrayUtils.fromAvro(datum.cpuUsages);
+    return cpuUsages;
   }
   public int[] getVMemKbytes() {
-    return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+    return vMemKbytes;
   }
   public int[] getPhysMemKbytes() {
-    return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+    return physMemKbytes;
   }
 
 }



Mime
View raw message