hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r885145 [19/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Date Sat, 28 Nov 2009 20:26:22 GMT
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JobTrackerMetricsInst.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,8 +22,6 @@
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
-import org.apache.hadoop.metrics.util.MetricsRegistry;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 
 class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater {
   private final MetricsRecord metricsRecord;
@@ -45,6 +43,27 @@
   private int numBlackListedMapSlots = 0;
   private int numBlackListedReduceSlots = 0;
 
+  private int numReservedMapSlots = 0;
+  private int numReservedReduceSlots = 0;
+  private int numOccupiedMapSlots = 0;
+  private int numOccupiedReduceSlots = 0;
+  
+  private int numJobsFailed = 0;
+  private int numJobsKilled = 0;
+  
+  private int numJobsPreparing = 0;
+  private int numJobsRunning = 0;
+  
+  private int numRunningMaps = 0;
+  private int numRunningReduces = 0;
+  
+  private int numMapTasksKilled = 0;
+  private int numReduceTasksKilled = 0;
+
+  private int numTrackers = 0;
+  private int numTrackersBlackListed = 0;
+  private int numTrackersDecommissioned = 0;
+  
   public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
     super(tracker, conf);
     String sessionId = conf.getSessionId();
@@ -78,6 +97,28 @@
       metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
       metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
       metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
+      
+      metricsRecord.incrMetric("reserved_map_slots", numReservedMapSlots);
+      metricsRecord.incrMetric("reserved_reduce_slots", numReservedReduceSlots);
+      metricsRecord.incrMetric("occupied_map_slots", numOccupiedMapSlots);
+      metricsRecord.incrMetric("occupied_reduce_slots", numOccupiedReduceSlots);
+      
+      metricsRecord.incrMetric("jobs_failed", numJobsFailed);
+      metricsRecord.incrMetric("jobs_killed", numJobsKilled);
+      
+      metricsRecord.incrMetric("jobs_preparing", numJobsPreparing);
+      metricsRecord.incrMetric("jobs_running", numJobsRunning);
+      
+      metricsRecord.incrMetric("running_maps", numRunningMaps);
+      metricsRecord.incrMetric("running_reduces", numRunningReduces);
+      
+      metricsRecord.incrMetric("maps_killed", numMapTasksKilled);
+      metricsRecord.incrMetric("reduces_killed", numReduceTasksKilled);
+
+      metricsRecord.incrMetric("trackers", numTrackers);
+      metricsRecord.incrMetric("trackers_blacklisted", numTrackersBlackListed);
+      metricsRecord.setMetric("trackers_decommissioned", 
+          numTrackersDecommissioned);
 
       numMapTasksLaunched = 0;
       numMapTasksCompleted = 0;
@@ -91,6 +132,26 @@
       numWaitingReduces = 0;
       numBlackListedMapSlots = 0;
       numBlackListedReduceSlots = 0;
+      
+      numReservedMapSlots = 0;
+      numReservedReduceSlots = 0;
+      numOccupiedMapSlots = 0;
+      numOccupiedReduceSlots = 0;
+      
+      numJobsFailed = 0;
+      numJobsKilled = 0;
+      
+      numJobsPreparing = 0;
+      numJobsRunning = 0;
+      
+      numRunningMaps = 0;
+      numRunningReduces = 0;
+      
+      numMapTasksKilled = 0;
+      numReduceTasksKilled = 0;
+
+      numTrackers = 0;
+      numTrackersBlackListed = 0;
     }
     metricsRecord.update();
 
@@ -166,12 +227,12 @@
   }
 
   @Override
