hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r808036 [2/4] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ conf/ ivy/ lib/ src/c++/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/data_join/ src/contrib/dynamic-scheduler/ src/contrib/eclipse-plugin/ src/...
Date Wed, 26 Aug 2009 15:01:34 GMT
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/vaidya/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 26 15:01:29 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/vaidya:713112
 /hadoop/core/trunk/src/contrib/vaidya:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/vaidya:804974-805826
+/hadoop/mapreduce/trunk/src/contrib/vaidya:804974-807678

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/docs/src/documentation/content/xdocs/commands_manual.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/docs/src/documentation/content/xdocs/commands_manual.xml?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/docs/src/documentation/content/xdocs/commands_manual.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/docs/src/documentation/content/xdocs/commands_manual.xml Wed Aug 26 15:01:29 2009
@@ -596,8 +596,20 @@
 					Runs the MapReduce job Tracker node.
 				</p> 
 				<p>
-					<code>Usage: hadoop jobtracker</code>
-				</p>
+					<code>Usage: hadoop jobtracker [-dumpConfiguration]</code>
+					</p>
+          <table>
+          <tr>
+          <th>COMMAND_OPTION</th><th> Description</th>
+          </tr>
+          <tr>
+          <td><code>-dumpConfiguration</code></td>
+          <td> Dumps the configuration used by the JobTracker alongwith queue
+          configuration in JSON format into Standard output used by the 
+          jobtracker and exits.</td>
+          </tr>
+          </table>
+				
 			</section>
 			
 			<section>

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 26 15:01:29 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/examples:713112
 /hadoop/core/trunk/src/examples:776175-784663
-/hadoop/mapreduce/trunk/src/examples:804974-805826
+/hadoop/mapreduce/trunk/src/examples:804974-807678

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/ExampleDriver.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/ExampleDriver.java Wed Aug 26 15:01:29 2009
@@ -50,8 +50,7 @@
       "A map/reduce program that writes 10GB of random textual data per node.");
       pgd.addClass("sort", Sort.class, "A map/reduce program that sorts the data written by the random writer.");
 
-      //Pi computation examples
-      pgd.addClass("pi", PiEstimator.class, PiEstimator.DESCRIPTION);
+      pgd.addClass("pi", QuasiMonteCarlo.class, QuasiMonteCarlo.DESCRIPTION);
       pgd.addClass("bbp", BaileyBorweinPlouffe.class, BaileyBorweinPlouffe.DESCRIPTION);
       pgd.addClass("distbbp", DistBbp.class, DistBbp.DESCRIPTION);
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/Join.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/Join.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/Join.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/Join.java Wed Aug 26 15:01:29 2009
@@ -27,19 +27,23 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapred.join.*;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.join.*;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 /**
- * This is the trivial map/reduce program that does absolutely nothing
- * other than use the framework to fragment and sort the input values.
+ * Given a set of sorted datasets keyed with the same class and yielding
+ * equal partitions, it is possible to effect a join of those datasets 
+ * prior to the map. The example facilitates the same.
  *
  * To run: bin/hadoop jar build/hadoop-examples.jar join
- *            [-m <i>maps</i>] [-r <i>reduces</i>]
+ *            [-r <i>reduces</i>]
  *            [-inFormat <i>input format class</i>] 
  *            [-outFormat <i>output format class</i>] 
  *            [-outKey <i>output key class</i>] 
@@ -50,7 +54,7 @@
 public class Join extends Configured implements Tool {
 
   static int printUsage() {
-    System.out.println("join [-m <maps>] [-r <reduces>] " +
+    System.out.println("join [-r <reduces>] " +
                        "[-inFormat <input format class>] " +
                        "[-outFormat <output format class>] " + 
                        "[-outKey <output key class>] " +
@@ -58,7 +62,7 @@
                        "[-joinOp <inner|outer|override>] " +
                        "[input]* <input> <output>");
     ToolRunner.printGenericCommandUsage(System.out);
-    return -1;
+    return 2;
   }
 
   /**
@@ -67,23 +71,24 @@
    * @throws IOException When there is communication problems with the 
    *                     job tracker.
    */
