hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r785392 [3/5] - in /hadoop/core/branches/HADOOP-4687/mapred: conf/ lib/ src/c++/ src/c++/task-controller/ src/contrib/dynamic-scheduler/ src/contrib/sqoop/ src/contrib/streaming/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ s...
Date Tue, 16 Jun 2009 20:54:28 GMT
Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/Merger.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/Merger.java Tue Jun 16 20:54:24 2009
@@ -55,13 +55,14 @@
                             int mergeFactor, Path tmpDir,
                             RawComparator<K> comparator, Progressable reporter,
                             Counters.Counter readsCounter,
-                            Counters.Counter writesCounter)
+                            Counters.Counter writesCounter,
+                            Progress mergePhase)
   throws IOException {
     return 
       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
                            reporter).merge(keyClass, valueClass,
                                            mergeFactor, tmpDir,
-                                           readsCounter, writesCounter);
+                                           readsCounter, writesCounter, mergePhase);
   }
   
   public static <K extends Object, V extends Object>
@@ -71,10 +72,11 @@
                             int mergeFactor, Path tmpDir,
                             RawComparator<K> comparator, Progressable reporter,
                             Counters.Counter readsCounter,
-                            Counters.Counter writesCounter)
+                            Counters.Counter writesCounter,
+                            Progress mergePhase)
       throws IOException {
     return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
-                 comparator, reporter, false, readsCounter, writesCounter);
+                 comparator, reporter, false, readsCounter, writesCounter, mergePhase);
   }
 
   public static <K extends Object, V extends Object>
@@ -85,12 +87,33 @@
                             RawComparator<K> comparator, Progressable reporter,
                             boolean sortSegments,
                             Counters.Counter readsCounter,
-                            Counters.Counter writesCounter)
+                            Counters.Counter writesCounter,
+                            Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
                            sortSegments).merge(keyClass, valueClass,
                                                mergeFactor, tmpDir,
-                                               readsCounter, writesCounter);
+                                               readsCounter, writesCounter,
+                                               mergePhase);
+  }
+
+  public static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass,
+                            CompressionCodec codec,
+                            List<Segment<K, V>> segments,
+                            int mergeFactor, Path tmpDir,
+                            RawComparator<K> comparator, Progressable reporter,
+                            boolean sortSegments,
+                            Counters.Counter readsCounter,
+                            Counters.Counter writesCounter,
+                            Progress mergePhase)
+      throws IOException {
+    return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+                           sortSegments, codec).merge(keyClass, valueClass,
+                                               mergeFactor, tmpDir,
+                                               readsCounter, writesCounter,
+                                               mergePhase);
   }
 
   static <K extends Object, V extends Object>
@@ -101,15 +124,38 @@
                             RawComparator<K> comparator, Progressable reporter,
                             boolean sortSegments,
                             Counters.Counter readsCounter,
-                            Counters.Counter writesCounter)
+                            Counters.Counter writesCounter,
+                            Progress mergePhase)
       throws IOException {
     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
                            sortSegments).merge(keyClass, valueClass,
                                                mergeFactor, inMemSegments,
                                                tmpDir,
-                                               readsCounter, writesCounter);
+                                               readsCounter, writesCounter,
+                                               mergePhase);
   }
 
+
+  static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                          Class<K> keyClass, Class<V> valueClass,
+                          CompressionCodec codec,
+                          List<Segment<K, V>> segments,
+                          int mergeFactor, int inMemSegments, Path tmpDir,
+                          RawComparator<K> comparator, Progressable reporter,
+                          boolean sortSegments,
+                          Counters.Counter readsCounter,
+                          Counters.Counter writesCounter,
+                          Progress mergePhase)
+    throws IOException {
+  return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
+                         sortSegments, codec).merge(keyClass, valueClass,
+                                             mergeFactor, inMemSegments,
+                                             tmpDir,
+                                             readsCounter, writesCounter,
+                                             mergePhase);
+}
+
   public static <K extends Object, V extends Object>
   void writeFile(RawKeyValueIterator records, Writer<K, V> writer, 
                  Progressable progressable, Configuration conf) 
@@ -235,6 +281,20 @@
     }
   }
   
+  // Boolean variable for including/considering final merge as part of sort
+  // phase or not. This is true in map task, false in reduce task. It is
+  // used in calculating mergeProgress.
+  static boolean includeFinalMerge = false;
+  
+  /**
+   * Sets the boolean variable includeFinalMerge to true. Called from
+   * map task before calling merge() so that final merge of map task
+   * is also considered as part of sort phase.
+   */
+  static void considerFinalMergeForProgress() {
+    includeFinalMerge = true;
+  }
+  
   private static class MergeQueue<K extends Object, V extends Object> 
   extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
     Configuration conf;
@@ -306,6 +366,13 @@
       }
     }
 
+    public MergeQueue(Configuration conf, FileSystem fs,
+        List<Segment<K, V>> segments, RawComparator<K> comparator,
+        Progressable reporter, boolean sortSegments, CompressionCodec codec) {
+      this(conf, fs, segments, comparator, reporter, sortSegments);
+      this.codec = codec;
+    }
+
     public void close() throws IOException {
       Segment<K, V> segment;
       while((segment = pop()) != null) {
@@ -386,24 +453,41 @@
     public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
                                      int factor, Path tmpDir,
                                      Counters.Counter readsCounter,
-                                     Counters.Counter writesCounter)
+                                     Counters.Counter writesCounter,
+                                     Progress mergePhase)
         throws IOException {
       return merge(keyClass, valueClass, factor, 0, tmpDir,
-                   readsCounter, writesCounter);
+                   readsCounter, writesCounter, mergePhase);
     }
 
     RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
                                      int factor, int inMem, Path tmpDir,
                                      Counters.Counter readsCounter,
-                                     Counters.Counter writesCounter)
+                                     Counters.Counter writesCounter,
+                                     Progress mergePhase)
         throws IOException {
       LOG.info("Merging " + segments.size() + " sorted segments");
-      
-      //create the MergeStreams from the sorted map created in the constructor
-      //and dump the final output to a file
+
+      /*
+       * If there are inMemory segments, then they come first in the segments
+       * list and then the sorted disk segments. Otherwise(if there are only
+       * disk segments), then they are sorted segments if there are more than
+       * factor segments in the segments list.
+       */
       int numSegments = segments.size();
       int origFactor = factor;
       int passNo = 1;
+      if (mergePhase != null) {
+        mergeProgress = mergePhase;
+      }
+
+      long totalBytes = computeBytesInMerges(factor, inMem);
+      if (totalBytes != 0) {
+        progPerByte = 1.0f / (float)totalBytes;
+      }
+      
+      //create the MergeStreams from the sorted map created in the constructor
+      //and dump the final output to a file
       do {
         //get the factor for this pass of merge. We assume in-memory segments
         //are the first entries in the segment list and that the pass factor
@@ -429,9 +513,9 @@
             long startPos = segment.getPosition();
             boolean hasNext = segment.nextRawKey();
             long endPos = segment.getPosition();
-            startBytes += endPos - startPos;
             
             if (hasNext) {
+              startBytes += endPos - startPos;
               segmentsToMerge.add(segment);
               segmentsConsidered++;
             }
@@ -460,34 +544,39 @@
         //if we have lesser number of segments remaining, then just return the
         //iterator, else do another single level merge
         if (numSegments <= factor) {
-          // Reset totalBytesProcessed to track the progress of the final merge.
-          // This is considered the progress of the reducePhase, the 3rd phase
-          // of reduce task. Currently totalBytesProcessed is not used in sort
-          // phase of reduce task(i.e. when intermediate merges happen).
-          totalBytesProcessed = startBytes;
-          
-          //calculate the length of the remaining segments. Required for 
-          //calculating the merge progress
-          long totalBytes = 0;
-          for (int i = 0; i < segmentsToMerge.size(); i++) {
-            totalBytes += segmentsToMerge.get(i).getLength();
+          if (!includeFinalMerge) { // for reduce task
+
+            // Reset totalBytesProcessed and recalculate totalBytes from the
+            // remaining segments to track the progress of the final merge.
+            // Final merge is considered as the progress of the reducePhase,
+            // the 3rd phase of reduce task.
+            totalBytesProcessed = 0;
+            totalBytes = 0;
+            for (int i = 0; i < segmentsToMerge.size(); i++) {
+              totalBytes += segmentsToMerge.get(i).getLength();
+            }
           }
           if (totalBytes != 0) //being paranoid
             progPerByte = 1.0f / (float)totalBytes;
           
+          totalBytesProcessed += startBytes;         
           if (totalBytes != 0)
             mergeProgress.set(totalBytesProcessed * progPerByte);
           else
             mergeProgress.set(1.0f); // Last pass and no segments left - we're done
           
           LOG.info("Down to the last merge-pass, with " + numSegments + 
-                   " segments left of total size: " + totalBytes + " bytes");
+                   " segments left of total size: " +
+                   (totalBytes - totalBytesProcessed) + " bytes");
           return this;
         } else {
           LOG.info("Merging " + segmentsToMerge.size() + 
                    " intermediate segments out of a total of " + 
                    (segments.size()+segmentsToMerge.size()));
           
+          long bytesProcessedInPrevMerges = totalBytesProcessed;
+          totalBytesProcessed += startBytes;
+
           //we want to spread the creation of temp files on multiple disks if 
           //available under the space constraints
           long approxOutputSize = 0; 
@@ -516,9 +605,27 @@
           // Add the newly create segment to the list of segments to be merged
           Segment<K, V> tempSegment = 
             new Segment<K, V>(conf, fs, outputFile, codec, false);
-          segments.add(tempSegment);
+
+          // Insert new merged segment into the sorted list
+          int pos = Collections.binarySearch(segments, tempSegment,
+                                             segmentComparator);
+          if (pos < 0) {
+            // binary search failed. So position to be inserted at is -pos-1
+            pos = -pos-1;
+          }
+          segments.add(pos, tempSegment);
           numSegments = segments.size();
-          Collections.sort(segments, segmentComparator);
+          
+          // Subtract the difference between expected size of new segment and 
+          // actual size of new segment(Expected size of new segment is
+          // inputBytesOfThisMerge) from totalBytes. Expected size and actual
+          // size will match(almost) if combiner is not called in merge.
+          long inputBytesOfThisMerge = totalBytesProcessed -
+                                       bytesProcessedInPrevMerges;
+          totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
+          if (totalBytes != 0) {
+            progPerByte = 1.0f / (float)totalBytes;
+          }
           
           passNo++;
         }
@@ -560,6 +667,59 @@
       }
       return subList;
     }
