hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r885145 [29/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Date Sat, 28 Nov 2009 20:26:22 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java Sat Nov 28 20:26:01 2009
@@ -38,10 +38,12 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.Job.RawSplit;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.StaticMapping;
@@ -75,8 +77,8 @@
     TestSetup setup = new TestSetup(new TestSuite(TestJobInProgress.class)) {
       protected void setUp() throws Exception {
         JobConf conf = new JobConf();
-        conf.set("mapred.job.tracker", "localhost:0");
-        conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+        conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+        conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
         conf.setClass("topology.node.switch.mapping.impl", 
             StaticMapping.class, DNSToSwitchMapping.class);
         jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers);
@@ -99,14 +101,14 @@
     }
 
     @Override
-    JobClient.RawSplit[] createSplits() {
+    Job.RawSplit[] createSplits() {
       // Set all splits to reside on one host. This will ensure that 
       // one tracker gets data local, one gets rack local and two others
       // get non-local maps
-      RawSplit[] splits = new RawSplit[numMapTasks];
+      Job.RawSplit[] splits = new Job.RawSplit[numMapTasks];
       String[] splitHosts0 = new String[] { hosts[0] };
       for (int i = 0; i < numMapTasks; i++) {
-        splits[i] = new RawSplit();
+        splits[i] = new Job.RawSplit();
         splits[i].setDataLength(0);
         splits[i].setLocations(splitHosts0);
       }
@@ -165,7 +167,7 @@
     conf.setNumReduceTasks(numReds);
     conf.setSpeculativeExecution(false);
     conf.setBoolean(
-        "mapred.committer.job.setup.cleanup.needed", false);
+        JobContext.SETUP_CLEANUP_NEEDED, false);
     MyFakeJobInProgress job1 = new MyFakeJobInProgress(conf, jobTracker);
     job1.initTasks();
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgressListener.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgressListener.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgressListener.java Sat Nov 28 20:26:01 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -59,7 +60,7 @@
       @Override
       protected void setUp() throws Exception {
         conf = new JobConf();   
-        conf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
+        conf.setClass(JTConfig.JT_TASK_SCHEDULER, MyScheduler.class,
                       TaskScheduler.class);
         mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
         jobtracker = mr.getJobTrackerRunner().getJobTracker();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java Sat Nov 28 20:26:01 2009
@@ -37,25 +37,93 @@
   public void testJobFailAndKill() throws IOException {
     MiniMRCluster mr = null;
     try {
-      mr = new MiniMRCluster(2, "file:///", 3);
+      JobConf jtConf = new JobConf();
+      jtConf.set("mapred.jobtracker.instrumentation", 
+          JTInstrumentation.class.getName());
+      mr = new MiniMRCluster(2, "file:///", 3, null, null, jtConf);
+      JTInstrumentation instr = (JTInstrumentation) 
+        mr.getJobTrackerRunner().getJobTracker().getInstrumentation();
 
       // run the TCs
       JobConf conf = mr.createJobConf();
-
+      
       Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
       Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
       RunningJob job = UtilsForTests.runJobFail(conf, inDir, outDir);
       // Checking that the Job got failed
       assertEquals(job.getJobState(), JobStatus.FAILED);
+      assertTrue(instr.verifyJob());
+      assertEquals(1, instr.failed);
+      instr.reset();
 
+      
       job = UtilsForTests.runJobKill(conf, inDir, outDir);
       // Checking that the Job got killed
       assertTrue(job.isComplete());
       assertEquals(job.getJobState(), JobStatus.KILLED);
+      assertTrue(instr.verifyJob());
+      assertEquals(1, instr.killed);
     } finally {
       if (mr != null) {
         mr.shutdown();
       }
     }
   }
+  
+  static class JTInstrumentation extends JobTrackerInstrumentation {
+    volatile int failed;
+    volatile int killed;
+    volatile int addPrep;
+    volatile int decPrep;
+    volatile int addRunning;
+    volatile int decRunning;
+
+    void reset() {
+      failed = 0;
+      killed = 0;
+      addPrep = 0;
+      decPrep = 0;
+      addRunning = 0;
+      decRunning = 0;
+    }
+
+    boolean verifyJob() {
+      return addPrep==1 && decPrep==1 && addRunning==1 && decRunning==1;
+    }
+
+    public JTInstrumentation(JobTracker jt, JobConf conf) {
+      super(jt, conf);
+    }
+
+    public synchronized void addPrepJob(JobConf conf, JobID id) 
+    {
+      addPrep++;
+    }
+    
+    public synchronized void decPrepJob(JobConf conf, JobID id) 
+    {
+      decPrep++;
+    }
+
+    public synchronized void addRunningJob(JobConf conf, JobID id) 
+    {
+      addRunning++;
+    }
+
+    public synchronized void decRunningJob(JobConf conf, JobID id) 
+    {
+      decRunning++;
+    }
+    
+    public synchronized void failedJob(JobConf conf, JobID id) 
+    {
+      failed++;
+    }
+
+    public synchronized void killedJob(JobConf conf, JobID id) 
+    {
+      killed++;
+    }
+  }
+  
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobName.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobName.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobName.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobName.java Sat Nov 28 20:26:01 2009
@@ -61,7 +61,7 @@
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
     InputStream is = getFileSystem().open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
@@ -95,7 +95,7 @@
 
     Path[] outputFiles = FileUtil.stat2Paths(
                            getFileSystem().listStatus(getOutputDir(),
-                           new OutputLogFilter()));
+                           new Utils.OutputFileUtils.OutputFilesFilter()));
     assertEquals(1, outputFiles.length);
     InputStream is = getFileSystem().open(outputFiles[0]);
     BufferedReader reader = new BufferedReader(new InputStreamReader(is));

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java Sat Nov 28 20:26:01 2009
@@ -18,23 +18,19 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.List;
 
-import javax.security.auth.login.LoginException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.mapreduce.QueueState;
+import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 import junit.framework.TestCase;
@@ -88,9 +84,9 @@
     dfsCluster = new MiniDFSCluster(conf, 4, true, null);
 
     jc = new JobConf();
-    jc.setClass("mapred.jobtracker.taskScheduler", TestTaskScheduler.class,
+    jc.setClass(JTConfig.JT_TASK_SCHEDULER, TestTaskScheduler.class,
         TaskScheduler.class);
-    jc.setLong("mapred.jobtracker.taskScheduler.maxRunningTasksPerJob", 10L);
+    jc.setLong(JTConfig.JT_RUNNINGTASKS_PER_JOB, 10L);
     mrCluster = new MiniMRCluster(0, 0, taskTrackers, dfsCluster
         .getFileSystem().getUri().toString(), 1, null, null, null, jc);
   }
@@ -109,7 +105,7 @@
     assertNotNull(queueInfos);
     assertEquals(1, queueInfos.length);
     assertEquals("default", queueInfos[0].getQueueName());
-    assertEquals(Queue.QueueState.RUNNING.getStateName(),
+    assertEquals(QueueState.RUNNING.getStateName(),
                   queueInfos[0].getQueueState());
     JobConf conf = mrCluster.createJobConf();
     FileSystem fileSys = dfsCluster.getFileSystem();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Sat Nov 28 20:26:01 2009
@@ -46,8 +46,9 @@
     private FakeTaskTrackerManager taskTrackerManager;
     
     public FakeJobInProgress(JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf, null);
+        FakeTaskTrackerManager taskTrackerManager, JobTracker jt) 
+          throws IOException {
+      super(new JobID("test", ++jobCounter), jobConf, jt);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP, 
@@ -237,6 +238,11 @@
         public boolean getIsMap() {
           return t.isMapTask();
         }
+
+        @Override
+        public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+          
+        }
       };
       status.setRunState(TaskStatus.State.RUNNING);
       trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
