hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r723732 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Fri, 05 Dec 2008 13:47:36 GMT
Author: yhemanth
Date: Fri Dec  5 05:47:35 2008
New Revision: 723732

URL: http://svn.apache.org/viewvc?rev=723732&view=rev
Log:
HADOOP-4623. Maintain running tasks even if speculative execution is off. Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=723732&r1=723731&r2=723732&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Dec  5 05:47:35 2008
@@ -285,6 +285,9 @@
     HADOOP-4576. Show pending job count instead of task count in the UI per
     queue in capacity scheduler. (Sreekanth Ramakrishnan via yhemanth)
 
+    HADOOP-4623. Maintain running tasks even if speculative execution is off.
+    (Amar Kamat via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=723732&r1=723731&r2=723732&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Dec  5 05:47:35 2008
@@ -1282,35 +1282,33 @@
    * @param tip the tip that needs to be retired
    */
   private synchronized void retireMap(TaskInProgress tip) {
-    // Since a list for running maps is maintained if speculation is 'ON'
-    if (hasSpeculativeMaps) {
-      if (runningMapCache == null) {
-        LOG.warn("Running cache for maps missing!! "
-                 + "Job details are missing.");
-        return;
-      }
-      String[] splitLocations = tip.getSplitLocations();
+    if (runningMapCache == null) {
+      LOG.warn("Running cache for maps missing!! "
+               + "Job details are missing.");
+      return;
+    }
+    
+    String[] splitLocations = tip.getSplitLocations();
 
-      // Remove the TIP from the list for running non-local maps
-      if (splitLocations.length == 0) {
-        nonLocalRunningMaps.remove(tip);
-        return;
-      }
+    // Remove the TIP from the list for running non-local maps
+    if (splitLocations.length == 0) {
+      nonLocalRunningMaps.remove(tip);
+      return;
+    }
 
-      // Remove from the running map caches
-      for(String host: splitLocations) {
-        Node node = jobtracker.getNode(host);
-        
-        for (int j = 0; j < maxLevel; ++j) {
-          Set<TaskInProgress> hostMaps = runningMapCache.get(node);
-          if (hostMaps != null) {
-            hostMaps.remove(tip);
-            if (hostMaps.size() == 0) {
-              runningMapCache.remove(node);
-            }
+    // Remove from the running map caches
+    for(String host: splitLocations) {
+      Node node = jobtracker.getNode(host);
+
+      for (int j = 0; j < maxLevel; ++j) {
+        Set<TaskInProgress> hostMaps = runningMapCache.get(node);
+        if (hostMaps != null) {
+          hostMaps.remove(tip);
+          if (hostMaps.size() == 0) {
+            runningMapCache.remove(node);
           }
-          node = node.getParent();
         }
+        node = node.getParent();
       }
     }
   }
@@ -1321,15 +1319,12 @@
    * @param tip the tip that needs to be retired
    */
   private synchronized void retireReduce(TaskInProgress tip) {
-    // Since a list for running reduces is maintained if speculation is 'ON'
-    if (hasSpeculativeReduces) {
-      if (runningReduces == null) {
-        LOG.warn("Running list for reducers missing!! "
-                 + "Job details are missing.");
-        return;
-      }
-      runningReduces.remove(tip);
+    if (runningReduces == null) {
+      LOG.warn("Running list for reducers missing!! "
+               + "Job details are missing.");
+      return;
     }
+    runningReduces.remove(tip);
   }
 
   /**
@@ -1338,34 +1333,31 @@
    */
   private synchronized void scheduleMap(TaskInProgress tip) {
     
-    // Since a running list is maintained only if speculation is 'ON'
-    if (hasSpeculativeMaps) {
-      if (runningMapCache == null) {
-        LOG.warn("Running cache for maps is missing!! " 
-                 + "Job details are missing.");
-        return;
-      }
-      String[] splitLocations = tip.getSplitLocations();
+    if (runningMapCache == null) {
+      LOG.warn("Running cache for maps is missing!! " 
+               + "Job details are missing.");
+      return;
+    }
+    String[] splitLocations = tip.getSplitLocations();
 
-      // Add the TIP to the list of non-local running TIPs
-      if (splitLocations.length == 0) {
-        nonLocalRunningMaps.add(tip);
-        return;
-      }
+    // Add the TIP to the list of non-local running TIPs
+    if (splitLocations.length == 0) {
+      nonLocalRunningMaps.add(tip);
+      return;
+    }
 
-      for(String host: splitLocations) {
-        Node node = jobtracker.getNode(host);
-      
-        for (int j = 0; j < maxLevel; ++j) {
-          Set<TaskInProgress> hostMaps = runningMapCache.get(node);
-          if (hostMaps == null) {
-            // create a cache if needed
-            hostMaps = new LinkedHashSet<TaskInProgress>();
-            runningMapCache.put(node, hostMaps);
-          }
-          hostMaps.add(tip);
-          node = node.getParent();
+    for(String host: splitLocations) {
+      Node node = jobtracker.getNode(host);
+
+      for (int j = 0; j < maxLevel; ++j) {
+        Set<TaskInProgress> hostMaps = runningMapCache.get(node);
+        if (hostMaps == null) {
+          // create a cache if needed
+          hostMaps = new LinkedHashSet<TaskInProgress>();
+          runningMapCache.put(node, hostMaps);
         }
+        hostMaps.add(tip);
+        node = node.getParent();
       }
     }
   }
@@ -1375,15 +1367,12 @@
    * @param tip the tip that needs to be scheduled as running
    */
   private synchronized void scheduleReduce(TaskInProgress tip) {
-    // Since a list for running reduces is maintained if speculation is 'ON'
-    if (hasSpeculativeReduces) {
-      if (runningReduces == null) {
-        LOG.warn("Running cache for reducers missing!! "
-                 + "Job details are missing.");
-        return;
-      }
-      runningReduces.add(tip);
+    if (runningReduces == null) {
+      LOG.warn("Running cache for reducers missing!! "
+               + "Job details are missing.");
+      return;
     }
+    runningReduces.add(tip);
   }
   
   /**

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java?rev=723732&r1=723731&r2=723732&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCollect.java Fri Dec  5 05:47:35 2008
@@ -19,7 +19,7 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.ThreadedMapBenchmark.RandomInputFormat;
+import org.apache.hadoop.mapred.UtilsForTests.RandomInputFormat;
 
 import junit.framework.TestCase;
 import java.io.*;

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java?rev=723732&r1=723731&r2=723732&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java Fri Dec  5 05:47:35 2008
@@ -3,25 +3,37 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.RandomWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.UtilsForTests;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.net.Node;
 
 import junit.framework.TestCase;
 
 public class TestJobInProgress extends TestCase {
+  static final Log LOG = LogFactory.getLog(TestJobInProgress.class);
 
   private MiniMRCluster mrCluster;
 
   private MiniDFSCluster dfsCluster;
   JobTracker jt;
+  private static Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data","/tmp"), "jip-testing");
+  private static int numSlaves = 4;
 
   public static class FailMapTaskJob extends MapReduceBase implements
       Mapper<LongWritable, Text, Text, IntWritable> {
@@ -64,10 +76,9 @@
   protected void setUp() throws Exception {
     // TODO Auto-generated method stub
     super.setUp();
-    final int taskTrackers = 4;
     Configuration conf = new Configuration();
-    dfsCluster = new MiniDFSCluster(conf, 4, true, null);
-    mrCluster = new MiniMRCluster(taskTrackers, dfsCluster.getFileSystem()
+    dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
+    mrCluster = new MiniMRCluster(numSlaves, dfsCluster.getFileSystem()
         .getUri().toString(), 1);
     jt = mrCluster.getJobTrackerRunner().getJobTracker();
   }
@@ -82,6 +93,97 @@
     checkTaskCounts();
   }
 
+  /**
+   * Test if running tasks are correctly maintained for various types of jobs
+   */
+  private void testRunningTaskCount(boolean speculation, boolean locality)
+  throws Exception {
+    LOG.info("Testing running jobs with speculation : " + speculation 
+             + ", locality : " + locality);
+    // cleanup
+    dfsCluster.getFileSystem().delete(TEST_DIR, true);
+    
+    final Path mapSignalFile = new Path(TEST_DIR, "map-signal");
+    final Path redSignalFile = new Path(TEST_DIR, "reduce-signal");
+    
+    // configure a waiting job with 2 maps and 2 reducers
+    JobConf job = 
+      configure(UtilsForTests.WaitingMapper.class, IdentityReducer.class, 1, 1,
+                locality);
+    job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString());
+    job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString());
+    
+    // test jobs with speculation
+    job.setSpeculativeExecution(speculation);
+    JobClient jc = new JobClient(job);
+    RunningJob running = jc.submitJob(job);
+    JobTracker jobtracker = mrCluster.getJobTrackerRunner().getJobTracker();
+    JobInProgress jip = jobtracker.getJob(running.getID());
+    LOG.info("Running job " + jip.getJobID());
+    
+    // wait
+    LOG.info("Waiting for job " + jip.getJobID() + " to be ready");
+    waitTillReady(jip, job);
+    
+    // check if the running structures are populated
+    Set<TaskInProgress> uniqueTasks = new HashSet<TaskInProgress>();
+    for (Map.Entry<Node, Set<TaskInProgress>> s : 
+           jip.getRunningMapCache().entrySet()) {
+      uniqueTasks.addAll(s.getValue());
+    }
+    
+    // add non local map tasks
+    uniqueTasks.addAll(jip.getNonLocalRunningMaps());
+    
+    assertEquals("Running map count doesnt match for jobs with speculation " 
+                 + speculation + ", and locality " + locality,
+                 jip.runningMaps(), uniqueTasks.size());
+
+    assertEquals("Running reducer count doesnt match for jobs with speculation "
+                 + speculation + ", and locality " + locality,
+                 jip.runningReduces(), jip.getRunningReduces().size());
+    
+    // signal the tasks
+    LOG.info("Signaling the tasks");
+    UtilsForTests.signalTasks(dfsCluster, dfsCluster.getFileSystem(),
+                              mapSignalFile.toString(), 
+                              redSignalFile.toString(), numSlaves);
+    
+    // wait for the job to complete
+    LOG.info("Waiting for job " + jip.getJobID() + " to be complete");
+    UtilsForTests.waitTillDone(jc);
+    
+    // cleanup
+    dfsCluster.getFileSystem().delete(TEST_DIR, true);
+  }
+  
+  // wait for the job to start
+  private void waitTillReady(JobInProgress jip, JobConf job) {
+    // wait for all the maps to get scheduled
+    while (jip.runningMaps() < job.getNumMapTasks()) {
+      UtilsForTests.waitFor(10);
+    }
+    
+    // wait for all the reducers to get scheduled
+    while (jip.runningReduces() < job.getNumReduceTasks()) {
+      UtilsForTests.waitFor(10);
+    }
+  }
+  
+  public void testRunningTaskCount() throws Exception {
+    // test with spec = false and locality=true
+    testRunningTaskCount(false, true);
+    
+    // test with spec = true and locality=true
+    testRunningTaskCount(true, true);
+    
+    // test with spec = false and locality=false
+    testRunningTaskCount(false, false);
+    
+    // test with spec = true and locality=false
+    testRunningTaskCount(true, false);
+  }
+  
   @Override
   protected void tearDown() throws Exception {
     mrCluster.shutdown();
@@ -90,11 +192,18 @@
   }
   
 
-  @SuppressWarnings("unchecked")
   void launchTask(Class MapClass,Class ReduceClass) throws Exception{
+    JobConf job = configure(MapClass, ReduceClass, 5, 10, true);
+    try {
+      JobClient.runJob(job);
+    } catch (IOException ioe) {}
+  }
+  
+  @SuppressWarnings("unchecked")
+  JobConf configure(Class MapClass,Class ReduceClass, int maps, int reducers,
+                    boolean locality) 
+  throws Exception {
     JobConf jobConf = mrCluster.createJobConf();
-
-    JobClient jc = new JobClient(jobConf);
     final Path inDir = new Path("./failjob/input");
     final Path outDir = new Path("./failjob/output");
     String input = "Test failing job.\n One more line";
@@ -109,7 +218,11 @@
     file.writeBytes(input);
     file.close();
     jobConf.setJobName("failmaptask");
-    jobConf.setInputFormat(TextInputFormat.class);
+    if (locality) {
+      jobConf.setInputFormat(TextInputFormat.class);
+    } else {
+      jobConf.setInputFormat(UtilsForTests.RandomInputFormat.class);
+    }
     jobConf.setOutputKeyClass(Text.class);
     jobConf.setOutputValueClass(Text.class);
     jobConf.setMapperClass(MapClass);
@@ -117,14 +230,9 @@
     jobConf.setReducerClass(ReduceClass);
     FileInputFormat.setInputPaths(jobConf, inDir);
     FileOutputFormat.setOutputPath(jobConf, outDir);
-    jobConf.setNumMapTasks(10);
-    jobConf.setNumReduceTasks(5);
-    RunningJob job = null;
-    try {
-      job = JobClient.runJob(jobConf);
-    } catch (IOException e) {
-    }
-
+    jobConf.setNumMapTasks(maps);
+    jobConf.setNumReduceTasks(reducers);
+    return jobConf; 
   }
 
   void checkTaskCounts() {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java?rev=723732&r1=723731&r2=723732&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgressListener.java Fri Dec  5 05:47:35 2008
@@ -43,9 +43,8 @@
                                Path inDir, Path outputDir,
                                String mapSignalFile, String redSignalFile) 
   throws IOException {
-    TestJobTrackerRestart.configureWaitingJobConf(conf, inDir, outputDir, 
-                                                  m, r, "job-listener-test", 
-                                                  mapSignalFile, redSignalFile);
+    UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir,  m, r, 
+        "job-listener-test", mapSignalFile, redSignalFile);
     return conf; 
   }
   
@@ -84,11 +83,10 @@
     // Write the input file
     Path inDir = new Path(testDir, "input");
     Path shareDir = new Path(testDir, "share");
-    String mapSignalFile = TestJobTrackerRestart.getMapSignalFile(shareDir);
-    String redSignalFile = TestJobTrackerRestart.getReduceSignalFile(shareDir);
-    TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf, 
-                                         new Path(inDir + "/file"), 
-                                         (short)1);
+    String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
+    String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
+    UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file"), 
+                            (short)1);
     
     JobQueueJobInProgressListener myListener = 
       new JobQueueJobInProgressListener();
@@ -187,16 +185,15 @@
                  2, queue.length);
     
     // signal the maps to complete
-    TestJobTrackerRestart.signalTasks(dfs, fileSys, true, 
-                                      mapSignalFile, redSignalFile);
+    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
     
     // check if job completion leaves the queue sane
     while (rJob2.getJobState() != JobStatus.SUCCEEDED) {
-      TestJobTrackerRestart.waitFor(10);
+      UtilsForTests.waitFor(10);
     }
     
     while (rJob1.getJobState() != JobStatus.SUCCEEDED) {
-      TestJobTrackerRestart.waitFor(10);
+      UtilsForTests.waitFor(10);
     }
     
     assertTrue("Job completion garbles the queue", 
@@ -316,7 +313,7 @@
     
     // wait for the job to be running
     while (rJob.getJobState() != JobStatus.RUNNING) {
-      TestJobTrackerRestart.waitFor(10);
+      UtilsForTests.waitFor(10);
     }
     
     LOG.info("Job " +  rJob.getID().toString() + " started running");
@@ -326,7 +323,7 @@
                 myListener.contains(rJob.getID(), true));
     
     while (rJob.getJobState() != JobStatus.SUCCEEDED) {
-      TestJobTrackerRestart.waitFor(10);
+      UtilsForTests.waitFor(10);
     }
     
     // check if the job success was notified

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java?rev=723732&r1=723731&r2=723732&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java Fri Dec  5 05:47:35 2008
@@ -44,20 +44,23 @@
   private MiniDFSCluster dfsCluster;
   private JobConf jc;
   private static final String JOB_SCHEDULING_INFO = "TESTSCHEDULINGINFO";
-  private static final Path TEST_DIR = new Path("job-queue-info-testing");
+  private static final Path TEST_DIR = 
+    new Path(System.getProperty("test.build.data","/tmp"), 
+             "job-queue-info-testing");
+  private static final Path IN_DIR = new Path(TEST_DIR, "input");
+  private static final Path SHARE_DIR = new Path(TEST_DIR, "share");
+  private static final Path OUTPUT_DIR = new Path(TEST_DIR, "output");
   
+  static String getSignalFile() {
+    return (new Path(SHARE_DIR, "signal")).toString();
+  }
+
   // configure a waiting job with 2 maps
   private JobConf configureWaitingJob(JobConf conf) throws IOException {
-    Path inDir = new Path(TEST_DIR, "input");
-    Path shareDir = new Path(TEST_DIR, "share");
-    Path outputDir = new Path(TEST_DIR, "output");
-    String mapSignalFile = TestJobTrackerRestart.getMapSignalFile(shareDir);
-    String redSignalFile = TestJobTrackerRestart.getReduceSignalFile(shareDir);
-    JobPriority[] priority = new JobPriority[] {JobPriority.NORMAL};
-    return TestJobTrackerRestart.getJobs(conf, priority, 
-                                         new int[] {2}, new int[] {0}, 
-                                         outputDir, inDir, 
-                                         mapSignalFile, redSignalFile)[0];
+    
+    UtilsForTests.configureWaitingJobConf(conf, IN_DIR, OUTPUT_DIR, 2, 0, 
+        "test-job-queue-info", getSignalFile(), getSignalFile());
+    return conf;
   }
 
   public static class TestTaskScheduler extends LimitTasksPerJobTaskScheduler {
@@ -113,12 +116,12 @@
     conf.setJobName("test-job-queue-info-test");
     
     // clear the signal file if any
-    TestJobTrackerRestart.cleanUp(fileSys, TEST_DIR);
+    fileSys.delete(SHARE_DIR, true);
     
     RunningJob rJob = jc.submitJob(conf);
     
     while (rJob.getJobState() != JobStatus.RUNNING) {
-      TestJobTrackerRestart.waitFor(10);
+      UtilsForTests.waitFor(10);
     }
     
     int numberOfJobs = 0;
@@ -135,5 +138,8 @@
       }
     }
     assertEquals(1, numberOfJobs);
+    
+    UtilsForTests.signalTasks(dfsCluster, fileSys, getSignalFile(), 
+                              getSignalFile(), 4);
   }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=723732&r1=723731&r2=723732&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Fri Dec  5 05:47:35 2008
@@ -20,16 +20,10 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
-import org.apache.hadoop.mapred.ThreadedMapBenchmark.RandomInputFormat;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapred.UtilsForTests;
 
 import junit.framework.TestCase;
 import java.io.*;
-import java.util.Iterator;
 
 /** 
  * TestJobTrackerRestart checks if the jobtracker can restart. JobTracker 
@@ -37,27 +31,15 @@
  * recover previosuly submitted jobs.
  */
 public class TestJobTrackerRestart extends TestCase {
-  final static Object waitLock = new Object();
-  final Path testDir = new Path("/jt-restart-testing");
+  final Path testDir = 
+    new Path(System.getProperty("test.build.data","/tmp"), 
+             "jt-restart-testing");
   final Path inDir = new Path(testDir, "input");
   final Path shareDir = new Path(testDir, "share");
   final Path outputDir = new Path(testDir, "output");
   private static int numJobsSubmitted = 0;
   
   /**
-   * Gets job status from the jobtracker given the jobclient and the job id
-   */
-  static JobStatus getJobStatus(JobClient jc, JobID id) throws IOException {
-    JobStatus[] statuses = jc.getAllJobs();
-    for (JobStatus jobStatus : statuses) {
-      if (jobStatus.getJobID().equals(id)) {
-        return jobStatus;
-      }
-    }
-    return null;
-  }
-
-  /**
    * Return the job conf configured with the priorities and mappers as passed.
    * @param conf The default conf
    * @param priorities priorities for the jobs
@@ -70,7 +52,7 @@
    * @return a array of jobconfs configured as needed
    * @throws IOException
    */
-  static JobConf[] getJobs(JobConf conf, JobPriority[] priorities, 
+  private static JobConf[] getJobs(JobConf conf, JobPriority[] priorities, 
                            int[] numMaps, int[] numReds,
                            Path outputDir, Path inDir,
                            String mapSignalFile, String reduceSignalFile) 
@@ -79,81 +61,18 @@
     for (int i = 0; i < jobs.length; ++i) {
       jobs[i] = new JobConf(conf);
       Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++));
-      configureWaitingJobConf(jobs[i], inDir, newOutputDir, 
-                              numMaps[i], numReds[i], "jt-restart-test-job", 
-                              mapSignalFile, reduceSignalFile);
+      UtilsForTests.configureWaitingJobConf(jobs[i], inDir, newOutputDir, 
+          numMaps[i], numReds[i], "jt-restart-test-job", mapSignalFile, 
+          reduceSignalFile);
       jobs[i].setJobPriority(priorities[i]);
     }
     return jobs;
   }
 
   /**
-   * A utility that waits for specified amount of time
-   */
-  static void waitFor(long duration) {
-    try {
-      synchronized (waitLock) {
-        waitLock.wait(duration);
-      }
-    } catch (InterruptedException ie) {}
-  }
-  
-  /**
-   * Wait for the jobtracker to be RUNNING.
-   */
-  static void waitForJobTracker(JobClient jobClient) {
-    while (true) {
-      try {
-        ClusterStatus status = jobClient.getClusterStatus();
-        while (status.getJobTrackerState() != JobTracker.State.RUNNING) {
-          waitFor(100);
-          status = jobClient.getClusterStatus();
-        }
-        break; // means that the jt is ready
-      } catch (IOException ioe) {}
-    }
-  }
-  
-  /**
-   * Signal the maps/reduces to start.
-   */
-  static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, 
-                          boolean isMap, String mapSignalFile, 
-                          String reduceSignalFile)
-  throws IOException {
-    //  signal the maps to complete
-    TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), fileSys.getConf(),
-                                         isMap 
-                                         ? new Path(mapSignalFile)
-                                         : new Path(reduceSignalFile), 
-                                         (short)1);
-  }
-  
-  /**
-   * Waits until all the jobs at the jobtracker complete.
-   */
-  static void waitTillDone(JobClient jobClient) throws IOException {
-    // Wait for the last job to complete
-    while (true) {
-      boolean shouldWait = false;
-      for (JobStatus jobStatuses : jobClient.getAllJobs()) {
-        if (jobStatuses.getRunState() == JobStatus.RUNNING) {
-          shouldWait = true;
-          break;
-        }
-      }
-      if (shouldWait) {
-        waitFor(1000);
-      } else {
-        break;
-      }
-    }
-  }
-  
-  /**
    * Clean up the signals.
    */