+    
+    /**
+     * Compute expected size of input bytes to merges, will be used in
+     * calculating mergeProgress. This simulates the above merge() method and
+     * tries to obtain the number of bytes that are going to be merged in all
+     * merges(assuming that there is no combiner called while merging).
+     * @param factor io.sort.factor
+     * @param inMem  number of segments in memory to be merged
+     */
+    long computeBytesInMerges(int factor, int inMem) {
+      int numSegments = segments.size();
+      List<Long> segmentSizes = new ArrayList<Long>(numSegments);
+      long totalBytes = 0;
+      int n = numSegments - inMem;
+      // factor for 1st pass
+      int f = getPassFactor(factor, 1, n) + inMem;
+      n = numSegments;
+ 
+      for (int i = 0; i < numSegments; i++) {
+        // Not handling empty segments here assuming that it would not affect
+        // much in calculation of mergeProgress.
+        segmentSizes.add(segments.get(i).getLength());
+      }
+      
+      // If includeFinalMerge is true, allow the following while loop iterate
+      // for 1 more iteration. This is to include final merge as part of the
+      // computation of expected input bytes of merges
+      boolean considerFinalMerge = includeFinalMerge;
+      
+      while (n > f || considerFinalMerge) {
+        if (n <=f ) {
+          considerFinalMerge = false;
+        }
+        long mergedSize = 0;
+        f = Math.min(f, segmentSizes.size());
+        for (int j = 0; j < f; j++) {
+          mergedSize += segmentSizes.remove(0);
+        }
+        totalBytes += mergedSize;
+        
+        // insert new size into the sorted list
+        int pos = Collections.binarySearch(segmentSizes, mergedSize);
+        if (pos < 0) {
+          pos = -pos-1;
+        }
+        segmentSizes.add(pos, mergedSize);
+        
+        n -= (f-1);
+        f = factor;
+      }
+
+      return totalBytes;
+    }
 
     public Progress getProgress() {
       return mergeProgress;

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Jun 16 20:54:24 2009
@@ -241,7 +241,7 @@
     }
     
     public void informReduceProgress() {
-      reducePhase.set(super.in.getProgress().get()); // update progress
+      reducePhase.set(super.in.getProgress().getProgress()); // update progress
       reporter.progress();
     }
   }
@@ -392,7 +392,7 @@
           job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
           !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
           new Path(getTaskID().toString()), job.getOutputKeyComparator(),
-          reporter, spilledRecordsCounter, null)
+          reporter, spilledRecordsCounter, null, sortPhase)
       : reduceCopier.createKVIterator(job, rfs, reporter);
         
     // free up the data structures
@@ -522,6 +522,28 @@
                      Class<INVALUE> valueClass
                      ) throws IOException,InterruptedException, 
                               ClassNotFoundException {
+    // wrap value iterator to report progress.
+    final RawKeyValueIterator rawIter = rIter;
+    rIter = new RawKeyValueIterator() {
+      public void close() throws IOException {
+        rawIter.close();
+      }
+      public DataInputBuffer getKey() throws IOException {
+        return rawIter.getKey();
+      }
+      public Progress getProgress() {
+        return rawIter.getProgress();
+      }
+      public DataInputBuffer getValue() throws IOException {
+        return rawIter.getValue();
+      }
+      public boolean next() throws IOException {
+        boolean ret = rawIter.next();
+        reducePhase.set(rawIter.getProgress().getProgress());
+        reporter.progress();
+        return ret;
+      }
+    };
     // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
       new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
@@ -1786,9 +1808,7 @@
       InMemFSMergeThread inMemFSMergeThread = null;
       GetMapEventsThread getMapEventsThread = null;
       
-      for (int i = 0; i < numMaps; i++) {
-        copyPhase.addPhase();       // add sub-phase per file
-      }
+      copyPhase.addPhases(numMaps); // add sub-phase per file
       
       copiers = new ArrayList<MapOutputCopier>(numCopiers);
       
@@ -2243,6 +2263,10 @@
       // segments required to vacate memory
       List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
       long inMemToDiskBytes = 0;
+      // sortPhaseFinished will be set to true if we call merge() separately
+      // here to vacate memory(i.e. there will not be any intermediate merges.
+      // In other words, only final merge is pending).
+      boolean sortPhaseFinished = false;
       if (mapOutputsFilesInMemory.size() > 0) {
         TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
         inMemToDiskBytes = createInMemorySegments(memDiskSegments,
@@ -2250,12 +2274,20 @@
         final int numMemDiskSegments = memDiskSegments.size();
         if (numMemDiskSegments > 0 &&
               ioSortFactor > mapOutputFilesOnDisk.size()) {
+          // As we have < ioSortFactor files on disk now, after this
+          // merging of inMemory segments, we would have at most ioSortFactor
+          // files on disk. So only final merge(directly feeding to reducers)
+          // will be pending. i.e. reduce phase will be pending.
+          sortPhaseFinished = true;
+          
           // must spill to disk, but can't retain in-mem for intermediate merge
           final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
                             reduceTask.getTaskID(), inMemToDiskBytes);
           final RawKeyValueIterator rIter = Merger.merge(job, fs,
               keyClass, valueClass, memDiskSegments, numMemDiskSegments,
-              tmpDir, comparator, reporter, spilledRecordsCounter, null);
+              tmpDir, comparator, reporter, spilledRecordsCounter, null,
+              sortPhase);
+
           final Writer writer = new Writer(job, fs, outputPath,
               keyClass, valueClass, codec, null);
           try {
@@ -2311,10 +2343,12 @@
         final int numInMemSegments = memDiskSegments.size();
         diskSegments.addAll(0, memDiskSegments);
         memDiskSegments.clear();
+        Progress mergePhase = (sortPhaseFinished) ? null : sortPhase; 
         RawKeyValueIterator diskMerge = Merger.merge(
-            job, fs, keyClass, valueClass, diskSegments,
+            job, fs, keyClass, valueClass, codec, diskSegments,
             ioSortFactor, 0 == numInMemSegments ? 0 : numInMemSegments - 1,
-            tmpDir, comparator, reporter, false, spilledRecordsCounter, null);
+            tmpDir, comparator, reporter, false, spilledRecordsCounter, null,
+            mergePhase);
         diskSegments.clear();
         if (0 == finalSegments.size()) {
           return diskMerge;
@@ -2324,7 +2358,7 @@
       }
       return Merger.merge(job, fs, keyClass, valueClass,
                    finalSegments, finalSegments.size(), tmpDir,
-                   comparator, reporter, spilledRecordsCounter, null);
+                   comparator, reporter, spilledRecordsCounter, null, null);
     }
 
     class RawKVIteratorReader extends IFile.Reader<K,V> {
@@ -2465,7 +2499,7 @@
                                   codec, mapFiles.toArray(new Path[mapFiles.size()]), 
                                   true, ioSortFactor, tmpDir, 
                                   conf.getOutputKeyComparator(), reporter,
-                                  spilledRecordsCounter, null);
+                                  spilledRecordsCounter, null, null);
               
               Merger.writeFile(iter, writer, reporter, conf);
               writer.close();
