hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cnaur...@apache.org
Subject svn commit: r1495297 [27/46] - in /hadoop/common/branches/branch-1-win: ./ bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/ src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...
Date Fri, 21 Jun 2013 06:37:39 GMT
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Jun 21 06:37:27 2013
@@ -18,14 +18,23 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,27 +44,26 @@ import org.apache.hadoop.filecache.Track
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
-class LocalJobRunner implements JobSubmissionProtocol {
+public class LocalJobRunner implements JobSubmissionProtocol {
   public static final Log LOG =
     LogFactory.getLog(LocalJobRunner.class);
+  
+  public static final String LOCAL_MAX_MAPS =
+      "mapreduce.local.map.tasks.maximum";
 
   private FileSystem fs;
   private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
   private JobConf conf;
-  private int map_tasks = 0;
+  private AtomicInteger map_tasks = new AtomicInteger(0);
   private int reduce_tasks = 0;
   final Random rand = new Random();
   private final TaskController taskController = new DefaultTaskController();
@@ -82,9 +90,15 @@ class LocalJobRunner implements JobSubmi
 
     private JobID id;
     private JobConf job;
+    
+    private int numMapTasks;
+    private float [] partialMapProgress;
+    private Counters [] mapCounters;
+    private Counters reduceCounters;
 
     private JobStatus status;
-    private ArrayList<TaskAttemptID> mapIds = new ArrayList<TaskAttemptID>();
+    private List<TaskAttemptID> mapIds = Collections.synchronizedList(
+        new ArrayList<TaskAttemptID>());
 
     private JobProfile profile;
     private FileSystem localFs;
@@ -92,13 +106,6 @@ class LocalJobRunner implements JobSubmi
     
     private TrackerDistributedCacheManager trackerDistributedCacheManager;
     private TaskDistributedCacheManager taskDistributedCacheManager;
-    
-    // Counters summed over all the map/reduce tasks which
-    // have successfully completed
-    private Counters completedTaskCounters = new Counters();
-    
-    // Current counters, including incomplete task(s)
-    private Counters currentCounters = new Counters();
 
     public long getProtocolVersion(String protocol, long clientVersion) {
       return TaskUmbilicalProtocol.versionID;
@@ -168,6 +175,132 @@ class LocalJobRunner implements JobSubmi
       return profile;
     }
     
+    protected class MapTaskRunnable implements Runnable {
+      private final int taskId;
+      private final TaskSplitMetaInfo info;
+      private final JobID jobId;
+      private final JobConf localConf;
+
+      // This is a reference to a shared object passed in by the
+      // external context; this delivers state to the reducers regarding
+      // where to fetch mapper outputs.
+      private final Map<TaskAttemptID, MapOutputFile> mapOutputFiles;
+
+      public volatile Throwable storedException;
+
+      public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
+          Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
+        this.info = info;
+        this.taskId = taskId;
+        this.mapOutputFiles = mapOutputFiles;
+        this.jobId = jobId;
+        this.localConf = new JobConf(job);
+      }
+
+      public void run() {
+        try {
+          TaskAttemptID mapId = new TaskAttemptID(new TaskID(
+              jobId, true, taskId), 0);
+          LOG.info("Starting task: " + mapId);
+          mapIds.add(mapId);
+          MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,
+            info.getSplitIndex(), 1);
+          map.setUser(UserGroupInformation.getCurrentUser().
+              getShortUserName());
+          TaskRunner.setupChildMapredLocalDirs(map, localConf);
+
+          MapOutputFile mapOutput = new MapOutputFile();
+          mapOutput.setConf(localConf);
+          mapOutputFiles.put(mapId, mapOutput);
+
+          map.setJobFile(localJobFile.toString());
+          localConf.setUser(map.getUser());
+          map.localizeConfiguration(localConf);
+          map.setConf(localConf);
+          try {
+            map_tasks.getAndIncrement();
+            myMetrics.launchMap(mapId);
+            map.run(localConf, Job.this);
+            myMetrics.completeMap(mapId);
+          } finally {
+            map_tasks.getAndDecrement();
+          }
+
+          LOG.info("Finishing task: " + mapId);
+        } catch (Throwable e) {
+          this.storedException = e;
+        }
+      }
+    }
+
+    /**
+     * Create Runnables to encapsulate map tasks for use by the executor
+     * service.
+     * @param taskInfo Info about the map task splits
+     * @param jobId the job id
+     * @param mapOutputFiles a mapping from task attempts to output files
+     * @return a List of Runnables, one per map task.
+     */
+    protected List<MapTaskRunnable> getMapTaskRunnables(
+        TaskSplitMetaInfo [] taskInfo, JobID jobId,
+        Map<TaskAttemptID, MapOutputFile> mapOutputFiles) {
+
+      int numTasks = 0;
+      ArrayList<MapTaskRunnable> list = new ArrayList<MapTaskRunnable>();
+      for (TaskSplitMetaInfo task : taskInfo) {
+        list.add(new MapTaskRunnable(task, numTasks++, jobId,
+            mapOutputFiles));
+      }
+
+      return list;
+    }
+
+    /**
+     * Initialize the counters that will hold partial-progress from
+     * the various task attempts.
+     * @param numMaps the number of map tasks in this job.
+     */
+    private synchronized void initCounters(int numMaps) {
+      // Initialize state trackers for all map tasks.
+      this.partialMapProgress = new float[numMaps];
+      this.mapCounters = new Counters[numMaps];
+      for (int i = 0; i < numMaps; i++) {
+        this.mapCounters[i] = new Counters();
+      }
+
+      this.reduceCounters = new Counters();
+    }
+
+    /**
+     * Creates the executor service used to run map tasks.
+     *
+     * @param numMapTasks the total number of map tasks to be run
+     * @return an ExecutorService instance that handles map tasks
+     */
+    protected ExecutorService createMapExecutor(int numMapTasks) {
+
+      // Determine the size of the thread pool to use
+      int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
+      if (maxMapThreads < 1) {
+        throw new IllegalArgumentException(
+            "Configured " + LOCAL_MAX_MAPS + " must be >= 1");
+      }
+      this.numMapTasks = numMapTasks;
+      maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
+      maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
+
+      initCounters(this.numMapTasks);
+
+      LOG.debug("Starting thread pool executor.");
+      LOG.debug("Max local threads: " + maxMapThreads);
+      LOG.debug("Map tasks to process: " + this.numMapTasks);
+
+      // Create a new executor service to drain the work queue.
+      ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads);
+
+      return executor;
+    }
+    
     @SuppressWarnings("unchecked")
     @Override
     public void run() {
@@ -187,39 +320,41 @@ class LocalJobRunner implements JobSubmi
         status.setSetupProgress(1.0f);
 
         Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
-          new HashMap<TaskAttemptID, MapOutputFile>();
-        for (int i = 0; i < taskSplitMetaInfos.length; i++) {
-          if (!this.isInterrupted()) {
-            TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i),0);  
-            mapIds.add(mapId);
-            MapTask map = new MapTask(systemJobFile.toString(),  
-                                      mapId, i,
-                                      taskSplitMetaInfos[i].getSplitIndex(), 1);
-            map.setUser(UserGroupInformation.getCurrentUser().
-                getShortUserName());
-            JobConf localConf = new JobConf(job);
-            TaskRunner.setupChildMapredLocalDirs(map, localConf);
+            Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
 
-            MapOutputFile mapOutput = new MapOutputFile();
-            mapOutput.setConf(localConf);
-            mapOutputFiles.put(mapId, mapOutput);
-
-            map.setJobFile(localJobFile.toString());
-            localConf.setUser(map.getUser());
-            map.localizeConfiguration(localConf);
-            map.setConf(localConf);
-            map_tasks += 1;
-            myMetrics.launchMap(mapId);
-            queueMetrics.launchMap(mapId);
-            map.run(localConf, this);
-            myMetrics.completeMap(mapId);
-            queueMetrics.completeMap(mapId);
-            map_tasks -= 1;
-            updateCounters(map);
-          } else {
-            throw new InterruptedException();
+        List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
+            jobId, mapOutputFiles);
+        
+        ExecutorService mapService = createMapExecutor(taskRunnables.size());
+        // Start populating the executor with work units.
+        // They may begin running immediately (in other threads).
+        for (Runnable r : taskRunnables) {
+          mapService.submit(r);
+        }
+
+        try {
+          mapService.shutdown(); // Instructs queue to drain.
+
+          // Wait for tasks to finish; do not use a time-based timeout.
+          // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
+          LOG.info("Waiting for map tasks");
+          mapService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException ie) {
+          // Cancel all threads.
+          mapService.shutdownNow();
+          throw ie;
+        }
+
+        LOG.info("Map task executor complete.");
+
+        // After waiting for the map tasks to complete, if any of these
+        // have thrown an exception, rethrow it now in the main thread context.
+        for (MapTaskRunnable r : taskRunnables) {
+          if (r.storedException != null) {
+            throw new Exception(r.storedException);
           }
         }