-  static void cleanUp(FileSystem fileSys, Path dir) throws IOException {
+  private static void cleanUp(FileSystem fileSys, Path dir) throws IOException {
     // Delete the map signal file
     fileSys.delete(new Path(getMapSignalFile(dir)), false);
     // Delete the reduce signal file
@@ -237,30 +156,30 @@
     }
 
     // Make sure that the master job is 50% completed
-    while (getJobStatus(jobClient, id).mapProgress() < 0.5f) {
-      waitFor(100);
+    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
+      UtilsForTests.waitFor(100);
     }
 
     // Note the data that needs to be tested upon restart
-    long jobStartTime = getJobStatus(jobClient, id).getStartTime();
+    long jobStartTime = UtilsForTests.getJobStatus(jobClient, id).getStartTime();
 
     // Kill the jobtracker
     mr.stopJobTracker();
 
     // Signal the maps to complete
-    signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), 
-                getReduceSignalFile(shareDir));
+    UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), 
+                              getReduceSignalFile(shareDir));
 
     // Signal the reducers to complete
-    signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), 
-                getReduceSignalFile(shareDir));
+    UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), 
+                              getReduceSignalFile(shareDir));
     
     // Enable recovery on restart
     mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
                                       true);
 
     //  Wait for a minute before submitting a job