@@ -2562,7 +2596,7 @@
                                inMemorySegments, inMemorySegments.size(),
                                new Path(reduceTask.getTaskID().toString()),
                                conf.getOutputKeyComparator(), reporter,
-                               spilledRecordsCounter, null);
+                               spilledRecordsCounter, null, null);
           
           if (combinerRunner == null) {
             Merger.writeFile(rIter, writer, reporter, conf);

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Tue Jun 16 20:54:24 2009
@@ -88,6 +88,18 @@
   }
 
   @Override
+  public long getMapFinishTime() {
+    throw new UnsupportedOperationException(
+        "getMapFinishTime() not supported for ReduceTask");
+  }
+
+  @Override
+  void setMapFinishTime(long shuffleFinishTime) {
+    throw new UnsupportedOperationException(
+        "setMapFinishTime() not supported for ReduceTask");
+  }
+
+  @Override
   public List<TaskAttemptID> getFetchFailedMaps() {
     return failedFetchTasks;
   }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/ResourceEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/ResourceEstimator.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/ResourceEstimator.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/ResourceEstimator.java Tue Jun 16 20:54:24 2009
@@ -82,7 +82,10 @@
    * @return estimated length of this job's average map output
    */
   long getEstimatedMapOutputSize() {
-    long estimate = getEstimatedTotalMapOutputSize()  / job.desiredMaps();
+    long estimate = 0L;
+    if (job.desiredMaps() > 0) {
+      estimate = getEstimatedTotalMapOutputSize()  / job.desiredMaps();
+    }
     return estimate;
   }
 

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskController.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskController.java Tue Jun 16 20:54:24 2009
@@ -19,10 +19,12 @@
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 /**
@@ -38,6 +40,8 @@
   
   private Configuration conf;
   
+  public static final Log LOG = LogFactory.getLog(TaskController.class);
+  
   public Configuration getConf() {
     return conf;
   }
@@ -63,13 +67,29 @@
                                       throws IOException;
   
   /**
-   * Kill a task JVM
+   * Top level cleanup a task JVM method.
+   *
+   * The current implementation does the following.
+   * <ol>
+   * <li>Sends a graceful terminate signal to task JVM allowing its sub-process
+   * to cleanup.</li>
+   * <li>Waits for stipulated period</li>
+   * <li>Sends a forceful kill signal to task JVM, terminating all its
+   * sub-process forcefully.</li>
+   * </ol>
    * 
-   * This method defines how a JVM launched to execute one or more
-   * tasks will be killed.
-   * @param context
+   * @param context the task for which kill signal has to be sent.
    */
-  abstract void killTaskJVM(TaskControllerContext context);
+  final void destroyTaskJVM(TaskControllerContext context) {
+    terminateTask(context);
+    try {
+      Thread.sleep(context.sleeptimeBeforeSigkill);
+    } catch (InterruptedException e) {
+      LOG.warn("Sleep interrupted : " + 
+          StringUtils.stringifyException(e));
+    }
+    killTask(context);
+  }
   
   /**
    * Perform initializing actions required before a task can run.
@@ -110,4 +130,20 @@
    * @param tip  Task of job for which localization happens.
    */
   abstract void initializeJob(JobID jobId);