@@ -276,7 +282,8 @@
                          int numJobs, int state)
     throws IOException {
     for (int i = 0; i < numJobs; i++) {
-      JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
+      JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager, 
+      UtilsForTests.getJobTracker());
       job.getStatus().setRunState(state);
       taskTrackerManager.submitJob(job);
     }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobRetire.java Sat Nov 28 20:26:01 2009
@@ -24,6 +24,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 /**
  * Test if the job retire works fine. 
@@ -38,8 +39,8 @@
     try {
       JobConf conf = new JobConf();
 
-      conf.setBoolean("mapred.job.tracker.retire.jobs", true);
-      conf.setLong("mapred.job.tracker.retiredjobs.cache.size", 1);
+      conf.setBoolean(JTConfig.JT_RETIREJOBS, true);
+      conf.setLong(JTConfig.JT_RETIREJOB_CACHE_SIZE, 1);
       mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0);
       JobConf jobConf = mr.createJobConf();
       JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
@@ -81,7 +82,7 @@
     assertNotNull("Job is not in cache", jobtracker.getJobStatus(id));
     
     // get the job conf filename
-    String name = JobHistory.JobInfo.getLocalJobFilePath(id);
+    String name = jobtracker.getLocalJobFilePath(id);
     File file = new File(name);
  
     assertFalse("JobConf file not deleted", file.exists());

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobStatusPersistency.java Sat Nov 28 20:26:01 2009
@@ -26,6 +26,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
   static final Path TEST_DIR = 
@@ -77,8 +78,8 @@
 
   public void testPersistency() throws Exception {
     Properties config = new Properties();
-    config.setProperty("mapred.job.tracker.persist.jobstatus.active", "true");
-    config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1");
+    config.setProperty(JTConfig.JT_PERSIST_JOBSTATUS, "true");
+    config.setProperty(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, "1");
     stopCluster();
     startCluster(false, config);
     JobID jobId = runJob();
@@ -117,9 +118,9 @@
     fs.delete(TEST_DIR, true);
     
     Properties config = new Properties();
-    config.setProperty("mapred.job.tracker.persist.jobstatus.active", "true");
-    config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1");
-    config.setProperty("mapred.job.tracker.persist.jobstatus.dir", 
+    config.setProperty(JTConfig.JT_PERSIST_JOBSTATUS, "true");
+    config.setProperty(JTConfig.JT_PERSIST_JOBSTATUS_HOURS, "1");
+    config.setProperty(JTConfig.JT_PERSIST_JOBSTATUS_DIR, 
                        fs.makeQualified(TEST_DIR).toString());
     stopCluster();
     startCluster(false, config);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java Sat Nov 28 20:26:01 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 /**
  * A JUnit test to test Job System Directory with Mini-DFS.
@@ -82,11 +83,12 @@
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.setNumMapTasks(numMaps);
     conf.setNumReduceTasks(numReduces);
-    conf.set("mapred.system.dir", "/tmp/subru/mapred/system");
+    conf.set(JTConfig.JT_SYSTEM_DIR, "/tmp/subru/mapred/system");
     JobClient jobClient = new JobClient(conf);
     RunningJob job = jobClient.runJob(conf);
     // Checking that the Job Client system dir is not used
-    assertFalse(FileSystem.get(conf).exists(new Path(conf.get("mapred.system.dir")))); 
+    assertFalse(FileSystem.get(conf).exists(
+      new Path(conf.get(JTConfig.JT_SYSTEM_DIR)))); 
     // Check if the Job Tracker system dir is propogated to client
     String sysDir = jobClient.getSystemDir().toString();
     System.out.println("Job sys dir -->" + sysDir);
@@ -121,7 +123,7 @@
       final int taskTrackers = 4;
 
       JobConf conf = new JobConf();
-      conf.set("mapred.system.dir", "/tmp/custom/mapred/system");
+      conf.set(JTConfig.JT_SYSTEM_DIR, "/tmp/custom/mapred/system");
       dfs = new MiniDFSCluster(conf, 4, true, null);
       fileSys = dfs.getFileSystem();
       mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, null, null, conf);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java Sat Nov 28 20:26:01 2009
@@ -189,7 +189,7 @@
     Text value = reader.createValue();
     while (reader.next(key, value)) {
       result.add(value);
-      value = (Text) reader.createValue();
+      value = reader.createValue();
     }
     return result;
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java Sat Nov 28 20:26:01 2009
@@ -24,6 +24,7 @@
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
 import org.apache.hadoop.mapred.TestRackAwareTaskPlacement.MyFakeJobInProgress;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 /**
  * A JUnit test to test that killing completed jobs does not move them
@@ -41,9 +42,9 @@
   @Override
   protected void setUp() throws Exception {
     JobConf conf = new JobConf();
-    conf.set("mapred.job.tracker", "localhost:0");
-    conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
-    conf.setLong("mapred.tasktracker.expiry.interval", 1000);
+    conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+    conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
+    conf.setLong(JTConfig.JT_TRACKER_EXPIRY_INTERVAL, 1000);
     jobTracker = new FakeJobTracker(conf, (clock = new FakeClock()), trackers);
   }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestKillSubProcesses.java Sat Nov 28 20:26:01 2009
@@ -33,13 +33,13 @@
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.util.TestProcfsBasedProcessTree;
 
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
-import org.apache.hadoop.util.TestProcfsBasedProcessTree;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLazyOutput.java Sat Nov 28 20:26:01 2009
@@ -56,7 +56,7 @@
     private String id;
 
     public void configure(JobConf job) {
-      id = job.get("mapred.task.id");
+      id = job.get(JobContext.TASK_ATTEMPT_ID);
     }
 
     public void map(LongWritable key, Text val,
@@ -74,7 +74,7 @@
     private String id;
 
     public void configure(JobConf job) {
-      id = job.get("mapred.task.id");
+      id = job.get(JobContext.TASK_ATTEMPT_ID);
     }
 
     /** Writes all keys and values directly to output. */