-    waitFor(60 * 1000);
+    UtilsForTests.waitFor(60 * 1000);
     
     // Restart the jobtracker
     mr.startJobTracker();
@@ -268,11 +187,11 @@
     // Check if the jobs are still running
 
     // Wait for the JT to be ready
-    waitForJobTracker(jobClient);
+    UtilsForTests.waitForJobTracker(jobClient);
 
     // Check if the job recovered
     assertEquals("Restart failed as previously submitted job was missing", 
-                 true, getJobStatus(jobClient, id) != null);
+                 true, UtilsForTests.getJobStatus(jobClient, id) != null);
 
     // check if the job's priority got changed
     assertEquals("Restart failed as job's priority did not match", 
@@ -280,7 +199,7 @@
 
     
 
-    waitTillDone(jobClient);
+    UtilsForTests.waitTillDone(jobClient);
 
     // Check if the jobs are in order .. the order is 1->3->2
     JobStatus[] newStatuses = jobClient.getAllJobs();
@@ -317,7 +236,8 @@
 
     // Check if the start time was recovered
     assertTrue("Previously submitted job's start time has changed", 
-               getJobStatus(jobClient, id).getStartTime() == jobStartTime);
+               UtilsForTests.getJobStatus(jobClient, id).getStartTime() 
+               == jobStartTime);
 
     // Test history files
     testJobHistoryFiles(id, jobs[masterJob]);