+        
         TaskAttemptID reduceId = 
           new TaskAttemptID(new TaskID(jobId, false, 0), 0);
         try {
@@ -230,6 +365,7 @@ class LocalJobRunner implements JobSubmi
             reduce.setUser(UserGroupInformation.getCurrentUser().
                  getShortUserName());
             JobConf localConf = new JobConf(job);
+            localConf.set("mapreduce.jobtracker.address", "local");
             TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
             // move map output to reduce input  
             for (int i = 0; i < mapIds.size(); i++) {
@@ -263,7 +399,6 @@ class LocalJobRunner implements JobSubmi
               myMetrics.completeReduce(reduce.getTaskID());
               queueMetrics.completeReduce(reduce.getTaskID());
               reduce_tasks -= 1;
-              updateCounters(reduce);
             } else {
               throw new InterruptedException();
             }
@@ -318,22 +453,53 @@ class LocalJobRunner implements JobSubmi
 
     public JvmTask getTask(JvmContext context) { return null; }
 
-    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, JvmContext context) 
-    throws IOException, InterruptedException {
+    public synchronized boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus, 
+        JvmContext context) throws IOException, InterruptedException {
+      // Serialize as we would if distributed in order to make deep copy
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(baos);
+      TaskStatus.writeTaskStatus(dos, taskStatus);
+      dos.close();
+      taskStatus = TaskStatus.readTaskStatus(new DataInputStream(
+          new ByteArrayInputStream(baos.toByteArray())));
+      
       LOG.info(taskStatus.getStateString());
-      float taskIndex = mapIds.indexOf(taskId);
+      int taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping
-        float numTasks = mapIds.size();
-        status.setMapProgress(taskIndex/numTasks + taskStatus.getProgress()/numTasks);
+        float numTasks = (float) this.numMapTasks;
+        partialMapProgress[taskIndex] = taskStatus.getProgress();
+        mapCounters[taskIndex] = taskStatus.getCounters();
+        float partialProgress = 0.0f;
+        for (float f : partialMapProgress) {
+          partialProgress += f;
+        }
+        status.setMapProgress(partialProgress / numTasks);
       } else {
+        reduceCounters = taskStatus.getCounters();
         status.setReduceProgress(taskStatus.getProgress());
       }
-      currentCounters = Counters.sum(completedTaskCounters, taskStatus.getCounters());
       
       // ignore phase
       
       return true;
     }
+    
+    /** Return the current values of the counters for this job,
+     * including tasks that are in progress.
+    */
+    public synchronized Counters getCurrentCounters() {
+      if (null == mapCounters) {
+        // Counters not yet initialized for job.
+        return new Counters();
+      }
+
+      Counters current = new Counters();
+      for (Counters c : mapCounters) {
+        current = Counters.sum(current, c);
+      }
+      current = Counters.sum(current, reduceCounters);
+      return current;
+    }
 
     /**
      * Task is reporting that it is in commit_pending
@@ -346,15 +512,6 @@ class LocalJobRunner implements JobSubmi
       statusUpdate(taskid, taskStatus, jvmContext);
     }
 
-    /**
-     * Updates counters corresponding to completed tasks.
-     * @param task A map or reduce task which has just been 
-     * successfully completed
-     */ 
-    private void updateCounters(Task task) {
-      completedTaskCounters.incrAllCounters(task.getCounters());
-    }
-
     public void reportDiagnosticInfo(TaskAttemptID taskid, String trace,
         JvmContext jvmContext) {
       // Ignore for now
@@ -376,14 +533,8 @@ class LocalJobRunner implements JobSubmi
     
     public void done(TaskAttemptID taskId, JvmContext jvmContext)
         throws IOException {
-      int taskIndex = mapIds.indexOf(taskId);
-      if (taskIndex >= 0) { // mapping
-        status.setMapProgress(1.0f);
-      } else {
-        status.setReduceProgress(1.0f);
-      }
     }
-
+    
     public synchronized void fsError(TaskAttemptID taskId, String message,
         JvmContext jvmContext) throws IOException {
       LOG.fatal("FSError: " + message + "from task: " + taskId);
@@ -427,8 +578,12 @@ class LocalJobRunner implements JobSubmi
   // JobSubmissionProtocol methods
 
   private static int jobid = 0;
+  // used for making sure that local jobs run in different jvms don't
+  // collide on staging or job directories
+  private int randid;
+
   public synchronized JobID getNewJobId() {
-    return new JobID("local", ++jobid);
+    return new JobID("local" + randid, ++jobid);
   }
 
   public JobStatus submitJob(JobID jobid, String jobSubmitDir, 
@@ -486,7 +641,7 @@ class LocalJobRunner implements JobSubmi
   
   public Counters getJobCounters(JobID id) {
     Job job = jobs.get(id);
-    return job.currentCounters;
+    return job.getCurrentCounters();
   }
 
   public String getFilesystemName() throws IOException {
@@ -494,7 +649,8 @@ class LocalJobRunner implements JobSubmi
   }
   
   public ClusterStatus getClusterStatus(boolean detailed) {
-    return new ClusterStatus(1, 0, 0, 0, map_tasks, reduce_tasks, 1, 1, 
+    int numMapTasks = map_tasks.get();
+    return new ClusterStatus(1, 0, 0, 0, numMapTasks, reduce_tasks, 1, 1, 
                              JobTracker.State.RUNNING);
   }
 
@@ -541,10 +697,11 @@ class LocalJobRunner implements JobSubmi
         "/tmp/hadoop/mapred/staging"));
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     String user;
+    randid = rand.nextInt(Integer.MAX_VALUE);
     if (ugi != null) {
-      user = ugi.getShortUserName() + rand.nextInt();
+      user = ugi.getShortUserName() + randid;
     } else {
-      user = "dummy" + rand.nextInt();
+      user = "dummy" + randid;
     }
     return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
   }
@@ -571,6 +728,26 @@ class LocalJobRunner implements JobSubmi
     return null;
   }
   
+  /**
+   * Set the max number of map tasks to run concurrently in the LocalJobRunner.
+   * @param job the job to configure
+   * @param maxMaps the maximum number of map tasks to allow.
+   */
+  public static void setLocalMaxRunningMaps(
+      org.apache.hadoop.mapreduce.JobContext job,
+      int maxMaps) {
+    job.getConfiguration().setInt(LOCAL_MAX_MAPS, maxMaps);
+  }
+
+  /**
+   * @return the max number of map tasks to run concurrently in the
+   * LocalJobRunner.
+   */
+  public static int getLocalMaxRunningMaps(
+      org.apache.hadoop.mapreduce.JobContext job) {
+    return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
+  }
+
   @Override
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
                                        ) throws IOException,