@@ -157,7 +157,7 @@
 
       Path[] fileList = 
         FileUtil.stat2Paths(fileSys.listStatus(output1,
-            new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         System.out.println("Test1 File list[" + i + "]" + ": "+ fileList[i]);
       }
@@ -169,7 +169,7 @@
 
       fileList =
         FileUtil.stat2Paths(fileSys.listStatus(output2,
-            new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         System.out.println("Test2 File list[" + i + "]" + ": "+ fileList[i]);
       }
@@ -182,7 +182,7 @@
 
       fileList =
         FileUtil.stat2Paths(fileSys.listStatus(output3,
-            new OutputLogFilter()));
+            new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         System.out.println("Test3 File list[" + i + "]" + ": "+ fileList[i]);
       }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLimitTasksPerJobTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLimitTasksPerJobTaskScheduler.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLimitTasksPerJobTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLimitTasksPerJobTaskScheduler.java Sat Nov 28 20:26:01 2009
@@ -22,6 +22,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.mapred.TestJobQueueTaskScheduler.FakeTaskTrackerManager;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 public class TestLimitTasksPerJobTaskScheduler extends TestCase {
   protected JobConf jobConf;
@@ -53,7 +54,7 @@
   }
 
   public void testMaxRunningTasksPerJob() throws IOException {
-    jobConf.setLong(LimitTasksPerJobTaskScheduler.MAX_TASKS_PER_JOB_PROPERTY,
+    jobConf.setLong(JTConfig.JT_RUNNINGTASKS_PER_JOB,
         4L);
     scheduler.setConf(jobConf);
     TestJobQueueTaskScheduler.submitJobs(taskTrackerManager, jobConf, 
@@ -88,7 +89,7 @@
   
   public void testMaxRunningTasksPerJobWithInterleavedTrackers()
       throws IOException {
-    jobConf.setLong(LimitTasksPerJobTaskScheduler.MAX_TASKS_PER_JOB_PROPERTY,
+    jobConf.setLong(JTConfig.JT_RUNNINGTASKS_PER_JOB,
         4L);
     scheduler.setConf(jobConf);
     TestJobQueueTaskScheduler.submitJobs(taskTrackerManager, jobConf, 2, JobStatus.RUNNING);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Sat Nov 28 20:26:01 2009
@@ -22,17 +22,12 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import javax.security.auth.login.LoginException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
-import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.MRConfig;
 
 /**
  * Test to verify localization of a job and localization of a task on a
@@ -46,7 +41,8 @@
       LogFactory.getLog(TestLocalizationWithLinuxTaskController.class);
 
   private File configFile;
-  private MyLinuxTaskController taskController;
+
+  private static String taskTrackerSpecialGroup;
 
   @Override
   protected void setUp()
@@ -65,9 +61,21 @@
         ClusterWithLinuxTaskController.createTaskControllerConf(path,
             localDirs);
     String execPath = path + "/task-controller";
-    taskController.setTaskControllerExe(execPath);
+    ((MyLinuxTaskController) taskController).setTaskControllerExe(execPath);
+    taskTrackerSpecialGroup = getFilePermissionAttrs(execPath)[2];
     taskController.setConf(trackerFConf);
     taskController.setup();
+
+    tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
+        taskController));
+
+    // Rewrite conf so as to reflect task's correct user name.
+    String ugi =
+        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+    JobConf jobConf = new JobConf(task.getConf());
+    jobConf.setUser(ugi.split(",")[0]);
+    uploadJobConf(jobConf);
+    task.setConf(jobConf);
   }
 
   @Override
@@ -89,77 +97,114 @@
   }
 
   /**
+   * Test the localization of a user on the TT when {@link LinuxTaskController}
+   * is in use.
+   */
+  @Override
+  public void testUserLocalization()
+      throws IOException {
+
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
+    }
+
+    super.testJobLocalization();
+  }
+
+  @Override
+  protected void checkUserLocalization()
+      throws IOException {
+    // Check the directory structure and permissions
+    for (String dir : localDirs) {
+
+      File localDir = new File(dir);
+      assertTrue(MRConfig.LOCAL_DIR + localDir + " isn'task created!",
+          localDir.exists());
+
+      File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+      assertTrue("taskTracker sub-dir in the local-dir " + localDir
+          + "is not created!", taskTrackerSubDir.exists());
+
+      File userDir = new File(taskTrackerSubDir, task.getUser());
+      assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir
+          + "is not created!", userDir.exists());
+      checkFilePermissions(userDir.getAbsolutePath(), "dr-xrws---", task
+          .getUser(), taskTrackerSpecialGroup);
+
+      File jobCache = new File(userDir, TaskTracker.JOBCACHE);
+      assertTrue("jobcache in the userDir " + userDir + " isn't created!",
+          jobCache.exists());
+      checkFilePermissions(jobCache.getAbsolutePath(), "dr-xrws---", task
+          .getUser(), taskTrackerSpecialGroup);
+
+      // Verify the distributed cache dir.
+      File distributedCacheDir =
+          new File(localDir, TaskTracker
+              .getDistributedCacheDir(task.getUser()));
+      assertTrue("distributed cache dir " + distributedCacheDir
+          + " doesn't exists!", distributedCacheDir.exists());
+      checkFilePermissions(distributedCacheDir.getAbsolutePath(),
+          "dr-xrws---", task.getUser(), taskTrackerSpecialGroup);
+    }
+  }
+
+  /**
    * Test job localization with {@link LinuxTaskController}. Also check the
    * permissions and file ownership of the job related files.
    */
   @Override
   public void testJobLocalization()
-      throws IOException,
-      LoginException {
+      throws IOException {
 
     if (!ClusterWithLinuxTaskController.shouldRun()) {
       return;
     }
 
-    // Do job localization
-    JobConf localizedJobConf = tracker.localizeJobFiles(task);
-
-    String ugi =
-        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
-    localizedJobConf.setUser(ugi.split(",")[0]);
+    super.testJobLocalization();
+  }
 
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of jars, job-work-dir
-    JobInitializationContext context = new JobInitializationContext();
-    context.jobid = jobId;
-    context.user = localizedJobConf.getUser();
-    context.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-
-    // /////////// The method being tested
-    taskController.initializeJob(context);
-    // ///////////
-
-    UserGroupInformation taskTrackerugi =
-        UserGroupInformation.login(localizedJobConf);
-    for (String localDir : trackerFConf.getStrings("mapred.local.dir")) {
+  @Override
+  protected void checkJobLocalization()
+      throws IOException {
+    for (String localDir : trackerFConf.getStrings(MRConfig.LOCAL_DIR)) {
       File jobDir =
-          new File(localDir, TaskTracker.getLocalJobDir(jobId.toString()));
+          new File(localDir, TaskTracker.getLocalJobDir(task.getUser(), jobId
+              .toString()));
       // check the private permissions on the job directory
-      checkFilePermissions(jobDir.getAbsolutePath(), "dr-xrws---",
-          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+      checkFilePermissions(jobDir.getAbsolutePath(), "dr-xrws---", task
+          .getUser(), taskTrackerSpecialGroup);
     }
 
     // check the private permissions of various directories
     List<Path> dirs = new ArrayList<Path>();
     Path jarsDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(jobId
-            .toString()), trackerFConf);
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(task.getUser(),
+            jobId.toString()), trackerFConf);
     dirs.add(jarsDir);
     dirs.add(new Path(jarsDir, "lib"));
     for (Path dir : dirs) {
       checkFilePermissions(dir.toUri().getPath(), "dr-xrws---",
-          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+          task.getUser(), taskTrackerSpecialGroup);
     }
 
     // job-work dir needs user writable permissions
     Path jobWorkDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(jobId
-            .toString()), trackerFConf);
-    checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---",
-        localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+        lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(),
+            jobId.toString()), trackerFConf);
+    checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---", task
+        .getUser(), taskTrackerSpecialGroup);
 
     // check the private permissions of various files
     List<Path> files = new ArrayList<Path>();
-    files.add(lDirAlloc.getLocalPathToRead(TaskTracker
-        .getLocalJobConfFile(jobId.toString()), trackerFConf));
-    files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
-        .toString()), trackerFConf));
+    files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalJobConfFile(
+        task.getUser(), jobId.toString()), trackerFConf));
+    files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task
+        .getUser(), jobId.toString()), trackerFConf));
     files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib1.jar"));
     files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib2.jar"));
     for (Path file : files) {
-      checkFilePermissions(file.toUri().getPath(), "-r-xrwx---",
-          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+      checkFilePermissions(file.toUri().getPath(), "-r-xrwx---", task
+          .getUser(), taskTrackerSpecialGroup);
     }
   }
 
@@ -169,75 +214,50 @@
    */
   @Override
   public void testTaskLocalization()