@@ -354,8 +274,8 @@
     JobID id = job.getID();
     
     //  make sure that the job is 50% completed
-    while (getJobStatus(jobClient, id).mapProgress() < 0.5f) {
-      waitFor(100);
+    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
+      UtilsForTests.waitFor(100);
     }
     
     mr.stopJobTracker();
@@ -365,22 +285,22 @@
                                       false);
     
     // Wait for a minute before submitting a job
-    waitFor(60 * 1000);
+    UtilsForTests.waitFor(60 * 1000);
     
     mr.startJobTracker();
     
     // Signal the tasks
-    signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), 
-                getReduceSignalFile(shareDir));
+    UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), 
+                              getReduceSignalFile(shareDir));
     
     // Wait for the JT to be ready
-    waitForJobTracker(jobClient);
+    UtilsForTests.waitForJobTracker(jobClient);
     
-    waitTillDone(jobClient);
+    UtilsForTests.waitTillDone(jobClient);
     
     // The submitted job should not exist
     assertTrue("Submitted job was detected with recovery disabled", 
-               getJobStatus(jobClient, id) == null);
+               UtilsForTests.getJobStatus(jobClient, id) == null);
   }
 
   /** Tests a job on jobtracker with restart-recovery turned on.
@@ -455,7 +375,7 @@
     
     //  make sure that atleast on reducer is spawned
     while (jobClient.getClusterStatus().getReduceTasks() == 0) {
-      waitFor(100);
+      UtilsForTests.waitFor(100);
     }
     
     while(true) {
@@ -464,7 +384,7 @@
         mr.getMapTaskCompletionEventsUpdates(0, id, numMaps)
           .getMapTaskCompletionEvents();
       if (trackerEvents.length < numMaps / 2) {
-        waitFor(1000);
+        UtilsForTests.waitFor(1000);
       } else {
         break;
       }
@@ -483,22 +403,22 @@
                                       true);
     
     //  Wait for a minute before submitting a job
-    waitFor(60 * 1000);
+    UtilsForTests.waitFor(60 * 1000);
     
     mr.startJobTracker();
     
     // Signal the map tasks
-    signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), 
-                getReduceSignalFile(shareDir));
+    UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), 
+                              getReduceSignalFile(shareDir));
     
     // Wait for the JT to be ready
-    waitForJobTracker(jobClient);
+    UtilsForTests.waitForJobTracker(jobClient);
     
     int numToMatch = mr.getNumEventsRecovered() / 2;
     
     //  make sure that the maps are completed
-    while (getJobStatus(jobClient, id).mapProgress() < 1.0f) {
-      waitFor(100);
+    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 1.0f) {
+      UtilsForTests.waitFor(100);
     }
     
     // Get the new jobtrackers events
@@ -514,7 +434,7 @@
         mr.getMapTaskCompletionEventsUpdates(0, id, 2 * numMaps)
           .getMapTaskCompletionEvents();
       if (trackerEvents.length < jtEvents.length) {
-        waitFor(1000);
+        UtilsForTests.waitFor(1000);
       } else {
         break;
       }
@@ -528,10 +448,10 @@
     testTaskReports(prevSetupReports, afterSetupReports, 1);
     
     //  Signal the reduce tasks
-    signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), 
-                getReduceSignalFile(shareDir));
+    UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), 
+                              getReduceSignalFile(shareDir));
     
-    waitTillDone(jobClient);
+    UtilsForTests.waitTillDone(jobClient);
     
     testTaskCompletionEvents(jtEvents, trackerEvents, true, 2 * numMaps);
     
@@ -653,9 +573,8 @@
       }
 
       // Write the input file
-      TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf, 
-                                           new Path(inDir + "/file"), 
-                                           (short)1);
+      UtilsForTests.writeFile(dfs.getNameNode(), conf, 
+                              new Path(inDir + "/file"), (short)1);
 
       dfs.startDataNodes(conf, 1, true, null, null, null, null);
       dfs.waitActive();
@@ -695,140 +614,14 @@
     }
   }
 
-  static String getMapSignalFile(Path dir) {
-    return dir.suffix("/jt-restart-map-signal").toString();
-  }
-
-  static String getReduceSignalFile(Path dir) {
-    return dir.suffix("/jt-restart-reduce-signal").toString();
-  }
-  
-  /** 
-   * Map is a Mapper that just waits for a file to be created on the dfs. The 
-   * file creation is a signal to the mappers and hence acts as a waiting job. 
-   * Only the later half of the maps wait for the signal while the rest 
-   * complete immediately.
-   */
-
-  static class HalfWaitingMapper 
-  extends MapReduceBase 
-  implements Mapper<WritableComparable, Writable, 
-                    WritableComparable, Writable> {
-
-    FileSystem fs = null;
-    Path signal;
-    int id = 0;
-    int totalMaps = 0;
-
-    /** The waiting function.  The map exits once it gets a signal. Here the 
-     * signal is the file existence. 
-     */
-    public void map(WritableComparable key, Writable val, 
-                    OutputCollector<WritableComparable, Writable> output,
-                    Reporter reporter)
-    throws IOException {
-      if (id > totalMaps / 2) {
-        if (fs != null) {
-          while (!fs.exists(signal)) {
-            try {
-              reporter.progress();
-              synchronized (this) {
-                this.wait(1000); // wait for 1 sec
-              }
-            } catch (InterruptedException ie) {
-              System.out.println("Interrupted while the map was waiting for "
-                                 + " the signal.");
-              break;
-            }
-          }
-        } else {
-          throw new IOException("Could not get the DFS!!");
-        }
-      }
-    }
-
-    public void configure(JobConf conf) {
-      try {
-        String taskId = conf.get("mapred.task.id");
-        id = Integer.parseInt(taskId.split("_")[4]);
-        totalMaps = Integer.parseInt(conf.get("mapred.map.tasks"));
-        fs = FileSystem.get(conf);
-        signal = new Path(conf.get("test.mapred.map.waiting.target"));
-      } catch (IOException ioe) {
-        System.out.println("Got an exception while obtaining the filesystem");
-      }
-    }
+  private static String getMapSignalFile(Path dir) {
+    return (new Path(dir, "jt-restart-map-signal")).toString();
   }
-  
-  /** 
-   * Reduce that just waits for a file to be created on the dfs. The 
-   * file creation is a signal to the reduce.
-   */
 
-  static class WaitingReducer extends MapReduceBase 
-  implements Reducer<WritableComparable, Writable, 
-                     WritableComparable, Writable> {
-
-    FileSystem fs = null;
-    Path signal;
-    
-    /** The waiting function.  The reduce exits once it gets a signal. Here the
-     * signal is the file existence. 
-     */
-    public void reduce(WritableComparable key, Iterator<Writable> val, 
-                       OutputCollector<WritableComparable, Writable> output,
-                       Reporter reporter)
-    throws IOException {
-      if (fs != null) {
-        while (!fs.exists(signal)) {
-          try {
-            reporter.progress();
-            synchronized (this) {
-              this.wait(1000); // wait for 1 sec
-            }
-          } catch (InterruptedException ie) {
-            System.out.println("Interrupted while the map was waiting for the"
-                               + " signal.");
-            break;
-          }
-        }
-      } else {
-        throw new IOException("Could not get the DFS!!");
-      }
-    }
-
-    public void configure(JobConf conf) {
-      try {
-        fs = FileSystem.get(conf);
-        signal = new Path(conf.get("test.mapred.reduce.waiting.target"));
-      } catch (IOException ioe) {
-        System.out.println("Got an exception while obtaining the filesystem");
-      }
-    }
+  private static String getReduceSignalFile(Path dir) {
+    return (new Path(dir, "jt-restart-reduce-signal")).toString();
   }
   
-  static void configureWaitingJobConf(JobConf jobConf, Path inDir,
-                                      Path outputPath, int numMaps, int numRed,
-                                      String jobName, String mapSignalFilename,
-                                      String redSignalFilename)
-  throws IOException {
-    jobConf.setJobName(jobName);
-    jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
-    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
-    FileInputFormat.setInputPaths(jobConf, inDir);
-    FileOutputFormat.setOutputPath(jobConf, outputPath);
-    jobConf.setMapperClass(HalfWaitingMapper.class);
-    jobConf.setReducerClass(IdentityReducer.class);
-    jobConf.setOutputKeyClass(BytesWritable.class);
-    jobConf.setOutputValueClass(BytesWritable.class);
-    jobConf.setInputFormat(RandomInputFormat.class);
-    jobConf.setNumMapTasks(numMaps);
-    jobConf.setNumReduceTasks(numRed);
-    jobConf.setJar("build/test/testjar/testjob.jar");
-    jobConf.set("test.mapred.map.waiting.target", mapSignalFilename);
-    jobConf.set("test.mapred.reduce.waiting.target", redSignalFilename);
-  }
-
   public static void main(String[] args) throws IOException {
     new TestJobTrackerRestart().testJobTrackerRestart();
   }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java?rev=723732&r1=723731&r2=723732&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java Fri Dec  5 05:47:35 2008
@@ -35,13 +35,13 @@
   final Path shareDir = new Path(testDir, "share");
   final Path outputDir = new Path(testDir, "output");
   
-  private JobConf configureJob(JobConf conf, int[] maps, int[] reduces,
+  private JobConf configureJob(JobConf conf, int maps, int reduces,
                                String mapSignal, String redSignal) 
   throws IOException {
-    JobPriority[] priority = new JobPriority[] {JobPriority.NORMAL};
-    return TestJobTrackerRestart.getJobs(conf, priority, 
-                                         maps, reduces, outputDir, inDir, 
-                                         mapSignal, redSignal)[0];
+    UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, 
+        maps, reduces, "test-jobtracker-restart-with-lost-tt", 
+        mapSignal, redSignal);
+    return conf;
   }
   
   public void testRecoveryWithLostTracker(MiniDFSCluster dfs,
@@ -51,15 +51,14 @@
     JobConf jobConf = mr.createJobConf();
     int numMaps = 50;
     int numReds = 1;
-    String mapSignalFile = TestJobTrackerRestart.getMapSignalFile(shareDir);
-    String redSignalFile = TestJobTrackerRestart.getReduceSignalFile(shareDir);
+    String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
+    String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
     
     // Configure the jobs
-    JobConf job = configureJob(jobConf, new int[] {numMaps}, 
-                               new int[] {numReds}, 
+    JobConf job = configureJob(jobConf, numMaps, numReds, 
                                mapSignalFile, redSignalFile);
       
-    TestJobTrackerRestart.cleanUp(fileSys, shareDir);
+    fileSys.delete(shareDir, true);
     
     // Submit a master job   
     JobClient jobClient = new JobClient(job);
@@ -70,17 +69,16 @@
     mr.initializeJob(id);
     
     // Make sure that the master job is 50% completed
-    while (TestJobTrackerRestart.getJobStatus(jobClient, id).mapProgress() 
+    while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() 
            < 0.5f) {
-      TestJobTrackerRestart.waitFor(100);
+      UtilsForTests.waitFor(100);
     }
 
     // Kill the jobtracker
     mr.stopJobTracker();
 
     // Signal the maps to complete
-    TestJobTrackerRestart.signalTasks(dfs, fileSys, true, 
-                                      mapSignalFile, redSignalFile);
+    UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
     
     // Enable recovery on restart
     mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
@@ -90,7 +88,7 @@
     mr.stopTaskTracker(1);
     
     // Wait for a minute before submitting a job
-    TestJobTrackerRestart.waitFor(60 * 1000);
+    UtilsForTests.waitFor(60 * 1000);
     
     // Restart the jobtracker
     mr.startJobTracker();
@@ -98,13 +96,13 @@
     // Check if the jobs are still running
     
     // Wait for the JT to be ready
-    TestJobTrackerRestart.waitForJobTracker(jobClient);
+    UtilsForTests.waitForJobTracker(jobClient);
 
     // Signal the reducers to complete
-    TestJobTrackerRestart.signalTasks(dfs, fileSys, false, 
-                                      mapSignalFile, redSignalFile);
+    UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
+                              redSignalFile);
     
-    TestJobTrackerRestart.waitTillDone(jobClient);
+    UtilsForTests.waitTillDone(jobClient);
 
     // Check if the tasks on the lost tracker got re-executed
     assertTrue("Tracker killed while the jobtracker was down did not get lost "
@@ -134,9 +132,8 @@
       }
 
       // Write the input file
-      TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf, 
-                                           new Path(inDir + "/file"), 
-                                           (short)1);
+      UtilsForTests.writeFile(dfs.getNameNode(), conf, 
+                              new Path(inDir + "/file"), (short)1);
 
       dfs.startDataNodes(conf, 1, true, null, null, null, null);
       dfs.waitActive();
@@ -173,4 +170,4 @@
   public static void main(String[] args) throws IOException {
     new TestJobTrackerRestartWithLostTracker().testRestartWithLostTracker();
   }
-}
\ No newline at end of file
+}

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java?rev=723732&r1=723731&r2=723732&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleLevelCaching.java Fri Dec  5 05:47:35 2008
@@ -92,8 +92,8 @@
       if (!fileSys.mkdirs(inDir)) {
         throw new IOException("Mkdirs failed to create " + inDir.toString());
       }