+  @SuppressWarnings("unchecked")
   public int run(String[] args) throws Exception {
-    JobConf jobConf = new JobConf(getConf(), Sort.class);
-    jobConf.setJobName("join");
-
-    jobConf.setMapperClass(IdentityMapper.class);        
-    jobConf.setReducerClass(IdentityReducer.class);
-
-    JobClient client = new JobClient(jobConf);
+    Configuration conf = getConf();
+    JobClient client = new JobClient(conf);
     ClusterStatus cluster = client.getClusterStatus();
-    int num_maps = cluster.getTaskTrackers() * 
-                   jobConf.getInt("test.sort.maps_per_host", 10);
     int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
-    String sort_reduces = jobConf.get("test.sort.reduces_per_host");
-    if (sort_reduces != null) {
+    String join_reduces = conf.get("mapreduce.join.reduces_per_host");
+    if (join_reduces != null) {
        num_reduces = cluster.getTaskTrackers() * 
-                       Integer.parseInt(sort_reduces);
+                       Integer.parseInt(join_reduces);
     }
+    Job job = new Job(conf);
+    job.setJobName("join");
+    job.setJarByClass(Sort.class);
+
+    job.setMapperClass(Mapper.class);        
+    job.setReducerClass(Reducer.class);
+
     Class<? extends InputFormat> inputFormatClass = 
       SequenceFileInputFormat.class;
     Class<? extends OutputFormat> outputFormatClass = 
@@ -94,9 +99,7 @@
     List<String> otherArgs = new ArrayList<String>();
     for(int i=0; i < args.length; ++i) {
       try {
-        if ("-m".equals(args[i])) {
-          num_maps = Integer.parseInt(args[++i]);
-        } else if ("-r".equals(args[i])) {
+        if ("-r".equals(args[i])) {
           num_reduces = Integer.parseInt(args[++i]);
         } else if ("-inFormat".equals(args[i])) {
           inputFormatClass = 
@@ -126,37 +129,37 @@
     }
 
     // Set user-supplied (possibly default) job configs
-    jobConf.setNumMapTasks(num_maps);
-    jobConf.setNumReduceTasks(num_reduces);
+    job.setNumReduceTasks(num_reduces);
 
     if (otherArgs.size() < 2) {
       System.out.println("ERROR: Wrong number of parameters: ");
       return printUsage();
     }
 
-    FileOutputFormat.setOutputPath(jobConf, 
+    FileOutputFormat.setOutputPath(job, 
       new Path(otherArgs.remove(otherArgs.size() - 1)));
     List<Path> plist = new ArrayList<Path>(otherArgs.size());
     for (String s : otherArgs) {
       plist.add(new Path(s));
     }
 
-    jobConf.setInputFormat(CompositeInputFormat.class);
-    jobConf.set("mapred.join.expr", CompositeInputFormat.compose(
-          op, inputFormatClass, plist.toArray(new Path[0])));
-    jobConf.setOutputFormat(outputFormatClass);
+    job.setInputFormatClass(CompositeInputFormat.class);
+    job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, 
+      CompositeInputFormat.compose(op, inputFormatClass,
+      plist.toArray(new Path[0])));
+    job.setOutputFormatClass(outputFormatClass);
 
-    jobConf.setOutputKeyClass(outputKeyClass);
-    jobConf.setOutputValueClass(outputValueClass);
+    job.setOutputKeyClass(outputKeyClass);
+    job.setOutputValueClass(outputValueClass);
 
     Date startTime = new Date();
     System.out.println("Job started: " + startTime);
-    JobClient.runJob(jobConf);
+    int ret = job.waitForCompletion(true) ? 0 : 1 ;
     Date end_time = new Date();
     System.out.println("Job ended: " + end_time);
     System.out.println("The job took " + 
         (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
-    return 0;
+    return ret;
   }
 
   public static void main(String[] args) throws Exception {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/RandomTextWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/RandomTextWriter.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/RandomTextWriter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/examples/org/apache/hadoop/examples/RandomTextWriter.java Wed Aug 26 15:01:29 2009
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.Formatter;
 import java.util.List;
 import java.util.Random;
 
@@ -96,8 +97,16 @@
     private int wordsInKeyRange;
     private int minWordsInValue;
     private int wordsInValueRange;
-    private Random random = new Random();
-    
+
+    private final Random random = new Random();
+    private final Text keyWords = new Text();
+    private final Text valueWords = new Text();
+    private final String STATUS_MSG = "wrote record %d. %d bytes left.";
+    private final Formatter statusFormat = new Formatter(new StringBuilder());
+
+    private Counter byteCounter;
+    private Counter recordCounter;
+
     /**
      * Save the configuration value that we need to write the data.
      */
@@ -115,6 +124,8 @@
       wordsInValueRange = 
         (conf.getInt("test.randomtextwrite.max_words_value", 100) - 
          minWordsInValue);
+      byteCounter = context.getCounter(Counters.BYTES_WRITTEN);
+      recordCounter = context.getCounter(Counters.RECORDS_WRITTEN);
     }
     
     /**
@@ -125,38 +136,39 @@
       int itemCount = 0;
       while (numBytesToWrite > 0) {
         // Generate the key/value 
-        int noWordsKey = minWordsInKey + 
+        final int noWordsKey = minWordsInKey +
           (wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
-        int noWordsValue = minWordsInValue + 
+        final int noWordsValue = minWordsInValue +
           (wordsInValueRange != 0 ? random.nextInt(wordsInValueRange) : 0);
-        Text keyWords = generateSentence(noWordsKey);
-        Text valueWords = generateSentence(noWordsValue);
-        
+
+        int recordBytes = generateSentence(keyWords, noWordsKey);
+        recordBytes += generateSentence(valueWords, noWordsValue);
+        numBytesToWrite -= recordBytes;
+
         // Write the sentence 
         context.write(keyWords, valueWords);
-        
-        numBytesToWrite -= (keyWords.getLength() + valueWords.getLength());
-        
+
         // Update counters, progress etc.
-        context.getCounter(Counters.BYTES_WRITTEN).increment(
-                  keyWords.getLength() + valueWords.getLength());
-        context.getCounter(Counters.RECORDS_WRITTEN).increment(1);
-        if (++itemCount % 200 == 0) {
-          context.setStatus("wrote record " + itemCount + ". " + 
-                             numBytesToWrite + " bytes left.");
+        recordCounter.increment(1);
+        byteCounter.increment(recordBytes);
+
+        if (++itemCount % 1000 == 0) {
+          ((StringBuilder)statusFormat.out()).setLength(0);
+          context.setStatus(statusFormat.format(STATUS_MSG,
+                itemCount, numBytesToWrite).toString());
         }
       }
       context.setStatus("done with " + itemCount + " records.");
     }
     
-    private Text generateSentence(int noWords) {
-      StringBuffer sentence = new StringBuffer();
-      String space = " ";
+    private int generateSentence(Text txt, int noWords) {
+      txt.clear();
       for (int i=0; i < noWords; ++i) {
-        sentence.append(words[random.nextInt(words.length)]);
-        sentence.append(space);
+        final Text word = words[random.nextInt(words.length)];
+        txt.append(word.getBytes(), 0, word.getLength());
+        txt.append(SPACE, 0, SPACE.length);
       }
-      return new Text(sentence.toString());
+      return txt.getLength();
     }
   }
   
@@ -245,10 +257,12 @@
     System.exit(res);
   }
 
+  private static final byte[] SPACE = " ".getBytes();
+
   /**
    * A random list of 100 words from /usr/share/dict/words
    */
-  private static String[] words = {
+  private final static Text[] words = buildText(new String[] {
                                    "diurnalness", "Homoiousian",
                                    "spiranthic", "tetragynian",
                                    "silverhead", "ungreat",
@@ -749,5 +763,14 @@
                                    "sterilely", "unrealize",
                                    "unpatched", "hypochondriacism",
                                    "critically", "cheesecutter",
-                                  };
+                                  });
+
+  private static Text[] buildText(String[] str) {
+    Text[] ret = new Text[str.length];
+    for (int i = 0; i < str.length; ++i) {
+      ret[i] = new Text(str[i]);
+    }
+    return ret;
+  }
+
 }

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 26 15:01:29 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/java:713112
 /hadoop/core/trunk/src/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/java:804974-805826
+/hadoop/mapreduce/trunk/src/java:804974-807678

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/mapred-default.xml?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/mapred-default.xml Wed Aug 26 15:01:29 2009
@@ -341,13 +341,6 @@
 </property>
 
 <property>
-  <name>mapred.jobtracker.completeuserjobs.maximum</name>
-  <value>100</value>
-  <description>The maximum number of complete jobs per user to keep around 
-  before delegating them to the job history.</description>
-</property>
-
-<property>
   <name>mapred.job.tracker.retiredjobs.cache.size</name>
   <value>1000</value>
   <description>The number of retired job status to keep in the cache.
@@ -421,6 +414,22 @@
 </property>
 
 <property>
+  <name>mapred.map.child.log.level</name>
+  <value>INFO</value>
+  <description>The logging level for the map task. The allowed levels are:
+  OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
+  </description>
+</property>
+
+<property>
+  <name>mapred.reduce.child.log.level</name>
+  <value>INFO</value>
+  <description>The logging level for the reduce task. The allowed levels are:
+  OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
+  </description>
+</property>
+
+<property>
   <name>mapred.inmem.merge.threshold</name>
   <value>1000</value>
   <description>The threshold, in terms of the number of files 
@@ -887,33 +896,6 @@
 </property>
 
 <property>
-  <name>mapred.queue.names</name>
-  <value>default</value>
-  <description> Comma separated list of queues configured for this jobtracker.
-    Jobs are added to queues and schedulers can configure different 
-    scheduling properties for the various queues. To configure a property 
-    for a queue, the name of the queue must match the name specified in this 
-    value. Queue properties that are common to all schedulers are configured 
-    here with the naming convention, mapred.queue.$QUEUE-NAME.$PROPERTY-NAME,
-    for e.g. mapred.queue.default.submit-job-acl.
-    The number of queues configured in this parameter could depend on the
-    type of scheduler being used, as specified in 
-    mapred.jobtracker.taskScheduler. For example, the JobQueueTaskScheduler
-    supports only a single queue, which is the default configured here.
-    Before adding more queues, ensure that the scheduler you've configured
-    supports multiple queues.
-  </description>
-</property>
-
-<property>
-  <name>mapred.acls.enabled</name>
-  <value>false</value>
-  <description> Specifies whether ACLs are enabled, and should be checked
-    for various operations.
-  </description>
-</property>
-
-<property>
   <name>mapred.job.queue.name</name>
   <value>default</value>
   <description> Queue to which a job is submitted. This must match one of the

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Child.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Child.java Wed Aug 26 15:01:29 2009
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
@@ -27,7 +28,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -146,7 +146,7 @@
         //setupWorkDir actually sets up the symlinks for the distributed
         //cache. After a task exits we wipe the workdir clean, and hence
         //the symlinks have to be rebuilt.
-        TaskRunner.setupWorkDir(job);
+        TaskRunner.setupWorkDir(job, new File(".").getAbsoluteFile());
 
         numTasksToExecute = job.getNumTasksToExecutePerJvm();
         assert(numTasksToExecute != 0);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobClient.java Wed Aug 26 15:01:29 2009
@@ -48,6 +48,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -567,15 +568,12 @@
                "Applications should implement Tool for the same.");
     }
 
-    // get all the command line arguments into the 
-    // jobconf passed in by the user conf
-    String files = null;
-    String libjars = null;
-    String archives = null;
-
-    files = job.get("tmpfiles");
-    libjars = job.get("tmpjars");
-    archives = job.get("tmparchives");
+    // Retrieve command line arguments placed into the JobConf
+    // by GenericOptionsParser.
+    String files = job.get("tmpfiles");
+    String libjars = job.get("tmpjars");
+    String archives = job.get("tmparchives");
+
     /*
      * set this user's id in job configuration, so later job files can be
      * accessed using this user's id
@@ -651,27 +649,7 @@
     }
     
     //  set the timestamps of the archives and files
-    URI[] tarchives = DistributedCache.getCacheArchives(job);
-    if (tarchives != null) {
-      StringBuffer archiveTimestamps = 
-        new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tarchives[0])));
-      for (int i = 1; i < tarchives.length; i++) {
-        archiveTimestamps.append(",");
-        archiveTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tarchives[i])));
-      }
-      DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
-    }
-
-    URI[] tfiles = DistributedCache.getCacheFiles(job);
-    if (tfiles != null) {
-      StringBuffer fileTimestamps = 
-        new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tfiles[0])));
-      for (int i = 1; i < tfiles.length; i++) {
-        fileTimestamps.append(",");
-        fileTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tfiles[i])));
-      }
-      DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
-    }
+    TrackerDistributedCacheManager.determineTimestamps(job);
        
     String originalJarPath = job.getJar();
 
@@ -700,6 +678,7 @@
 
   }
 
+
   private UnixUserGroupInformation getUGI(Configuration job) throws IOException {
     UnixUserGroupInformation ugi = null;
     try {
@@ -949,9 +928,16 @@
    */
   public static boolean isJobDirValid(Path jobDirPath, FileSystem fs) 
   throws IOException {
-    FileStatus[] contents = fs.listStatus(jobDirPath);
+    FileStatus[] contents = null;
+    
+    try {
+      contents = fs.listStatus(jobDirPath);
+    } catch(FileNotFoundException fnfe) {
+      return false;
+    }
+    
     int matchCount = 0;
-    if (contents != null && contents.length >=2) {
+    if (contents.length >=2) {
       for (FileStatus status : contents) {
         if ("job.xml".equals(status.getPath().getName())) {
           ++matchCount;

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobConf.java Wed Aug 26 15:01:29 2009
@@ -43,6 +43,7 @@
 import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
+import org.apache.log4j.Level;
 
 /** 
  * A map/reduce job configuration.
@@ -305,9 +306,34 @@
     "mapred.reduce.child.env";
 
   /**
+   * Configuration key to set the logging {@link Level} for the map task.
+   *
+   * The allowed logging levels are:
+   * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
+   */
+  public static final String MAPRED_MAP_TASK_LOG_LEVEL = 
+    "mapred.map.child.log.level";
+  
+  /**
+   * Configuration key to set the logging {@link Level} for the reduce task.
+   *
+   * The allowed logging levels are:
+   * OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE and ALL.
+   */
+  public static final String MAPRED_REDUCE_TASK_LOG_LEVEL = 
+    "mapred.reduce.child.log.level";
+  
+  /**
+   * Default logging level for map/reduce tasks.
+   */
+  public static final Level DEFAULT_LOG_LEVEL = Level.INFO;
+  
+  /**
    * Construct a map/reduce job configuration.
    */
-  public JobConf() {}
+  public JobConf() {
+    checkAndWarnDeprecation();
+  }
 
   /** 
    * Construct a map/reduce job configuration.
@@ -316,6 +342,7 @@
    */
   public JobConf(Class exampleClass) {
     setJarByClass(exampleClass);
+    checkAndWarnDeprecation();
   }
   
   /**
@@ -325,6 +352,7 @@
    */
   public JobConf(Configuration conf) {
     super(conf);
+    checkAndWarnDeprecation();
   }
 
 
@@ -354,6 +382,7 @@
   public JobConf(Path config) {
     super();
     addResource(config);
+    checkAndWarnDeprecation();
   }
 
   /** A new map/reduce configuration where the behavior of reading from the
@@ -366,6 +395,7 @@
    */
   public JobConf(boolean loadDefaults) {
     super(loadDefaults);
+    checkAndWarnDeprecation();
   }
 
   /**
@@ -1570,12 +1600,6 @@
 
   public long getMemoryForMapTask() {
     if (get(MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
-      LOG.warn(
-        JobConf.deprecatedString(
-          JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)+
-          " instead use  "+JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY + " and "
-          + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
-
       long val = getLong(
         MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
       return (val == DISABLED_MEMORY_LIMIT) ? val :
@@ -1592,11 +1616,6 @@
 
   public long getMemoryForReduceTask() {
     if (get(MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
-      LOG.warn(
-        JobConf.deprecatedString(
-          JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)+
-        " instead use  "+JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY + " and "
-        + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
       long val = getLong(
         MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
       return (val == DISABLED_MEMORY_LIMIT) ? val :
@@ -1808,8 +1827,17 @@
   }
 
   static String deprecatedString(String key) {
-    return "The variable " + key + " is no longer used";
+    return "The variable " + key + " is no longer used.";
   }
 
+  private void checkAndWarnDeprecation() {
+    if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
+      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
+                + " Instead use " + JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY
+                + " and " + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
+    }
+  }
+  
+
 }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobEndNotifier.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobEndNotifier.java Wed Aug 26 15:01:29 2009
@@ -89,7 +89,9 @@
 
   public static void stopNotifier() {
     running = false;
-    thread.interrupt();
+    if (thread != null) {
+      thread.interrupt();
+    }
   }
 
   private static JobEndStatusInfo createNotification(JobConf conf,

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobHistory.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobHistory.java Wed Aug 26 15:01:29 2009
@@ -188,6 +188,9 @@
     }
 
     void moveToDone(final JobID id) {
+      if (disableHistory) {
+        return;
+      }
       final List<Path> paths = new ArrayList<Path>();
       final Path historyFile = fileManager.getHistoryFile(id);
       if (historyFile == null) {
@@ -218,19 +221,18 @@
                     new FsPermission(HISTORY_FILE_PERMISSION));
               }
             }
-
-            String historyFileDonePath = null;
-            if (historyFile != null) {
-              historyFileDonePath = new Path(DONE, 
-                  historyFile.getName()).toString();
-            }
-            jobTracker.historyFileCopied(id, historyFileDonePath);
-            
-            //purge the job from the cache
-            fileManager.purgeJob(id);
           } catch (Throwable e) {
             LOG.error("Unable to move history file to DONE folder.", e);
           }
+          String historyFileDonePath = null;
+          if (historyFile != null) {
+            historyFileDonePath = new Path(DONE, 
+                historyFile.getName()).toString();
+          }
+          jobTracker.retireJob(id, historyFileDonePath);
+          
+          //purge the job from the cache
+          fileManager.purgeJob(id);
         }
 
       });
@@ -2048,13 +2050,12 @@
       isRunning = true; 
       try {
         FileStatus[] historyFiles = DONEDIR_FS.listStatus(DONE);
+
         // delete if older than 30 days
-        if (historyFiles != null) {
-          for (FileStatus f : historyFiles) {
-            if (now - f.getModificationTime() > THIRTY_DAYS_IN_MS) {
-              DONEDIR_FS.delete(f.getPath(), true); 
-              LOG.info("Deleting old history file : " + f.getPath());
-            }
+        for (FileStatus f : historyFiles) {
+          if (now - f.getModificationTime() > THIRTY_DAYS_IN_MS) {
+            DONEDIR_FS.delete(f.getPath(), true); 
+            LOG.info("Deleting old history file : " + f.getPath());
           }
         }
       } catch (IOException ie) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Aug 26 15:01:29 2009
@@ -18,8 +18,11 @@
 package org.apache.hadoop.mapred;
 
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
+import java.io.Writer;
 import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.URLEncoder;
@@ -113,9 +116,6 @@
   }
 
   private long tasktrackerExpiryInterval;
-  private long retireJobInterval;
-  private long retireJobCheckInterval;
-
   // The interval after which one fault of a tracker will be discarded,
   // if there are no faults during this. 
   private static long UPDATE_FAULTY_TRACKER_INTERVAL = 24 * 60 * 60 * 1000;
@@ -152,7 +152,9 @@
   final static FsPermission SYSTEM_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0700); // rwx------
   
-  private static Clock clock;
+  private static Clock clock = null;
+  
+  static final Clock DEFAULT_CLOCK = new Clock();
 
   /**
    * A client tried to submit a job before the Job Tracker was ready.
@@ -166,25 +168,17 @@
     }
   }
 
-  /**
-   * The maximum no. of 'completed' (successful/failed/killed)
-   * jobs kept in memory per-user. 
-   */
-  int MAX_COMPLETE_USER_JOBS_IN_MEMORY;
-
-   /**
-    * The minimum time (in ms) that a job's information has to remain
-    * in the JobTracker's memory before it is retired.
-    */
-  int MIN_TIME_BEFORE_RETIRE;
-
-
   private int nextJobId = 1;
 
   public static final Log LOG = LogFactory.getLog(JobTracker.class);
     
+  /**
+   * Returns JobTracker's clock. Note that the correct clock implementation will
+   * be obtained only when the JobTracker is initialized. If the JobTracker is
+   * not initialized then the default clock i.e {@link Clock} is returned. 
+   */
   static Clock getClock() {
-    return clock;
+    return clock == null ? DEFAULT_CLOCK : clock;
   }
   
   /**
@@ -197,18 +191,22 @@
    * @param conf configuration for the JobTracker.
    * @throws IOException
    */
-  public static JobTracker startTracker(JobConf conf
-                                        ) throws IOException,
-                                                 InterruptedException {
-    return startTracker(conf, new Clock());
+  public static JobTracker startTracker(JobConf conf) 
+  throws IOException, InterruptedException, LoginException {
+    return startTracker(conf, DEFAULT_CLOCK);
   }
 
   static JobTracker startTracker(JobConf conf, Clock clock) 
-  throws IOException, InterruptedException {
+  throws IOException, InterruptedException, LoginException {
+    return startTracker(conf, clock, generateNewIdentifier());
+  }
+
+  static JobTracker startTracker(JobConf conf, Clock clock, String identifier) 
+  throws IOException, InterruptedException, LoginException {
     JobTracker result = null;
     while (true) {
       try {
-        result = new JobTracker(conf, clock);
+        result = new JobTracker(conf, clock, identifier);
         startService(result);
         result.taskScheduler.setTaskTrackerManager(result);
         break;
@@ -218,6 +216,10 @@
         throw e;
       } catch (UnknownHostException e) {
         throw e;
+      } catch (AccessControlException ace) {
+        // in case of jobtracker not having right access
+        // bail out
+        throw ace;
       } catch (IOException e) {
         LOG.warn("Error starting tracker: " +
                 e, e);
@@ -429,29 +431,58 @@
     }
   }
 
-  synchronized void historyFileCopied(JobID jobid, String historyFile) {
-    JobStatus status = getJobStatus(jobid);
-    if (status != null) {
-      String trackingUrl = "";
-      if (historyFile != null) {
-        status.setHistoryFile(historyFile);
-        try {
-          trackingUrl = "http://" + getJobTrackerMachine() + ":" + 
-            getInfoPort() + "/jobdetailshistory.jsp?jobid=" + 
-            jobid + "&logFile=" + URLEncoder.encode(historyFile, "UTF-8");
-        } catch (UnsupportedEncodingException e) {
-          LOG.warn("Could not create trackingUrl", e);
+  synchronized void retireJob(JobID jobid, String historyFile) {
+    synchronized (jobs) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        JobStatus status = job.getStatus();
+        
+        //set the historyfile and update the tracking url
+        String trackingUrl = "";
+        if (historyFile != null) {
+          status.setHistoryFile(historyFile);
+          try {
+            trackingUrl = "http://" + getJobTrackerMachine() + ":" + 
+              getInfoPort() + "/jobdetailshistory.jsp?jobid=" + 
+              jobid + "&logFile=" + URLEncoder.encode(historyFile, "UTF-8");
+          } catch (UnsupportedEncodingException e) {
+            LOG.warn("Could not create trackingUrl", e);
+          }
+        }
+        status.setTrackingUrl(trackingUrl);
+        // clean up job files from the local disk
+        JobHistory.JobInfo.cleanupJob(job.getProfile().getJobID());
+
+        //this configuration is primarily for testing
+        //test cases can set this to false to validate job data structures on 
+        //job completion
+        boolean retireJob = 
+          conf.getBoolean("mapred.job.tracker.retire.jobs", true);
+
+        if (retireJob) {
+          //purge the job from memory
+          removeJobTasks(job);
+          jobs.remove(job.getProfile().getJobID());
+          for (JobInProgressListener l : jobInProgressListeners) {
+            l.jobRemoved(job);
+          }
+
+          String jobUser = job.getProfile().getUser();
+          LOG.info("Retired job with id: '" + 
+                   job.getProfile().getJobID() + "' of user '" +
+                   jobUser + "'");
+
+          //add the job status to retired cache
+          retireJobs.addToCache(job.getStatus());
         }
       }
-      status.setTrackingUrl(trackingUrl);
     }
   }
 
   ///////////////////////////////////////////////////////
   // Used to remove old finished Jobs that have been around for too long
   ///////////////////////////////////////////////////////
-  class RetireJobs implements Runnable {
-    int runCount = 0;
+  class RetireJobs {
     private final Map<JobID, JobStatus> jobIDStatusMap = 
       new HashMap<JobID, JobStatus>();
     private final LinkedList<JobStatus> jobStatusQ = 
@@ -478,74 +509,8 @@
     synchronized LinkedList<JobStatus> getAll() {
       return (LinkedList<JobStatus>) jobStatusQ.clone();
     }
-
-    /**
-     * The run method lives for the life of the JobTracker,
-     * and removes Jobs that are not still running, but which
-     * finished a long time ago.
-     */
-    public void run() {
-      while (true) {
-        ++runCount;
-        try {
-          Thread.sleep(retireJobCheckInterval);
-          List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
-          long now = clock.getTime();
-          long retireBefore = now - retireJobInterval;
-
-          synchronized (jobs) {
-            for(JobInProgress job: jobs.values()) {
-              if (job.getStatus().getRunState() != JobStatus.RUNNING &&
-                  job.getStatus().getRunState() != JobStatus.PREP &&
-                  (job.getFinishTime() + MIN_TIME_BEFORE_RETIRE < now) &&
-                  (job.getFinishTime()  < retireBefore)) {
-                retiredJobs.add(job);
-              }
-            }
-          }
-          if (!retiredJobs.isEmpty()) {
-            synchronized (JobTracker.this) {
-              synchronized (jobs) {
-                synchronized (taskScheduler) {
-                  for (JobInProgress job: retiredJobs) {
-                    removeJobTasks(job);
-                    jobs.remove(job.getProfile().getJobID());
-                    for (JobInProgressListener l : jobInProgressListeners) {
-                      l.jobRemoved(job);
-                    }
-                    String jobUser = job.getProfile().getUser();
-                    synchronized (userToJobsMap) {
-                      ArrayList<JobInProgress> userJobs =
-                        userToJobsMap.get(jobUser);
-                      synchronized (userJobs) {
-                        userJobs.remove(job);
-                      }
-                      if (userJobs.isEmpty()) {
-                        userToJobsMap.remove(jobUser);
-                      }
-                    }
-                    LOG.info("Retired job with id: '" + 
-                             job.getProfile().getJobID() + "' of user '" +
-                             jobUser + "'");
-
-                    // clean up job files from the local disk
-                    JobHistory.JobInfo.cleanupJob(job.getProfile().getJobID());
-                    addToCache(job.getStatus());
-                  }
-                }
-              }
-            }
-          }
-        } catch (InterruptedException t) {
-          break;
-        } catch (Throwable t) {
-          LOG.error("Error in retiring job:\n" +
-                    StringUtils.stringifyException(t));
-        }
-      }
-    }
   }
-  
+
   enum ReasonForBlackListing {
     EXCEEDING_FAILURES,
     NODE_UNHEALTHY
@@ -1112,6 +1077,9 @@
           hasUpdates = true;
           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
           initJob(jip);
+          if (!jip.inited()) {
+            throw new IOException("Failed to initialize job " + jip.getJobID());
+          }
         }
       }
       
@@ -1717,10 +1685,6 @@
   // All the known jobs.  (jobid->JobInProgress)
   Map<JobID, JobInProgress> jobs = new TreeMap<JobID, JobInProgress>();
 
-  // (user -> list of JobInProgress)
-  TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
-    new TreeMap<String, ArrayList<JobInProgress>>();
-    
   // (trackerID --> list of jobs to cleanup)
   Map<String, Set<JobID>> trackerToJobsToCleanup = 
     new HashMap<String, Set<JobID>>();
@@ -1777,7 +1741,6 @@
   ExpireTrackers expireTrackers = new ExpireTrackers();
   Thread expireTrackersThread = null;
   RetireJobs retireJobs = new RetireJobs();
-  Thread retireJobsThread = null;
   int retiredJobsCacheSize;
   ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
   Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
@@ -1833,7 +1796,8 @@
 
   private QueueManager queueManager;
 
-  JobTracker(JobConf conf) throws IOException,InterruptedException{
+  JobTracker(JobConf conf) 
+  throws IOException,InterruptedException, LoginException {
     this(conf, new Clock());
   }
   /**
@@ -1843,32 +1807,28 @@
    * @param clock clock to use
    * @throws IOException on problems initializing the tracker
    */
-  JobTracker(JobConf conf, Clock clock)
-          throws IOException, InterruptedException {
-    super(conf);
-    this.clock = clock;
-    // find the owner of the process
-    try {
-      mrOwner = UnixUserGroupInformation.login(conf);
-    } catch (LoginException e) {
-      throw new IOException(StringUtils.stringifyException(e));
-    }
+  JobTracker(JobConf conf, Clock clock) 
+  throws IOException, InterruptedException, LoginException {
+    this(conf, clock, generateNewIdentifier());
+  }
+
+  JobTracker(JobConf conf, Clock newClock, String jobtrackerIdentifier) 
+  throws IOException, InterruptedException, LoginException {
+    clock = newClock;
+    mrOwner = UnixUserGroupInformation.login(conf);
     supergroup = conf.get("mapred.permissions.supergroup", "supergroup");
     LOG.info("Starting jobtracker with owner as " + mrOwner.getUserName() 
              + " and supergroup as " + supergroup);
+    this.conf = conf;
+    setConf(conf);
 
     //
     // Grab some static constants
     //
     tasktrackerExpiryInterval = 
       conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
-    retireJobInterval = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000);
-    retireJobCheckInterval = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
     retiredJobsCacheSize = 
       conf.getInt("mapred.job.tracker.retiredjobs.cache.size", 1000);
-    // min time before retire
-    MIN_TIME_BEFORE_RETIRE = conf.getInt("mapred.jobtracker.retirejob.interval.min", 60000);
-    MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100);
     MAX_BLACKLISTS_PER_TRACKER = 
         conf.getInt("mapred.max.tracker.blacklists", 4);
     NUM_HEARTBEATS_IN_SECOND = 
@@ -1882,8 +1842,6 @@
 
     // This is a directory of temporary submission files.  We delete it
     // on startup, and can delete any files that we're done with
-    this.conf = conf;
-    JobConf jobConf = new JobConf(conf);
 
     initializeTaskMemoryRelatedConfig();
 
@@ -1893,6 +1851,7 @@
 
     Configuration queuesConf = new Configuration(this.conf);
     queueManager = new QueueManager(queuesConf);
+    this.trackerIdentifier = jobtrackerIdentifier;
     
     // Create the scheduler
     Class<? extends TaskScheduler> schedulerClass
@@ -1959,7 +1918,7 @@
     infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
     infoServer.start();
     
-    trackerIdentifier = getDateFormat().format(new Date());
+    
 
     // Initialize instrumentation
     //this operation is synchronized to stop findbugs warning of inconsistent
@@ -1994,7 +1953,7 @@
     // start the recovery manager
     recoveryManager = new RecoveryManager();
     
-    while (true) {
+    while (!Thread.currentThread().isInterrupted()) {
       try {
         // if we haven't contacted the namenode go ahead and do it
         if (fs == null) {
@@ -2009,8 +1968,14 @@
           systemDir = new Path(getSystemDir());    
         }
         // Make sure that the backup data is preserved
-        FileStatus[] systemDirData = fs.listStatus(this.systemDir);
-        // Check if the history is enabled .. as we cant have persistence with 
+        FileStatus[] systemDirData;
+        try {
+          systemDirData = fs.listStatus(this.systemDir);
+        } catch (FileNotFoundException fnfe) {
+          systemDirData = null;
+        }
+        
+        // Check if the history is enabled .. as we can't have persistence with 
         // history disabled
         if (conf.getBoolean("mapred.jobtracker.restart.recover", false) 
             && !JobHistory.isDisableHistory()
@@ -2037,12 +2002,14 @@
           break;
         }
         LOG.error("Mkdirs failed to create " + systemDir);
+      } catch (AccessControlException ace) {
+        LOG.warn("Failed to operate on mapred.system.dir (" + systemDir 
+                 + ") because of permissions.");
+        LOG.warn("Manually delete the mapred.system.dir (" + systemDir 
+                 + ") and then start the JobTracker.");
+        LOG.warn("Bailing out ... ");
+        throw ace;
       } catch (IOException ie) {
-        if (ie instanceof RemoteException && 
-            AccessControlException.class.getName().equals(
-                ((RemoteException)ie).getClassName())) {
-          throw ie;
-        }
         LOG.info("problem cleaning system directory: " + systemDir + ": " + ie,
                 ie);
       }
@@ -2054,6 +2021,10 @@
       }
     }
     
+    if (Thread.currentThread().isInterrupted()) {
+      throw new IOException("Interrupted during startup");
+    }
+    
     // Same with 'localDir' except it's always on the local disk.
     jobConf.deleteLocalFiles(SUBDIR);
 
@@ -2085,6 +2056,10 @@
     return new SimpleDateFormat("yyyyMMddHHmm");
   }
 
+  private static String generateNewIdentifier() {
+    return getDateFormat().format(new Date());
+  }
+  
   static boolean validateIdentifier(String id) {
     try {
       // the jobtracker id should be 'date' parseable
@@ -2217,8 +2192,6 @@
     
     startExpireTrackersThread();
 
-    this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
-    this.retireJobsThread.start();
     expireLaunchingTaskThread.start();
 
     synchronized (this) {
@@ -2273,8 +2246,11 @@
    */
   @Override
   protected void innerClose() throws IOException {
-    JobEndNotifier.stopNotifier();
-    closeJobTracker();
+      try {
+          JobEndNotifier.stopNotifier();
+      } finally {
+          closeJobTracker();
+      }
   }
 
   /**
@@ -2302,7 +2278,6 @@
     }
 
     stopExpireTrackersThread();
-    retireThread("retirer", retireJobsThread);
     if (taskScheduler != null) {
       taskScheduler.terminate();
       taskScheduler = null;
@@ -2369,6 +2344,16 @@
     return "JobTracker";
   }
 
+  /**
+   * Get the current number of workers
+   *
+   * @return the number of task trackers
+   */
+  @Override
+  public int getLiveWorkerCount() {
+    return getNumResolvedTaskTrackers();
+  }
+
   ///////////////////////////////////////////////////////
   // Maintain lookup tables; called by JobInProgress
   // and TaskInProgress
@@ -2500,11 +2485,7 @@
   /**
    * Call {@link #removeTaskEntry(String)} for each of the
    * job's tasks.
-   * When the JobTracker is retiring the long-completed
-   * job, either because it has outlived {@link #retireJobInterval}
-   * or the limit of {@link #MAX_COMPLETE_USER_JOBS_IN_MEMORY} jobs 
-   * has been reached, we can afford to nuke all it's tasks; a little
-   * unsafe, but practically feasible. 
+   * When the job is retiring we can afford to nuke all it's tasks
    * 
    * @param job the job about to be 'retired'
    */
@@ -2556,8 +2537,6 @@
     final JobTrackerInstrumentation metrics = getInstrumentation();
     metrics.finalizeJob(conf, id);
     
-    long now = clock.getTime();
-    
     // mark the job for cleanup at all the trackers
     addJobForCleanup(id);
 
@@ -2569,74 +2548,6 @@
         }
       }
     }
-    
-    // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
-    // in memory; information about the purged jobs is available via
-    // JobHistory.
-    synchronized (jobs) {
-      synchronized (taskScheduler) {
-        synchronized (userToJobsMap) {
-          String jobUser = job.getProfile().getUser();
-          if (!userToJobsMap.containsKey(jobUser)) {
-            userToJobsMap.put(jobUser, 
-                              new ArrayList<JobInProgress>());
-          }
-          ArrayList<JobInProgress> userJobs = 
-            userToJobsMap.get(jobUser);
-          synchronized (userJobs) {
-            // Add the currently completed 'job'
-            userJobs.add(job);
-
-            // Check if we need to retire some jobs of this user
-            while (userJobs.size() > 
-                   MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
-              JobInProgress rjob = userJobs.get(0);
-
-              // do not retire jobs that finished in the very recent past.
-              if (rjob.getFinishTime() + MIN_TIME_BEFORE_RETIRE > now) {
-                break;
-              }
-                
-              // Cleanup all datastructures
-              int rjobRunState = 
-                rjob.getStatus().getRunState();
-              if (rjobRunState == JobStatus.SUCCEEDED || 
-                  rjobRunState == JobStatus.FAILED ||
-                  rjobRunState == JobStatus.KILLED) {
-                // Ok, this call to removeTaskEntries
-                // is dangerous is some very very obscure
-                // cases; e.g. when rjob completed, hit
-                // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
-                // limit and yet some task (taskid)
-                // wasn't complete!
-                removeJobTasks(rjob);
-                  
-                userJobs.remove(0);
-                jobs.remove(rjob.getProfile().getJobID());
-                for (JobInProgressListener listener : jobInProgressListeners) {
-                  listener.jobRemoved(rjob);
-                }
-                  
-                LOG.info("Retired job with id: '" + 
-                         rjob.getProfile().getJobID() + "' of user: '" +
-                         jobUser + "'");
-                // clean up job files from the local disk
-                JobHistory.JobInfo.cleanupJob(rjob.getProfile().getJobID());
-                retireJobs.addToCache(rjob.getStatus());
-              } else {
-                // Do not remove jobs that aren't complete.
-                // Stop here, and let the next pass take
-                // care of purging jobs.
-                break;
-              }
-            }
-          }
-          if (userJobs.isEmpty()) {
-            userToJobsMap.remove(jobUser);
-          }
-        }
-      }
-    }
   }
 
   ///////////////////////////////////////////////////////
@@ -4368,20 +4279,40 @@
   public static void main(String argv[]
                           ) throws IOException, InterruptedException {
     StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG);
-    if (argv.length != 0) {
-      System.out.println("usage: JobTracker");
-      System.exit(-1);
-    }
-      
+    
     try {
-      JobTracker tracker = startTracker(new JobConf());
-      tracker.offerService();
+      if(argv.length == 0) {
+        JobTracker tracker = startTracker(new JobConf());
+        tracker.offerService();
+      }
+      else {
+        if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) {
+          dumpConfiguration(new PrintWriter(System.out));
+        }
+        else {
+          System.out.println("usage: JobTracker [-dumpConfiguration]");
+          System.exit(-1);
+        }
+      }
     } catch (Throwable e) {
       LOG.fatal(StringUtils.stringifyException(e));
       System.exit(-1);
     }
   }
 
+  /**
+   * Dumps the configuration properties in Json format
+   * @param writer {@link}Writer object to which the output is written
+   * @throws IOException
+   */
+  private static void dumpConfiguration(Writer writer) throws IOException {
+    Configuration.dumpConfiguration(new JobConf(), writer);
+    writer.write("\n");
+    // get the QueueManager configuration properties
+    QueueManager.dumpConfiguration(writer);
+    writer.write("\n");
+  }
+
   @Override
   public JobQueueInfo[] getQueues() throws IOException {
     return queueManager.getJobQueueInfos();

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Aug 26 15:01:29 2009
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -26,15 +28,17 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.JobTrackerMetricsInst;
-import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapred.JobClient.RawSplit;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
@@ -82,9 +86,16 @@
     return rawSplits;
   }
 
-  private class Job extends Thread
-    implements TaskUmbilicalProtocol {
-    private Path file;
+  private class Job extends Thread implements TaskUmbilicalProtocol {
+    // The job directory on the system: JobClient places job configurations here.
+    // This is analogous to JobTracker's system directory.
+    private Path systemJobDir;
+    private Path systemJobFile;
+    
+    // The job directory for the task.  Analagous to a task's job directory.
+    private Path localJobDir;
+    private Path localJobFile;
+
     private JobID id;
     private JobConf job;
 
@@ -92,10 +103,12 @@
     private ArrayList<TaskAttemptID> mapIds = new ArrayList<TaskAttemptID>();
 
     private JobProfile profile;
-    private Path localFile;
     private FileSystem localFs;
     boolean killed = false;
     
+    private TrackerDistributedCacheManager trackerDistributerdCacheManager;
+    private TaskDistributedCacheManager taskDistributedCacheManager;
+    
     // Counters summed over all the map/reduce tasks which
     // have successfully completed
     private Counters completedTaskCounters = new Counters();
@@ -108,15 +121,55 @@
     }
     
     public Job(JobID jobid, JobConf conf) throws IOException {
-      this.file = new Path(getSystemDir(), jobid + "/job.xml");
+      this.systemJobDir = new Path(getSystemDir(), jobid.toString());
+      this.systemJobFile = new Path(systemJobDir, "job.xml");
       this.id = jobid;
-
-      this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
       this.localFs = FileSystem.getLocal(conf);
+      this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir));
+      this.localJobFile = new Path(this.localJobDir, id + ".xml");
 
-      fs.copyToLocalFile(file, localFile);
-      this.job = new JobConf(localFile);
-      profile = new JobProfile(job.getUser(), id, file.toString(), 
+      // Manage the distributed cache.  If there are files to be copied,
+      // this will trigger localFile to be re-written again.
+      this.trackerDistributerdCacheManager =
+          new TrackerDistributedCacheManager(conf);
+      this.taskDistributedCacheManager = 
+          trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
+      taskDistributedCacheManager.setup(
+          new LocalDirAllocator("mapred.local.dir"), 
+          new File(systemJobDir.toString()),
+          "archive");
+      
+      if (DistributedCache.getSymlink(conf)) {
+        // This is not supported largely because, 
+        // for a Child subprocess, the cwd in LocalJobRunner
+        // is not a fresh slate, but rather the user's working directory.
+        // This is further complicated because the logic in
+        // setupWorkDir only creates symlinks if there's a jarfile
+        // in the configuration.
+        LOG.warn("LocalJobRunner does not support " +
+        		"symlinking into current working dir.");
+      }
+      // Setup the symlinks for the distributed cache.
+      TaskRunner.setupWorkDir(conf, new File(localJobDir.toUri()).getAbsoluteFile());
+      
+      // Write out configuration file.  Instead of copying it from
+      // systemJobFile, we re-write it, since setup(), above, may have
+      // updated it.
+      OutputStream out = localFs.create(localJobFile);
+      try {
+        conf.writeXml(out);
+      } finally {
+        out.close();
+      }
+      this.job = new JobConf(localJobFile);
+      
+      // Job (the current object) is a Thread, so we wrap its class loader.
+      if (!taskDistributedCacheManager.getClassPaths().isEmpty()) {
+        setContextClassLoader(taskDistributedCacheManager.makeClassLoader(
+                getContextClassLoader()));
+      }
+      
+      profile = new JobProfile(job.getUser(), id, systemJobFile.toString(), 
                                "http://localhost:8080/", job.getJobName());
       status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING, 
           profile.getUser(), profile.getJobName(), profile.getJobFile(), 
@@ -174,7 +227,7 @@
             TaskAttemptID mapId = new TaskAttemptID(
                 new TaskID(jobId, TaskType.MAP, i),0);  
             mapIds.add(mapId);
-            MapTask map = new MapTask(file.toString(),  
+            MapTask map = new MapTask(systemJobFile.toString(),  
                                       mapId, i,
                                       rawSplits[i].getClassName(),
                                       rawSplits[i].getBytes(), 1);
@@ -185,7 +238,7 @@
             mapOutput.setConf(localConf);
             mapOutputFiles.put(mapId, mapOutput);
 
-            map.setJobFile(localFile.toString());
+            map.setJobFile(localJobFile.toString());
             map.localizeConfiguration(localConf);
             map.setConf(localConf);
             map_tasks += 1;
@@ -202,7 +255,7 @@
           new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
         try {
           if (numReduceTasks > 0) {
-            ReduceTask reduce = new ReduceTask(file.toString(), 
+            ReduceTask reduce = new ReduceTask(systemJobFile.toString(), 
                 reduceId, 0, mapIds.size(), 1);
             JobConf localConf = new JobConf(job);
             TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
@@ -227,7 +280,7 @@
               }
             }
             if (!this.isInterrupted()) {
-              reduce.setJobFile(localFile.toString());
+              reduce.setJobFile(localJobFile.toString());
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
               reduce_tasks += 1;
@@ -275,8 +328,11 @@
 
       } finally {
         try {
-          fs.delete(file.getParent(), true);  // delete submit dir
-          localFs.delete(localFile, true);              // delete local copy
+          fs.delete(systemJobFile.getParent(), true);  // delete submit dir
+          localFs.delete(localJobFile, true);              // delete local copy
+          // Cleanup distributed cache
+          taskDistributedCacheManager.release();
+          trackerDistributerdCacheManager.purgeCache();
         } catch (IOException e) {
           LOG.warn("Error cleaning up "+id+": "+e);
         }
@@ -489,5 +545,5 @@
   @Override
   public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException{
     return null;
-}
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTaskRunner.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTaskRunner.java Wed Aug 26 15:01:29 2009
@@ -20,6 +20,7 @@
 import java.io.*;
 
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.log4j.Level;
 
 /** Runs a map task. */
 class MapTaskRunner extends TaskRunner {
@@ -62,4 +63,10 @@
     return jobConf.get(JobConf.MAPRED_MAP_TASK_ENV, super.getChildEnv(jobConf));
   }
 
+  @Override
+  public Level getLogLevel(JobConf jobConf) {
+    return Level.toLevel(jobConf.get(JobConf.MAPRED_MAP_TASK_LOG_LEVEL, 
+                                     JobConf.DEFAULT_LOG_LEVEL.toString()));
+  }
+
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java Wed Aug 26 15:01:29 2009
@@ -19,6 +19,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Writer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -440,4 +442,18 @@
       this.queues.put(queue.getName(), queue);
     }
   }
+
+  /**
+   * prints the configuration of QueueManager in Json format.
+   * The method should be modified accordingly whenever
+   * QueueManager(Configuration) constructor is modified.
+   * @param writer {@link}Writer object to which the configuration properties 
+   * are printed in json format
+   * @throws IOException
+   */
+  static void dumpConfiguration(Writer writer) throws IOException {
+    Configuration conf = new Configuration(false);
+    conf.addResource(QUEUE_CONF_FILE_NAME);
+    Configuration.dumpConfiguration(conf, writer);
+  }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Aug 26 15:01:29 2009
@@ -2760,10 +2760,12 @@
                 maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP, 
                   getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
               }
+              // send the full attempt ID of the reduce task to enable full 
+              // causal tracing of shuffle from map attempt to reduce attempt
               URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + 
                                       "/mapOutput?job=" + taskId.getJobID() +
                                       "&map=" + taskId + 
-                                      "&reduce=" + getPartition());
+                                      "&reduce=" + reduceTask.getTaskID());
               List<MapOutputLocation> loc = mapLocations.get(host);
               if (loc == null) {
                 loc = Collections.synchronizedList

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Wed Aug 26 15:01:29 2009
@@ -20,6 +20,7 @@
 import java.io.*;
 
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.log4j.Level;
 
 /** Runs a reduce task. */
 class ReduceTaskRunner extends TaskRunner {
@@ -68,4 +69,10 @@
                        super.getChildEnv(jobConf));
   }
 
+  @Override
+  public Level getLogLevel(JobConf jobConf) {
+    return Level.toLevel(jobConf.get(JobConf.MAPRED_REDUCE_TASK_LOG_LEVEL, 
+                                     JobConf.DEFAULT_LOG_LEVEL.toString()));
+  }
+
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java Wed Aug 26 15:01:29 2009
@@ -33,8 +33,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FSError;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -42,6 +43,7 @@
 import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
 
 /** Base class that runs a task in a separate process.  Tasks are run in a
  * separate process in order to isolate the map/reduce system code from bugs in
@@ -63,6 +65,7 @@
 
   
   private TaskTracker tracker;
+  private TaskDistributedCacheManager taskDistributedCacheManager;
 
   protected JobConf conf;
   JvmManager jvmManager;
@@ -100,18 +103,6 @@
    */
   public void close() throws IOException {}
 
-  private static String stringifyPathArray(Path[] p){
-    if (p == null){
-      return null;
-    }
-    StringBuffer str = new StringBuffer(p[0].toString());
-    for (int i = 1; i < p.length; i++){
-      str.append(",");
-      str.append(p[i].toString());
-    }
-    return str.toString();
-  }
-  
 
   /**
    * Get the java command line options for the child map/reduce tasks.
@@ -154,6 +145,13 @@
     return jobConf.get(JobConf.MAPRED_TASK_ENV);
   }
 
+  /**
+   * Get the log {@link Level} for the child map/reduce tasks.
+   * @param jobConf
+   * @return the log-level for the child map/reduce tasks
+   */
+  public abstract Level getLogLevel(JobConf jobConf);
+  
   @Override
   public final void run() {
     String errorInfo = "Child Error";
@@ -165,11 +163,12 @@
       LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
       File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
 
-      URI[] archives = DistributedCache.getCacheArchives(conf);
-      URI[] files = DistributedCache.getCacheFiles(conf);
       // We don't create any symlinks yet, so presence/absence of workDir
       // actually on the file system doesn't matter.
-      setupDistributedCache(lDirAlloc, workDir, archives, files);
+      taskDistributedCacheManager = tracker.getTrackerDistributedCacheManager()
+          .newTaskDistributedCacheManager(conf);
+      taskDistributedCacheManager.setup(
+          lDirAlloc, workDir, TaskTracker.getDistributedCacheDir());
 
       // Set up the child task's configuration. After this call, no localization
       // of files should happen in the TaskTracker's process space. Any changes to
@@ -181,7 +180,8 @@
       }
 
       // Build classpath
-      List<String> classPaths = getClassPaths(conf, workDir, archives, files);
+      List<String> classPaths =
+          getClassPaths(conf, workDir, taskDistributedCacheManager);
 
       long logSize = TaskLog.getTaskLogLength(conf);
 
@@ -241,18 +241,8 @@
       }
     } finally {
       try{
-        URI[] archives = DistributedCache.getCacheArchives(conf);
-        URI[] files = DistributedCache.getCacheFiles(conf);
-        if (archives != null){
-          for (int i = 0; i < archives.length; i++){
-            DistributedCache.releaseCache(archives[i], conf);
-          }
-        }
-        if (files != null){
-          for(int i = 0; i < files.length; i++){
-            DistributedCache.releaseCache(files[i], conf);
-          }
-        }
+        taskDistributedCacheManager.release();
+
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
@@ -411,7 +401,7 @@
     vargs.add("-Dhadoop.log.dir=" + 
         new File(System.getProperty("hadoop.log.dir")
         ).getAbsolutePath());
-    vargs.add("-Dhadoop.root.logger=INFO,TLA");
+    vargs.add("-Dhadoop.root.logger=" + getLogLevel(conf).toString() + ",TLA");
     vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
     vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
 
@@ -462,7 +452,7 @@
   /**
    */
   private static List<String> getClassPaths(JobConf conf, File workDir,
-      URI[] archives, URI[] files)
+      TaskDistributedCacheManager taskDistributedCacheManager)
       throws IOException {
     // Accumulates class paths for child.
     List<String> classPaths = new ArrayList<String>();
@@ -473,7 +463,7 @@
     appendJobJarClasspaths(conf.getJar(), classPaths);
     
     // Distributed cache paths
-    appendDistributedCacheClasspaths(conf, archives, files, classPaths);
+    classPaths.addAll(taskDistributedCacheManager.getClassPaths());
     
     // Include the working dir too
     classPaths.add(workDir.toString());
@@ -592,105 +582,6 @@
     return new File(workDir.toString());
   }
 
-  private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir,
-      URI[] archives, URI[] files) throws IOException {
-    FileStatus fileStatus;
-    FileSystem fileSystem;
-    Path localPath;
-    String baseDir;
-    if ((archives != null) || (files != null)) {
-      if (archives != null) {
-        String[] archivesTimestamps = 
-                             DistributedCache.getArchiveTimestamps(conf);
-        Path[] p = new Path[archives.length];
-        for (int i = 0; i < archives.length;i++){
-          fileSystem = FileSystem.get(archives[i], conf);
-          fileStatus = fileSystem.getFileStatus(
-                                    new Path(archives[i].getPath()));
-          String cacheId = DistributedCache.makeRelative(archives[i],conf);
-          String cachePath = TaskTracker.getDistributedCacheDir() + 
-                               Path.SEPARATOR + cacheId;
-          
-          localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                    fileStatus.getLen(), conf);
-          baseDir = localPath.toString().replace(cacheId, "");
-          p[i] = DistributedCache.getLocalCache(archives[i], conf, 
-                                                new Path(baseDir),
-                                                fileStatus,
-                                                true, Long.parseLong(
-                                                      archivesTimestamps[i]),
-                                                new Path(workDir.
-                                                      getAbsolutePath()), 
-                                                false);
-          
-        }
-        DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
-      }
-      if ((files != null)) {
-        String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
-        Path[] p = new Path[files.length];
-        for (int i = 0; i < files.length;i++){
-          fileSystem = FileSystem.get(files[i], conf);
-          fileStatus = fileSystem.getFileStatus(
-                                    new Path(files[i].getPath()));
-          String cacheId = DistributedCache.makeRelative(files[i], conf);
-          String cachePath = TaskTracker.getDistributedCacheDir() +
-                               Path.SEPARATOR + cacheId;
-          
-          localPath = lDirAlloc.getLocalPathForWrite(cachePath,
-                                    fileStatus.getLen(), conf);
-          baseDir = localPath.toString().replace(cacheId, "");
-          p[i] = DistributedCache.getLocalCache(files[i], conf, 
-                                                new Path(baseDir),
-                                                fileStatus,
-                                                false, Long.parseLong(
-                                                         fileTimestamps[i]),
-                                                new Path(workDir.
-                                                      getAbsolutePath()), 
-                                                false);
-        }
-        DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
-      }
-    }
-  }
-
-  private static void appendDistributedCacheClasspaths(JobConf conf,
-      URI[] archives, URI[] files, List<String> classPaths)
-      throws IOException {
-    // Archive paths
-    Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
-    if (archiveClasspaths != null && archives != null) {
-      Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
-      if (localArchives != null){
-        for (int i=0;i<archives.length;i++){
-          for(int j=0;j<archiveClasspaths.length;j++){
-            if (archives[i].getPath().equals(
-                                             archiveClasspaths[j].toString())){
-              classPaths.add(localArchives[i].toString());
-            }
-          }
-        }
-      }
-    }
-    
-    //file paths
-    Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
-    if (fileClasspaths!=null && files != null) {
-      Path[] localFiles = DistributedCache
-        .getLocalCacheFiles(conf);
-      if (localFiles != null) {
-        for (int i = 0; i < files.length; i++) {
-          for (int j = 0; j < fileClasspaths.length; j++) {
-            if (files[i].getPath().equals(
-                                          fileClasspaths[j].toString())) {
-              classPaths.add(localFiles[i].toString());
-            }
-          }
-        }
-      }
-    }
-  }
-
   private static void appendSystemClasspaths(List<String> classPaths) {
     for (String c : System.getProperty("java.class.path").split(
         SYSTEM_PATH_SEPARATOR)) {
@@ -724,12 +615,22 @@
     classPaths.add(jobCacheDir.toString());
   }
   
-  //Mostly for setting up the symlinks. Note that when we setup the distributed
-  //cache, we didn't create the symlinks. This is done on a per task basis
-  //by the currently executing task.
-  public static void setupWorkDir(JobConf conf) throws IOException {
-    File workDir = new File(".").getAbsoluteFile();
+  /**
+   * Creates distributed cache symlinks and tmp directory, as appropriate.
+   * Note that when we setup the distributed
+   * cache, we didn't create the symlinks. This is done on a per task basis
+   * by the currently executing task.
+   * 
+   * @param conf The job configuration.
+   * @param workDir Working directory, which is completely deleted.
+   */
+  public static void setupWorkDir(JobConf conf, File workDir) throws IOException {
+    LOG.debug("Fully deleting and re-creating" + workDir);
     FileUtil.fullyDelete(workDir);
+    if (!workDir.mkdir()) {
+      LOG.debug("Did not recreate " + workDir);
+    }
+    
     if (DistributedCache.getSymlink(conf)) {
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
@@ -738,48 +639,58 @@
       if (archives != null) {
         for (int i = 0; i < archives.length; i++) {
           String link = archives[i].getFragment();
-          if (link != null) {
-            link = workDir.toString() + Path.SEPARATOR + link;
-            File flink = new File(link);
-            if (!flink.exists()) {
-              FileUtil.symLink(localArchives[i].toString(), link);
-            }
-          }
+          String target = localArchives[i].toString();
+          symlink(workDir, target, link);
         }
       }
       if (files != null) {
         for (int i = 0; i < files.length; i++) {
           String link = files[i].getFragment();
-          if (link != null) {
-            link = workDir.toString() + Path.SEPARATOR + link;
-            File flink = new File(link);
-            if (!flink.exists()) {
-              FileUtil.symLink(localFiles[i].toString(), link);
-            }
-          }
+          String target = localFiles[i].toString();
+          symlink(workDir, target, link);
         }
       }
     }
-    File jobCacheDir = null;
+    
+    // For streaming, create extra symlinks (for all the files
+    // in the job cache dir) in the current working directory.
+    // Note that this is only executed if the configuration 
+    // points to a jar file.
     if (conf.getJar() != null) {
-      jobCacheDir = new File(
+      File jobCacheDir = new File(
           new Path(conf.getJar()).getParent().toString());
-    }
-
-    // create symlinks for all the files in job cache dir in current
-    // workingdir for streaming
-    try{
-      DistributedCache.createAllSymlink(conf, jobCacheDir,
-          workDir);
-    } catch(IOException ie){
-      // Do not exit even if symlinks have not been created.
-      LOG.warn(StringUtils.stringifyException(ie));
+      try{
+        TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir,
+            workDir);
+      } catch(IOException ie){
+        // Do not exit even if symlinks have not been created.
+        LOG.warn(StringUtils.stringifyException(ie));
+      }
     }
 
     createChildTmpDir(workDir, conf);
   }
 
   /**
+   * Utility method for creating a symlink and warning on errors.
+   * 
+   * If link is null, does nothing.
+   */
+  private static void symlink(File workDir, String target, String link)
+      throws IOException {
+    if (link != null) {
+      link = workDir.toString() + Path.SEPARATOR + link;
+      File flink = new File(link);
+      if (!flink.exists()) {
+        LOG.info(String.format("Creating symlink: %s <- %s", target, link));
+        if (0 != FileUtil.symLink(target, link)) {
+          LOG.warn(String.format("Failed to create symlink: %s <- %s", target, link));
+        }
+      }
+    }
+  }
+
+  /**
    * Kill the child process
    */
   public void kill() {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Aug 26 15:01:29 2009
@@ -51,7 +51,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -70,6 +69,7 @@
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -133,6 +133,7 @@
         ", bytes: %s" + // byte count
         ", op: %s" +    // operation
         ", cliID: %s" + // task id
+        ", reduceID: %s" + // reduce id
         ", duration: %s"; // duration
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
@@ -151,6 +152,8 @@
 
   Server taskReportServer = null;
   InterTrackerProtocol jobClient;
+  
+  private TrackerDistributedCacheManager distributedCacheManager;
     
   // last heartbeat response recieved
   short heartbeatResponseId = -1;
@@ -556,8 +559,11 @@
     this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
     LOG.info("Starting tracker " + taskTrackerName);
 
-    // Clear out temporary files that might be lying around
-    DistributedCache.purgeCache(this.fConf);
+    // Initialize DistributedCache and
+    // clear out temporary files that might be lying around
+    this.distributedCacheManager = 
+        new TrackerDistributedCacheManager(this.fConf);
+    this.distributedCacheManager.purgeCache();
     cleanupStorage();
 
     //mark as just started; this is used in heartbeats
@@ -3295,6 +3301,7 @@
     public void doGet(HttpServletRequest request, 
                       HttpServletResponse response
                       ) throws ServletException, IOException {
+      TaskAttemptID reduceAttId = null;
       String mapId = request.getParameter("map");
       String reduceId = request.getParameter("reduce");
       String jobId = request.getParameter("job");
@@ -3306,8 +3313,13 @@
       if (mapId == null || reduceId == null) {
         throw new IOException("map and reduce parameters are required");
       }
+      try {
+        reduceAttId = TaskAttemptID.forName(reduceId);
+      } catch (IllegalArgumentException e) {
+        throw new IOException("reduce attempt ID is malformed");
+      }
       ServletContext context = getServletContext();
-      int reduce = Integer.parseInt(reduceId);
+      int reduce = reduceAttId.getTaskID().getId();
       byte[] buffer = new byte[MAX_BYTES_TO_READ];
       // true iff IOException was caused by attempt to access input
       boolean isInputException = true;
@@ -3422,7 +3434,8 @@
           ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT,
                 request.getLocalAddr() + ":" + request.getLocalPort(),
                 request.getRemoteAddr() + ":" + request.getRemotePort(),
-                totalRead, "MAPRED_SHUFFLE", mapId, endTime-startTime));
+                totalRead, "MAPRED_SHUFFLE", mapId, reduceId,
+                endTime-startTime));
         }
       }
       outStream.close();
@@ -3543,7 +3556,26 @@
         maxMapSlots * mapSlotMemorySizeOnTT + maxReduceSlots
             * reduceSlotSizeMemoryOnTT;
     if (totalMemoryAllottedForTasks < 0) {
-      totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
+      //adding check for the old keys which might be used by the administrator
+      //while configuration of the memory monitoring on TT
+      long memoryAllotedForSlot = fConf.normalizeMemoryConfigValue(
+          fConf.getLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, 
+              JobConf.DISABLED_MEMORY_LIMIT));
+      long limitVmPerTask = fConf.normalizeMemoryConfigValue(
+          fConf.getLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, 
+              JobConf.DISABLED_MEMORY_LIMIT));
+      if(memoryAllotedForSlot == JobConf.DISABLED_MEMORY_LIMIT) {
+        totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT; 
+      } else {
+        if(memoryAllotedForSlot > limitVmPerTask) {
+          LOG.info("DefaultMaxVmPerTask is mis-configured. " +
+          		"It shouldn't be greater than task limits");
+          totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
+        } else {
+          totalMemoryAllottedForTasks = (maxMapSlots + 
+              maxReduceSlots) *  (memoryAllotedForSlot/(1024 * 1024));
+        }
+      }
     }
     if (totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT) {
       LOG.info("totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT."
@@ -3620,6 +3652,9 @@
     healthChecker.start();
   }
 
+  TrackerDistributedCacheManager getTrackerDistributedCacheManager() {
+    return distributedCacheManager;
+  }
 
   /**
    * Thread that handles cleanup
@@ -3680,5 +3715,7 @@
       }
       LOG.debug("Task cleanup thread ending");
     }
+
   }
+
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java?rev=808036&r1=808035&r2=808036&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/ArrayListBackedIterator.java Wed Aug 26 15:01:29 2009
@@ -17,73 +17,28 @@
  */
 package org.apache.hadoop.mapred.join;
 
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
 
 /**
  * This class provides an implementation of ResetableIterator. The
  * implementation uses an {@link java.util.ArrayList} to store elements
  * added to it, replaying them as requested.
  * Prefer {@link StreamBackedIterator}.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.join.ArrayListBackedIterator} instead
  */
-public class ArrayListBackedIterator<X extends Writable>
+@Deprecated
+public class ArrayListBackedIterator<X extends Writable> extends 
+    org.apache.hadoop.mapreduce.lib.join.ArrayListBackedIterator<X>
     implements ResetableIterator<X> {
 
-  private Iterator<X> iter;
-  private ArrayList<X> data;
-  private X hold = null;
-
   public ArrayListBackedIterator() {
-    this(new ArrayList<X>());
+    super();
   }
 
   public ArrayListBackedIterator(ArrayList<X> data) {
-    this.data = data;
-    this.iter = this.data.iterator();
-  }
-
-  public boolean hasNext() {
-    return iter.hasNext();
-  }
-
-  public boolean next(X val) throws IOException {
-    if (iter.hasNext()) {
-      WritableUtils.cloneInto(val, iter.next());
-      if (null == hold) {
-        hold = WritableUtils.clone(val, null);
-      } else {
-        WritableUtils.cloneInto(hold, val);
-      }
-      return true;
-    }
-    return false;
-  }
-
-  public boolean replay(X val) throws IOException {
-    WritableUtils.cloneInto(val, hold);
-    return true;
-  }
-
-  public void reset() {
-    iter = data.iterator();
-  }
-
-  public void add(X item) throws IOException {
-    data.add(WritableUtils.clone(item, null));
-  }
-
-  public void close() throws IOException {
-    iter = null;
-    data = null;
-  }
-
-  public void clear() {
-    data.clear();
-    reset();
+    super(data);
   }
-
 }



Mime
View raw message