-  public void setMapSlots(int slots) {
+  public synchronized void setMapSlots(int slots) {
     numMapSlots = slots;
   }
 
   @Override
-  public void setReduceSlots(int slots) {
+  public synchronized void setReduceSlots(int slots) {
     numReduceSlots = slots;
   }
 
@@ -194,4 +255,154 @@
   public synchronized void decBlackListedReduceSlots(int slots){
     numBlackListedReduceSlots -= slots;
   }
+
+  @Override
+  public synchronized void addReservedMapSlots(int slots)
+  { 
+    numReservedMapSlots += slots;
+  }
+
+  @Override
+  public synchronized void decReservedMapSlots(int slots)
+  {
+    numReservedMapSlots -= slots;
+  }
+
+  @Override
+  public synchronized void addReservedReduceSlots(int slots)
+  {
+    numReservedReduceSlots += slots;
+  }
+
+  @Override
+  public synchronized void decReservedReduceSlots(int slots)
+  {
+    numReservedReduceSlots -= slots;
+  }
+
+  @Override
+  public synchronized void addOccupiedMapSlots(int slots)
+  {
+    numOccupiedMapSlots += slots;
+  }
+
+  @Override
+  public synchronized void decOccupiedMapSlots(int slots)
+  {
+    numOccupiedMapSlots -= slots;
+  }
+
+  @Override
+  public synchronized void addOccupiedReduceSlots(int slots)
+  {
+    numOccupiedReduceSlots += slots;
+  }
+
+  @Override
+  public synchronized void decOccupiedReduceSlots(int slots)
+  {
+    numOccupiedReduceSlots -= slots;
+  }
+
+  @Override
+  public synchronized void failedJob(JobConf conf, JobID id) 
+  {
+    numJobsFailed++;
+  }
+
+  @Override
+  public synchronized void killedJob(JobConf conf, JobID id) 
+  {
+    numJobsKilled++;
+  }
+
+  @Override
+  public synchronized void addPrepJob(JobConf conf, JobID id) 
+  {
+    numJobsPreparing++;
+  }
+
+  @Override
+  public synchronized void decPrepJob(JobConf conf, JobID id) 
+  {
+    numJobsPreparing--;
+  }
+
+  @Override
+  public synchronized void addRunningJob(JobConf conf, JobID id) 
+  {
+    numJobsRunning++;
+  }
+
+  @Override
+  public synchronized void decRunningJob(JobConf conf, JobID id) 
+  {
+    numJobsRunning--;
+  }
+
+  @Override
+  public synchronized void addRunningMaps(int task)
+  {
+    numRunningMaps += task;
+  }
+
+  @Override
+  public synchronized void decRunningMaps(int task) 
+  {
+    numRunningMaps -= task;
+  }
+
+  @Override
+  public synchronized void addRunningReduces(int task)
+  {
+    numRunningReduces += task;
+  }
+
+  @Override
+  public synchronized void decRunningReduces(int task)
+  {
+    numRunningReduces -= task;
+  }
+
+  @Override
+  public synchronized void killedMap(TaskAttemptID taskAttemptID)
+  {
+    numMapTasksKilled++;
+  }
+
+  @Override
+  public synchronized void killedReduce(TaskAttemptID taskAttemptID)
+  {
+    numReduceTasksKilled++;
+  }
+
+  @Override
+  public synchronized void addTrackers(int trackers)
+  {
+    numTrackers += trackers;
+  }
+
+  @Override
+  public synchronized void decTrackers(int trackers)
+  {
+    numTrackers -= trackers;
+  }
+
+  @Override
+  public synchronized void addBlackListedTrackers(int trackers)
+  {
+    numTrackersBlackListed += trackers;
+  }
+
+  @Override
+  public synchronized void decBlackListedTrackers(int trackers)
+  {
+    numTrackersBlackListed -= trackers;
+  }
+
+  @Override
+  public synchronized void setDecommissionedTrackers(int trackers)
+  {
+    numTrackersDecommissioned = trackers;
+  }  
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JvmManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JvmManager.java Sat Nov 28 20:26:01 2009
@@ -33,9 +33,9 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
-import org.apache.hadoop.util.ProcessTree;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapreduce.util.ProcessTree;
 
 class JvmManager {
 
@@ -196,7 +196,7 @@
         context.task = task;
         // If we are returning the same task as which the JVM was launched
         // we don't initialize task once again.
-        if (!jvmRunner.env.conf.get("mapred.task.id").equals(
+        if (!jvmRunner.env.conf.get(JobContext.TASK_ATTEMPT_ID).equals(
             task.getTaskID().toString())) {
           try {
             tracker.getTaskController().initializeTask(context);
@@ -447,7 +447,7 @@
           if (initalContext != null && initalContext.env != null) {
             initalContext.pid = jvmIdToPid.get(jvmId);
             initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
-                .getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill",
+                .getLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
                     ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
 
             // Destroy the task jvm

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JvmTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JvmTask.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JvmTask.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/JvmTask.java Sat Nov 28 20:26:01 2009
@@ -23,7 +23,15 @@
 import java.io.IOException;
 import org.apache.hadoop.io.Writable;
 
-class JvmTask implements Writable {
+/**
+ * Task abstraction that can be serialized, implements the writable interface
+ * 
+ * <FRAMEWORK-USE-ONLY>
+ * This method is intended only for use by the Map/Reduce framework and not
+ * for external users
+ *
+ */
+public class JvmTask implements Writable {
   Task t;
   boolean shouldDie;
   public JvmTask(Task t, boolean shouldDie) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/KeyValueLineRecordReader.java Sat Nov 28 20:26:01 2009
@@ -27,7 +27,7 @@
 /**
  * This class treats a line in the input as a key/value pair separated by a 
  * separator character. The separator can be specified in config file 
- * under the attribute name key.value.separator.in.input.line. The default
+ * under the attribute name mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default
  * separator is the tab character ('\t').
  * 
  * @deprecated Use 
@@ -61,7 +61,7 @@
     lineRecordReader = new LineRecordReader(job, split);
     dummyKey = lineRecordReader.createKey();
     innerValue = lineRecordReader.createValue();
-    String sepStr = job.get("key.value.separator.in.input.line", "\t");
+    String sepStr = job.get("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
     this.separator = (byte) sepStr.charAt(0);
   }
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/KeyValueTextInputFormat.java Sat Nov 28 20:26:01 2009
@@ -23,7 +23,9 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 
 /**
  * An {@link InputFormat} for plain text files. Files are broken into lines.
@@ -46,7 +48,11 @@
   }
   
   protected boolean isSplitable(FileSystem fs, Path file) {
-    return compressionCodecs.getCodec(file) == null;
+    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
   }
   
   public RecordReader<Text, Text> getRecordReader(InputSplit genericSplit,

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java Sat Nov 28 20:26:01 2009
@@ -20,28 +20,24 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * A {@link TaskScheduler} that limits the maximum number of tasks
  * running for a job. The limit is set by means of the
- * <code>mapred.jobtracker.scheduler.maxRunningTasksPerJob</code>
- * property.
+ * {@link JTConfig#JT_RUNNINGTASKS_PER_JOB} property.
  */
 class LimitTasksPerJobTaskScheduler extends JobQueueTaskScheduler {
   
   private static final Log LOG = LogFactory.getLog(
     "org.apache.hadoop.mapred.TaskLimitedJobQueueTaskScheduler");
   
-  public static final String MAX_TASKS_PER_JOB_PROPERTY = 
-    "mapred.jobtracker.taskScheduler.maxRunningTasksPerJob";
-  
   private long maxTasksPerJob;
   
   public LimitTasksPerJobTaskScheduler() {
@@ -60,9 +56,10 @@
   @Override
   public synchronized void setConf(Configuration conf) {
     super.setConf(conf);
-    maxTasksPerJob = conf.getLong(MAX_TASKS_PER_JOB_PROPERTY ,Long.MAX_VALUE);
+    maxTasksPerJob = 
+      conf.getLong(JTConfig.JT_RUNNINGTASKS_PER_JOB, Long.MAX_VALUE);
     if (maxTasksPerJob <= 0) {
-      String msg = MAX_TASKS_PER_JOB_PROPERTY +
+      String msg = JTConfig.JT_RUNNINGTASKS_PER_JOB +
         " is set to zero or a negative value. Aborting.";
       LOG.fatal(msg);
       throw new RuntimeException (msg);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LineRecordReader.java Sat Nov 28 20:26:01 2009
@@ -25,12 +25,15 @@
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 
@@ -50,6 +53,7 @@
   private long end;
   private LineReader in;
   private FSDataInputStream fileIn;
+  private final Seekable filePosition;
   int maxLineLength;
   private CompressionCodec codec;
   private Decompressor decompressor;
@@ -73,8 +77,8 @@
 
   public LineRecordReader(Configuration job, 
                           FileSplit split) throws IOException {
-    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
-                                    Integer.MAX_VALUE);
+    this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
+      LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
     start = split.getStart();
     end = start + split.getLength();
     final Path file = split.getPath();
@@ -82,20 +86,33 @@
     codec = compressionCodecs.getCodec(file);
 
     // open the file and seek to the start of the split
-    FileSystem fs = file.getFileSystem(job);
-    fileIn = fs.open(split.getPath());
+    final FileSystem fs = file.getFileSystem(job);
+    fileIn = fs.open(file);
     if (isCompressedInput()) {
       decompressor = CodecPool.getDecompressor(codec);
-      in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
+      if (codec instanceof SplittableCompressionCodec) {
+        final SplitCompressionInputStream cIn =
+          ((SplittableCompressionCodec)codec).createInputStream(
+            fileIn, decompressor, start, end,
+            SplittableCompressionCodec.READ_MODE.BYBLOCK);
+        in = new LineReader(cIn, job);
+        start = cIn.getAdjustedStart();
+        end = cIn.getAdjustedEnd();
+        filePosition = cIn; // take pos from compressed stream
+      } else {
+        in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
+        filePosition = fileIn;
+      }
     } else {
       fileIn.seek(start);
       in = new LineReader(fileIn, job);
+      filePosition = fileIn;
     }
     // If this is not the first split, we always throw away first record
     // because we always (except the last split) read one extra line in
     // next() method.
     if (start != 0) {
-      start += in.readLine(new Text(), 0, maxBytesToConsume());
+      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
     }
     this.pos = start;
   }
@@ -107,17 +124,19 @@
     this.start = offset;
     this.pos = offset;
     this.end = endOffset;    
+    filePosition = null;
   }
 
   public LineRecordReader(InputStream in, long offset, long endOffset, 
                           Configuration job) 
     throws IOException{
-    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
-                                    Integer.MAX_VALUE);
+    this.maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
+      LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
     this.in = new LineReader(in, job);
     this.start = offset;
     this.pos = offset;
     this.end = endOffset;    
+    filePosition = null;
   }
   
   public LongWritable createKey() {
@@ -128,16 +147,20 @@
     return new Text();
   }
   
-  private boolean isCompressedInput() { return (codec != null); }
-  
-  private int maxBytesToConsume() {
-    return (isCompressedInput()) ? Integer.MAX_VALUE
-                           : (int) Math.min(Integer.MAX_VALUE, (end - start));
+  private boolean isCompressedInput() {
+    return (codec != null);
+  }
+
+  private int maxBytesToConsume(long pos) {
+    return isCompressedInput()
+      ? Integer.MAX_VALUE
+      : (int) Math.min(Integer.MAX_VALUE, end - pos);
   }
+
   private long getFilePosition() throws IOException {
     long retVal;
-    if (isCompressedInput()) {
-      retVal = fileIn.getPos();
+    if (isCompressedInput() && null != filePosition) {
+      retVal = filePosition.getPos();
     } else {
       retVal = pos;
     }
@@ -155,7 +178,7 @@
       key.set(pos);
 
       int newSize = in.readLine(value, maxLineLength,
-                                Math.max(maxBytesToConsume(), maxLineLength));
+          Math.max(maxBytesToConsume(pos), maxLineLength));
       if (newSize == 0) {
         return false;
       }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Sat Nov 28 20:26:01 2009
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package org.apache.hadoop.mapred;
 
 import java.io.BufferedWriter;
@@ -29,6 +29,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -42,8 +43,8 @@
  * JVM and killing it when needed, and also initializing and
  * finalizing the task environment. 
  * <p> The setuid executable is launched using the command line:</p>
- * <p>task-controller user-name command command-args, where</p>
- * <p>user-name is the name of the owner who submits the job</p>
+ * <p>task-controller mapreduce.job.user.name command command-args, where</p>
+ * <p>mapreduce.job.user.name is the name of the owner who submits the job</p>
  * <p>command is one of the cardinal value of the 
  * {@link LinuxTaskController.TaskCommands} enumeration</p>
  * <p>command-args depends on the command being launched.</p>
@@ -80,11 +81,14 @@
    * List of commands that the setuid script will execute.
    */
   enum TaskCommands {
+    INITIALIZE_USER,
     INITIALIZE_JOB,
+    INITIALIZE_DISTRIBUTEDCACHE,
     LAUNCH_TASK_JVM,
     INITIALIZE_TASK,
     TERMINATE_TASK_JVM,
     KILL_TASK_JVM,
+    RUN_DEBUG_SCRIPT,
   }
 
   /**
@@ -117,10 +121,12 @@
     sb.append(cmdLine);
     // write the command to a file in the
     // task specific cache directory
-    writeCommand(sb.toString(), getTaskCacheDirectory(context));
+    writeCommand(sb.toString(), getTaskCacheDirectory(context, 
+        context.env.workDir));
     
     // Call the taskcontroller with the right parameters.
-    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context);
+    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, 
+        context.env.workDir);
     ShellCommandExecutor shExec =  buildTaskControllerExecutor(
                                     TaskCommands.LAUNCH_TASK_JVM, 
                                     env.conf.getUser(),
@@ -147,7 +153,23 @@
       logOutput(shExec.getOutput());
     }
   }
-
+  
+  /**
+   * Launch the debug script process that will run as the owner of the job.
+   * 
+   * This method launches the task debug script process by executing a setuid
+   * executable that will switch to the user and run the task. 
+   */
+  @Override
+  void runDebugScript(DebugScriptContext context) throws IOException {
+    String debugOut = FileUtil.makeShellPath(context.stdout);
+    String cmdLine = TaskLog.buildDebugScriptCommandLine(context.args, debugOut);
+    writeCommand(cmdLine, getTaskCacheDirectory(context, context.workDir));
+    // Call the taskcontroller with the right parameters.
+    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, context.workDir);
+    runCommand(TaskCommands.RUN_DEBUG_SCRIPT, context.task.getUser(), 
+        launchTaskJVMArgs, context.workDir, null);
+  }
   /**
    * Helper method that runs a LinuxTaskController command
    * 
@@ -190,7 +212,7 @@
    * @param context
    * @return Argument to be used while launching Task VM
    */
-  private List<String> buildInitializeTaskArgs(TaskControllerContext context) {
+  private List<String> buildInitializeTaskArgs(TaskExecContext context) {
     List<String> commandArgs = new ArrayList<String>(3);
     String taskId = context.task.getTaskID().toString();
     String jobId = getJobId(context);
@@ -221,7 +243,7 @@
     }
   }
 
-  private String getJobId(TaskControllerContext context) {
+  private String getJobId(TaskExecContext context) {
     String taskId = context.task.getTaskID().toString();
     TaskAttemptID tId = TaskAttemptID.forName(taskId);
     String jobId = tId.getJobID().toString();
@@ -235,31 +257,34 @@
    * @param context
    * @return Argument to be used while launching Task VM
    */
-  private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
+  private List<String> buildLaunchTaskArgs(TaskExecContext context, 
+      File workDir) {
     List<String> commandArgs = new ArrayList<String>(3);
     LOG.debug("getting the task directory as: " 
-        + getTaskCacheDirectory(context));
+        + getTaskCacheDirectory(context, workDir));
     LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
-        new File(getTaskCacheDirectory(context)), 
+        new File(getTaskCacheDirectory(context, workDir)), 
         context) );
     commandArgs.add(getDirectoryChosenForTask(
-        new File(getTaskCacheDirectory(context)), 
+        new File(getTaskCacheDirectory(context, workDir)), 
         context));
     commandArgs.addAll(buildInitializeTaskArgs(context));
     return commandArgs;
   }
 
   // Get the directory from the list of directories configured
-  // in mapred.local.dir chosen for storing data pertaining to
+  // in Configs.LOCAL_DIR chosen for storing data pertaining to
   // this task.
   private String getDirectoryChosenForTask(File directory,
-      TaskControllerContext context) {
+      TaskExecContext context) {
     String jobId = getJobId(context);
     String taskId = context.task.getTaskID().toString();
     for (String dir : mapredLocalDirs) {
       File mapredDir = new File(dir);
-      File taskDir = new File(mapredDir, TaskTracker.getTaskWorkDir(
-          jobId, taskId, context.task.isTaskCleanupTask())).getParentFile();
+      File taskDir =
+          new File(mapredDir, TaskTracker.getTaskWorkDir(context.task
+              .getUser(), jobId, taskId, context.task.isTaskCleanupTask()))
+              .getParentFile();
       if (directory.equals(taskDir)) {
         return dir;
       }
@@ -276,13 +301,13 @@
    * <br/>
    * For launching following is command line argument:
    * <br/>
-   * {@code user-name command tt-root job_id task_id} 
+   * {@code mapreduce.job.user.name command tt-root job_id task_id} 
    * <br/>
    * For terminating/killing task jvm.
-   * {@code user-name command tt-root task-pid}
+   * {@code mapreduce.job.user.name command tt-root task-pid}
    * 
    * @param command command to be executed.
-   * @param userName user name
+   * @param userName mapreduce.job.user.name
    * @param cmdArgs list of extra arguments
    * @param workDir working directory for the task-controller
    * @param env JVM environment variables.
@@ -318,12 +343,13 @@
   }
   
   // Return the task specific directory under the cache.
-  private String getTaskCacheDirectory(TaskControllerContext context) {
+  private String getTaskCacheDirectory(TaskExecContext context, 
+      File workDir) {
     // In the case of JVM reuse, the task specific directory
     // is different from what is set with respect with
     // env.workDir. Hence building this from the taskId everytime.
     String taskId = context.task.getTaskID().toString();
-    File cacheDirForJob = context.env.workDir.getParentFile().getParentFile();
+    File cacheDirForJob = workDir.getParentFile().getParentFile();
     if(context.task.isTaskCleanupTask()) {
       taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
     }
@@ -341,6 +367,9 @@
     PrintWriter pw = null;
     String commandFile = directory + File.separator + COMMAND_FILE;
     LOG.info("Writing commands to " + commandFile);
+    LOG.info("--------Commands Begin--------");
+    LOG.info(cmdLine);
+    LOG.info("--------Commands End--------");
     try {
       FileWriter fw = new FileWriter(commandFile);
       BufferedWriter bw = new BufferedWriter(fw);
@@ -378,6 +407,24 @@
         buildInitializeJobCommandArgs(context), context.workDir, null);
   }
 
+  @Override
+  public void initializeDistributedCache(InitializationContext context)
+      throws IOException {
+    LOG.debug("Going to initialize distributed cache for " + context.user
+        + " on the TT");
+    runCommand(TaskCommands.INITIALIZE_DISTRIBUTEDCACHE, context.user,
+        new ArrayList<String>(), context.workDir, null);
+  }
+
+  @Override
+  public void initializeUser(InitializationContext context)
+      throws IOException {
+    LOG.debug("Going to initialize user directories for " + context.user
+        + " on the TT");
+    runCommand(TaskCommands.INITIALIZE_USER, context.user,
+        new ArrayList<String>(), context.workDir, null);
+  }
+
   /**
    * API which builds the command line to be pass to LinuxTaskController
    * binary to terminate/kill the task. See 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Sat Nov 28 20:26:01 2009
@@ -28,6 +28,8 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job.RawSplit;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -35,14 +37,21 @@
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.server.jobtracker.State;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
-class LocalJobRunner implements JobSubmissionProtocol {
+public class LocalJobRunner implements ClientProtocol {
   public static final Log LOG =
     LogFactory.getLog(LocalJobRunner.class);
 
@@ -57,9 +66,10 @@
   private static final String jobDir =  "localRunner/";
   
   public long getProtocolVersion(String protocol, long clientVersion) {
-    return JobSubmissionProtocol.versionID;
+    return ClientProtocol.versionID;
   }
   
+  @SuppressWarnings("unchecked")
   static RawSplit[] getRawSplits(JobContext jContext, JobConf job)
       throws Exception {
     JobConf jobConf = jContext.getJobConf();
@@ -120,10 +130,11 @@
       return TaskUmbilicalProtocol.versionID;
     }
     
-    public Job(JobID jobid, JobConf conf) throws IOException {
+    public Job(JobID jobid) throws IOException {
       this.systemJobDir = new Path(getSystemDir(), jobid.toString());
       this.systemJobFile = new Path(systemJobDir, "job.xml");
       this.id = jobid;
+      JobConf conf = new JobConf(systemJobFile);
       this.localFs = FileSystem.getLocal(conf);
       this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir));
       this.localJobFile = new Path(this.localJobDir, id + ".xml");
@@ -135,7 +146,7 @@
       this.taskDistributedCacheManager = 
           trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
       taskDistributedCacheManager.setup(
-          new LocalDirAllocator("mapred.local.dir"), 
+          new LocalDirAllocator(MRConfig.LOCAL_DIR), 
           new File(systemJobDir.toString()),
           "archive");
       
@@ -188,7 +199,7 @@
     @Override
     public void run() {
       JobID jobId = profile.getJobID();
-      JobContext jContext = new JobContext(conf, jobId);
+      JobContext jContext = new JobContextImpl(job, jobId);
       OutputCommitter outputCommitter = job.getOutputCommitter();
       try {
         // split input into minimum number of splits
@@ -299,7 +310,7 @@
           }
         }
         // delete the temporary directory in output directory
-        outputCommitter.cleanupJob(jContext);
+        outputCommitter.commitJob(jContext);
         status.setCleanupProgress(1.0f);
 
         if (killed) {
@@ -312,7 +323,8 @@
 
       } catch (Throwable t) {
         try {
-          outputCommitter.cleanupJob(jContext);
+          outputCommitter.abortJob(jContext, 
+            org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
         } catch (IOException ioe) {
           LOG.info("Error cleaning up job:" + id);
         }
@@ -415,16 +427,26 @@
       LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
     }
     
+    public synchronized void fatalError(TaskAttemptID taskId, String msg) 
+    throws IOException {
+      LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
+    }
+    
     public MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId, 
         int fromEventId, int maxLocs, TaskAttemptID id) throws IOException {
-      return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
-                                               false);
+      return new MapTaskCompletionEventsUpdate(
+        org.apache.hadoop.mapred.TaskCompletionEvent.EMPTY_ARRAY, false);
     }
     
   }
 
+  public LocalJobRunner(Configuration conf) throws IOException {
+    this(new JobConf(conf));
+  }
+
+  @Deprecated
   public LocalJobRunner(JobConf conf) throws IOException {
-    this.fs = FileSystem.get(conf);
+    this.fs = FileSystem.getLocal(conf);
     this.conf = conf;
     myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf));
   }
@@ -432,118 +454,144 @@
   // JobSubmissionProtocol methods
 
   private static int jobid = 0;
-  public synchronized JobID getNewJobId() {
-    return new JobID("local", ++jobid);
+  public synchronized org.apache.hadoop.mapreduce.JobID getNewJobID() {
+    return new org.apache.hadoop.mapreduce.JobID("local", ++jobid);
   }
 
-  public JobStatus submitJob(JobID jobid) throws IOException {
-    return new Job(jobid, this.conf).status;
+  public org.apache.hadoop.mapreduce.JobStatus submitJob(
+      org.apache.hadoop.mapreduce.JobID jobid) 
+      throws IOException {
+    return new Job(JobID.downgrade(jobid)).status;
   }
 
-  public void killJob(JobID id) {
+  public void killJob(org.apache.hadoop.mapreduce.JobID id) {
     jobs.get(id).killed = true;
     jobs.get(id).interrupt();
   }
 
-  public void setJobPriority(JobID id, String jp) throws IOException {
+  public void setJobPriority(org.apache.hadoop.mapreduce.JobID id,
+      String jp) throws IOException {
     throw new UnsupportedOperationException("Changing job priority " +
                       "in LocalJobRunner is not supported.");
   }
   
   /** Throws {@link UnsupportedOperationException} */
-  public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException {
+  public boolean killTask(org.apache.hadoop.mapreduce.TaskAttemptID taskId,
+      boolean shouldFail) throws IOException {
     throw new UnsupportedOperationException("Killing tasks in " +
     "LocalJobRunner is not supported");
   }
 
-  public JobProfile getJobProfile(JobID id) {
-    Job job = jobs.get(id);
-    if(job != null)
-      return job.getProfile();
-    else 
-      return null;
-  }
-
-  public TaskReport[] getMapTaskReports(JobID id) {
-    return new TaskReport[0];
-  }
-  public TaskReport[] getReduceTaskReports(JobID id) {
-    return new TaskReport[0];
-  }
-  public TaskReport[] getCleanupTaskReports(JobID id) {
-    return new TaskReport[0];
-  }
-  public TaskReport[] getSetupTaskReports(JobID id) {
-    return new TaskReport[0];
+  public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
+      org.apache.hadoop.mapreduce.JobID id, TaskType type) {
+    return new org.apache.hadoop.mapreduce.TaskReport[0];
   }
 
-  public JobStatus getJobStatus(JobID id) {
-    Job job = jobs.get(id);
+  public org.apache.hadoop.mapreduce.JobStatus getJobStatus(
+      org.apache.hadoop.mapreduce.JobID id) {
+    Job job = jobs.get(JobID.downgrade(id));
     if(job != null)
       return job.status;
     else 
       return null;
   }
   
-  public Counters getJobCounters(JobID id) {
-    Job job = jobs.get(id);
-    return job.currentCounters;
+  public org.apache.hadoop.mapreduce.Counters getJobCounters(
+      org.apache.hadoop.mapreduce.JobID id) {
+    Job job = jobs.get(JobID.downgrade(id));
+    return new org.apache.hadoop.mapreduce.Counters(job.currentCounters);
   }
 
   public String getFilesystemName() throws IOException {
     return fs.getUri().toString();
   }
   
-  public ClusterStatus getClusterStatus(boolean detailed) {
-    return new ClusterStatus(1, 0, 0, map_tasks, reduce_tasks, 1, 1, 
-                             JobTracker.State.RUNNING);
+  public ClusterMetrics getClusterMetrics() {
+    return new ClusterMetrics(map_tasks, reduce_tasks, map_tasks, reduce_tasks,
+      0, 0, 1, 1, jobs.size(), 1, 0, 0);
+  }
+
+  public State getJobTrackerState() throws IOException, InterruptedException {
+    return State.RUNNING;
   }
 
-  public JobStatus[] jobsToComplete() {return null;}
+  public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
+    return 0;
+  }
 
-  public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid
+  /** 
+   * Get all active trackers in cluster. 
+   * @return array of TaskTrackerInfo
+   */
+  public TaskTrackerInfo[] getActiveTrackers() 
+      throws IOException, InterruptedException {
+    return null;
+  }
+
+  /** 
+   * Get all blacklisted trackers in cluster. 
+   * @return array of TaskTrackerInfo
+   */
+  public TaskTrackerInfo[] getBlacklistedTrackers() 
+      throws IOException, InterruptedException {
+    return null;
+  }
+
+  public TaskCompletionEvent[] getTaskCompletionEvents(
+      org.apache.hadoop.mapreduce.JobID jobid
       , int fromEventId, int maxEvents) throws IOException {
     return TaskCompletionEvent.EMPTY_ARRAY;
   }
   
-  public JobStatus[] getAllJobs() {return null;}
+  public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() {return null;}
 
   
   /**
    * Returns the diagnostic information for a particular task in the given job.
    * To be implemented
    */
-  public String[] getTaskDiagnostics(TaskAttemptID taskid)
-  		throws IOException{
+  public String[] getTaskDiagnostics(
+      org.apache.hadoop.mapreduce.TaskAttemptID taskid) throws IOException{
 	  return new String [0];
   }
 
   /**
-   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir()
    */
   public String getSystemDir() {
-    Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
+    Path sysDir = new Path(
+      conf.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system"));  
     return fs.makeQualified(sysDir).toString();
   }
 
+  public String getJobHistoryDir() {
+    return null;
+  }
+
+  @Override
+  public QueueInfo[] getChildQueues(String queueName) throws IOException {
+    return null;
+  }
+
   @Override
-  public JobStatus[] getJobsFromQueue(String queue) throws IOException {
+  public QueueInfo[] getRootQueues() throws IOException {
     return null;
   }
 
   @Override
-  public JobQueueInfo[] getQueues() throws IOException {
+  public QueueInfo[] getQueues() throws IOException {
     return null;
   }
 
 
   @Override
-  public JobQueueInfo getQueueInfo(String queue) throws IOException {
+  public QueueInfo getQueue(String queue) throws IOException {
     return null;
   }
 
   @Override
-  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException{
+  public org.apache.hadoop.mapreduce.QueueAclsInfo[] 
+      getQueueAclsForCurrentUser() throws IOException{
     return null;
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapOutputFile.java Sat Nov 28 20:26:01 2009
@@ -23,21 +23,30 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
 
 /**
  * Manipulate the working area for the transient store for maps and reduces.
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of 
+ * these methods are from child space and see mapreduce.cluster.local.dir as 
+ * taskTracker/jobCache/jobId/attemptId
+ * 
+ * <FRAMEWORK-USE-ONLY>
+ * This method is intended only for use by the Map/Reduce framework and not
+ * for external users
  */ 
-class MapOutputFile {
+public class MapOutputFile {
 
   private JobConf conf;
 
   static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
 
-  MapOutputFile() {
+  public MapOutputFile() {
   }
 
   private LocalDirAllocator lDirAlloc = 
-                            new LocalDirAllocator("mapred.local.dir");
+    new LocalDirAllocator(MRConfig.LOCAL_DIR);
   
   /**
    * Return the path to local map output file created earlier
@@ -165,7 +174,8 @@
    * @return path
    * @throws IOException
    */
-  public Path getInputFileForWrite(TaskID mapId, long size)
+  public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, 
+                                   long size)
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(String.format(
         REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
@@ -185,4 +195,5 @@
       this.conf = new JobConf(conf);
     }
   }
+  
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapReducePolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapReducePolicyProvider.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapReducePolicyProvider.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapReducePolicyProvider.java Sat Nov 28 20:26:01 2009
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.authorize.Service;
@@ -30,7 +31,7 @@
       new Service("security.inter.tracker.protocol.acl", 
                   InterTrackerProtocol.class),
       new Service("security.job.submission.protocol.acl",
-                  JobSubmissionProtocol.class),
+                  ClientProtocol.class),
       new Service("security.task.umbilical.protocol.acl", 
                   TaskUmbilicalProtocol.class),
       new Service("security.refresh.policy.protocol.acl", 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTask.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTask.java Sat Nov 28 20:26:01 2009
@@ -53,6 +53,9 @@
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.util.IndexedSortable;
@@ -60,6 +63,7 @@
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.QuickSort;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
 
 /** A Map task. */
 class MapTask extends Task {
@@ -113,9 +117,9 @@
         && (conf.getKeepTaskFilesPattern() != null || conf
             .getKeepFailedTaskFiles())) {
       Path localSplit =
-          new LocalDirAllocator("mapred.local.dir").getLocalPathForWrite(
-              TaskTracker.getLocalSplitFile(getJobID().toString(), getTaskID()
-                  .toString()), conf);
+          new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathForWrite(
+              TaskTracker.getLocalSplitFile(conf.getUser(), getJobID()
+                  .toString(), getTaskID().toString()), conf);
       LOG.debug("Writing local split to " + localSplit);
       DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
       Text.writeString(out, splitClass);
@@ -290,6 +294,7 @@
   @Override
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException, ClassNotFoundException, InterruptedException {
+    this.umbilical = umbilical;
 
     if (isMapTask()) {
       mapPhase = getProgress().addPhase("map", 0.667f);
@@ -353,7 +358,7 @@
     RecordReader<INKEY,INVALUE> in = isSkipping() ? 
         new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
         new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
-    job.setBoolean("mapred.skip.on", isSkipping());
+    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
 
 
     int numReduceTasks = conf.getNumReduceTasks();
@@ -388,9 +393,9 @@
   private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
     if (inputSplit instanceof FileSplit) {
       FileSplit fileSplit = (FileSplit) inputSplit;
-      job.set("map.input.file", fileSplit.getPath().toString());
-      job.setLong("map.input.start", fileSplit.getStart());
-      job.setLong("map.input.length", fileSplit.getLength());
+      job.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
+      job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
+      job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
     }
   }
 
@@ -488,6 +493,43 @@
     }
   }
 
+  private class NewDirectOutputCollector<K,V>
+  extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
+    private final org.apache.hadoop.mapreduce.RecordWriter out;
+
+    private final TaskReporter reporter;
+
+    private final Counters.Counter mapOutputRecordCounter;
+    
+    @SuppressWarnings("unchecked")
+    NewDirectOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
+        JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) 
+    throws IOException, ClassNotFoundException, InterruptedException {
+      this.reporter = reporter;
+      out = outputFormat.getRecordWriter(taskContext);
+      mapOutputRecordCounter = 
+        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void write(K key, V value) 
+    throws IOException, InterruptedException {
+      reporter.progress();
+      out.write(key, value);
+      mapOutputRecordCounter.increment(1);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) 
+    throws IOException,InterruptedException {
+      reporter.progress();
+      if (out != null) {
+        out.close(context);
+      }
+    }
+  }
+  
   private class NewOutputCollector<K,V>
     extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
     private final MapOutputCollector<K,V> collector;
@@ -543,7 +585,8 @@
                              InterruptedException {
     // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-      new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
+      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
+                                                                  getTaskID());
     // make a mapper
     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
       (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
@@ -568,49 +611,36 @@
       new NewTrackingRecordReader<INKEY,INVALUE>
           (inputFormat.createRecordReader(split, taskContext), reporter);
     
-    job.setBoolean("mapred.skip.on", isSkipping());
+    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
     org.apache.hadoop.mapreduce.RecordWriter output = null;
-    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
-         mapperContext = null;
-    try {
-      Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
-        org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
-        (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
-                     Configuration.class,
-                     org.apache.hadoop.mapreduce.TaskAttemptID.class,
-                     org.apache.hadoop.mapreduce.RecordReader.class,
-                     org.apache.hadoop.mapreduce.RecordWriter.class,
-                     org.apache.hadoop.mapreduce.OutputCommitter.class,
-                     org.apache.hadoop.mapreduce.StatusReporter.class,
-                     org.apache.hadoop.mapreduce.InputSplit.class});
-
-      // get an output object
-      if (job.getNumReduceTasks() == 0) {
-        output = outputFormat.getRecordWriter(taskContext);
-      } else {
-        output = new NewOutputCollector(taskContext, job, umbilical, reporter);
-      }
+    
+    // get an output object
+    if (job.getNumReduceTasks() == 0) {
+      output = 
+        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
+    } else {
+      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
+    }
 
-      mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
-                                                     input, output, committer,
-                                                     reporter, split);
+    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
+    mapContext = 
+      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
+          input, output, 
+          committer, 
+          reporter, split);
 
-      input.initialize(split, mapperContext);
-      mapper.run(mapperContext);
-      mapPhase.complete();
-      setPhase(TaskStatus.Phase.SORT);
-      statusUpdate(umbilical);
-      input.close();
-      output.close(mapperContext);
-    } catch (NoSuchMethodException e) {
-      throw new IOException("Can't find Context constructor", e);
-    } catch (InstantiationException e) {
-      throw new IOException("Can't create Context", e);
-    } catch (InvocationTargetException e) {
-      throw new IOException("Can't invoke Context constructor", e);
-    } catch (IllegalAccessException e) {
-      throw new IOException("Can't invoke Context constructor", e);
-    }
+    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
+        mapperContext = 
+          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
+              mapContext);
+
+    input.initialize(split, mapperContext);
+    mapper.run(mapperContext);
+    mapPhase.complete();
+    setPhase(TaskStatus.Phase.SORT);
+    statusUpdate(umbilical);
+    input.close();
+    output.close(mapperContext);
   }
 
   interface MapOutputCollector<K, V> {
@@ -742,21 +772,21 @@
       indexCacheList = new ArrayList<SpillRecord>();
       
       //sanity checks
-      final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
-      final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
-      final int sortmb = job.getInt("io.sort.mb", 100);
+      final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT,(float)0.8);
+      final float recper = job.getFloat(JobContext.MAP_SORT_RECORD_PERCENT,(float)0.05);
+      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
       if (spillper > (float)1.0 || spillper < (float)0.0) {
-        throw new IOException("Invalid \"io.sort.spill.percent\": " + spillper);
+        throw new IOException("Invalid \"mapreduce.map.sort.spill.percent\": " + spillper);
       }
       if (recper > (float)1.0 || recper < (float)0.01) {
-        throw new IOException("Invalid \"io.sort.record.percent\": " + recper);
+        throw new IOException("Invalid \"mapreduce.map.sort.record.percent\": " + recper);
       }
       if ((sortmb & 0x7FF) != sortmb) {
-        throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
+        throw new IOException("Invalid \"mapreduce.task.mapreduce.task.io.sort.mb\": " + sortmb);
       }
       sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
             QuickSort.class, IndexedSorter.class), job);
-      LOG.info("io.sort.mb = " + sortmb);
+      LOG.info("mapreduce.task.mapreduce.task.io.sort.mb = " + sortmb);
       // buffers and accounting
       int maxMemUsage = sortmb << 20;
       int recordCapacity = (int)(maxMemUsage * recper);
@@ -802,7 +832,7 @@
       } else {
         combineCollector = null;
       }
-      minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
+      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPISS, 3);
       spillThread.setDaemon(true);
       spillThread.setName("SpillThread");
       spillLock.lock();
@@ -1188,8 +1218,13 @@
             try {
               spillLock.unlock();
               sortAndSpill();
-            } catch (Throwable e) {
+            } catch (Exception e) {
               sortSpillException = e;
+            } catch (Throwable t) {
+              sortSpillException = t;
+              String logMsg = "Task " + getTaskID() + " failed : " 
+                              + StringUtils.stringifyException(t);
+              reportFatalError(getTaskID(), t, logMsg);
             } finally {
               spillLock.lock();
               if (bufend < bufindex && bufindex < bufstart) {
@@ -1532,7 +1567,7 @@
             }
           }
 
-          int mergeFactor = job.getInt("io.sort.factor", 100);
+          int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);
           // sort the segments only if there are intermediate merges
           boolean sortSegments = segmentList.size() > mergeFactor;
           //merge

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java Sat Nov 28 20:26:01 2009
@@ -27,8 +27,12 @@
  * A class that represents the communication between the tasktracker and child
  * tasks w.r.t the map task completion events. It also indicates whether the 
  * child task should reset its events index.
+ *
+ * <FRAMEWORK-USE-ONLY>
+ * This method is intended only for use by the Map/Reduce framework and not
+ * for external users
  */
-class MapTaskCompletionEventsUpdate implements Writable {
+public class MapTaskCompletionEventsUpdate implements Writable {
   TaskCompletionEvent[] events;
   boolean reset;
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTaskStatus.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTaskStatus.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MapTaskStatus.java Sat Nov 28 20:26:01 2009
@@ -105,4 +105,11 @@
     super.write(out);
     out.writeLong(mapFinishTime);
   }
+
+  @Override
+  public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+    throw new UnsupportedOperationException
+                ("addFetchFailedMap() not supported for MapTask");
+  }
+
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Mapper.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Mapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Mapper.java Sat Nov 28 20:26:01 2009
@@ -81,8 +81,8 @@
  *       private int noRecords = 0;
  *       
  *       public void configure(JobConf job) {
- *         mapTaskId = job.get("mapred.task.id");
- *         inputFile = job.get("map.input.file");
+ *         mapTaskId = job.get(JobContext.TASK_ATTEMPT_ID);
+ *         inputFile = job.get(JobContext.MAP_INPUT_FILE);
  *       }
  *       
  *       public void map(K key, V val,
@@ -145,8 +145,8 @@
    * takes an insignificant amount of time to process individual key/value 
    * pairs, this is crucial since the framework might assume that the task has 
    * timed-out and kill that task. The other way of avoiding this is to set 
-   * <a href="{@docRoot}/../mapred-default.html#mapred.task.timeout">
-   * mapred.task.timeout</a> to a high-enough value (or even zero for no 
+   * <a href="{@docRoot}/../mapred-default.html#mapreduce.task.timeout">
+   * mapreduce.task.timeout</a> to a high-enough value (or even zero for no 
    * time-outs).</p>
    * 
    * @param key the input key.

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MergeSorter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MergeSorter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MergeSorter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/MergeSorter.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Merger.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Merger.java Sat Nov 28 20:26:01 2009
@@ -36,16 +36,21 @@
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.util.PriorityQueue;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 
-class Merger {  
+/**
+ * Merger is an utility class used by the Map and Reduce tasks for merging
+ * both their memory and disk segments
+ */
+public class Merger {  
   private static final Log LOG = LogFactory.getLog(Merger.class);
 
   // Local directories
   private static LocalDirAllocator lDirAlloc = 
-    new LocalDirAllocator("mapred.local.dir");
+    new LocalDirAllocator(MRConfig.LOCAL_DIR);
 
   public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
@@ -60,9 +65,32 @@
   throws IOException {
     return 
       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
-                           reporter).merge(keyClass, valueClass,
+                           reporter, null).merge(keyClass, valueClass,
                                            mergeFactor, tmpDir,
-                                           readsCounter, writesCounter, mergePhase);
+                                           readsCounter, writesCounter, 
+                                           mergePhase);
+  }
+
+  public static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass, 
+                            CompressionCodec codec,
+                            Path[] inputs, boolean deleteInputs, 
+                            int mergeFactor, Path tmpDir,
+                            RawComparator<K> comparator,
+                            Progressable reporter,
+                            Counters.Counter readsCounter,
+                            Counters.Counter writesCounter,
+                            Counters.Counter mergedMapOutputsCounter,
+                            Progress mergePhase)
+  throws IOException {
+    return 
+      new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
+                           reporter, mergedMapOutputsCounter).merge(
+                                           keyClass, valueClass,
+                                           mergeFactor, tmpDir,
+                                           readsCounter, writesCounter,
+                                           mergePhase);
   }
   
   public static <K extends Object, V extends Object>
@@ -76,7 +104,8 @@
                             Progress mergePhase)
       throws IOException {
     return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
-                 comparator, reporter, false, readsCounter, writesCounter, mergePhase);
+                 comparator, reporter, false, readsCounter, writesCounter,
+                 mergePhase);
   }
 
   public static <K extends Object, V extends Object>
@@ -116,7 +145,7 @@
                                                mergePhase);
   }
 
-  static <K extends Object, V extends Object>
+  public static <K extends Object, V extends Object>
     RawKeyValueIterator merge(Configuration conf, FileSystem fs,
                             Class<K> keyClass, Class<V> valueClass,
                             List<Segment<K, V>> segments,
@@ -160,7 +189,7 @@
   void writeFile(RawKeyValueIterator records, Writer<K, V> writer, 
                  Progressable progressable, Configuration conf) 
   throws IOException {
-    long progressBar = conf.getLong("mapred.merge.recordsBeforeProgress",
+    long progressBar = conf.getLong(JobContext.RECORDS_BEFORE_PROGRESS,
         10000);
     long recordCtr = 0;
     while(records.next()) {
@@ -184,14 +213,33 @@
     long segmentOffset = 0;
     long segmentLength = -1;
     
+    Counters.Counter mapOutputsCounter = null;
+
     public Segment(Configuration conf, FileSystem fs, Path file,
-                   CompressionCodec codec, boolean preserve) throws IOException {
-      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve);
+                   CompressionCodec codec, boolean preserve)
+    throws IOException {
+      this(conf, fs, file, codec, preserve, null);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   CompressionCodec codec, boolean preserve,
+                   Counters.Counter mergedMapOutputsCounter)
+  throws IOException {
+      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, 
+           mergedMapOutputsCounter);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   long segmentOffset, long segmentLength,
+                   CompressionCodec codec,
+                   boolean preserve) throws IOException {
+      this(conf, fs, file, segmentOffset, segmentLength, codec, preserve, null);
     }
 
     public Segment(Configuration conf, FileSystem fs, Path file,
         long segmentOffset, long segmentLength, CompressionCodec codec,
-        boolean preserve) throws IOException {
+        boolean preserve, Counters.Counter mergedMapOutputsCounter)
+    throws IOException {
       this.conf = conf;
       this.fs = fs;
       this.file = file;
@@ -200,13 +248,22 @@
 
       this.segmentOffset = segmentOffset;
       this.segmentLength = segmentLength;
+      
+      this.mapOutputsCounter = mergedMapOutputsCounter;
     }
     
     public Segment(Reader<K, V> reader, boolean preserve) {
+      this(reader, preserve, null);
+    }
+    
+    public Segment(Reader<K, V> reader, boolean preserve, 
+                   Counters.Counter mapOutputsCounter) {
       this.reader = reader;
       this.preserve = preserve;
       
       this.segmentLength = reader.getLength();
+      
+      this.mapOutputsCounter = mapOutputsCounter;
     }
 
     void init(Counters.Counter readsCounter) throws IOException {
@@ -215,6 +272,10 @@
         in.seek(segmentOffset);
         reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter);
       }
+      
+      if (mapOutputsCounter != null) {
+        mapOutputsCounter.increment(1);
+      }
     }
     
     boolean inMemory() {
@@ -228,7 +289,7 @@
       return value;
     }
 
-    long getLength() { 
+    public long getLength() { 
       return (reader == null) ?
         segmentLength : reader.getLength();
     }
@@ -333,6 +394,15 @@
                       CompressionCodec codec, RawComparator<K> comparator,
                       Progressable reporter) 
     throws IOException {
+      this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null);
+    }
+    
+    public MergeQueue(Configuration conf, FileSystem fs, 
+                      Path[] inputs, boolean deleteInputs, 
+                      CompressionCodec codec, RawComparator<K> comparator,
+                      Progressable reporter, 
+                      Counters.Counter mergedMapOutputsCounter) 
+    throws IOException {
       this.conf = conf;
       this.fs = fs;
       this.codec = codec;
@@ -340,7 +410,11 @@
       this.reporter = reporter;
       
       for (Path file : inputs) {
-        segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs));
+        LOG.debug("MergeQ: adding: " + file);
+        segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs, 
+                                       (file.toString().endsWith(
+                                           Task.MERGED_OUTPUT_PREFIX) ? 
+                                        null : mergedMapOutputsCounter)));
       }
       
       // Sort segments on file-lengths
@@ -673,7 +747,7 @@
      * calculating mergeProgress. This simulates the above merge() method and
      * tries to obtain the number of bytes that are going to be merged in all
      * merges(assuming that there is no combiner called while merging).
-     * @param factor io.sort.factor
+     * @param factor mapreduce.task.mapreduce.task.io.sort.factor
      * @param inMem  number of segments in memory to be merged
      */
     long computeBytesInMerges(int factor, int inMem) {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/NodeHealthCheckerService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/NodeHealthCheckerService.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/NodeHealthCheckerService.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/NodeHealthCheckerService.java Sat Nov 28 20:26:01 2009
@@ -29,6 +29,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -62,13 +63,18 @@
   static private final String ERROR_PATTERN = "ERROR";
 
   /* Configuration keys */
-  static final String HEALTH_CHECK_SCRIPT_PROPERTY = "mapred.healthChecker.script.path";
+  static final String HEALTH_CHECK_SCRIPT_PROPERTY = 
+    TTConfig.TT_HEALTH_CHECKER_SCRIPT_PATH;
 
-  static final String HEALTH_CHECK_INTERVAL_PROPERTY = "mapred.healthChecker.interval";
+  static final String HEALTH_CHECK_INTERVAL_PROPERTY = 
+    TTConfig.TT_HEALTH_CHECKER_INTERVAL;
 
-  static final String HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY = "mapred.healthChecker.script.timeout";
+  static final String HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY = 
+    TTConfig.TT_HEALTH_CHECKER_SCRIPT_TIMEOUT;
 
-  static final String HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY = "mapred.healthChecker.script.args";
+  static final String HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY = 
+    TTConfig.TT_HEALTH_CHECKER_SCRIPT_ARGS;
+  
   /* end of configuration keys */
   /** Time out error message */
   static final String NODE_HEALTH_SCRIPT_TIMED_OUT_MSG = "Node health script timed out";
@@ -211,12 +217,14 @@
    * Method which initializes the values for the script path and interval time.
    */
   private void initialize(Configuration conf) {
-    this.nodeHealthScript = conf.get(HEALTH_CHECK_SCRIPT_PROPERTY);
-    this.intervalTime = conf.getLong(HEALTH_CHECK_INTERVAL_PROPERTY,
+    this.nodeHealthScript = 
+        conf.get(TTConfig.TT_HEALTH_CHECKER_SCRIPT_PATH);
+    this.intervalTime = conf.getLong(TTConfig.TT_HEALTH_CHECKER_INTERVAL,
         DEFAULT_HEALTH_CHECK_INTERVAL);
-    this.scriptTimeout = conf.getLong(HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY,
+    this.scriptTimeout = conf.getLong(
+        TTConfig.TT_HEALTH_CHECKER_SCRIPT_TIMEOUT,
         DEFAULT_HEALTH_SCRIPT_FAILURE_INTERVAL);
-    String[] args = conf.getStrings(HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY,
+    String[] args = conf.getStrings(TTConfig.TT_HEALTH_CHECKER_SCRIPT_ARGS,
         new String[] {});
     timer = new NodeHealthMonitorExecutor(args);
   }
@@ -323,7 +331,8 @@
    * @return true if node health monitoring service can be started.
    */
   static boolean shouldRun(Configuration conf) {
-    String nodeHealthScript = conf.get(HEALTH_CHECK_SCRIPT_PROPERTY);
+    String nodeHealthScript = 
+      conf.get(TTConfig.TT_HEALTH_CHECKER_SCRIPT_PATH);
     if (nodeHealthScript == null || nodeHealthScript.trim().isEmpty()) {
       return false;
     }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/OutputCommitter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/OutputCommitter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/OutputCommitter.java Sat Nov 28 20:26:01 2009
@@ -71,10 +71,38 @@
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException
+   * @deprecated Use {@link #commitJob(JobContext)} or 
+   *                 {@link #abortJob(JobContext, int)} instead.
    */
-  public abstract void cleanupJob(JobContext jobContext) throws IOException;
+  @Deprecated
+  public void cleanupJob(JobContext jobContext) throws IOException { }
 
   /**
+   * For committing job's output after successful job completion. Note that this
+   * is invoked for jobs with final runstate as SUCCESSFUL.	
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException 
+   */
+  public void commitJob(JobContext jobContext) throws IOException {
+    cleanupJob(jobContext);
+  }
+  
+  /**
+   * For aborting an unsuccessful job's output. Note that this is invoked for 
+   * jobs with final runstate as {@link JobStatus#FAILED} or 
+   * {@link JobStatus#KILLED}
+   * 
+   * @param jobContext Context of the job whose output is being written.
+   * @param status final runstate of the job
+   * @throws IOException
+   */
+  public void abortJob(JobContext jobContext, int status) 
+  throws IOException {
+    cleanupJob(jobContext);
+  }
+  
+  /**
    * Sets up output for the task.
    * 
    * @param taskContext Context of the task whose output is being written.
@@ -128,8 +156,12 @@
    * This method implements the new interface by calling the old method. Note
    * that the input types are different between the new and old apis and this
    * is a bridge between the two.
+   * @deprecated Use {@link #commitJob(org.apache.hadoop.mapreduce.JobContext)}
+   *             or {@link #abortJob(org.apache.hadoop.mapreduce.JobContext, org.apache.hadoop.mapreduce.JobStatus.State)}
+   *             instead.
    */
   @Override
+  @Deprecated
   public final void cleanupJob(org.apache.hadoop.mapreduce.JobContext context
                                ) throws IOException {
     cleanupJob((JobContext) context);
@@ -141,6 +173,33 @@
    * is a bridge between the two.
    */
   @Override
+  public final void commitJob(org.apache.hadoop.mapreduce.JobContext context
+                             ) throws IOException {
+    commitJob((JobContext) context);
+  }
+  
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final void abortJob(org.apache.hadoop.mapreduce.JobContext context, 
+		                   org.apache.hadoop.mapreduce.JobStatus.State runState) 
+  throws IOException {
+    int state = JobStatus.getOldNewJobRunState(runState);
+    if (state != JobStatus.FAILED && state != JobStatus.KILLED) {
+      throw new IOException ("Invalid job run state : " + runState.name());
+    }
+    abortJob((JobContext) context, state);
+  }
+  
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
   public final 
   void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
                  ) throws IOException {

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/OutputLogFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/OutputLogFilter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/OutputLogFilter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/OutputLogFilter.java Sat Nov 28 20:26:01 2009
@@ -27,9 +27,14 @@
  * This can be used to list paths of output directory as follows:
  *   Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
  *                                   new OutputLogFilter()));
+ * @deprecated Use 
+ *   {@link org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputLogFilter} 
+ *   instead.
  */
 public class OutputLogFilter implements PathFilter {
+  private static final PathFilter LOG_FILTER = 
+    new Utils.OutputFileUtils.OutputLogFilter();
   public boolean accept(Path path) {
-    return !(path.toString().contains("_logs"));
+    return LOG_FILTER.accept(path);
   }
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Queue.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Queue.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Queue.java Sat Nov 28 20:26:01 2009
@@ -17,32 +17,35 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.QueueState;
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
 
 /**
  * A class for storing the properties of a job queue.
  */
-class Queue {
+class Queue implements Comparable<Queue>{
 
   private static final Log LOG = LogFactory.getLog(Queue.class);
 
   //Queue name
-  private String name;
+  private String name = null;
 
   //acls list
   private Map<String, AccessControlList> acls;
 
   //Queue State
-  private QueueState state;
+  private QueueState state = QueueState.RUNNING;
 
   // An Object that can be used by schedulers to fill in
   // arbitrary scheduling information. The toString method
@@ -50,26 +53,9 @@
   // get a String that can be displayed on UI.
   private Object schedulingInfo;
 
-  /**
-   * Enum representing queue state
-   */
-  static enum QueueState {
-
-    STOPPED("stopped"), RUNNING("running");
-    private final String stateName;
+  private Set<Queue> children;
 
-    QueueState(String stateName) {
-      this.stateName = stateName;
-    }
-
-    /**
-     * @return the stateName
-     */
-    public String getStateName() {
-      return stateName;
-    }
-
-  }
+  private Properties props;
 
   /**
    * Enum representing an operation that can be performed on a queue.
@@ -100,6 +86,14 @@
   }
 
   /**
+   * Default constructor is useful in creating the hierarchy.
+   * The variables are populated using mutator methods.
+   */
+  Queue() {
+    
+  }
+
+  /**
    * Create a job queue
    * @param name name of the queue
    * @param acls ACLs for the queue
@@ -127,7 +121,7 @@
   void setName(String name) {
     this.name = name;
   }
-  
+
   /**
    * Return the ACLs for the queue
    * 
@@ -182,4 +176,232 @@
   void setSchedulingInfo(Object schedulingInfo) {
     this.schedulingInfo = schedulingInfo;
   }
+
+  /**
+   * Copy the scheduling information from the sourceQueue into this queue
+   * recursively.
+   * 
+   * @param sourceQueue
+   */
+  void copySchedulingInfo(Queue sourceQueue) {
+    // First update the children queues recursively.
+    Set<Queue> destChildren = getChildren();
+    if (destChildren != null) {
+      Iterator<Queue> itr1 = destChildren.iterator();
+      Iterator<Queue> itr2 = sourceQueue.getChildren().iterator();
+      while (itr1.hasNext()) {
+        itr1.next().copySchedulingInfo(itr2.next());
+      }
+    }
+
+    // Now, copy the information for the root-queue itself
+    setSchedulingInfo(sourceQueue.getSchedulingInfo());
+  }
+
+  /**
+   *
+   */
+  void addChild(Queue child) {
+    if(children == null) {
+      children = new TreeSet<Queue>();
+    }
+
+    children.add(child);
+  }
+
+  /**
+   *
+   * @return
+   */
+  Set<Queue> getChildren() {
+    return children;
+  }
+
+  /**
+   * 
+   * @param props
+   */
+  void setProperties(Properties props) {
+     this.props = props;
+  }
+
+  /**
+   *
+   * @return
+   */
+  Properties getProperties() {
+    return this.props;
+  }
+
+  /**
+   * This methods helps in traversing the
+   * tree hierarchy.
+   *
+   * Returns list of all inner queues.i.e nodes which has children.
+   * below this level.
+   *
+   * Incase of children being null , returns an empty map.
+   * This helps in case of creating union of inner and leaf queues.
+   * @return
+   */
+  Map<String,Queue> getInnerQueues() {
+    Map<String,Queue> l = new HashMap<String,Queue>();
+
+    //If no children , return empty set.
+    //This check is required for root node.
+    if(children == null) {
+      return l;
+    }
+
+    //check for children if they are parent.
+    for(Queue child:children) {
+      //check if children are themselves parent add them
+      if(child.getChildren() != null && child.getChildren().size() > 0) {
+        l.put(child.getName(),child);
+        l.putAll(child.getInnerQueues());
+      }
+    }
+    return l;
+  }
+
+  /**
+   * This method helps in maintaining the single
+   * data structure across QueueManager.
+   *
+   * Now if we just maintain list of root queues we
+   * should be done.
+   *
+   * Doesn't return null .
+   * Adds itself if this is leaf node.
+   * @return
+   */
+  Map<String,Queue> getLeafQueues() {
+    Map<String,Queue> l = new HashMap<String,Queue>();
+    if(children == null) {
+      l.put(name,this);
+      return l;
+    }
+
+    for(Queue child:children) {
+      l.putAll(child.getLeafQueues());
+    }
+    return l;
+  }
+
+
+  @Override
+  public int compareTo(Queue queue) {
+    return name.compareTo(queue.getName());
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if(o == this) {
+      return true;
+    }
+    if(! (o instanceof Queue)) {
+      return false;
+    }
+    
+    return ((Queue)o).getName().equals(name);
+  }
+
+  @Override
+  public String toString() {
+    return this.getName();
+  }
+
+  @Override
+  public int hashCode() {
+    return this.getName().hashCode();
+  }
+
+  /**
+   * Return hierarchy of {@link JobQueueInfo} objects
+   * under this Queue.
+   *
+   * @return JobQueueInfo[]
+   */
+  JobQueueInfo getJobQueueInfo() {
+    JobQueueInfo queueInfo = new JobQueueInfo();
+    queueInfo.setQueueName(name);
+    LOG.debug("created jobQInfo " + queueInfo.getQueueName());
+    queueInfo.setQueueState(state.getStateName());
+    if (schedulingInfo != null) {
+      queueInfo.setSchedulingInfo(schedulingInfo.toString());
+    }
+
+    if (props != null) {
+      //Create deep copy of properties.
+      Properties newProps = new Properties();
+      for (Object key : props.keySet()) {
+        newProps.setProperty(key.toString(), props.getProperty(key.toString()));
+      }
+      queueInfo.setProperties(newProps);
+    }
+
+    if (children != null && children.size() > 0) {
+      List<JobQueueInfo> list = new ArrayList<JobQueueInfo>();
+      for (Queue child : children) {
+        list.add(child.getJobQueueInfo());
+      }
+      queueInfo.setChildren(list);
+    }
+    return queueInfo;
+  }
+
+  /**
+   * For each node validate if current node hierarchy is same newState.
+   * recursively check for child nodes.
+   * 
+   * @param newState
+   * @return
+   */
+  boolean isHierarchySameAs(Queue newState) {
+    if(newState == null) {
+      return false;
+    }
+    //First check if names are equal
+    if(!(name.equals(newState.getName())) ) {
+      LOG.info(" current name " + name + " not equal to " + newState.getName());
+      return false;
+    }
+
+    if (children == null || children.size() == 0) {
+      if(newState.getChildren() != null && newState.getChildren().size() > 0) {
+        LOG.info( newState + " has added children in refresh ");
+        return false;
+      }
+    } else if(children.size() > 0) {
+      //check for the individual children and then see if all of them
+      //are updated.
+      if (newState.getChildren() == null) {
+        LOG.fatal("In the current state, queue " + getName() + " has "
+            + children.size() + " but the new state has none!");
+        return false;
+      }
+      int childrenSize = children.size();
+      int newChildrenSize = newState.getChildren().size();
+      if (childrenSize != newChildrenSize) {
+        LOG.fatal("Number of children for queue " + newState.getName()
+            + " in newState is " + newChildrenSize + " which is not equal to "
+            + childrenSize + " in the current state.");
+        return false;
+      }
+      //children are pre sorted as they are stored in treeset.
+      //hence order shold be the same.
+      Iterator<Queue> itr1 = children.iterator();
+      Iterator<Queue> itr2 = newState.getChildren().iterator();
+
+      while(itr1.hasNext()) {
+        Queue q = itr1.next();
+        Queue newq = itr2.next();
+        if(! (q.isHierarchySameAs(newq)) ) {
+          LOG.info(" Queue " + q.getName() + " not equal to " + newq.getName());
+          return false;
+        }
+      }
+    }
+    return true;
+  }
 }



Mime
View raw message