+  
+  /**
+   * Sends a graceful terminate signal to taskJVM and it sub-processes. 
+   *   
+   * @param context task context
+   */
+  abstract void terminateTask(TaskControllerContext context);
+  
+  /**
+   * Sends a KILL signal to forcefully terminate the taskJVM and its
+   * sub-processes.
+   * 
+   * @param context task context
+   */
+  
+  abstract void killTask(TaskControllerContext context);
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Jun 16 20:54:24 2009
@@ -30,6 +30,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobClient.RawSplit;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -731,7 +732,10 @@
    * Get the split locations 
    */
   public String[] getSplitLocations() {
-    return rawSplit.getLocations();
+    if (isMapTask() && !jobSetup && !jobCleanup) {
+      return rawSplit.getLocations();
+    }
+    return new String[0];
   }
   
   /**
@@ -916,10 +920,17 @@
     // create the task
     Task t = null;
     if (isMapTask()) {
-      LOG.debug("attempt "+  numTaskFailures   +
-          " sending skippedRecords "+failedRanges.getIndicesCount());
-      t = new MapTask(jobFile, taskid, partition, 
-          rawSplit.getClassName(), rawSplit.getBytes());
+      LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
+          + failedRanges.getIndicesCount());
+      String splitClass = null;
+      BytesWritable split;
+      if (!jobSetup && !jobCleanup) {
+        splitClass = rawSplit.getClassName();
+        split = rawSplit.getBytes();
+      } else {
+        split = new BytesWritable();
+      }
+      t = new MapTask(jobFile, taskid, partition, splitClass, split);
     } else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps);
     }
@@ -1029,7 +1040,7 @@
    * Gets the Node list of input split locations sorted in rack order.
    */ 
   public String getSplitNodes() {
-    if ( rawSplit == null) {
+    if (!isMapTask() || jobSetup || jobCleanup) {
       return "";
     }
     String[] splits = rawSplit.getLocations();

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Tue Jun 16 20:54:24 2009
@@ -51,19 +51,26 @@
   private List<TaskAttemptID> tasksToBeRemoved;
 
   public TaskMemoryManagerThread(TaskTracker taskTracker) {
+    this(taskTracker.getTotalMemoryAllottedForTasksOnTT() * 1024 * 1024L,
+            taskTracker.getJobConf().getLong(
+                "mapred.tasktracker.taskmemorymanager.monitoring-interval", 
+                5000L));         
     this.taskTracker = taskTracker;
+  }
+
+  // mainly for test purposes. note that the tasktracker variable is
+  // not set here.
+  TaskMemoryManagerThread(long maxMemoryAllowedForAllTasks,
+                            long monitoringInterval) {
     setName(this.getClass().getName());
 
     processTreeInfoMap = new HashMap<TaskAttemptID, ProcessTreeInfo>();
     tasksToBeAdded = new HashMap<TaskAttemptID, ProcessTreeInfo>();
     tasksToBeRemoved = new ArrayList<TaskAttemptID>();
 
-    maxMemoryAllowedForAllTasks =
-        taskTracker.getTotalVirtualMemoryOnTT()
-            - taskTracker.getReservedVirtualMemory();
+    this.maxMemoryAllowedForAllTasks = maxMemoryAllowedForAllTasks;
 
-    monitoringInterval = taskTracker.getJobConf().getLong(
-        "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
+    this.monitoringInterval = monitoringInterval;
   }
 
   public void addTask(TaskAttemptID tid, long memLimit) {
@@ -198,23 +205,15 @@
           ptInfo.setProcessTree(pTree); // update ptInfo with proces-tree of
                                         // updated state
           long currentMemUsage = pTree.getCumulativeVmem();
+          // as processes begin with an age 1, we want to see if there 
+          // are processes more than 1 iteration old.
+          long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
           long limit = ptInfo.getMemLimit();
           LOG.info("Memory usage of ProcessTree " + pId + " :"
               + currentMemUsage + "bytes. Limit : " + limit + "bytes");
 
-          if (limit > taskTracker.getLimitMaxVMemPerTask()) {
-            // TODO: With monitoring enabled and no scheduling based on
-            // memory,users can seriously hijack the system by specifying memory
-            // requirements well above the cluster wide limit. Ideally these
-            // jobs should have been rejected by JT/scheduler. Because we can't
-            // do that, in the minimum we should fail the tasks and hence the
-            // job.
-            LOG.warn("Task " + tid
-                + " 's maxVmemPerTask is greater than TT's limitMaxVmPerTask");
-          }
-
-          if (limit != JobConf.DISABLED_MEMORY_LIMIT
-              && currentMemUsage > limit) {
+          if (isProcessTreeOverLimit(tid.toString(), currentMemUsage, 
+                                      curMemUsageOfAgedProcesses, limit)) {
             // Task (the root process) is still alive and overflowing memory.
             // Clean up.
             String msg =
@@ -242,12 +241,11 @@
         }
       }
 
-      LOG.debug("Memory still in usage across all tasks : " + memoryStillInUsage
-          + "bytes. Total limit : " + maxMemoryAllowedForAllTasks);
-
       if (memoryStillInUsage > maxMemoryAllowedForAllTasks) {
-        LOG.warn("The total memory usage is still overflowing TTs limits."
-            + " Trying to kill a few tasks with the least progress.");
+        LOG.warn("The total memory in usage " + memoryStillInUsage
+            + " is still overflowing TTs limits "
+            + maxMemoryAllowedForAllTasks
+            + ". Trying to kill a few tasks with the least progress.");
         killTasksWithLeastProgress(memoryStillInUsage);
       }
     
@@ -264,6 +262,65 @@
     }
   }
 
+  /**
+   * Check whether a task's process tree's current memory usage is over limit.
+   * 
+   * When a java process exec's a program, it could momentarily account for
+   * double the size of it's memory, because the JVM does a fork()+exec()
+   * which at fork time creates a copy of the parent's memory. If the 
+   * monitoring thread detects the memory used by the task tree at the same
+   * instance, it could assume it is over limit and kill the tree, for no
+   * fault of the process itself.
+   * 
+   * We counter this problem by employing a heuristic check:
+   * - if a process tree exceeds the memory limit by more than twice, 
+   * it is killed immediately
+   * - if a process tree has processes older than the monitoring interval
+   * exceeding the memory limit by even 1 time, it is killed. Else it is given
+   * the benefit of doubt to lie around for one more iteration.
+   * 
+   * @param tId Task Id for the task tree
+   * @param currentMemUsage Memory usage of a task tree
+   * @param curMemUsageOfAgedProcesses Memory usage of processes older than
+   *                                    an iteration in a task tree
+   * @param limit The limit specified for the task
+   * @return true if the memory usage is more than twice the specified limit,
+   *              or if processes in the tree, older than this thread's 
+   *              monitoring interval, exceed the memory limit. False, 
+   *              otherwise.
+   */
+  boolean isProcessTreeOverLimit(String tId, 
+                                  long currentMemUsage, 
+                                  long curMemUsageOfAgedProcesses, 
+                                  long limit) {
+    boolean isOverLimit = false;
+    
+    if (currentMemUsage > (2*limit)) {
+      LOG.warn("Process tree for task: " + tId + " running over twice " +
+                "the configured limit. Limit=" + limit + 
+                ", current usage = " + currentMemUsage);
+      isOverLimit = true;
+    } else if (curMemUsageOfAgedProcesses > limit) {
+      LOG.warn("Process tree for task: " + tId + " has processes older than 1 " +
+          "iteration running over the configured limit. Limit=" + limit + 
+          ", current usage = " + curMemUsageOfAgedProcesses);
+      isOverLimit = true;
+    }
+
+    return isOverLimit; 
+  }
+
+  // method provided just for easy testing purposes
+  boolean isProcessTreeOverLimit(ProcfsBasedProcessTree pTree, 
+                                    String tId, long limit) {
+    long currentMemUsage = pTree.getCumulativeVmem();
+    // as processes begin with an age 1, we want to see if there are processes
+    // more than 1 iteration old.
+    long curMemUsageOfAgedProcesses = pTree.getCumulativeVmem(1);
+    return isProcessTreeOverLimit(tId, currentMemUsage, 
+                                  curMemUsageOfAgedProcesses, limit);
+  }
+
   private void killTasksWithLeastProgress(long memoryStillInUsage) {
 
     List<TaskAttemptID> tasksToKill = new ArrayList<TaskAttemptID>();

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Jun 16 20:54:24 2009
@@ -101,6 +101,7 @@
   
   @Override
   public final void run() {
+    String errorInfo = "Child Error";
     try {
       
       //before preparing the job localize 
@@ -368,7 +369,7 @@
       vargs.add(Integer.toString(address.getPort())); 
       vargs.add(taskid.toString());                      // pass task identifier
 
-      tracker.addToMemoryManager(t.getTaskID(), conf);
+      tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
 
       // set memory limit using ulimit if feasible and necessary ...
       String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
@@ -399,6 +400,45 @@
         ldLibraryPath.append(oldLdLibraryPath);
       }
       env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+      
+      // add the env variables passed by the user
+      String mapredChildEnv = conf.get("mapred.child.env");
+      if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
+        String childEnvs[] = mapredChildEnv.split(",");
+        for (String cEnv : childEnvs) {
+          try {
+            String[] parts = cEnv.split("="); // split on '='
+            String value = env.get(parts[0]);
+            if (value != null) {
+              // replace $env with the child's env constructed by tt's
+              // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
+              value = parts[1].replace("$" + parts[0], value);
+            } else {
+              // this key is not configured by the tt for the child .. get it 
+              // from the tt's env
+              // example PATH=$PATH:/tmp
+              value = System.getenv(parts[0]);
+              if (value != null) {
+                // the env key is present in the tt's env
+                value = parts[1].replace("$" + parts[0], value);
+              } else {
+                // the env key is note present anywhere .. simply set it
+                // example X=$X:/tmp or X=/tmp
+                value = parts[1].replace("$" + parts[0], "");
+              }
+            }
+            env.put(parts[0], value);
+          } catch (Throwable t) {
+            // set the error msg
+            errorInfo = "Invalid User environment settings : " + mapredChildEnv 
+                        + ". Failed to parse user-passed environment param."
+                        + " Expecting : env1=value1,env2=value2...";
+            LOG.warn(errorInfo);
+            throw t;
+          }
+        }
+      }
+
       jvmManager.launchJvm(this, 
           jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
               workDir, env, conf));
@@ -425,9 +465,10 @@
         LOG.fatal(t.getTaskID()+" reporting FSError", ie);
       }
     } catch (Throwable throwable) {
-      LOG.warn(t.getTaskID()+" Child Error", throwable);
+      LOG.warn(t.getTaskID() + errorInfo, throwable);
+      Throwable causeThrowable = new Throwable(errorInfo, throwable);
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      throwable.printStackTrace(new PrintStream(baos));
+      causeThrowable.printStackTrace(new PrintStream(baos));
       try {
         tracker.reportDiagnosticInfo(t.getTaskID(), baos.toString());
       } catch (IOException e) {

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Jun 16 20:54:24 2009
@@ -148,6 +148,24 @@
   void setShuffleFinishTime(long shuffleFinishTime) {}
 
   /**
+   * Get map phase finish time for the task. If map finsh time was
+   * not set due to sort phase ending within same heartbeat interval,
+   * it is set to finish time of next phase i.e. sort phase
+   * when it is set.
+   * @return 0 if mapFinishTime, sortFinishTime are not set. else 
+   * it returns approximate map finish time.
+   */
+  public long getMapFinishTime() {
+    return 0;
+  }
+  
+  /**
+   * Set map phase finish time. 
+   * @param mapFinishTime 
+   */
+  void setMapFinishTime(long mapFinishTime) {}
+
+  /**
    * Get sort finish time for the task,. If sort finish time was not set 
    * due to sort and reduce phase finishing in same heartebat interval, it is 
    * set to finish time, when finish time is set. 
@@ -197,12 +215,17 @@
     if (oldPhase != phase){
       // sort phase started
       if (phase == TaskStatus.Phase.SORT){
-        setShuffleFinishTime(System.currentTimeMillis());
+        if (oldPhase == TaskStatus.Phase.MAP) {
+          setMapFinishTime(System.currentTimeMillis());
+        }
+        else {
+          setShuffleFinishTime(System.currentTimeMillis());
+        }
       }else if (phase == TaskStatus.Phase.REDUCE){
         setSortFinishTime(System.currentTimeMillis());
       }
+      this.phase = phase;
     }
-    this.phase = phase; 
   }
 
   boolean inTaskCleanupPhase() {

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jun 16 20:54:24 2009
@@ -200,58 +200,10 @@
   private TaskMemoryManagerThread taskMemoryManager;
   private boolean taskMemoryManagerEnabled = true;
   private long totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
-  private long totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
-  private long reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
-  private long reservedPmem = JobConf.DISABLED_MEMORY_LIMIT;
-
-  // Cluster wide default value for max-vm per task
-  private long defaultMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
-  // Cluster wide upper limit on max-vm per task
-  private long limitMaxVmPerTask = JobConf.DISABLED_MEMORY_LIMIT;
-
-  /**
-   * Configuration property to specify the amount of virtual memory that has to
-   * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
-   * virtual memory should be a part of the total virtual memory available on
-   * the TaskTracker. TaskTracker obtains the total virtual memory available on
-   * the system by using a {@link MemoryCalculatorPlugin}. The total physical
-   * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
-   * MemoryCalculatorPlugin implementation.
-   * 
-   * <p>
-   * 
-   * The reserved virtual memory and the total virtual memory values are
-   * reported by the TaskTracker as part of heart-beat so that they can
-   * considered by a scheduler.
-   * 
-   * <p>
-   * 
-   * These two values are also used by the TaskTracker for tracking tasks'
-   * memory usage. Memory management functionality on a TaskTracker is disabled
-   * if this property is not set, if it more than the total virtual memory
-   * reported by MemoryCalculatorPlugin, or if either of the values is negative.
-   */
-  static final String MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY =
-      "mapred.tasktracker.vmem.reserved";
-
-  /**
-   * Configuration property to specify the amount of physical memory that has to
-   * be reserved by the TaskTracker for system usage (OS, TT etc). The reserved
-   * physical memory should be a part of the total physical memory available on
-   * the TaskTracker. TaskTracker obtains the total physical memory available on
-   * the system by using a {@link MemoryCalculatorPlugin}. The total physical
-   * memory is set to {@link JobConf#DISABLED_MEMORY_LIMIT} on systems lacking a
-   * MemoryCalculatorPlugin implementation.
-   * 
-   * <p>
-   * 
-   * The reserved virtual memory and the total virtual memory values are
-   * reported by the TaskTracker as part of heart-beat so that they can
-   * considered by a scheduler.
-   * 
-   */
-  static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
-      "mapred.tasktracker.pmem.reserved";
+  private long totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
 
   static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
       "mapred.tasktracker.memory_calculator_plugin";
@@ -1247,14 +1199,14 @@
       long freeDiskSpace = getFreeSpace();
       long totVmem = getTotalVirtualMemoryOnTT();
       long totPmem = getTotalPhysicalMemoryOnTT();
-      long rsrvdVmem = getReservedVirtualMemory();
-      long rsrvdPmem = getReservedPhysicalMemory();
 
       status.getResourceStatus().setAvailableSpace(freeDiskSpace);
       status.getResourceStatus().setTotalVirtualMemory(totVmem);
       status.getResourceStatus().setTotalPhysicalMemory(totPmem);
-      status.getResourceStatus().setReservedVirtualMemory(rsrvdVmem);
-      status.getResourceStatus().setReservedPhysicalMemory(rsrvdPmem);
+      status.getResourceStatus().setMapSlotMemorySizeOnTT(
+          mapSlotMemorySizeOnTT);
+      status.getResourceStatus().setReduceSlotMemorySizeOnTT(
+          reduceSlotSizeMemoryOnTT);
     }
       
     //
@@ -1317,53 +1269,11 @@
    * @return total size of physical memory.
    */
   long getTotalPhysicalMemoryOnTT() {
-    return totalPmemOnTT;
-  }
-
-  /**
-   * Return the amount of virtual memory reserved on the TaskTracker for system
-   * usage (OS, TT etc).
-   */
-  long getReservedVirtualMemory() {
-    return reservedVirtualMemory;
-  }
-
-  /**
-   * Return the amount of physical memory reserved on the TaskTracker for system
-   * usage (OS, TT etc).
-   */
-  long getReservedPhysicalMemory() {
-    return reservedPmem;
+    return totalPhysicalMemoryOnTT;
   }
 
-  /**
-   * Return the limit on the maxVMemPerTask on this TaskTracker
-   * @return limitMaxVmPerTask
-   */
-  long getLimitMaxVMemPerTask() {
-    return limitMaxVmPerTask;
-  }
-
-  /**
-   * Obtain the virtual memory allocated for a TIP.
-   * 
-   * If the TIP's job has a configured value for the max-virtual memory, that
-   * will be returned. Else, the cluster-wide default maxvirtual memory for
-   * tasks is returned.
-   * 
-   * @param conf
-   * @return the virtual memory allocated for the TIP.
-   */
-  long getVirtualMemoryForTask(JobConf conf) {
-    long vMemForTask =
-        normalizeMemoryConfigValue(conf.getMaxVirtualMemoryForTask());
-    if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      vMemForTask =
-          normalizeMemoryConfigValue(fConf.getLong(
-              JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-              JobConf.DISABLED_MEMORY_LIMIT));
-    }
-    return vMemForTask;
+  long getTotalMemoryAllottedForTasksOnTT() {
+    return totalMemoryAllottedForTasks;
   }
 
   /**
@@ -1637,7 +1547,6 @@
 
   private TaskLauncher mapLauncher;
   private TaskLauncher reduceLauncher;
-      
   public JvmManager getJvmManagerInstance() {
     return jvmManager;
   }
@@ -1775,10 +1684,12 @@
     }
   }
   
-  void addToMemoryManager(TaskAttemptID attemptId, 
+  void addToMemoryManager(TaskAttemptID attemptId, boolean isMap, 
                           JobConf conf) {
     if (isTaskMemoryManagerEnabled()) {
-      taskMemoryManager.addTask(attemptId, getVirtualMemoryForTask(conf));
+      taskMemoryManager.addTask(attemptId, isMap ? conf
+          .getMemoryForMapTask() * 1024 * 1024L : conf
+          .getMemoryForReduceTask() * 1024 * 1024L);
     }
   }
 
@@ -3119,33 +3030,35 @@
             + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
         totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       }
-      totalPmemOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
-      if (totalPmemOnTT <= 0) {
+      totalPhysicalMemoryOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
+      if (totalPhysicalMemoryOnTT <= 0) {
         LOG.warn("TaskTracker's totalPmem could not be calculated. "
             + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
-        totalPmemOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+        totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       }
     }
 
-    reservedVirtualMemory =
-        normalizeMemoryConfigValue(fConf.getLong(
-            TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-
-    reservedPmem =
-        normalizeMemoryConfigValue(fConf.getLong(
-            TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-
-    defaultMaxVmPerTask =
-        normalizeMemoryConfigValue(fConf.getLong(
-            JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-
-    limitMaxVmPerTask =
-        normalizeMemoryConfigValue(fConf.getLong(
-            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
+    mapSlotMemorySizeOnTT =
+        fConf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT);
+    reduceSlotSizeMemoryOnTT =
+        fConf.getLong(
+            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT);
+    totalMemoryAllottedForTasks =
+        maxCurrentMapTasks * mapSlotMemorySizeOnTT + maxCurrentReduceTasks
+            * reduceSlotSizeMemoryOnTT;
+    if (totalMemoryAllottedForTasks < 0) {
+      totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
+    }
+    if (totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT) {
+      LOG.info("totalMemoryAllottedForTasks > totalPhysicalMemoryOnTT."
+          + " Thrashing might happen.");
+    } else if (totalMemoryAllottedForTasks > totalVirtualMemoryOnTT) {
+      LOG.info("totalMemoryAllottedForTasks > totalVirtualMemoryOnTT."
+          + " Thrashing might happen.");
+    }
 
     // start the taskMemoryManager thread only if enabled
     setTaskMemoryManagerEnabledFlag();
@@ -3164,55 +3077,12 @@
       return;
     }
 
-    // /// Missing configuration
-    StringBuilder mesg = new StringBuilder();
-
-    long totalVmemOnTT = getTotalVirtualMemoryOnTT();
-    if (totalVmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
-      mesg.append("TaskTracker's totalVmem could not be calculated.\n");
+    if (totalMemoryAllottedForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
       taskMemoryManagerEnabled = false;
-    }
-
-    long reservedVmem = getReservedVirtualMemory();
-    if (reservedVmem == JobConf.DISABLED_MEMORY_LIMIT) {
-      mesg.append("TaskTracker's reservedVmem is not configured.\n");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      mesg.append("TaskTracker's defaultMaxVmPerTask is not configured.\n");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (limitMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
-      mesg.append("TaskTracker's limitMaxVmPerTask is not configured.\n");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (!taskMemoryManagerEnabled) {
-      LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
-      return;
-    }
-    // ///// End of missing configuration
-
-    // ///// Mis-configuration
-    if (defaultMaxVmPerTask > limitMaxVmPerTask) {
-      mesg.append("defaultMaxVmPerTask is mis-configured. "
-          + "It shouldn't be greater than limitMaxVmPerTask. ");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (reservedVmem > totalVmemOnTT) {
-      mesg.append("reservedVmemOnTT is mis-configured. "
-          + "It shouldn't be greater than totalVmemOnTT");
-      taskMemoryManagerEnabled = false;
-    }
-
-    if (!taskMemoryManagerEnabled) {
-      LOG.warn(mesg.toString() + "TaskMemoryManager is disabled.");
+      LOG.warn("TaskTracker's totalMemoryAllottedForTasks is -1."
+          + " TaskMemoryManager is disabled.");
       return;
     }
-    // ///// End of mis-configuration
 
     taskMemoryManagerEnabled = true;
   }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue Jun 16 20:54:24 2009
@@ -55,16 +55,16 @@
   static class ResourceStatus implements Writable {
     
     private long totalVirtualMemory;
-    private long reservedVirtualMemory;
     private long totalPhysicalMemory;
-    private long reservedPhysicalMemory;
+    private long mapSlotMemorySizeOnTT;
+    private long reduceSlotMemorySizeOnTT;
     private long availableSpace;
     
     ResourceStatus() {
       totalVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
-      reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
       totalPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
-      reservedPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
+      mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+      reduceSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       availableSpace = Long.MAX_VALUE;
     }
 
@@ -90,24 +90,6 @@
     }
 
     /**
-     * Set the amount of virtual memory reserved on the TaskTracker for system
-     * usage (OS, TT etc).
-     * 
-     * @param reservedVmem amount of virtual memory reserved in bytes.
-     */
-    void setReservedVirtualMemory(long reservedVmem) {
-      reservedVirtualMemory = reservedVmem;
-    }
-
-    /**
-     * Get the amount of virtual memory reserved on the TaskTracker for system
-     * usage (OS, TT etc).
-     */
-    long getReservedTotalMemory() {
-      return reservedVirtualMemory;
-    }
-
-    /**
      * Set the maximum amount of physical memory on the tasktracker.
      * 
      * @param totalRAM maximum amount of physical memory on the tasktracker in
@@ -130,23 +112,49 @@
     }
 
     /**
-     * Set the amount of physical memory reserved on the TaskTracker for system
-     * usage (OS, TT etc).
+     * Set the memory size of each map slot on this TT. This will be used by JT
+     * for accounting more slots for jobs that use more memory.
+     * 
+     * @param mem
+     */
+    void setMapSlotMemorySizeOnTT(long mem) {
+      mapSlotMemorySizeOnTT = mem;
+    }
+
+    /**
+     * Get the memory size of each map slot on this TT. See
+     * {@link #setMapSlotMemorySizeOnTT(long)}
      * 
-     * @param reservedPmem amount of physical memory reserved in bytes.
+     * @return
      */
-    void setReservedPhysicalMemory(long reservedPmem) {
-      reservedPhysicalMemory = reservedPmem;
+    long getMapSlotMemorySizeOnTT() {
+      return mapSlotMemorySizeOnTT;
     }
 
     /**
-     * Get the amount of physical memory reserved on the TaskTracker for system
-     * usage (OS, TT etc).
+     * Set the memory size of each reduce slot on this TT. This will be used by
+     * JT for accounting more slots for jobs that use more memory.
+     * 
+     * @param mem
      */
-    long getReservedPhysicalMemory() {
-      return reservedPhysicalMemory;
+    void setReduceSlotMemorySizeOnTT(long mem) {
+      reduceSlotMemorySizeOnTT = mem;
     }
 
+    /**
+     * Get the memory size of each reduce slot on this TT. See
+     * {@link #setReduceSlotMemorySizeOnTT(long)}
+     * 
+     * @return
+     */
+    long getReduceSlotMemorySizeOnTT() {
+      return reduceSlotMemorySizeOnTT;
+    }
+
+    /**
+     * Set the available disk space on the TT
+     * @param availSpace
+     */
     void setAvailableSpace(long availSpace) {
       availableSpace = availSpace;
     }
@@ -161,17 +169,17 @@
     
     public void write(DataOutput out) throws IOException {
       WritableUtils.writeVLong(out, totalVirtualMemory);
-      WritableUtils.writeVLong(out, reservedVirtualMemory);
       WritableUtils.writeVLong(out, totalPhysicalMemory);
-      WritableUtils.writeVLong(out, reservedPhysicalMemory);
+      WritableUtils.writeVLong(out, mapSlotMemorySizeOnTT);
+      WritableUtils.writeVLong(out, reduceSlotMemorySizeOnTT);
       WritableUtils.writeVLong(out, availableSpace);
     }
     
     public void readFields(DataInput in) throws IOException {
       totalVirtualMemory = WritableUtils.readVLong(in);
-      reservedVirtualMemory = WritableUtils.readVLong(in);
       totalPhysicalMemory = WritableUtils.readVLong(in);
-      reservedPhysicalMemory = WritableUtils.readVLong(in);
+      mapSlotMemorySizeOnTT = WritableUtils.readVLong(in);
+      reduceSlotMemorySizeOnTT = WritableUtils.readVLong(in);
       availableSpace = WritableUtils.readVLong(in);
     }
   }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java Tue Jun 16 20:54:24 2009
@@ -22,167 +22,62 @@
 import java.io.IOException;
 import java.util.ArrayList;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
 
-/** This class encapsulates a MapReduce job and its dependency. It monitors 
- *  the states of the depending jobs and updates the state of this job.
- *  A job starts in the WAITING state. If it does not have any depending jobs, or
- *  all of the depending jobs are in SUCCESS state, then the job state will become
- *  READY. If any depending jobs fail, the job will fail too. 
- *  When in READY state, the job can be submitted to Hadoop for execution, with
- *  the state changing into RUNNING state. From RUNNING state, the job can get into 
- *  SUCCESS or FAILED state, depending the status of the job execution.
- *  
+/** 
+ * @deprecated Use {@link ControlledJob} instead.  
  */
+@Deprecated
+public class Job extends ControlledJob {
+  static final Log LOG = LogFactory.getLog(Job.class);
 
-public class Job {
-
-  // A job will be in one of the following states
   final public static int SUCCESS = 0;
   final public static int WAITING = 1;
   final public static int RUNNING = 2;
   final public static int READY = 3;
   final public static int FAILED = 4;
   final public static int DEPENDENT_FAILED = 5;
-	
-	
-  private JobConf theJobConf;
-  private int state;
-  private String jobID; 		// assigned and used by JobControl class
-  private JobID mapredJobID; // the job ID assigned by map/reduce
-  private String jobName;		// external name, assigned/used by client app
-  private String message;		// some info for human consumption, 
-  // e.g. the reason why the job failed
-  private ArrayList<Job> dependingJobs;	// the jobs the current job depends on
-	
-  private JobClient jc = null;		// the map reduce job client
-	
+
   /** 
    * Construct a job.
    * @param jobConf a mapred job configuration representing a job to be executed.
    * @param dependingJobs an array of jobs the current job depends on
    */
-  public Job(JobConf jobConf, ArrayList<Job> dependingJobs) throws IOException {
-    this.theJobConf = jobConf;
-    this.dependingJobs = dependingJobs;
-    this.state = Job.WAITING;
-    this.jobID = "unassigned";
-    this.mapredJobID = null; //not yet assigned 
-    this.jobName = "unassigned";
-    this.message = "just initialized";
-    this.jc = new JobClient(jobConf);
-  }
-  
-  /**
-   * Construct a job.
-   * 
-   * @param jobConf mapred job configuration representing a job to be executed.
-   * @throws IOException
-   */
-  public Job(JobConf jobConf) throws IOException {
-    this(jobConf, null);
-  }
-	
-  @Override
-  public String toString() {
-    StringBuffer sb = new StringBuffer();
-    sb.append("job name:\t").append(this.jobName).append("\n");
-    sb.append("job id:\t").append(this.jobID).append("\n");
-    sb.append("job state:\t").append(this.state).append("\n");
-    sb.append("job mapred id:\t").append(this.mapredJobID==null ? "unassigned" 
-        : this.mapredJobID).append("\n");
-    sb.append("job message:\t").append(this.message).append("\n");
-		
-    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
-      sb.append("job has no depending job:\t").append("\n");
-    } else {
-      sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
-      for (int i = 0; i < this.dependingJobs.size(); i++) {
-        sb.append("\t depending job ").append(i).append(":\t");
-        sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
-      }
-    }
-    return sb.toString();
-  }
-	
-  /**
-   * @return the job name of this job
-   */
-  public String getJobName() {
-    return this.jobName;
-  }
-	
-  /**
-   * Set the job name for  this job.
-   * @param jobName the job name
-   */
-  public void setJobName(String jobName) {
-    this.jobName = jobName;
+  public Job(JobConf jobConf, ArrayList<?> dependingJobs) throws IOException {
+    super(new org.apache.hadoop.mapreduce.Job(jobConf), 
+          (ArrayList<ControlledJob>) dependingJobs);
   }
-	
-  /**
-   * @return the job ID of this job assigned by JobControl
-   */
-  public String getJobID() {
-    return this.jobID;
-  }
-	
-  /**
-   * Set the job ID for  this job.
-   * @param id the job ID
-   */
-  public void setJobID(String id) {
-    this.jobID = id;
-  }
-	
-  /**
-   * @return the mapred ID of this job
-   * @deprecated use {@link #getAssignedJobID()} instead
-   */
-  @Deprecated
-  public String getMapredJobID() {
-    return this.mapredJobID.toString();
-  }
-	
-  /**
-   * Set the mapred ID for this job.
-   * @param mapredJobID the mapred job ID for this job.
-   * @deprecated use {@link #setAssignedJobID(JobID)} instead
-   */
-  @Deprecated
-  public void setMapredJobID(String mapredJobID) {
-    this.mapredJobID = JobID.forName(mapredJobID);
+
+  public Job(JobConf conf) throws IOException {
+    super(conf);
   }
-	
+
   /**
    * @return the mapred ID of this job as assigned by the 
    * mapred framework.
    */
   public JobID getAssignedJobID() {
-    return this.mapredJobID;
+    return (JobID)super.getMapredJobID();
   }
-  
+
   /**
-   * Set the mapred ID for this job as assigned by the 
-   * mapred framework.
-   * @param mapredJobID the mapred job ID for this job.
+   * @deprecated setAssignedJobID should not be called.
+   * JOBID is set by the framework.
    */
   public void setAssignedJobID(JobID mapredJobID) {
-    this.mapredJobID = mapredJobID;
+    // do nothing
   }
-  
+
   /**
    * @return the mapred job conf of this job
    */
   public synchronized JobConf getJobConf() {
-    return this.theJobConf;
+    return new JobConf(super.getJob().getConfiguration());
   }
 
 
@@ -191,197 +86,55 @@
    * @param jobConf the mapred job conf for this job.
    */
   public synchronized void setJobConf(JobConf jobConf) {
-    this.theJobConf = jobConf;
+    try {
+      super.setJob(new org.apache.hadoop.mapreduce.Job(jobConf));
+    } catch (IOException ioe) { 
+      LOG.info("Exception" + ioe);
+    }
   }
 
   /**
    * @return the state of this job
    */
   public synchronized int getState() {
-    return this.state;
-  }
-	
-  /**
-   * Set the state for this job.
-   * @param state the new state for this job.
-   */
-  protected synchronized void setState(int state) {
-    this.state = state;
-  }
-	
-  /**
-   * @return the message of this job
-   */
-  public synchronized String getMessage() {
-    return this.message;
-  }
-
-  /**
-   * Set the message for this job.
-   * @param message the message for this job.
-   */
-  public synchronized void setMessage(String message) {
-    this.message = message;
-  }
-
-
-  /**
-   * @return the job client of this job
-   */
-  public JobClient getJobClient(){
-          return this.jc;
-  }
-
-  /**
-   * @return the depending jobs of this job
-   */
-  public ArrayList<Job> getDependingJobs() {
-    return this.dependingJobs;
-  }
-  
-  /**
-   * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job 
-   * is waiting to run, not during or afterwards.
-   * 
-   * @param dependingJob Job that this Job depends on.
-   * @return <tt>true</tt> if the Job was added.
-   */
-  public synchronized boolean addDependingJob(Job dependingJob) {
-    if (this.state == Job.WAITING) { //only allowed to add jobs when waiting
-      if (this.dependingJobs == null) {
-        this.dependingJobs = new ArrayList<Job>();
-      }
-      return this.dependingJobs.add(dependingJob);
-    } else {
-      return false;
+    State state = super.getJobState();
+    if (state == State.SUCCESS) {
+      return SUCCESS;
+    } 
+    if (state == State.WAITING) {
+      return WAITING;
+    }
+    if (state == State.RUNNING) {
+      return RUNNING;
+    }
+    if (state == State.READY) {
+      return READY;
     }
+    if (state == State.FAILED ) {
+      return FAILED;
+    }
+    if (state == State.DEPENDENT_FAILED ) {
+      return DEPENDENT_FAILED;
+    }
+    return -1;
   }
-	
-  /**
-   * @return true if this job is in a complete state
-   */
-  public synchronized boolean isCompleted() {
-    return this.state == Job.FAILED || 
-      this.state == Job.DEPENDENT_FAILED ||
-      this.state == Job.SUCCESS;
-  }
-	
-  /**
-   * @return true if this job is in READY state
-   */
-  public synchronized boolean isReady() {
-    return this.state == Job.READY;
-  }
-	
+  
   /**
-   * Check the state of this running job. The state may 
-   * remain the same, become SUCCESS or FAILED.
+   * @return the job client of this job
    */
-  private void checkRunningState() {
-    RunningJob running = null;
+  public JobClient getJobClient() {
     try {
-      running = jc.getJob(this.mapredJobID);
-      if (running.isComplete()) {
-        if (running.isSuccessful()) {
-          this.state = Job.SUCCESS;
-        } else {
-          this.state = Job.FAILED;
-          this.message = "Job failed!";
-          try {
-            running.killJob();
-          } catch (IOException e1) {
-
-          }
-          try {
-            this.jc.close();
-          } catch (IOException e2) {
-
-          }
-        }
-      }
-
+      return new JobClient(super.getJob().getConfiguration());
     } catch (IOException ioe) {
-      this.state = Job.FAILED;
-      this.message = StringUtils.stringifyException(ioe);
-      try {
-        if (running != null)
-          running.killJob();
-      } catch (IOException e1) {
-
-      }
-      try {
-        this.jc.close();
-      } catch (IOException e1) {
-
-      }
+      return null;
     }
   }
-	
-  /**
-   * Check and update the state of this job. The state changes  
-   * depending on its current state and the states of the depending jobs.
-   */
-   synchronized int checkState() {
-    if (this.state == Job.RUNNING) {
-      checkRunningState();
-    }
-    if (this.state != Job.WAITING) {
-      return this.state;
-    }
-    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
-      this.state = Job.READY;
-      return this.state;
-    }
-    Job pred = null;
-    int n = this.dependingJobs.size();
-    for (int i = 0; i < n; i++) {
-      pred = this.dependingJobs.get(i);
-      int s = pred.checkState();
-      if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
-        break; // a pred is still not completed, continue in WAITING
-        // state
-      }
-      if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
-        this.state = Job.DEPENDENT_FAILED;
-        this.message = "depending job " + i + " with jobID "
-          + pred.getJobID() + " failed. " + pred.getMessage();
-        break;
-      }
-      // pred must be in success state
-      if (i == n - 1) {
-        this.state = Job.READY;
-      }
-    }
 
-    return this.state;
-  }
-	
   /**
-   * Submit this job to mapred. The state becomes RUNNING if submission 
-   * is successful, FAILED otherwise.  
+   * @return the depending jobs of this job
    */
-  protected synchronized void submit() {
-    try {
-      if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
-        FileSystem fs = FileSystem.get(theJobConf);
-        Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
-        for (int i = 0; i < inputPaths.length; i++) {
-          if (!fs.exists(inputPaths[i])) {
-            try {
-              fs.mkdirs(inputPaths[i]);
-            } catch (IOException e) {
-
-            }
-          }
-        }
-      }
-      RunningJob running = jc.submitJob(theJobConf);
-      this.mapredJobID = running.getID();
-      this.state = Job.RUNNING;
-    } catch (IOException ioe) {
-      this.state = Job.FAILED;
-      this.message = StringUtils.stringifyException(ioe);
-    }
+  public ArrayList<Job> getDependingJobs() {
+    return JobControl.castToJobList(super.getDependentJobs());
   }
-	
+
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java Tue Jun 16 20:54:24 2009
@@ -20,279 +20,97 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Hashtable;
-import java.util.Map;
 
-/** This class encapsulates a set of MapReduce jobs and its dependency. It tracks 
- *  the states of the jobs by placing them into different tables according to their 
- *  states. 
- *  
- *  This class provides APIs for the client app to add a job to the group and to get 
- *  the jobs in the group in different states. When a 
- *  job is added, an ID unique to the group is assigned to the job. 
- *  
- *  This class has a thread that submits jobs when they become ready, monitors the
- *  states of the running jobs, and updates the states of jobs based on the state changes 
- *  of their depending jobs states. The class provides APIs for suspending/resuming
- *  the thread,and for stopping the thread.
- *  
- */
-public class JobControl implements Runnable{
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+
+/**
+ *@deprecated Use 
+ *{@link org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl} instead
+ **/
+@Deprecated
+public class JobControl extends 
+    org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl {
 
-  // The thread can be in one of the following state
-  private static final int RUNNING = 0;
-  private static final int SUSPENDED = 1;
-  private static final int STOPPED = 2;
-  private static final int STOPPING = 3;
-  private static final int READY = 4;
-	
-  private int runnerState;			// the thread state
-	
-  private Map<String, Job> waitingJobs;
-  private Map<String, Job> readyJobs;
-  private Map<String, Job> runningJobs;
-  private Map<String, Job> successfulJobs;
-  private Map<String, Job> failedJobs;
-	
-  private long nextJobID;
-  private String groupName;
-	
   /** 
    * Construct a job control for a group of jobs.
    * @param groupName a name identifying this group
    */
   public JobControl(String groupName) {
-    this.waitingJobs = new Hashtable<String, Job>();
-    this.readyJobs = new Hashtable<String, Job>();
-    this.runningJobs = new Hashtable<String, Job>();
-    this.successfulJobs = new Hashtable<String, Job>();
-    this.failedJobs = new Hashtable<String, Job>();
-    this.nextJobID = -1;
-    this.groupName = groupName;
-    this.runnerState = JobControl.READY;
+    super(groupName);
   }
-	
-  private static ArrayList<Job> toArrayList(Map<String, Job> jobs) {
-    ArrayList<Job> retv = new ArrayList<Job>();
-    synchronized (jobs) {
-      for (Job job : jobs.values()) {
-        retv.add(job);
-      }
+  
+  static ArrayList<Job> castToJobList(ArrayList<ControlledJob> cjobs) {
+    ArrayList<Job> ret = new ArrayList<Job>();
+    for (ControlledJob job : cjobs) {
+      ret.add((Job)job);
     }
-    return retv;
+    return ret;
   }
-	
+  
   /**
    * @return the jobs in the waiting state
    */
   public ArrayList<Job> getWaitingJobs() {
-    return JobControl.toArrayList(this.waitingJobs);
+    return castToJobList(super.getWaitingJobList());
   }
 	
   /**
    * @return the jobs in the running state
    */
   public ArrayList<Job> getRunningJobs() {
-    return JobControl.toArrayList(this.runningJobs);
+    return castToJobList(super.getRunningJobList());
   }
 	
   /**
    * @return the jobs in the ready state
    */
   public ArrayList<Job> getReadyJobs() {
-    return JobControl.toArrayList(this.readyJobs);
+    return castToJobList(super.getReadyJobsList());
   }
 	
   /**
    * @return the jobs in the success state
    */
   public ArrayList<Job> getSuccessfulJobs() {
-    return JobControl.toArrayList(this.successfulJobs);
+    return castToJobList(super.getSuccessfulJobList());
   }
 	
   public ArrayList<Job> getFailedJobs() {
-    return JobControl.toArrayList(this.failedJobs);
-  }
-	
-  private String getNextJobID() {
-    nextJobID += 1;
-    return this.groupName + this.nextJobID;
-  }
-	
-  private static void addToQueue(Job aJob, Map<String, Job> queue) {
-    synchronized(queue) {
-      queue.put(aJob.getJobID(), aJob);
-    }		
-  }
-	
-  private void addToQueue(Job aJob) {
-    Map<String, Job> queue = getQueue(aJob.getState());
-    addToQueue(aJob, queue);	
-  }
-	
-  private Map<String, Job> getQueue(int state) {
-    Map<String, Job> retv = null;
-    if (state == Job.WAITING) {
-      retv = this.waitingJobs;
-    } else if (state == Job.READY) {
-      retv = this.readyJobs;
-    } else if (state == Job.RUNNING) {
-      retv = this.runningJobs;
-    } else if (state == Job.SUCCESS) {
-      retv = this.successfulJobs;
-    } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
-      retv = this.failedJobs;
-    } 
-    return retv;
+    return castToJobList(super.getFailedJobList());
   }
 
   /**
-   * Add a new job.
-   * @param aJob the new job
-   */
-  synchronized public String addJob(Job aJob) {
-    String id = this.getNextJobID();
-    aJob.setJobID(id);
-    aJob.setState(Job.WAITING);
-    this.addToQueue(aJob);
-    return id;	
-  }
-	
-  /**
    * Add a collection of jobs
    * 
    * @param jobs
    */
-  public void addJobs(Collection<Job> jobs) {
+  public void addJobs(Collection <Job> jobs) {
     for (Job job : jobs) {
       addJob(job);
     }
   }
-	
+
   /**
    * @return the thread state
    */
   public int getState() {
-    return this.runnerState;
-  }
-	
-  /**
-   * set the thread state to STOPPING so that the 
-   * thread will stop when it wakes up.
-   */
-  public void stop() {
-    this.runnerState = JobControl.STOPPING;
-  }
-	
-  /**
-   * suspend the running thread
-   */
-  public void suspend () {
-    if (this.runnerState == JobControl.RUNNING) {
-      this.runnerState = JobControl.SUSPENDED;
-    }
-  }
-	
-  /**
-   * resume the suspended thread
-   */
-  public void resume () {
-    if (this.runnerState == JobControl.SUSPENDED) {
-      this.runnerState = JobControl.RUNNING;
+    ThreadState state = super.getThreadState();
+    if (state == ThreadState.RUNNING) {
+      return 0;
+    } 
+    if (state == ThreadState.SUSPENDED) {
+      return 1;
     }
-  }
-	
-  synchronized private void checkRunningJobs() {
-		
-    Map<String, Job> oldJobs = null;
-    oldJobs = this.runningJobs;
-    this.runningJobs = new Hashtable<String, Job>();
-		
-    for (Job nextJob : oldJobs.values()) {
-      int state = nextJob.checkState();
-      /*
-        if (state != Job.RUNNING) {
-        System.out.println("The state of the running job " +
-        nextJob.getJobName() + " has changed to: " + nextJob.getState());
-        }
-      */
-      this.addToQueue(nextJob);
+    if (state == ThreadState.STOPPED) {
+      return 2;
     }
-  }
-	
-  synchronized private void checkWaitingJobs() {
-    Map<String, Job> oldJobs = null;
-    oldJobs = this.waitingJobs;
-    this.waitingJobs = new Hashtable<String, Job>();
-		
-    for (Job nextJob : oldJobs.values()) {
-      int state = nextJob.checkState();
-      /*
-        if (state != Job.WAITING) {
-        System.out.println("The state of the waiting job " +
-        nextJob.getJobName() + " has changed to: " + nextJob.getState());
-        }
-      */
-      this.addToQueue(nextJob);
+    if (state == ThreadState.STOPPING) {
+      return 3;
     }
-  }
-	
-  synchronized private void startReadyJobs() {
-    Map<String, Job> oldJobs = null;
-    oldJobs = this.readyJobs;
-    this.readyJobs = new Hashtable<String, Job>();
-		
-    for (Job nextJob : oldJobs.values()) {
-      //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
-      nextJob.submit();
-      //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
-      this.addToQueue(nextJob);
-    }	
-  }
-	
-  synchronized public boolean allFinished() {
-    return this.waitingJobs.size() == 0 &&
-      this.readyJobs.size() == 0 &&
-      this.runningJobs.size() == 0;
-  }
-	
-  /**
-   *  The main loop for the thread.
-   *  The loop does the following:
-   *  	Check the states of the running jobs
-   *  	Update the states of waiting jobs
-   *  	Submit the jobs in ready state
-   */
-  public void run() {
-    this.runnerState = JobControl.RUNNING;
-    while (true) {
-      while (this.runnerState == JobControl.SUSPENDED) {
-        try {
-          Thread.sleep(5000);
-        }
-        catch (Exception e) {
-					
-        }
-      }
-      checkRunningJobs();	
-      checkWaitingJobs();		
-      startReadyJobs();		
-      if (this.runnerState != JobControl.RUNNING && 
-          this.runnerState != JobControl.SUSPENDED) {
-        break;
-      }
-      try {
-        Thread.sleep(5000);
-      }
-      catch (Exception e) {
-				
-      }
-      if (this.runnerState != JobControl.RUNNING && 
-          this.runnerState != JobControl.SUSPENDED) {
-        break;
-      }
+    if (state == ThreadState.READY ) {
+      return 4;
     }
-    this.runnerState = JobControl.STOPPED;
+    return -1;
   }
 
 }



Mime
View raw message