Added: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Locality.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Locality.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Locality.java (added)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Locality.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+enum Locality {
+  NODE_LOCAL,
+  GROUP_LOCAL,
+  RACK_LOCAL,
+  OFF_SWITCH
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MRConstants.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MRConstants.java Fri Jun 21 06:37:27 2013
@@ -25,7 +25,7 @@ interface MRConstants {
   //
   // Timeouts, constants
   //
-  public static final int HEARTBEAT_INTERVAL_MIN = 3 * 1000;
+  public static final int HEARTBEAT_INTERVAL_MIN = 300;
   
   public static final long COUNTER_UPDATE_INTERVAL = 60 * 1000;
 

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MapReducePolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MapReducePolicyProvider.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MapReducePolicyProvider.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MapReducePolicyProvider.java Fri Jun 21 06:37:27 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
@@ -25,6 +26,7 @@ import org.apache.hadoop.security.author
 /**
  * {@link PolicyProvider} for Map-Reduce protocols.
  */
+@Private
 public class MapReducePolicyProvider extends PolicyProvider {
   private static final Service[] mapReduceServices = 
     new Service[] {

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Jun 21 06:37:27 2013
@@ -103,15 +103,14 @@ class MapTask extends Task {
   }
 
   @Override
-  public void localizeConfiguration(JobConf conf)
+  public void writeFilesRequiredForRerun(JobConf conf)
       throws IOException {
-    super.localizeConfiguration(conf);
     // split.info file is used only by IsolationRunner.
     // Write the split file to the local disk if it is a normal map task (not a
     // job-setup or a job-cleanup task) and if the user wishes to run
     // IsolationRunner either by setting keep.failed.tasks.files to true or by
     // using keep.tasks.files.pattern
-    if (supportIsolationRunner(conf) && isMapOrReduce()) {
+    if (isMapOrReduce()) {
       // localize the split meta-information
       Path localSplitMeta =
         new LocalDirAllocator("mapred.local.dir").getLocalPathForWrite(
@@ -452,6 +451,7 @@ class MapTask extends Task {
       job.setLong("map.input.start", fileSplit.getStart());
       job.setLong("map.input.length", fileSplit.getLength());
     }
+    LOG.info("Processing split: " + inputSplit);
   }
 
   static class NewTrackingRecordReader<K,V> 
@@ -725,6 +725,7 @@ class MapTask extends Task {
     org.apache.hadoop.mapreduce.InputSplit split = null;
     split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
         splitIndex.getStartOffset());
+    LOG.info("Processing split: " + split);
 
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
       new NewTrackingRecordReader<INKEY,INVALUE>

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Merger.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Merger.java Fri Jun 21 06:37:27 2013
@@ -174,6 +174,7 @@ class Merger {  
     CompressionCodec codec = null;
     long segmentOffset = 0;
     long segmentLength = -1;
+    long rawDataLength = -1;
     
     public Segment(Configuration conf, FileSystem fs, Path file,
                    CompressionCodec codec, boolean preserve) throws IOException {
@@ -181,6 +182,12 @@ class Merger {  
     }
 
     public Segment(Configuration conf, FileSystem fs, Path file,
+            CompressionCodec codec, boolean preserve, long rawDataLength) throws IOException {
+      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve);
+      this.rawDataLength = rawDataLength;
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
         long segmentOffset, long segmentLength, CompressionCodec codec,
         boolean preserve) throws IOException {
       this.conf = conf;
@@ -200,6 +207,14 @@ class Merger {  
       this.segmentLength = reader.getLength();
     }
 
+    public Segment(Reader<K, V> reader, boolean preserve, long rawDataLength) {
+      this.reader = reader;
+      this.preserve = preserve;
+
+      this.segmentLength = reader.getLength();
+      this.rawDataLength = rawDataLength;
+    }
+
     private void init(Counters.Counter readsCounter) throws IOException {
       if (reader == null) {
         FSDataInputStream in = fs.open(file);
@@ -216,6 +231,9 @@ class Merger {  
         segmentLength : reader.getLength();
     }
     
+    long getRawDataLength() {
+      return (rawDataLength > 0) ? rawDataLength : getLength();
+    }
     boolean next() throws IOException {
       return reader.next(key, value);
     }
@@ -460,7 +478,7 @@ class Merger {  
           //calculating the merge progress
           long totalBytes = 0;
           for (int i = 0; i < segmentsToMerge.size(); i++) {
-            totalBytes += segmentsToMerge.get(i).getLength();
+            totalBytes += segmentsToMerge.get(i).getRawDataLength();
           }
           if (totalBytes != 0) //being paranoid
             progPerByte = 1.0f / (float)totalBytes;

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Jun 21 06:37:27 2013
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.HttpURLConnection;
 import java.net.URI;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -91,7 +92,6 @@ import org.apache.hadoop.metrics2.lib.De
 import org.apache.hadoop.metrics2.lib.MetricMutableCounterInt;
 import org.apache.hadoop.metrics2.lib.MetricMutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
-import org.apache.hadoop.util.Shell;
 
 /** A Reduce task. */
 class ReduceTask extends Task {
@@ -150,8 +150,8 @@ class ReduceTask extends Task {
   };
   
   // A sorted set for keeping a set of map output files on disk
-  private final SortedSet<FileStatus> mapOutputFilesOnDisk = 
-    new TreeSet<FileStatus>(mapOutputFileComparator);
+  private final SortedSet<CompressAwareFileStatus> mapOutputFilesOnDisk = 
+    new TreeSet<CompressAwareFileStatus>(mapOutputFileComparator);
 
   public ReduceTask() {
     super();
@@ -1045,6 +1045,7 @@ class ReduceTask extends Task {
       byte[] data;
       final boolean inMemory;
       long compressedSize;
+      long decompressedSize;
       
       public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, 
                        Configuration conf, Path file, long size) {
@@ -1446,7 +1447,10 @@ class ReduceTask extends Task {
             }
 
             synchronized (mapOutputFilesOnDisk) {        
-              addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
+              FileStatus fileStatus = localFileSys.getFileStatus(filename);
+              CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
+                  fileStatus, mapOutput.decompressedSize);
+              addToMapOutputFilesOnDisk(compressedFileStatus);
             }
           }
 
@@ -1484,9 +1488,17 @@ class ReduceTask extends Task {
       throws IOException, InterruptedException {
         // Connect
         URL url = mapOutputLoc.getOutputLocation();
-        URLConnection connection = url.openConnection();
+        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
         
         InputStream input = setupSecureConnection(mapOutputLoc, connection);
+
+        // Validate response code
+        int rc = connection.getResponseCode();
+        if (rc != HttpURLConnection.HTTP_OK) {
+          throw new IOException(
+              "Got invalid response code " + rc + " from " + url +
+              ": " + connection.getResponseMessage());
+        }
  
         // Validate header from map output
         TaskAttemptID mapId = null;
@@ -1561,7 +1573,7 @@ class ReduceTask extends Task {
           mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
               compressedLength);
         }
-            
+        mapOutput.decompressedSize = decompressedLength;    
         return mapOutput;
       }
       
@@ -1677,7 +1689,7 @@ class ReduceTask extends Task {
         }
 
         IFileInputStream checksumIn = 
-          new IFileInputStream(input,compressedLength);
+          new IFileInputStream(input,compressedLength, conf);
 
         input = checksumIn;       
       
@@ -1695,15 +1707,16 @@ class ReduceTask extends Task {
         
         int bytesRead = 0;
         try {
-          int n = input.read(shuffleData, 0, shuffleData.length);
+          int n = IOUtils.wrappedReadForCompressedData(input, shuffleData, 0,
+              shuffleData.length);
           while (n > 0) {
             bytesRead += n;
             shuffleClientMetrics.inputBytes(n);
 
             // indicate we're making progress
             reporter.progress();
-            n = input.read(shuffleData, bytesRead, 
-                           (shuffleData.length-bytesRead));
+            n = IOUtils.wrappedReadForCompressedData(input, shuffleData,
+                bytesRead, shuffleData.length - bytesRead);
           }
 
           if (LOG.isDebugEnabled()) {
@@ -2457,11 +2470,17 @@ class ReduceTask extends Task {
           final RawKeyValueIterator rIter = Merger.merge(job, fs,
               keyClass, valueClass, memDiskSegments, numMemDiskSegments,
               tmpDir, comparator, reporter, spilledRecordsCounter, null);
-          final Writer writer = new Writer(job, fs, outputPath,
+          Writer writer = new Writer(job, fs, outputPath,
               keyClass, valueClass, codec, null);
           try {
             Merger.writeFile(rIter, writer, reporter, job);
-            addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
+            writer.close();
+            long decompressedBytesWritten = writer.decompressedBytesWritten;
+            writer = null;
+            FileStatus fileStatus = fs.getFileStatus(outputPath);
+            CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
+                fileStatus, decompressedBytesWritten);
+            addToMapOutputFilesOnDisk(compressedFileStatus);
           } catch (Exception e) {
             if (null != outputPath) {
               fs.delete(outputPath, true);
@@ -2487,12 +2506,17 @@ class ReduceTask extends Task {
       // segments on disk
       List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
       long onDiskBytes = inMemToDiskBytes;
-      Path[] onDisk = getMapFiles(fs, false);
-      for (Path file : onDisk) {
-        onDiskBytes += fs.getFileStatus(file).getLen();
-        diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs));
+      long totalDecompressedBytes = inMemToDiskBytes;
+
+      for (CompressAwareFileStatus filestatus : mapOutputFilesOnDisk) {
+        long len = filestatus.getLen();
+        onDiskBytes += len;
+        diskSegments.add(new Segment<K, V>(job, fs, filestatus.getPath(),
+            codec, keepInputs, filestatus.getDecompressedSize()));
+        totalDecompressedBytes += (filestatus.getDecompressedSize() > 0) ? filestatus
+            .getDecompressedSize() : len;
       }
-      LOG.info("Merging " + onDisk.length + " files, " +
+      LOG.info("Merging " + mapOutputFilesOnDisk.size() + " files, " +
                onDiskBytes + " bytes from disk");
       Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
         public int compare(Segment<K, V> o1, Segment<K, V> o2) {
@@ -2521,7 +2545,7 @@ class ReduceTask extends Task {
           return diskMerge;
         }
         finalSegments.add(new Segment<K,V>(
-              new RawKVIteratorReader(diskMerge, onDiskBytes), true));
+              new RawKVIteratorReader(diskMerge, onDiskBytes), true, totalDecompressedBytes));
       }
       return Merger.merge(job, fs, keyClass, valueClass,
                    finalSegments, finalSegments.size(), tmpDir,
@@ -2596,7 +2620,7 @@ class ReduceTask extends Task {
       }    
     }
     
-    private void addToMapOutputFilesOnDisk(FileStatus status) {
+    private void addToMapOutputFilesOnDisk(CompressAwareFileStatus status) {
       synchronized (mapOutputFilesOnDisk) {
         mapOutputFilesOnDisk.add(status);
         mapOutputFilesOnDisk.notify();
@@ -2674,6 +2698,7 @@ class ReduceTask extends Task {
                          codec, null);
             RawKeyValueIterator iter  = null;
             Path tmpDir = new Path(reduceTask.getTaskID().toString());
+            long decompressedBytesWritten;
             try {
               iter = Merger.merge(conf, rfs,
                                   conf.getMapOutputKeyClass(),
@@ -2685,13 +2710,17 @@ class ReduceTask extends Task {
               
               Merger.writeFile(iter, writer, reporter, conf);
               writer.close();
+              decompressedBytesWritten = writer.decompressedBytesWritten;
             } catch (Exception e) {
               localFileSys.delete(outputPath, true);
               throw new IOException (StringUtils.stringifyException(e));
             }
             
             synchronized (mapOutputFilesOnDisk) {
-              addToMapOutputFilesOnDisk(localFileSys.getFileStatus(outputPath));
+              FileStatus fileStatus = localFileSys.getFileStatus(outputPath);
+              CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
+                  fileStatus, decompressedBytesWritten);
+              addToMapOutputFilesOnDisk(compressedFileStatus);
             }
             
             LOG.info(reduceTask.getTaskID() +
@@ -2774,7 +2803,7 @@ class ReduceTask extends Task {
                      conf.getMapOutputKeyClass(),
                      conf.getMapOutputValueClass(),
                      codec, null);
-
+        long decompressedBytesWritten;
         RawKeyValueIterator rIter = null;
         try {
           LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
@@ -2795,6 +2824,7 @@ class ReduceTask extends Task {
             combinerRunner.combine(rIter, combineCollector);
           }
           writer.close();
+          decompressedBytesWritten = writer.decompressedBytesWritten;
 
           LOG.info(reduceTask.getTaskID() + 
               " Merge of the " + noInMemorySegments +
@@ -2811,8 +2841,10 @@ class ReduceTask extends Task {
 
         // Note the output of the merge
         FileStatus status = localFileSys.getFileStatus(outputPath);
+        CompressAwareFileStatus compressedFileStatus = new CompressAwareFileStatus(
+            status, decompressedBytesWritten);
         synchronized (mapOutputFilesOnDisk) {
-          addToMapOutputFilesOnDisk(status);
+          addToMapOutputFilesOnDisk(compressedFileStatus);
         }
       }
     }
@@ -2908,6 +2940,12 @@ class ReduceTask extends Task {
             {
               URI u = URI.create(event.getTaskTrackerHttp());
               String host = u.getHost();
+              if (host == null) {
+                throw new IOException("Invalid hostname found in tracker" +
+                   " location: '" +
+                   event.getTaskTrackerHttp() +
+                   "'");
+              }
               TaskAttemptID taskId = event.getTaskAttemptId();
               URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + 
                                       "/mapOutput?job=" + taskId.getJobID() +
@@ -2960,6 +2998,20 @@ class ReduceTask extends Task {
     return Integer.numberOfTrailingZeros(hob) +
       (((hob >>> 1) & value) == 0 ? 0 : 1);
   }
+  static class CompressAwareFileStatus extends FileStatus {
+	private long decompressedSize;
+	CompressAwareFileStatus(FileStatus fileStatus, long decompressedSize) {
+	  super(fileStatus.getLen(), fileStatus.isDir(), fileStatus.getReplication(),
+				  fileStatus.getBlockSize(), fileStatus.getModificationTime(),
+				  fileStatus.getAccessTime(), fileStatus.getPermission(),
+				  fileStatus.getOwner(), fileStatus.getGroup(), fileStatus.getPath());
+	  this.decompressedSize = decompressedSize;
+	}
+
+	public long getDecompressedSize() {
+	  return decompressedSize;
+	}
+  }
   
   private <OUTKEY, OUTVALUE>
   void closeQuietly(RecordWriter<OUTKEY, OUTVALUE> c, Reporter r) {

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/RunningJob.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/RunningJob.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/RunningJob.java Fri Jun 21 06:37:27 2013
@@ -128,15 +128,22 @@ public interface RunningJob {
 
   /**
    * Returns the current state of the Job.
-   * {@link JobStatus}
    * 
    * @throws IOException
    */
   public int getJobState() throws IOException;
   
   /**
-   * Kill the running job.  Blocks until all job tasks have been
-   * killed as well.  If the job is no longer running, it simply returns.
+   * Returns a snapshot of the current status, {@link JobStatus}, of the Job.
+   * Need to call again for latest information.
+   * 
+   * @throws IOException
+   */
+  public JobStatus getJobStatus() throws IOException;
+
+  /**
+   * Kill the running job. Blocks until all job tasks have been killed as well.
+   * If the job is no longer running, it simply returns.
    * 
    * @throws IOException
    */

Added: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/SafeModeException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/SafeModeException.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/SafeModeException.java (added)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/SafeModeException.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * This exception is thrown when the JobTracker is in safe mode.
+ */
+public class SafeModeException extends IOException {
+
+  private static final long serialVersionUID = 1984839257L;
+
+  /**
+   * SafeModeException
+   * @param adminUser admin who put JobTracker in safe-mode, 
+   *                  <code>null</code> if it was automatic
+   * 
+   */
+  public SafeModeException(String adminUser) {
+    super(
+        (adminUser == null) ? 
+            "JobTracker is in safe mode" : 
+              "JobTracker is in safe-mode set by admin " + adminUser);
+  }
+
+}

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/SortedRanges.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/SortedRanges.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/SortedRanges.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/SortedRanges.java Fri Jun 21 06:37:27 2013
@@ -271,7 +271,7 @@ class SortedRanges implements Writable{
     }
     
     public boolean equals(Object o) {
-      if(o!=null && o instanceof Range) {
+      if (o instanceof Range) {
         Range range = (Range)o;
         return startIndex==range.startIndex &&
         length==range.length;
@@ -285,10 +285,11 @@ class SortedRanges implements Writable{
     }
     
     public int compareTo(Range o) {
-      if(this.equals(o)) {
-        return 0;
-      }
-      return (this.startIndex > o.startIndex) ? 1:-1;
+      // Ensure sgn(x.compareTo(y) == -sgn(y.compareTo(x))
+      return this.startIndex < o.startIndex ? -1 :
+          (this.startIndex > o.startIndex ? 1 :
+          (this.length < o.length ? -1 :
+          (this.length > o.length ? 1 : 0)));
     }
 
     public void readFields(DataInput in) throws IOException {

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/StatisticsCollector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/StatisticsCollector.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/StatisticsCollector.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/StatisticsCollector.java Fri Jun 21 06:37:27 2013
@@ -206,6 +206,7 @@ class StatisticsCollector {
       private final LinkedList<Integer> buckets = new LinkedList<Integer>();
       private int value;
       private int currentValue;
+      private int updates;
 
       public synchronized int getValue() {
         return value;
@@ -261,9 +262,6 @@ class StatisticsCollector {
     final int collectBuckets;
     final int updatesPerBucket;
     
-    private int updates;
-    private int buckets;
-
     TimeWindowStatUpdater(TimeWindow w, int updatePeriod) {
       if (updatePeriod > w.updateGranularity) {
         throw new RuntimeException(
@@ -274,18 +272,14 @@ class StatisticsCollector {
     }
 
     synchronized void update() {
-      updates++;
-      if (updates == updatesPerBucket) {
-        for(TimeStat stat : statToCollect.values()) {
+      for (TimeStat stat : statToCollect.values()) {
+        stat.updates++;
+        if (stat.updates == updatesPerBucket) {
           stat.addBucket();
+          stat.updates = 0;
         }
-        updates = 0;
-        buckets++;
-        if (buckets > collectBuckets) {
-          for (TimeStat stat : statToCollect.values()) {
-            stat.removeBucket();
-          }
-          buckets--;
+        if (stat.buckets.size() > collectBuckets) {
+          stat.removeBucket();
         }
       }
     }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Task.java Fri Jun 21 06:37:27 2013
@@ -467,6 +467,13 @@ abstract public class Task implements Wr
     conf.set("mapred.job.id", taskId.getJobID().toString());
   }
   
+  /**
+   * Write files that the IsolationRunner will need to rerun the task.
+   */
+  public void writeFilesRequiredForRerun(JobConf conf) throws IOException {
+    // Do nothing in the general case
+  }
+  
   /** Run this task as a part of the named job.  This method is executed in the
    * child process and is what invokes user-supplied map, reduce, etc. methods.
    * @param umbilical for progress reports
@@ -1077,14 +1084,6 @@ abstract public class Task implements Wr
                             + JobStatus.State.FAILED + " or "
                             + JobStatus.State.KILLED);
     }
-    // delete the staging area for the job
-    JobConf conf = new JobConf(jobContext.getConfiguration());
-    if (!supportIsolationRunner(conf)) {
-      String jobTempDir = conf.get("mapreduce.job.dir");
-      Path jobTempDirPath = new Path(jobTempDir);
-      FileSystem fs = jobTempDirPath.getFileSystem(conf);
-      fs.delete(jobTempDirPath, true);
-    }
     done(umbilical, reporter);
   }
 
@@ -1270,7 +1269,8 @@ abstract public class Task implements Wr
       more = in.next();
       if (more) {
         DataInputBuffer nextKeyBytes = in.getKey();
-        keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+        keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(),
+            nextKeyBytes.getLength() - nextKeyBytes.getPosition());
         nextKey = keyDeserializer.deserialize(nextKey);
         hasNext = key != null && (comparator.compare(key, nextKey) == 0);
       } else {
@@ -1284,7 +1284,8 @@ abstract public class Task implements Wr
      */
     private void readNextValue() throws IOException {
       DataInputBuffer nextValueBytes = in.getValue();
-      valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+      valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(),
+          nextValueBytes.getLength() - nextValueBytes.getPosition());
       value = valDeserializer.deserialize(value);
     }
   }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskGraphServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskGraphServlet.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskGraphServlet.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskGraphServlet.java Fri Jun 21 06:37:27 2013
@@ -72,13 +72,13 @@ public class TaskGraphServlet extends Ht
     final boolean isMap = "map".equalsIgnoreCase(request.getParameter("type"));
     final TaskReport[] reports = isMap? tracker.getMapTaskReports(jobId) 
                                       : tracker.getReduceTaskReports(jobId);
-    if(reports == null || reports.length == 0) {
+    if(reports == null) {
       return;
     }
 
     final int numTasks = reports.length;     
     int tasksPerBar = (int)Math.ceil(numTasks / 600d);
-    int numBars = (int) Math.ceil((double)numTasks / tasksPerBar);
+    int numBars = (numTasks==0)?600:(int) Math.ceil((double)numTasks / tasksPerBar);
     int w = Math.max(600, numBars);
     int barWidth = Math.min(10,  w / numBars); //min 1px, max 10px
     int barsPerNotch = (int)Math.ceil(10d / barWidth);

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Jun 21 06:37:27 2013
@@ -27,12 +27,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.mapred.SortedRanges.Range;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapred.TaskStatus.State;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.net.Node;
 
 
@@ -50,6 +53,7 @@ import org.apache.hadoop.net.Node;
  * ever have to handle.  Once those are up, the TIP is dead.
  * **************************************************************
  */
+@Private
 class TaskInProgress {
   static final int MAX_TASK_EXECS = 1;
   int maxTaskAttempts = 4;    
@@ -127,7 +131,11 @@ class TaskInProgress {
   private Counters counters = new Counters();
   
   private String user;
-  
+
+  private Map<TaskAttemptID, Locality> taskLocality = 
+      new ConcurrentHashMap<TaskAttemptID, Locality>();
+  private Map<TaskAttemptID, Avataar> taskAvataar = 
+      new ConcurrentHashMap<TaskAttemptID, Avataar>();
 
   /**
    * Constructor for MapTask
@@ -271,7 +279,23 @@ class TaskInProgress {
     execFinishTime = finishTime;
     JobHistory.Task.logUpdates(id, execFinishTime); // log the update
   }
+
+  public void setTaskAttemptLocality(TaskAttemptID taskAttemptID, Locality locality) {
+    taskLocality.put(taskAttemptID, locality);
+  }
+  
+  public Locality getTaskAttemptLocality(TaskAttemptID taskAttemptId) {
+    return taskLocality.get(taskAttemptId);
+  }
   
+  public void setTaskAttemptAvataar(TaskAttemptID taskAttemptId, Avataar avataar) {
+    taskAvataar.put(taskAttemptId, avataar);
+  }
+  
+  public Avataar getTaskAttemptAvataar(TaskAttemptID taskAttemptID) {
+    return taskAvataar.get(taskAttemptID);
+  }
+ 
   /**
    * Return the parent job
    */
@@ -604,10 +628,18 @@ class TaskInProgress {
           
       changed = oldState != newState;
     }
+    
     // if task is a cleanup attempt, do not replace the complete status,
     // update only specific fields.
     // For example, startTime should not be updated, 
     // but finishTime has to be updated.
+    
+    // Don't fail tasks when JobTracker is in safe-mode
+    if (status.getRunState() == State.FAILED && jobtracker.isInSafeMode()) {
+      LOG.info("JT is in safe-mode; marking " + taskid + " as KILLED");
+      status.setRunState(State.KILLED);
+    }
+
     if (!isCleanupAttempt(taskid)) {
       taskStatuses.put(taskid, status);
     } else {

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Jun 21 06:37:27 2013
@@ -109,7 +109,7 @@ public class TaskLog {
     String strAttemptLogDir = getTaskAttemptLogDir(taskID, 
         cleanupSuffix, localDirs);
     File attemptLogDir = new File(strAttemptLogDir);
-    if (!attemptLogDir.mkdirs()) {
+    if (!attemptLogDir.exists() && !attemptLogDir.mkdirs()) {
       throw new IOException("Creation of " + attemptLogDir + " failed.");
     }
     String strLinkAttemptLogDir = 

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Jun 21 06:37:27 2013
@@ -490,6 +490,7 @@ abstract class TaskRunner extends Thread
 
   private void setupLog4jProperties(Vector<String> vargs, TaskAttemptID taskid,
       long logSize) {
+    vargs.add("-Dlog4j.configuration=task-log4j.properties");
     vargs.add("-Dhadoop.log.dir=" + 
         new File(System.getProperty("hadoop.log.dir")).getAbsolutePath());
     vargs.add("-Dhadoop.root.logger=INFO,TLA");

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Jun 21 06:37:27 2013
@@ -44,6 +44,7 @@ import java.util.TreeMap;
 import java.util.Vector;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
@@ -62,6 +63,7 @@ import org.apache.hadoop.filecache.TaskD
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.server.tasktracker.*;
 import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -72,7 +74,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
 import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
@@ -343,6 +348,9 @@ public class TaskTracker implements MRCo
     "mapreduce.tasktracker.outofband.heartbeat.damper";
   static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000;
   private volatile int oobHeartbeatDamper;
+  private boolean manageOsCacheInShuffle = false;
+  private int readaheadLength;
+  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
   
   // Track number of completed tasks to send an out-of-band heartbeat
   private AtomicInteger finishedCount = new AtomicInteger(0);
@@ -409,6 +417,16 @@ public class TaskTracker implements MRCo
    */
   private long diskHealthCheckInterval;
 
+  /**
+   * Whether the TT performs a full or relaxed version check with the JT.
+   */
+  private boolean relaxedVersionCheck;
+
+  /**
+   * Whether the TT completely skips version check with the JT.
+   */
+  private boolean skipVersionCheck;
+
   /*
    * A list of commitTaskActions for whom commit response has been received 
    */
@@ -426,36 +444,105 @@ public class TaskTracker implements MRCo
   }
   
   /**
-   * A list of tips that should be cleaned up.
+   * A list of clean-up actions
    */
-  private BlockingQueue<TaskTrackerAction> tasksToCleanup = 
-    new LinkedBlockingQueue<TaskTrackerAction>();
-    
+  // No ConcurrentHashSet :(
+  ConcurrentHashMap<String, String> allCleanupActions = 
+      new ConcurrentHashMap<String, String>();
+  BlockingQueue<TaskTrackerAction> activeCleanupActions = 
+      new LinkedBlockingQueue<TaskTrackerAction>();
+  List<TaskTrackerAction> inactiveCleanupActions = 
+      new ArrayList<TaskTrackerAction>();
+
+  /*
+   * Add action to clean up queue. Check to make sure action is not already
+   * present before adding it to the queue.
+   */
+  void addActionToCleanup(TaskTrackerAction action) throws InterruptedException {
+
+    String actionId = getIdForCleanUpAction(action);
+
+    // add the action to the queue only if its not added in the first place
+    String previousActionId = allCleanupActions.putIfAbsent(actionId, actionId);
+    if (previousActionId != null) {
+      return;
+    } else {
+      activeCleanupActions.put(action);
+    }
+  }
+
+  /*
+   * Get the id for a given clean up action. It is expected to be either a
+   * KillJobAction or KillTaskAction
+   */
+  private String getIdForCleanUpAction(TaskTrackerAction action) {
+    String actionId = null;
+    // get the id of the action
+    if (action instanceof KillJobAction) {
+      actionId = ((KillJobAction) action).getJobID().toString();
+    } else if (action instanceof KillTaskAction) {
+      actionId = ((KillTaskAction) action).getTaskID().toString();
+    }
+    // Assuming actionId is not null as all clean-up actions are either KillJob
+    // or KillTask
+    return actionId;
+  }
+
   /**
    * A daemon-thread that pulls tips off the list of things to cleanup.
    */
-  private Thread taskCleanupThread = 
-    new Thread(new Runnable() {
-        public void run() {
-          while (true) {
-            try {
-              TaskTrackerAction action = tasksToCleanup.take();
-              checkJobStatusAndWait(action);
-              if (action instanceof KillJobAction) {
-                purgeJob((KillJobAction) action);
-              } else if (action instanceof KillTaskAction) {
-                processKillTaskAction((KillTaskAction) action);
-              } else {
-                LOG.error("Non-delete action given to cleanup thread: "
-                          + action);
-              }
-            } catch (Throwable except) {
-              LOG.warn(StringUtils.stringifyException(except));
-            }
-          }
+  private Thread taskCleanupThread = new Thread(new Runnable() {
+    public void run() {
+      while (true) {
+        try {
+          taskCleanUp();
+        } catch (Throwable except) {
+          LOG.warn(StringUtils.stringifyException(except));
         }
-      }, "taskCleanup");
+      }
+    }
+  }, "taskCleanup");
+
+  void taskCleanUp() throws InterruptedException, IOException {
+    // process all the localizing tasks and move to the clean up queue
+    // if the job is not localizing.
+    Iterator<TaskTrackerAction> itr = inactiveCleanupActions.iterator();
+    while (itr.hasNext()) {
+      TaskTrackerAction action = itr.next();
+      if (!isJobLocalizing(action)) {
+        activeCleanupActions.put(action);
+        itr.remove();
+      }
+    }
+
+    // if the tasks to clean is empty better sleep for 2 seconds and
+    // re process
+    if (activeCleanupActions.isEmpty()) {
+      Thread.sleep(2000);
+      return;
+    }
 
+    TaskTrackerAction action = activeCleanupActions.take();
+    String actionId = getIdForCleanUpAction(action);
+    if (isJobLocalizing(action)) {
+      // If job is still localizing, put it back into the queue and
+      // pick another one in the next iteration
+      LOG.info("Cleanup for id " + actionId + " skipped as its localizing.");
+      inactiveCleanupActions.add(action);
+      return;
+    } else {
+      // remove the action from the hash as its being processed.
+      allCleanupActions.remove(actionId);
+    }
+    if (action instanceof KillJobAction) {
+      purgeJob((KillJobAction) action);
+    } else if (action instanceof KillTaskAction) {
+      processKillTaskAction((KillTaskAction) action);
+    } else {
+      LOG.error("Non-delete action given to cleanup thread: " + action);
+    }
+  }
+  
   void processKillTaskAction(KillTaskAction killAction) throws IOException {
     TaskInProgress tip;
     synchronized (TaskTracker.this) {
@@ -464,16 +551,21 @@ public class TaskTracker implements MRCo
     LOG.info("Received KillTaskAction for task: " + killAction.getTaskID());
     purgeTask(tip, false);
   }
-  
-  private void checkJobStatusAndWait(TaskTrackerAction action) 
-  throws InterruptedException {
+
+  /**
+   * Check if the job for the given action is localizing or not.
+   * @param action The command received from the JobTracker
+   * @return true if job is localizing and false otherwise.
+   */
+  private boolean isJobLocalizing(TaskTrackerAction action)
+  {
     JobID jobId = null;
     if (action instanceof KillJobAction) {
       jobId = ((KillJobAction)action).getJobID();
     } else if (action instanceof KillTaskAction) {
       jobId = ((KillTaskAction)action).getTaskID().getJobID();
     } else {
-      return;
+      return false;
     }
     RunningJob rjob = null;
     synchronized (runningJobs) {
@@ -481,11 +573,10 @@ public class TaskTracker implements MRCo
     }
     if (rjob != null) {
       synchronized (rjob) {
-        while (rjob.localizing) {
-          rjob.wait();
-        }
+        return rjob.localizing;
       }
     }
+    return false;
   }
 
   public TaskController getTaskController() {
@@ -522,7 +613,15 @@ public class TaskTracker implements MRCo
         LOG.warn("Unknown job " + jobId + " being deleted.");
       } else {
         synchronized (rjob) {
-          rjob.tasks.remove(tip);
+          // Only remove the TIP if it is identical to the one that is finished
+          // Job recovery means that it is possible to have two task attempts
+          // with the same ID, which is used for TIP equals/hashcode.
+          for (TaskInProgress t : rjob.tasks) {
+            if (tip == t) {
+              rjob.tasks.remove(tip);
+              break;
+            }
+          }
         }
       }
     }
@@ -862,6 +961,12 @@ public class TaskTracker implements MRCo
     oobHeartbeatDamper = 
       fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER, 
           DEFAULT_OOB_HEARTBEAT_DAMPER);
+    manageOsCacheInShuffle = fConf.getBoolean(
+      "mapreduce.shuffle.manage.os.cache",
+      true);
+    readaheadLength = fConf.getInt(
+      "mapreduce.shuffle.readahead.bytes",
+      4 * 1024 * 1024);
   }
 
   private void startJettyBugMonitor() {
@@ -1361,7 +1466,7 @@ public class TaskTracker implements MRCo
       new TreeMap<TaskAttemptID, TaskInProgress>();
     tasksToClose.putAll(tasks);
     for (TaskInProgress tip : tasksToClose.values()) {
-      tip.jobHasFinished(false);
+      tip.jobHasFinished(true, false);
     }
     
     this.running = false;
@@ -1425,6 +1530,12 @@ public class TaskTracker implements MRCo
    */
   public TaskTracker(JobConf conf) throws IOException, InterruptedException {
     originalConf = conf;
+    relaxedVersionCheck = conf.getBoolean(
+        CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY,
+        CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_DEFAULT);
+    skipVersionCheck = conf.getBoolean(
+        CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY,
+        CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_DEFAULT);
     FILE_CACHE_SIZE = conf.getInt("mapred.tasktracker.file.cache.size", 2000);
     maxMapSlots = conf.getInt(
                   "mapred.tasktracker.map.tasks.maximum", 2);
@@ -1589,7 +1700,43 @@ public class TaskTracker implements MRCo
   private long getHeartbeatInterval(int numFinishedTasks) {
     return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
   }
-  
+
+  /**
+   * @return true if this tasktracker is permitted to connect to
+   *    the given jobtracker version
+   */
+  boolean isPermittedVersion(String jtBuildVersion, String jtVersion) {
+    boolean buildVersionMatch =
+      jtBuildVersion.equals(VersionInfo.getBuildVersion());
+    boolean versionMatch = jtVersion.equals(VersionInfo.getVersion());
+    if (buildVersionMatch && !versionMatch) {
+      throw new AssertionError("Invalid build. The build versions match" +
+          " but the JT version is " + jtVersion +
+          " and the TT version is " + VersionInfo.getVersion());
+    }
+    if (skipVersionCheck) {
+      LOG.info("Permitting tasktracker version '" + VersionInfo.getVersion() +
+          "' and build '" + VersionInfo.getBuildVersion() +
+          "' to connect to jobtracker version '" + jtVersion +
+          "' and build '" + jtBuildVersion + "' because " +
+          CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY +
+          " is enabled");
+      return true;
+    } else {
+      if (relaxedVersionCheck) {
+        if (!buildVersionMatch && versionMatch) {
+          LOG.info("Permitting tasktracker build " + VersionInfo.getBuildVersion() +
+              " to connect to jobtracker build " + jtBuildVersion + " because " +
+              CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY +
+              " is enabled");
+        }
+        return versionMatch;
+      } else {
+        return buildVersionMatch;
+      }
+    }
+  }
+
   /**
    * Main service loop.  Will stay in this loop forever.
    */
@@ -1603,6 +1750,11 @@ public class TaskTracker implements MRCo
         // accelerate to account for multiple finished tasks up-front
         long remaining = 
           (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+        
+        if (remaining <= 0) {
+          finishedCount.set(0);
+        }
+          
         while (remaining > 0) {
           // sleeps for the wait time or 
           // until there are *enough* empty slots to schedule tasks
@@ -1623,15 +1775,22 @@ public class TaskTracker implements MRCo
         }
 
         // If the TaskTracker is just starting up:
-        // 1. Verify the buildVersion
+        // 1. Verify the versions matches with the JobTracker
         // 2. Get the system directory & filesystem
         if(justInited) {
-          String jobTrackerBV = jobClient.getBuildVersion();
-          if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
-            String msg = "Shutting down. Incompatible buildVersion." +
-            "\nJobTracker's: " + jobTrackerBV + 
-            "\nTaskTracker's: "+ VersionInfo.getBuildVersion();
-            LOG.error(msg);
+          String jtBuildVersion = jobClient.getBuildVersion();
+          String jtVersion = jobClient.getVIVersion();
+          if (!isPermittedVersion(jtBuildVersion, jtVersion)) {
+            String msg = "Shutting down. Incompatible version or build version." +
+              "TaskTracker version '" + VersionInfo.getVersion() +
+              "' and build '" + VersionInfo.getBuildVersion() +
+              "' and JobTracker version '" + jtVersion +
+              "' and build '" + jtBuildVersion +
+              " and " + CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY +
+              " is " + (relaxedVersionCheck ? "enabled" : "not enabled") +
+              " and " + CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY +
+              " is " + (skipVersionCheck ? "enabled" : "not enabled");
+            LOG.fatal(msg);
             try {
               jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
             } catch(Exception e ) {
@@ -1641,8 +1800,17 @@ public class TaskTracker implements MRCo
           }
           
           String dir = jobClient.getSystemDir();
-          if (dir == null) {
-            throw new IOException("Failed to get system directory");
+          while (dir == null) {
+            LOG.info("Failed to get system directory...");
+            
+            // Re-try
+            try {
+              // Sleep interval: 1000 ms - 5000 ms
+              int sleepInterval = 1000 + r.nextInt(4000);
+              Thread.sleep(sleepInterval);
+            } catch (InterruptedException ie) 
+            {}
+            dir = jobClient.getSystemDir();
           }
           systemDirectory = new Path(dir);
           systemFS = systemDirectory.getFileSystem(fConf);
@@ -1725,7 +1893,7 @@ public class TaskTracker implements MRCo
                 commitResponses.add(commitAction.getTaskID());
               }
             } else {
-              tasksToCleanup.put(action);
+              addActionToCleanup(action);
             }
           }
         }
@@ -2079,7 +2247,7 @@ public class TaskTracker implements MRCo
         rjob.distCacheMgr.release();
         // Add this tips of this job to queue of tasks to be purged 
         for (TaskInProgress tip : rjob.tasks) {
-          tip.jobHasFinished(false);
+          tip.jobHasFinished(false, false);
           Task t = tip.getTask();
           if (t.isMapTask()) {
             indexCache.removeMap(tip.getTask().getTaskID().toString());
@@ -2153,7 +2321,7 @@ public class TaskTracker implements MRCo
       // Remove the task from running jobs, 
       // removing the job if it's the last task
       removeTaskFromJob(tip.getTask().getJobID(), tip);
-      tip.jobHasFinished(wasFailure);
+      tip.jobHasFinished(false, wasFailure);
       if (tip.getTask().isMapTask()) {
         indexCache.removeMap(tip.getTask().getTaskID().toString());
       }
@@ -2451,7 +2619,7 @@ public class TaskTracker implements MRCo
           tip.reportDiagnosticInfo(msg);
           try {
             tip.kill(true);
-            tip.cleanup(true);
+            tip.cleanup(false, true);
           } catch (IOException ie2) {
             LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
           } catch (InterruptedException ie2) {
@@ -2645,7 +2813,11 @@ public class TaskTracker implements MRCo
       this.localJobConf = lconf;
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
       taskTimeout = localJobConf.getLong("mapred.task.timeout", 
-                                         10 * 60 * 1000);
+                                         Integer.MIN_VALUE);
+      if (taskTimeout == Integer.MIN_VALUE) {
+        taskTimeout = localJobConf.getLong("mapreduce.task.timeout", 
+            10 * 60 * 1000);
+      }
       if (task.isMapTask()) {
         debugCommand = localJobConf.getMapDebugScript();
       } else {
@@ -3018,7 +3190,7 @@ public class TaskTracker implements MRCo
         removeTaskFromJob(task.getJobID(), this);
       }
       try {
-        cleanup(needCleanup);
+        cleanup(false, needCleanup);
       } catch (IOException ie) {
       }
 
@@ -3106,11 +3278,13 @@ public class TaskTracker implements MRCo
     /**
      * We no longer need anything from this task, as the job has
      * finished.  If the task is still running, kill it and clean up.
-     * 
+     *
+     * @param ttReInit is the TaskTracker executing re-initialization sequence?
      * @param wasFailure did the task fail, as opposed to was it killed by
      *                   the framework
      */
-    public void jobHasFinished(boolean wasFailure) throws IOException {
+    public void jobHasFinished(boolean ttReInit, boolean wasFailure) 
+        throws IOException {
       // Kill the task if it is still running
       synchronized(this){
         if (getRunState() == TaskStatus.State.RUNNING ||
@@ -3127,7 +3301,7 @@ public class TaskTracker implements MRCo
       }
       
       // Cleanup on the finished task
-      cleanup(true);
+      cleanup(ttReInit, true);
     }
 
     /**
@@ -3209,7 +3383,7 @@ public class TaskTracker implements MRCo
      * otherwise the current working directory of the task 
      * i.e. &lt;taskid&gt;/work is cleaned up.
      */
-    void cleanup(boolean needCleanup) throws IOException {
+    void cleanup(boolean ttReInit, boolean needCleanup) throws IOException {
       TaskAttemptID taskId = task.getTaskID();
       LOG.debug("Cleaning up " + taskId);
 
@@ -3238,7 +3412,10 @@ public class TaskTracker implements MRCo
           return;
         }
         try {
-          removeTaskFiles(needCleanup);
+          // TT re-initialization sequence: no need to cleanup, TT will cleanup
+          if (!ttReInit) {
+            removeTaskFiles(needCleanup);
+          }
         } catch (Throwable ie) {
           LOG.info("Error cleaning up task runner: "
               + StringUtils.stringifyException(ie));
@@ -3733,15 +3910,23 @@ public class TaskTracker implements MRCo
   JobConf getJobConf() {
     return fConf;
   }
-    
+
   /**
    * Is this task tracker idle?
-   * @return has this task tracker finished and cleaned up all of its tasks?
+   * @return has this task tracker finished all its assigned tasks?
    */
   public synchronized boolean isIdle() {
-    return tasks.isEmpty() && tasksToCleanup.isEmpty();
+    return tasks.isEmpty();
   }
-    
+
+  /**
+   * Is this task tracker idle and clean?
+   * @return has this task tracker finished and cleaned up all of its tasks?
+   */
+  public synchronized boolean isIdleAndClean() {
+    return tasks.isEmpty() && allCleanupActions.isEmpty();
+  }
+
   /**
    * Start the TaskTracker, point toward the indicated JobTracker
    */
@@ -3919,16 +4104,30 @@ public class TaskTracker implements MRCo
          * send it to the reducer.
          */
         //open the map-output file
+        String filePath = mapOutputFileName.toUri().getPath();
         mapOutputIn = SecureIOUtils.openForRead(
-            new File(mapOutputFileName.toUri().getPath()), runAsUserName);
+            new File(filePath), runAsUserName);
+            //new File(mapOutputFileName.toUri().getPath()), runAsUserName);
 
+        ReadaheadRequest curReadahead = null;
+        
         //seek to the correct offset for the reduce
         mapOutputIn.skip(info.startOffset);
         long rem = info.partLength;
-        int len =
-          mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
-        while (rem > 0 && len >= 0) {
+        long offset = info.startOffset;
+        while (rem > 0) {
+          if (tracker.manageOsCacheInShuffle && tracker.readaheadPool != null) {
+            curReadahead = tracker.readaheadPool.readaheadStream(filePath,
+                mapOutputIn.getFD(), offset, tracker.readaheadLength,
+                info.startOffset + info.partLength, curReadahead);
+          }
+          int len = mapOutputIn.read(buffer, 0,
+              (int) Math.min(rem, MAX_BYTES_TO_READ));
+          if (len < 0) {
+            break;
+          }
           rem -= len;
+          offset += len;
           try {
             shuffleMetrics.outputBytes(len);
             outStream.write(buffer, 0, len);
@@ -3938,10 +4137,18 @@ public class TaskTracker implements MRCo
             throw ie;
           }
           totalRead += len;
-          len =
-            mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
         }
         
+        if (curReadahead != null) {
+          curReadahead.cancel();
+        }
+
+        // drop cache if possible
+        if (tracker.manageOsCacheInShuffle && info.partLength > 0) {
+          NativeIO.posixFadviseIfPossible(mapOutputIn.getFD(),
+              info.startOffset, info.partLength, NativeIO.POSIX_FADV_DONTNEED);
+        }
+
         if (LOG.isDebugEnabled()) {
           LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
                  " from map: " + mapId + " given " + info.partLength + "/" + 

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java Fri Jun 21 06:37:27 2013
@@ -112,4 +112,10 @@ interface TaskTrackerManager {
    * @param job JobInProgress object
    */
   public void failJob(JobInProgress job);
+  
+  /**
+   * Get safe mode.
+   * @return
+   */
+  public boolean isInSafeMode();
 }

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Jun 21 06:37:27 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSelector;
@@ -29,6 +30,7 @@ import org.apache.hadoop.security.token.
  * parent is a daemon which which polls the central master for a new map or
  * reduce task and runs it as a child process.  All communication between child
  * and parent is via this protocol. */
+@Private
 @TokenInfo(JobTokenSelector.class)
 public interface TaskUmbilicalProtocol extends VersionedProtocol {
 

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties Fri Jun 21 06:37:27 2013
@@ -1,3 +1,16 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+
 # ResourceBundle properties file for Map-Reduce counters
 
 CounterGroupName=              Map-Reduce Framework

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TextInputFormat.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TextInputFormat.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/TextInputFormat.java Fri Jun 21 06:37:27 2013
@@ -39,7 +39,11 @@ public class TextInputFormat extends Fil
   }
   
   protected boolean isSplitable(FileSystem fs, Path file) {
-    return compressionCodecs.getCodec(file) == null;
+    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
   }
 
   public RecordReader<LongWritable, Text> getRecordReader(



Mime
View raw message