-      TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf, 
-    		                               new Path(inDir + "/file"), (short)1);
+      UtilsForTests.writeFile(dfs.getNameNode(), conf, 
+    		                        new Path(inDir + "/file"), (short)1);
       namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + 
                  (dfs.getFileSystem()).getUri().getPort();
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=723732&r1=723731&r2=723732&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Fri Dec  5 05:47:35 2008
@@ -23,14 +23,9 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
 import org.apache.hadoop.mapred.lib.IdentityMapper;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
@@ -118,12 +113,12 @@
       if (!fileSys.mkdirs(inDir)) {
         throw new IOException("Mkdirs failed to create " + inDir.toString());
       }
-      writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file1"), (short)1);
+      UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file1"), (short)1);
       dfs.startDataNodes(conf, 2, true, null, rack2, hosts2, null);
       dfs.waitActive();
 
-      writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file2"), (short)3);
-      writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file3"), (short)3);
+      UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file2"), (short)3);
+      UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file3"), (short)3);
       
       namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + 
                  (dfs.getFileSystem()).getUri().getPort(); 
@@ -166,19 +161,6 @@
       }
     }
   }
-  static void writeFile(NameNode namenode, Configuration conf, Path name, 
-      short replication) throws IOException {
-    FileSystem fileSys = FileSystem.get(conf);
-    SequenceFile.Writer writer = 
-      SequenceFile.createWriter(fileSys, conf, name, 
-                                BytesWritable.class, BytesWritable.class,
-                                CompressionType.NONE);
-    writer.append(new BytesWritable(), new BytesWritable());
-    writer.close();
-    fileSys.setReplication(name, replication);
-    DFSTestUtil.waitReplication(fileSys, name, replication);
-  }
-
   static RunningJob launchJob(JobConf jobConf, Path inDir, Path outputPath, 
                               int numMaps, String jobName) throws IOException {
     jobConf.setJobName(jobName);

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java?rev=723732&r1=723731&r2=723732&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/ThreadedMapBenchmark.java Fri Dec  5 05:47:35 2008
@@ -67,59 +67,6 @@
   static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
   
   /**
-   * A custom input format that creates virtual inputs of a single string
-   * for each map. Using {@link RandomWriter} code. 
-   */
-  public static class RandomInputFormat implements InputFormat<Text, Text> {
-    
-    public InputSplit[] getSplits(JobConf job, 
-                                  int numSplits) throws IOException {
-      InputSplit[] result = new InputSplit[numSplits];
-      Path outDir = FileOutputFormat.getOutputPath(job);
-      for(int i=0; i < result.length; ++i) {
-        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i),
-                                  0, 1, (String[])null);
-      }
-      return result;
-    }
-
-    static class RandomRecordReader implements RecordReader<Text, Text> {
-      Path name;
-      public RandomRecordReader(Path p) {
-        name = p;
-      }
-      public boolean next(Text key, Text value) {
-        if (name != null) {
-          key.set(name.getName());
-          name = null;
-          return true;
-        }
-        return false;
-      }
-      public Text createKey() {
-        return new Text();
-      }
-      public Text createValue() {
-        return new Text();
-      }
-      public long getPos() {
-        return 0;
-      }
-      public void close() {}
-      public float getProgress() {
-        return 0.0f;
-      }
-    }
-
-    public RecordReader<Text, Text> getRecordReader(InputSplit split,
-                                                    JobConf job, 
-                                                    Reporter reporter) 
-    throws IOException {
-      return new RandomRecordReader(((FileSplit) split).getPath());
-    }
-  }
-
-  /**
    * Generates random input data of given size with keys and values of given 
    * sizes. By default it generates 128mb input data with 10 byte keys and 10 
    * byte values.
@@ -195,7 +142,7 @@
     JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
     job.setJobName("threaded-map-benchmark-random-writer");
     job.setJarByClass(ThreadedMapBenchmark.class);
-    job.setInputFormat(RandomInputFormat.class);
+    job.setInputFormat(UtilsForTests.RandomInputFormat.class);
     job.setOutputFormat(SequenceFileOutputFormat.class);
     
     job.setMapperClass(Map.class);

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=723732&r1=723731&r2=723732&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Dec  5 05:47:35 2008
@@ -20,18 +20,25 @@
 
 import java.text.DecimalFormat;
 import java.io.*;
-import java.net.*;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Enumeration;
 import java.util.Iterator;
-import java.util.List;
-import java.util.jar.*;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.RandomWriter;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
 
 /** 
  * Utilities used in unit test.
@@ -44,6 +51,7 @@
   final static long GB = 1024L * MB;
   final static long TB = 1024L * GB;
   final static long PB = 1024L * TB;
+  final static Object waitLock = new Object();
 
   static DecimalFormat dfm = new DecimalFormat("####.000");
   static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###");
@@ -189,4 +197,335 @@
     }
     return new String(space, 0, len);
   }
+  
+  /**
+   * Gets job status from the jobtracker given the jobclient and the job id
+   */
+  static JobStatus getJobStatus(JobClient jc, JobID id) throws IOException {
+    JobStatus[] statuses = jc.getAllJobs();
+    for (JobStatus jobStatus : statuses) {
+      if (jobStatus.getJobID().equals(id)) {
+        return jobStatus;
+      }
+    }
+    return null;
+  }
+  
+  /**
+   * A utility that waits for specified amount of time
+   */
+  static void waitFor(long duration) {
+    try {
+      synchronized (waitLock) {
+        waitLock.wait(duration);
+      }
+    } catch (InterruptedException ie) {}
+  }
+  
+  /**
+   * Wait for the jobtracker to be RUNNING.
+   */
+  static void waitForJobTracker(JobClient jobClient) {
+    while (true) {
+      try {
+        ClusterStatus status = jobClient.getClusterStatus();
+        while (status.getJobTrackerState() != JobTracker.State.RUNNING) {
+          waitFor(100);
+          status = jobClient.getClusterStatus();
+        }
+        break; // means that the jt is ready
+      } catch (IOException ioe) {}
+    }
+  }
+  
+  /**
+   * Waits until all the jobs at the jobtracker complete.
+   */
+  static void waitTillDone(JobClient jobClient) throws IOException {
+    // Wait for the last job to complete
+    while (true) {
+      boolean shouldWait = false;
+      for (JobStatus jobStatuses : jobClient.getAllJobs()) {
+        if (jobStatuses.getRunState() == JobStatus.RUNNING) {
+          shouldWait = true;
+          break;
+        }
+      }
+      if (shouldWait) {
+        waitFor(1000);
+      } else {
+        break;
+      }
+    }
+  }
+  
+  /**
+   * Configure a waiting job
+   */
+  static void configureWaitingJobConf(JobConf jobConf, Path inDir,
+                                      Path outputPath, int numMaps, int numRed,
+                                      String jobName, String mapSignalFilename,
+                                      String redSignalFilename)
+  throws IOException {
+    jobConf.setJobName(jobName);
+    jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    FileOutputFormat.setOutputPath(jobConf, outputPath);
+    jobConf.setMapperClass(UtilsForTests.HalfWaitingMapper.class);
+    jobConf.setReducerClass(IdentityReducer.class);
+    jobConf.setOutputKeyClass(BytesWritable.class);
+    jobConf.setOutputValueClass(BytesWritable.class);
+    jobConf.setInputFormat(RandomInputFormat.class);
+    jobConf.setNumMapTasks(numMaps);
+    jobConf.setNumReduceTasks(numRed);
+    jobConf.setJar("build/test/testjar/testjob.jar");
+    jobConf.set(getTaskSignalParameter(true), mapSignalFilename);
+    jobConf.set(getTaskSignalParameter(false), redSignalFilename);
+  }
+
+  /**
+   * Commonly used map and reduce classes 
+   */
+  
+  /** 
+   * Map is a Mapper that just waits for a file to be created on the dfs. The 
+   * file creation is a signal to the mappers and hence acts as a waiting job. 
+   */
+
+  static class WaitingMapper 
+  extends MapReduceBase 
+  implements Mapper<WritableComparable, Writable, 
+                    WritableComparable, Writable> {
+
+    FileSystem fs = null;
+    Path signal;
+    int id = 0;
+    int totalMaps = 0;
+
+    /**
+     * Checks if the map task needs to wait. By default all the maps will wait.
+     * This method needs to be overridden to make a custom waiting mapper. 
+     */
+    public boolean shouldWait(int id) {
+      return true;
+    }
+    
+    /**
+     * Returns a signal file on which the map task should wait. By default all 
+     * the maps wait on a single file passed as test.mapred.map.waiting.target.
+     * This method needs to be overridden to make a custom waiting mapper
+     */
+    public Path getSignalFile(int id) {
+      return signal;
+    }
+    
+    /** The waiting function.  The map exits once it gets a signal. Here the 
+     * signal is the file existence. 
+     */
+    public void map(WritableComparable key, Writable val, 
+                    OutputCollector<WritableComparable, Writable> output,
+                    Reporter reporter)
+    throws IOException {
+      if (shouldWait(id)) {
+        if (fs != null) {
+          while (!fs.exists(getSignalFile(id))) {
+            try {
+              reporter.progress();
+              synchronized (this) {
+                this.wait(1000); // wait for 1 sec
+              }
+            } catch (InterruptedException ie) {
+              System.out.println("Interrupted while the map was waiting for "
+                                 + " the signal.");
+              break;
+            }
+          }
+        } else {
+          throw new IOException("Could not get the DFS!!");
+        }
+      }
+    }
+
+    public void configure(JobConf conf) {
+      try {
+        String taskId = conf.get("mapred.task.id");
+        id = Integer.parseInt(taskId.split("_")[4]);
+        totalMaps = Integer.parseInt(conf.get("mapred.map.tasks"));
+        fs = FileSystem.get(conf);
+        signal = new Path(conf.get(getTaskSignalParameter(true)));
+      } catch (IOException ioe) {
+        System.out.println("Got an exception while obtaining the filesystem");
+      }
+    }
+  }
+  
+  /** Only the later half of the maps wait for the signal while the rest 
+   * complete immediately.
+   */
+  static class HalfWaitingMapper extends WaitingMapper {
+    @Override
+    public boolean shouldWait(int id) {
+      return id >= (totalMaps / 2);
+    }
+  }
+  
+  /** 
+   * Reduce that just waits for a file to be created on the dfs. The 
+   * file creation is a signal to the reduce.
+   */
+
+  static class WaitingReducer extends MapReduceBase 
+  implements Reducer<WritableComparable, Writable, 
+                     WritableComparable, Writable> {
+
+    FileSystem fs = null;
+    Path signal;
+    
+    /** The waiting function.  The reduce exits once it gets a signal. Here the
+     * signal is the file existence. 
+     */
+    public void reduce(WritableComparable key, Iterator<Writable> val, 
+                       OutputCollector<WritableComparable, Writable> output,
+                       Reporter reporter)
+    throws IOException {
+      if (fs != null) {
+        while (!fs.exists(signal)) {
+          try {
+            reporter.progress();
+            synchronized (this) {
+              this.wait(1000); // wait for 1 sec
+            }
+          } catch (InterruptedException ie) {
+            System.out.println("Interrupted while the map was waiting for the"
+                               + " signal.");
+            break;
+          }
+        }
+      } else {
+        throw new IOException("Could not get the DFS!!");
+      }
+    }
+
+    public void configure(JobConf conf) {
+      try {
+        fs = FileSystem.get(conf);
+        signal = new Path(conf.get(getTaskSignalParameter(false)));
+      } catch (IOException ioe) {
+        System.out.println("Got an exception while obtaining the filesystem");
+      }
+    }
+  }
+  
+  static String getTaskSignalParameter(boolean isMap) {
+    return isMap 
+           ? "test.mapred.map.waiting.target" 
+           : "test.mapred.reduce.waiting.target";
+  }
+  
+  /**
+   * Signal the maps/reduces to start.
+   */
+  static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, 
+                          String mapSignalFile, 
+                          String reduceSignalFile, int replication) 
+  throws IOException {
+    writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(mapSignalFile), 
+              (short)replication);
+    writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(reduceSignalFile), 
+              (short)replication);
+  }
+  
+  /**
+   * Signal the maps/reduces to start.
+   */
+  static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, 
+                          boolean isMap, String mapSignalFile, 
+                          String reduceSignalFile)
+  throws IOException {
+    //  signal the maps to complete
+    writeFile(dfs.getNameNode(), fileSys.getConf(),
+              isMap 
+              ? new Path(mapSignalFile)
+              : new Path(reduceSignalFile), (short)1);
+  }
+  
+  static String getSignalFile(Path dir) {
+    return (new Path(dir, "signal")).toString();
+  }
+  
+  static String getMapSignalFile(Path dir) {
+    return (new Path(dir, "map-signal")).toString();
+  }
+
+  static String getReduceSignalFile(Path dir) {
+    return (new Path(dir, "reduce-signal")).toString();
+  }
+  
+  static void writeFile(NameNode namenode, Configuration conf, Path name, 
+      short replication) throws IOException {
+    FileSystem fileSys = FileSystem.get(conf);
+    SequenceFile.Writer writer = 
+      SequenceFile.createWriter(fileSys, conf, name, 
+                                BytesWritable.class, BytesWritable.class,
+                                CompressionType.NONE);
+    writer.append(new BytesWritable(), new BytesWritable());
+    writer.close();
+    fileSys.setReplication(name, replication);
+    DFSTestUtil.waitReplication(fileSys, name, replication);
+  }
+  
+  // Input formats
+  /**
+   * A custom input format that creates virtual inputs of a single string
+   * for each map. Using {@link RandomWriter} code. 
+   */
+  public static class RandomInputFormat implements InputFormat<Text, Text> {
+    
+    public InputSplit[] getSplits(JobConf job, 
+                                  int numSplits) throws IOException {
+      InputSplit[] result = new InputSplit[numSplits];
+      Path outDir = FileOutputFormat.getOutputPath(job);
+      for(int i=0; i < result.length; ++i) {
+        result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i),
+                                  0, 1, (String[])null);
+      }
+      return result;
+    }
+
+    static class RandomRecordReader implements RecordReader<Text, Text> {
+      Path name;
+      public RandomRecordReader(Path p) {
+        name = p;
+      }
+      public boolean next(Text key, Text value) {
+        if (name != null) {
+          key.set(name.getName());
+          name = null;
+          return true;
+        }
+        return false;
+      }
+      public Text createKey() {
+        return new Text();
+      }
+      public Text createValue() {
+        return new Text();
+      }
+      public long getPos() {
+        return 0;
+      }
+      public void close() {}
+      public float getProgress() {
+        return 0.0f;
+      }
+    }
+
+    public RecordReader<Text, Text> getRecordReader(InputSplit split,
+                                                    JobConf job, 
+                                                    Reporter reporter) 
+    throws IOException {
+      return new RandomRecordReader(((FileSplit) split).getPath());
+    }
+  }
 }



Mime
View raw message