-      throws IOException,
-      LoginException {
+      throws IOException {
 
     if (!ClusterWithLinuxTaskController.shouldRun()) {
       return;
     }
 
-    JobConf localizedJobConf = tracker.localizeJobFiles(task);
-    String ugi =
-        System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
-    localizedJobConf.setUser(ugi.split(",")[0]);
-
-    // Now initialize the job via task-controller so as to set
-    // ownership/permissions of jars, job-work-dir
-    JobInitializationContext jobContext = new JobInitializationContext();
-    jobContext.jobid = jobId;
-    jobContext.user = localizedJobConf.getUser();
-    jobContext.workDir =
-        new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-    taskController.initializeJob(jobContext);
-
-    TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
-    tip.setJobConf(localizedJobConf);
-
-    // localize the task.
-    tip.localizeTask(task);
-    TaskRunner runner = task.createRunner(tracker, tip);
-    runner.setupChildTaskConfiguration(lDirAlloc);
-    Path workDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
-            .getJobID().toString(), task.getTaskID().toString(), task
-            .isTaskCleanupTask()), trackerFConf);
-    TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
-        localizedJobConf);
-    File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
-
-    // Initialize task
-    TaskControllerContext taskContext =
-        new TaskController.TaskControllerContext();
-    taskContext.env =
-        new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
-            .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
-    taskContext.task = task;
-    // /////////// The method being tested
-    taskController.initializeTask(taskContext);
-    // ///////////
+    super.testTaskLocalization();
+  }
 
+  @Override
+  protected void checkTaskLocalization()
+      throws IOException {
     // check the private permissions of various directories
     List<Path> dirs = new ArrayList<Path>();
-    dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(jobId
-        .toString(), taskId.toString()), trackerFConf));
-    dirs.add(workDir);
-    dirs.add(new Path(workDir, "tmp"));
-    dirs.add(new Path(logFiles[1].getParentFile().getAbsolutePath()));
-    UserGroupInformation taskTrackerugi =
-        UserGroupInformation.login(localizedJobConf);
+    dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(task
+        .getUser(), jobId.toString(), taskId.toString()), trackerFConf));
+    dirs.add(attemptWorkDir);
+    dirs.add(new Path(attemptWorkDir, "tmp"));
+    dirs.add(new Path(attemptLogFiles[1].getParentFile().getAbsolutePath()));
     for (Path dir : dirs) {
       checkFilePermissions(dir.toUri().getPath(), "drwxrws---",
-          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+          task.getUser(), taskTrackerSpecialGroup);
     }
 
     // check the private permissions of various files
     List<Path> files = new ArrayList<Path>();
     files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
-        .getJobID().toString(), task.getTaskID().toString(), task
-        .isTaskCleanupTask()), trackerFConf));
+        .getUser(), task.getJobID().toString(), task.getTaskID().toString(),
+        task.isTaskCleanupTask()), trackerFConf));
     for (Path file : files) {
-      checkFilePermissions(file.toUri().getPath(), "-rwxrwx---",
-          localizedJobConf.getUser(), taskTrackerugi.getGroupNames()[0]);
+      checkFilePermissions(file.toUri().getPath(), "-rwxrwx---", task
+          .getUser(), taskTrackerSpecialGroup);
+    }
+  }
+
+  /**
+   * Test cleanup of task files with {@link LinuxTaskController}.
+   */
+  @Override
+  public void testTaskCleanup()
+      throws IOException {
+    if (!ClusterWithLinuxTaskController.shouldRun()) {
+      return;
     }
+    super.testTaskCleanup();
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestLostTracker.java Sat Nov 28 20:26:01 2009
@@ -24,6 +24,7 @@
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 /**
  * A test to verify JobTracker's resilience to lost task trackers. 
@@ -43,9 +44,9 @@
   @Override
   protected void setUp() throws Exception {
     JobConf conf = new JobConf();
-    conf.set("mapred.job.tracker", "localhost:0");
-    conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
-    conf.setLong("mapred.tasktracker.expiry.interval", 1000);
+    conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+    conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
+    conf.setLong(JTConfig.JT_TRACKER_EXPIRY_INTERVAL, 1000);
     jobTracker = new FakeJobTracker(conf, (clock = new FakeClock()), trackers);
     jobTracker.startExpireTrackersThread();
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMRServerPorts.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMRServerPorts.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMRServerPorts.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMRServerPorts.java Sat Nov 28 20:26:01 2009
@@ -25,6 +25,8 @@
 import org.apache.hadoop.hdfs.TestHDFSServerPorts;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
@@ -61,15 +63,15 @@
    */
   private JobTracker startJobTracker(JobConf conf, JTRunner runner) 
   throws IOException, LoginException {
-    conf.set("mapred.job.tracker", "localhost:0");
-    conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+    conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+    conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
     JobTracker jt = null;
     try {
       jt = JobTracker.startTracker(conf);
       runner.setJobTracker(jt);
       runner.start();
-      conf.set("mapred.job.tracker", "localhost:" + jt.getTrackerPort());
-      conf.set("mapred.job.tracker.http.address", 
+      conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:" + jt.getTrackerPort());
+      conf.set(JTConfig.JT_HTTP_ADDRESS, 
                             "0.0.0.0:" + jt.getInfoPort());
     } catch(InterruptedException e) {
       throw new IOException(e.getLocalizedMessage());
@@ -134,23 +136,23 @@
 
       // start job tracker on the same port as name-node
       JobConf conf2 = new JobConf(hdfs.getConfig());
-      conf2.set("mapred.job.tracker",
+      conf2.set(JTConfig.JT_IPC_ADDRESS,
                 FileSystem.getDefaultUri(hdfs.getConfig()).toString());
-      conf2.set("mapred.job.tracker.http.address",
+      conf2.set(JTConfig.JT_HTTP_ADDRESS,
         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
       boolean started = canStartJobTracker(conf2);
       assertFalse(started); // should fail
 
       // bind http server to the same port as name-node
-      conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
-      conf2.set("mapred.job.tracker.http.address",
+      conf2.set(JTConfig.JT_IPC_ADDRESS, TestHDFSServerPorts.NAME_NODE_HOST + 0);
+      conf2.set(JTConfig.JT_HTTP_ADDRESS,
         hdfs.getConfig().get("dfs.http.address"));
       started = canStartJobTracker(conf2);
       assertFalse(started); // should fail again
 
       // both ports are different from the name-node ones
-      conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
-      conf2.set("mapred.job.tracker.http.address",
+      conf2.set(JTConfig.JT_IPC_ADDRESS, TestHDFSServerPorts.NAME_NODE_HOST + 0);
+      conf2.set(JTConfig.JT_HTTP_ADDRESS,
         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
       started = canStartJobTracker(conf2);
       assertTrue(started); // should start now
@@ -179,25 +181,25 @@
       jt = startJobTracker(conf2, runner);
 
       // start job tracker on the same port as name-node
-      conf2.set("mapred.task.tracker.report.address",
+      conf2.set(TTConfig.TT_REPORT_ADDRESS,
                 FileSystem.getDefaultUri(hdfs.getConfig()).toString());
-      conf2.set("mapred.task.tracker.http.address",
+      conf2.set(TTConfig.TT_HTTP_ADDRESS,
         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
       boolean started = canStartTaskTracker(conf2);
       assertFalse(started); // should fail
 
       // bind http server to the same port as name-node
-      conf2.set("mapred.task.tracker.report.address",
+      conf2.set(TTConfig.TT_REPORT_ADDRESS,
         TestHDFSServerPorts.NAME_NODE_HOST + 0);
-      conf2.set("mapred.task.tracker.http.address",
+      conf2.set(TTConfig.TT_HTTP_ADDRESS,
         hdfs.getConfig().get("dfs.http.address"));
       started = canStartTaskTracker(conf2);
       assertFalse(started); // should fail again
 
       // both ports are different from the name-node ones
-      conf2.set("mapred.task.tracker.report.address",
+      conf2.set(TTConfig.TT_REPORT_ADDRESS,
         TestHDFSServerPorts.NAME_NODE_HOST + 0);
-      conf2.set("mapred.task.tracker.http.address",
+      conf2.set(TTConfig.TT_HTTP_ADDRESS,
         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
       started = canStartTaskTracker(conf2);
       assertTrue(started); // should start now

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Sat Nov 28 20:26:01 2009
@@ -31,7 +31,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -43,6 +42,7 @@
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 /**
  * Tests the use of the
@@ -80,8 +80,8 @@
     @Override
     public void setup(Context context) throws IOException {
       Configuration conf = context.getConfiguration();
-      Path[] files = DistributedCache.getLocalCacheFiles(conf);
-      Path[] archives = DistributedCache.getLocalCacheArchives(conf);
+      Path[] files = context.getLocalCacheFiles();
+      Path[] archives = context.getLocalCacheArchives();
       FileSystem fs = LocalFileSystem.get(conf);
 
       // Check that 2 files and 2 archives are present
@@ -113,7 +113,7 @@
       // (The symlinks exist in "localRunner/" for local Jobtrackers,
       // but the user has no way to get at them.
       if (!"local".equals(
-          context.getConfiguration().get("mapred.job.tracker"))) {
+          context.getConfiguration().get(JTConfig.JT_IPC_ADDRESS))) {
         File symlinkFile = new File("distributed.first.symlink");
         TestCase.assertTrue(symlinkFile.exists());
         TestCase.assertEquals(1, symlinkFile.length());
@@ -121,7 +121,7 @@
     }
   }
 
-  private void testWithConf(JobConf conf) throws IOException,
+  private void testWithConf(Configuration conf) throws IOException,
       InterruptedException, ClassNotFoundException, URISyntaxException {
     // Create a temporary file of length 1.
     Path first = createTempFile("distributed.first", "x");
@@ -133,20 +133,19 @@
     Path fourth =
         makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4);
 
-    // Creates the Job Configuration
-    DistributedCache.addCacheFile(
-        new URI(first.toUri().toString() + "#distributed.first.symlink"),
-        conf);
-    DistributedCache.addFileToClassPath(second, conf);
-    DistributedCache.addArchiveToClassPath(third, conf);
-    DistributedCache.addCacheArchive(fourth.toUri(), conf);
-    DistributedCache.createSymlink(conf);
 
-    conf.setMaxMapAttempts(1); // speed up failures
     Job job = new Job(conf);
     job.setMapperClass(DistributedCacheChecker.class);
     job.setOutputFormatClass(NullOutputFormat.class);
     FileInputFormat.setInputPaths(job, first);
+    // Creates the Job Configuration
+    job.addCacheFile(
+      new URI(first.toUri().toString() + "#distributed.first.symlink"));
+    job.addFileToClassPath(second);
+    job.addArchiveToClassPath(third);
+    job.addCacheArchive(fourth.toUri());
+    job.createSymlink();
+    job.setMaxMapAttempts(1); // speed up failures
 
     job.submit();
     assertTrue(job.waitForCompletion(false));
@@ -154,8 +153,8 @@
 
   /** Tests using the local job runner. */
   public void testLocalJobRunner() throws Exception {
-    JobConf c = new JobConf();
-    c.set("mapred.job.tracker", "local");
+    Configuration c = new Configuration();
+    c.set(JTConfig.JT_IPC_ADDRESS, "local");
     c.set("fs.default.name", "file:///");
     testWithConf(c);
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapCollection.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapCollection.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapCollection.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapCollection.java Sat Nov 28 20:26:01 2009
@@ -246,9 +246,9 @@
       boolean pedantic) throws Exception {
     JobConf conf = new JobConf(new Configuration(), SpillMapper.class);
 
-    conf.setInt("io.sort.mb", ioSortMB);
-    conf.set("io.sort.record.percent", Float.toString(recPer));
-    conf.set("io.sort.spill.percent", Float.toString(spillPer));
+    conf.setInt(JobContext.IO_SORT_MB, ioSortMB);
+    conf.set(JobContext.MAP_SORT_RECORD_PERCENT, Float.toString(recPer));
+    conf.set(JobContext.MAP_SORT_SPILL_PERCENT, Float.toString(spillPer));
 
     conf.setInt("test.keywritable.length", keylen);
     conf.setInt("test.valwritable.length", vallen);
@@ -280,7 +280,7 @@
   }
 
   public void testLargeRecords() throws Exception {
-    // maps emitting records larger than io.sort.mb
+    // maps emitting records larger than mapreduce.task.io.sort.mb
     runTest("largerec", 100, 1024*1024, 5, false);
     runTest("largekeyzeroval", 1024*1024, 0, 5, false);
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapOutputType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapOutputType.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapOutputType.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapOutputType.java Sat Nov 28 20:26:01 2009
@@ -81,7 +81,7 @@
     Path outDir = new Path(testdir, "out");
     FileSystem fs = FileSystem.get(conf);
     fs.delete(testdir, true);
-    conf.setInt("io.sort.mb", 1);
+    conf.setInt(JobContext.IO_SORT_MB, 1);
     conf.setInputFormat(SequenceFileInputFormat.class);
     FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java Sat Nov 28 20:26:01 2009
@@ -27,9 +27,10 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 /**
  *  Validates map phase progress.
@@ -133,18 +134,19 @@
     job.setNumReduceTasks(0);
     TaskAttemptID taskId = TaskAttemptID.forName(
                                   "attempt_200907082313_0424_m_000000_0");
-    job.setClass("mapreduce.outputformat.class",
+    job.setClass("mapreduce.job.outputformat.class",
                  NullOutputFormat.class, OutputFormat.class);
-    job.set("mapred.input.dir", TEST_ROOT_DIR);
+    job.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+            TEST_ROOT_DIR);
     jobId = taskId.getJobID();
     
-    JobContext jContext = new JobContext(job, jobId);
-    RawSplit[] rawSplits = LocalJobRunner.getRawSplits(jContext, job);
+    JobContext jContext = new JobContextImpl(job, jobId);
+    Job.RawSplit[] rawSplits = LocalJobRunner.getRawSplits(jContext, job);
 
     job.setUseNewMapper(true); // use new api
     for (int i = 0; i < rawSplits.length; i++) {// rawSplits.length is 1
       map = new TestMapTask(
-          job.get("mapred.system.dir", "/tmp/hadoop/mapred/system") +
+          job.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system") +
           jobId + "job.xml",  
           taskId, i,
           rawSplits[i].getClassName(),

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapRed.java Sat Nov 28 20:26:01 2009
@@ -28,9 +28,8 @@
 import java.util.Iterator;
 import java.util.Random;
 
-import junit.framework.TestCase;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +42,13 @@
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 /**********************************************************
  * MapredLoadTest generates a bunch of work that exercises
@@ -77,7 +83,7 @@
  * 7) A mapred job integrates all the count files into a single one.
  *
  **********************************************************/
-public class TestMapRed extends TestCase {
+public class TestMapRed extends Configured implements Tool {
   /**
    * Modified to make it a junit test.
    * The RandomGen Job does the actual work of creating
@@ -248,6 +254,7 @@
      }
   **/
 
+  @Test
   public void testMapred() throws Exception {
     launch();
   }
@@ -312,6 +319,7 @@
     }
   }
 
+  @Test
   public void testPartitioner() throws Exception {
     JobConf conf = new JobConf(TestMapRed.class);
     conf.setPartitionerClass(BadPartitioner.class);
@@ -361,6 +369,7 @@
     public void close() { }
   }
 
+  @Test
   public void testNullKeys() throws Exception {
     JobConf conf = new JobConf(TestMapRed.class);
     FileSystem fs = FileSystem.getLocal(conf);
@@ -452,7 +461,8 @@
       fs.delete(testdir, true);
     }
   }
-    
+  
+  @Test 
   public void testCompression() throws Exception {
     EnumSet<SequenceFile.CompressionType> seq =
       EnumSet.allOf(SequenceFile.CompressionType.class);
@@ -468,11 +478,19 @@
   /**
    * 
    */
-  public static void launch() throws Exception {
+  public void launch() throws Exception {
     //
     // Generate distribution of ints.  This is the answer key.
     //
-    JobConf conf = new JobConf(TestMapRed.class);
+    JobConf conf = null;
+    //Check to get configuration and check if it is configured thro' Configured
+    //interface. This would happen when running testcase thro' command line.
+    if(getConf() == null) {
+      conf = new JobConf();
+    } else {
+      conf = new JobConf(getConf());
+    }
+    conf.setJarByClass(TestMapRed.class);
     int countsToGo = counts;
     int dist[] = new int[range];
     for (int i = 0; i < range; i++) {
@@ -734,23 +752,15 @@
    * Launches all the tasks in order.
    */
   public static void main(String[] argv) throws Exception {
-    if (argv.length < 2) {
-      System.err.println("Usage: TestMapRed <range> <counts>");
-      System.err.println();
-      System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");
-      return;
-    }
-
-    int i = 0;
-    range = Integer.parseInt(argv[i++]);
-    counts = Integer.parseInt(argv[i++]);
-    launch();
+    int res = ToolRunner.run(new TestMapRed(), argv);
+    System.exit(res);
   }
-    
+  @Test  
   public void testSmallInput(){
     runJob(100);
   }
-
+  
+  @Test
   public void testBiggerInput(){
     runJob(1000);
   }
@@ -763,7 +773,7 @@
       Path outDir = new Path(testdir, "out");
       FileSystem fs = FileSystem.get(conf);
       fs.delete(testdir, true);
-      conf.setInt("io.sort.mb", 1);
+      conf.setInt(JobContext.IO_SORT_MB, 1);
       conf.setInputFormat(SequenceFileInputFormat.class);
       FileInputFormat.setInputPaths(conf, inDir);
       FileOutputFormat.setOutputPath(conf, outDir);
@@ -797,7 +807,24 @@
 
       JobClient.runJob(conf);
     } catch (Exception e) {
-      fail("Threw exception:" + e);
+      assertTrue("Threw exception:" + e,false);
     }
   }
+
+  @Override
+  public int run(String[] argv) throws Exception {
+    if (argv.length < 2) {
+      System.err.println("Usage: TestMapRed <range> <counts>");
+      System.err.println();
+      System.err.println("Note: a good test will have a " +
+      		"<counts> value that is substantially larger than the <range>");
+      return -1;
+    }
+
+    int i = 0;
+    range = Integer.parseInt(argv[i++]);
+    counts = Integer.parseInt(argv[i++]);
+    launch();
+    return 0;
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapredHeartbeat.java Sat Nov 28 20:26:01 2009
@@ -21,7 +21,14 @@
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.RandomWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 
 public class TestMapredHeartbeat extends TestCase {
   public void testJobDirCleanup() throws IOException {
@@ -42,7 +49,7 @@
       
       // test configured heartbeat interval
       taskTrackers = 5;
-      conf.setInt("mapred.heartbeats.in.second", 1);
+      conf.setInt(JTConfig.JT_HEARTBEATS_IN_SECOND, 1);
       mr = new MiniMRCluster(taskTrackers, "file:///", 3, 
           null, null, conf);
       jc = new JobClient(mr.createJobConf());
@@ -55,7 +62,7 @@
       
       // test configured heartbeat interval is capped with min value
       taskTrackers = 5;
-      conf.setInt("mapred.heartbeats.in.second", 10);
+      conf.setInt(JTConfig.JT_HEARTBEATS_IN_SECOND, 10);
       mr = new MiniMRCluster(taskTrackers, "file:///", 3, 
           null, null, conf);
       jc = new JobClient(mr.createJobConf());
@@ -68,6 +75,37 @@
       if (mr != null) { mr.shutdown(); }
     }
   }
+  
+  public void testOutOfBandHeartbeats() throws Exception {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    try {
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 4, true, null);
+      
+      int taskTrackers = 1;
+      JobConf jobConf = new JobConf();
+      jobConf.setFloat(JTConfig.JT_HEARTBEATS_SCALING_FACTOR, 30.0f);
+      jobConf.setBoolean(TTConfig.TT_OUTOFBAND_HEARBEAT, true);
+      mr = new MiniMRCluster(taskTrackers, 
+                             dfs.getFileSystem().getUri().toString(), 3, 
+                             null, null, jobConf);
+      long start = System.currentTimeMillis();
+      TestMiniMRDFSSort.runRandomWriter(mr.createJobConf(), new Path("rw"));
+      long end = System.currentTimeMillis();
+      
+      final int expectedRuntimeSecs = 120;
+      final int runTimeSecs = (int)((end-start) / 1000); 
+      System.err.println("Runtime is " + runTimeSecs);
+      assertEquals("Actual runtime " + runTimeSecs + "s not less than expected " +
+      		         "runtime of " + expectedRuntimeSecs + "s!", 
+                   true, (runTimeSecs <= 120));
+    } finally {
+      if (mr != null) { mr.shutdown(); }
+      if (dfs != null) { dfs.shutdown(); }
+    }
+  }
+  
 }
 
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapredSystemDir.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapredSystemDir.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapredSystemDir.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMapredSystemDir.java Sat Nov 28 20:26:01 2009
@@ -24,12 +24,13 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.*;
 
 /**
- * Test if JobTracker is resilient to garbage in mapred.system.dir.
+ * Test if JobTracker is resilient to garbage in {@link JTConfig#JT_SYSTEM_DIR}
  */
 public class TestMapredSystemDir extends TestCase {
   private static final Log LOG = LogFactory.getLog(TestMapredSystemDir.class);
@@ -55,7 +56,7 @@
       dfs = new MiniDFSCluster(conf, 1, true, null);
       FileSystem fs = dfs.getFileSystem();
       
-      // create mapred.system.dir
+      // create Configs.SYSTEM_DIR
       Path mapredSysDir = new Path("/mapred");
       fs.mkdirs(mapredSysDir);
       fs.setPermission(mapredSysDir, new FsPermission(SYSTEM_DIR_PERMISSION));
@@ -69,7 +70,7 @@
                              1, null, null, MR_UGI, new JobConf(mrConf));
       JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
       
-      // add garbage to mapred.system.dir
+      // add garbage to Configs.SYSTEM_DIR
       Path garbage = new Path(jobtracker.getSystemDir(), "garbage");
       fs.mkdirs(garbage);
       fs.setPermission(garbage, new FsPermission(SYSTEM_DIR_PERMISSION));
@@ -77,7 +78,7 @@
       
       // stop the jobtracker
       mr.stopJobTracker();
-      mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
+      mr.getJobTrackerConf().setBoolean(JTConfig.JT_RESTART_ENABLED, 
                                         false);
       // start jobtracker but dont wait for it to be up
       mr.startJobTracker(false);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRChildTask.java Sat Nov 28 20:26:01 2009
@@ -80,7 +80,7 @@
                                            makeQualified(localFs).toString());
      }
      public void configure(JobConf job) {
-       tmpDir = new Path(job.get("mapred.child.tmp", "./tmp"));
+       tmpDir = new Path(job.get(JobContext.TASK_TEMP_DIR, "./tmp"));
        try {
          localFs = FileSystem.getLocal(job);
        } catch (IOException ioe) {
@@ -145,13 +145,44 @@
     JobClient.runJob(conf);
     outFs.delete(outDir, true);
 
+    final String DEFAULT_ABS_TMP_PATH = "/tmp";
+    final String DEFAULT_REL_TMP_PATH = "../temp";
+
+    String absoluteTempPath = null;
+    String relativeTempPath = null;
+
+    for (String key : new String[] { "test.temp.dir", "test.tmp.dir" }) {
+      String p = conf.get(key);
+      if (p == null || p.isEmpty()) {
+        continue;
+      }
+      if (new Path(p).isAbsolute()) {
+        if (absoluteTempPath == null) {
+          absoluteTempPath = p;
+        }
+      } else {
+        if (relativeTempPath == null) {
+          relativeTempPath = p;
+        }
+      }
+    }
+
+    if (absoluteTempPath == null) {
+      absoluteTempPath = DEFAULT_ABS_TMP_PATH;
+    }
+    if (relativeTempPath == null) {
+      relativeTempPath = DEFAULT_REL_TMP_PATH;
+    }
+
     // Launch job by giving relative path to temp dir.
-    conf.set("mapred.child.tmp", "../temp");
+    LOG.info("Testing with relative temp dir = "+relativeTempPath);
+    conf.set("mapred.child.tmp", relativeTempPath);
     JobClient.runJob(conf);
     outFs.delete(outDir, true);
 
     // Launch job by giving absolute path to temp dir
-    conf.set("mapred.child.tmp", "/tmp");
+    LOG.info("Testing with absolute temp dir = "+absoluteTempPath);
+    conf.set("mapred.child.tmp", absoluteTempPath);
     JobClient.runJob(conf);
     outFs.delete(outDir, true);
   }
@@ -312,7 +343,7 @@
   /**
    * Tests task's temp directory.
    * 
-   * In this test, we give different values to mapred.child.tmp
+   * In this test, we give different values to mapreduce.task.tmp.dir
    * both relative and absolute. And check whether the temp directory 
    * is created. We also check whether java.io.tmpdir value is same as 
    * the directory specified. We create a temp file and check if is is 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java Sat Nov 28 20:26:01 2009
@@ -29,6 +29,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 /**
  * A JUnit test to test Mini Map-Reduce Cluster with multiple directories
@@ -56,7 +57,7 @@
       file.close();
     }
     FileSystem.setDefaultUri(conf, fileSys);
-    conf.set("mapred.job.tracker", jobTracker);
+    conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker);
     conf.setJobName("wordcount");
     conf.setInputFormat(TextInputFormat.class);
     
@@ -79,7 +80,7 @@
     {
       Path[] parents = FileUtil.stat2Paths(fs.listStatus(outDir.getParent()));
       Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-              new OutputLogFilter()));
+              new Utils.OutputFileUtils.OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         BufferedReader file = 
           new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
@@ -112,14 +113,14 @@
       file.close();
     }
     FileSystem.setDefaultUri(conf, uri);
-    conf.set("mapred.job.tracker", jobTracker);
+    conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker);
     conf.setJobName("wordcount");
     conf.setInputFormat(TextInputFormat.class);
 
     // the keys are counts
     conf.setOutputValueClass(IntWritable.class);
     // the values are the messages
-    conf.set("mapred.output.key.class", "testjar.ExternalWritable");
+    conf.set(JobContext.OUTPUT_KEY_CLASS, "testjar.ExternalWritable");
 
     FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
@@ -134,7 +135,8 @@
     JobClient.runJob(conf);
     StringBuffer result = new StringBuffer();
     Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-                                 new OutputLogFilter()));
+                                 new Utils.OutputFileUtils
+                                          .OutputFilesFilter()));
     for (int i = 0; i < fileList.length; ++i) {
       BufferedReader file = new BufferedReader(new InputStreamReader(
                                                                      fs.open(fileList[i])));

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java Sat Nov 28 20:26:01 2009
@@ -71,12 +71,12 @@
     return setup;
   }
 
-  private static void runRandomWriter(JobConf job, Path sortInput) 
+  public static void runRandomWriter(JobConf job, Path sortInput) 
   throws Exception {
     // Scale down the default settings for RandomWriter for the test-case
     // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP
-    job.setInt("test.randomwrite.bytes_per_map", RW_BYTES_PER_MAP);
-    job.setInt("test.randomwriter.maps_per_host", RW_MAPS_PER_HOST);
+    job.setInt(RandomWriter.BYTES_PER_MAP, RW_BYTES_PER_MAP);
+    job.setInt(RandomWriter.MAPS_PER_HOST, RW_MAPS_PER_HOST);
     String[] rwArgs = {sortInput.toString()};
     
     // Run RandomWriter
@@ -86,8 +86,8 @@
   private static void runSort(JobConf job, Path sortInput, Path sortOutput) 
   throws Exception {
 
-    job.setInt("mapred.job.reuse.jvm.num.tasks", -1);
-    job.setInt("io.sort.mb", 1);
+    job.setInt(JobContext.JVM_NUMTASKS_TORUN, -1);
+    job.setInt(JobContext.IO_SORT_MB, 1);
     job.setNumMapTasks(12);
 
     // Setup command-line arguments to 'sort'
@@ -140,7 +140,7 @@
                                       boolean reuse) throws IOException {
     // setup a map-only job that reads the input and only sets the counters
     // based on how many times the jvm was reused.
-    job.setInt("mapred.job.reuse.jvm.num.tasks", reuse ? -1 : 1);
+    job.setInt(JobContext.JVM_NUMTASKS_TORUN, reuse ? -1 : 1);
     FileInputFormat.setInputPaths(job, SORT_INPUT_PATH);
     job.setInputFormat(SequenceFileInputFormat.class);
     job.setOutputFormat(NullOutputFormat.class);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Sat Nov 28 20:26:01 2009
@@ -27,6 +27,8 @@
 import java.util.Arrays;
 import java.util.List;
 
+import javax.security.auth.login.LoginException;
+
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
@@ -39,7 +41,10 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS.
@@ -102,7 +107,8 @@
     {
       
       Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
-                                   new OutputLogFilter()));
+                                   new Utils.OutputFileUtils
+                                            .OutputFilesFilter()));
       for(int i=0; i < fileList.length; ++i) {
         LOG.info("File list[" + i + "]" + ": "+ fileList[i]);
         BufferedReader file = 
@@ -118,55 +124,108 @@
     }
     return result.toString();
   }
-  
+
   /**
    * Make sure that there are exactly the directories that we expect to find.
+   * 
+   * <br/>
+   * <br/>
+   * 
+   * For e.g., if we want to check the existence of *only* the directories for
+   * user1's tasks job1-attempt1, job1-attempt2, job2-attempt1, we pass user1 as
+   * user, {job1, job1, job2, job3} as jobIds and {attempt1, attempt2, attempt1,
+   * attempt3} as taskDirs.
+   * 
    * @param mr the map-reduce cluster
+   * @param user the name of the job-owner
+   * @param jobIds the list of jobs
    * @param taskDirs the task ids that should be present
    */
-  static void checkTaskDirectories(MiniMRCluster mr,
-                                           String[] jobIds,
-                                           String[] taskDirs) {
+  static void checkTaskDirectories(MiniMRCluster mr, String user,
+      String[] jobIds, String[] taskDirs) {
+
     mr.waitUntilIdle();
     int trackers = mr.getNumTaskTrackers();
-    List<String> neededDirs = new ArrayList<String>(Arrays.asList(taskDirs));
-    boolean[] found = new boolean[taskDirs.length];
-    for(int i=0; i < trackers; ++i) {
-      int numNotDel = 0;
+
+    List<String> observedJobDirs = new ArrayList<String>();
+    List<String> observedFilesInsideJobDir = new ArrayList<String>();
+
+    for (int i = 0; i < trackers; ++i) {
+
+      // Verify that mapred-local-dir and it's direct contents are valid
       File localDir = new File(mr.getTaskTrackerLocalDir(i));
-      LOG.debug("Tracker directory: " + localDir);
-      File trackerDir = new File(localDir, TaskTracker.SUBDIR);
-      assertTrue("local dir " + localDir + " does not exist.", 
-                 localDir.isDirectory());
-      assertTrue("task tracker dir " + trackerDir + " does not exist.", 
-                 trackerDir.isDirectory());
-      String contents[] = localDir.list();
-      String trackerContents[] = trackerDir.list();
-      for(int j=0; j < contents.length; ++j) {
-        System.out.println("Local " + localDir + ": " + contents[j]);
-      }
-      for(int j=0; j < trackerContents.length; ++j) {
-        System.out.println("Local jobcache " + trackerDir + ": " + trackerContents[j]);
-      }
-      for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
-        String name = contents[fileIdx];
-        if (!(TaskTracker.SUBDIR.equals(contents[fileIdx]))) {
-          LOG.debug("Looking at " + name);
-          assertTrue("Spurious directory " + name + " found in " +
-                     localDir, false);
+      assertTrue("Local dir " + localDir + " does not exist.", localDir
+          .isDirectory());
+      LOG.info("Verifying contents of " + MRConfig.LOCAL_DIR + " "
+          + localDir.getAbsolutePath());
+
+      // Verify contents(user-dir) of tracker-sub-dir
+      File trackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+      if (trackerSubDir.isDirectory()) {
+
+        // Verify contents of user-dir and populate the job-dirs/attempt-dirs
+        // lists
+        File userDir = new File(trackerSubDir, user);
+        if (userDir.isDirectory()) {
+          LOG.info("Verifying contents of user-dir "
+              + userDir.getAbsolutePath());
+          verifyContents(new String[] { TaskTracker.JOBCACHE,
+              TaskTracker.DISTCACHEDIR }, userDir.list());
+
+          File jobCacheDir =
+              new File(localDir, TaskTracker.getJobCacheSubdir(user));
+          String[] jobDirs = jobCacheDir.list();
+          observedJobDirs.addAll(Arrays.asList(jobDirs));
+
+          for (String jobDir : jobDirs) {
+            String[] attemptDirs = new File(jobCacheDir, jobDir).list();
+            observedFilesInsideJobDir.addAll(Arrays.asList(attemptDirs));
+          }
         }
       }
-      for (int idx = 0; idx < neededDirs.size(); ++idx) {
-        String name = neededDirs.get(idx);
-        if (new File(new File(new File(trackerDir, TaskTracker.JOBCACHE),
-                              jobIds[idx]), name).isDirectory()) {
-          found[idx] = true;
-          numNotDel++;
-        }  
+    }
+
+    // Now verify that only expected job-dirs and attempt-dirs are present.
+    LOG.info("Verifying the list of job directories");
+    verifyContents(jobIds, observedJobDirs.toArray(new String[observedJobDirs
+        .size()]));
+    LOG.info("Verifying the list of task directories");
+    // All taskDirs should be present in the observed list. Other files like
+    // job.xml etc may be present too, we are not checking them here.
+    for (int j = 0; j < taskDirs.length; j++) {
+      assertTrue(
+          "Expected task-directory " + taskDirs[j] + " is not present!",
+          observedFilesInsideJobDir.contains(taskDirs[j]));
+    }
+  }
+
+  /**
+   * Check the list of expectedFiles against the list of observedFiles and make
+   * sure they both are the same. Duplicates can be present in either of the
+   * lists and all duplicate entries are treated as a single entity.
+   * 
+   * @param expectedFiles
+   * @param observedFiles
+   */
+  private static void verifyContents(String[] expectedFiles,
+      String[] observedFiles) {
+    boolean[] foundExpectedFiles = new boolean[expectedFiles.length];
+    boolean[] validObservedFiles = new boolean[observedFiles.length];
+    for (int j = 0; j < observedFiles.length; ++j) {
+      for (int k = 0; k < expectedFiles.length; ++k) {
+        if (expectedFiles[k].equals(observedFiles[j])) {
+          foundExpectedFiles[k] = true;
+          validObservedFiles[j] = true;
+        }
       }
     }
-    for(int i=0; i< found.length; i++) {
-      assertTrue("Directory " + taskDirs[i] + " not found", found[i]);
+    for (int j = 0; j < foundExpectedFiles.length; j++) {
+      assertTrue("Expected file " + expectedFiles[j] + " not found",
+          foundExpectedFiles[j]);
+    }
+    for (int j = 0; j < validObservedFiles.length; j++) {
+      assertTrue("Unexpected file " + observedFiles[j] + " found",
+          validObservedFiles[j]);
     }
   }
 
@@ -177,7 +236,16 @@
         NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
     double error = Math.abs(Math.PI - estimate);
     assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
-    checkTaskDirectories(mr, new String[]{}, new String[]{});
+    String userName = jobconf.getUser();
+    if (userName == null) {
+      try {
+        userName = UnixUserGroupInformation.login(jobconf).getUserName();
+      } catch (LoginException le) {
+        throw new IOException("Cannot get the login username : "
+            + StringUtils.stringifyException(le));
+      }
+    }
+    checkTaskDirectories(mr, userName, new String[] {}, new String[] {});
   }
 
   public static void runWordCount(MiniMRCluster mr, JobConf jobConf) 
@@ -198,8 +266,17 @@
     JobID jobid = result.job.getID();
     TaskAttemptID taskid = new TaskAttemptID(
         new TaskID(jobid, TaskType.MAP, 1),0);
-    checkTaskDirectories(mr, new String[]{jobid.toString()}, 
-                         new String[]{taskid.toString()});
+    String userName = jobConf.getUser();
+    if (userName == null) {
+      try {
+        userName = UnixUserGroupInformation.login(jobConf).getUserName();
+      } catch (LoginException le) {
+        throw new IOException("Cannot get the login username : "
+            + StringUtils.stringifyException(le));
+      }
+    }
+    checkTaskDirectories(mr, userName, new String[] { jobid.toString() },
+        new String[] { taskid.toString() });
     // test with maps=0
     jobConf = mr.createJobConf();
     input = "owen is oom";

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMultipleLevelCaching.java Sat Nov 28 20:26:01 2009
@@ -30,6 +30,7 @@
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
 import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 /**
  * This test checks whether the task caches are created and used properly.
@@ -106,7 +107,7 @@
       JobConf jc = new JobConf();
       // cache-level = level (unshared levels) + 1(topmost shared node i.e /a) 
       //               + 1 (for host)
-      jc.setInt("mapred.task.cache.levels", level + 2);
+      jc.setInt(JTConfig.JT_TASKCACHE_LEVELS, level + 2);
       mr = new MiniMRCluster(taskTrackers, namenode, 1, new String[] {rack2}, 
     		                 new String[] {"host2.com"}, jc);
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -75,7 +75,7 @@
     FileSystem fs = FileSystem.getLocal(job);
     String name = "part-00000";
     //pretend that we have input file with 1/2/3 as the suffix
-    job.set("map.input.file", "1/2/3");
+    job.set(JobContext.MAP_INPUT_FILE, "1/2/3");
     // we use the last two legs of the input file as the output file
     job.set("mapred.outputformat.numOfTrailingLegs", "2");
     MultipleTextOutputFormat<Text, Text> theOutputFormat = new MultipleTextOutputFormat<Text, Text>();
@@ -86,7 +86,7 @@
   
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
-    job.set("mapred.task.id", attempt);
+    job.set(JobContext.TASK_ATTEMPT_ID, attempt);
     FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, workDir);
     FileSystem fs = workDir.getFileSystem(job);



Mime
View raw message