Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CA53411C1E for ; Tue, 19 Aug 2014 23:54:17 +0000 (UTC) Received: (qmail 64160 invoked by uid 500); 19 Aug 2014 23:54:17 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 64076 invoked by uid 500); 19 Aug 2014 23:54:17 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 64055 invoked by uid 99); 19 Aug 2014 23:54:17 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Aug 2014 23:54:17 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Aug 2014 23:52:46 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E800A2388CDF; Tue, 19 Aug 2014 23:51:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1619012 [5/7] - in /hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project: ./ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/... Date: Tue, 19 Aug 2014 23:50:52 -0000 To: mapreduce-commits@hadoop.apache.org From: cmccabe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140819235100.E800A2388CDF@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/WrappedRecordReader.java Tue Aug 19 23:49:39 2014 @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; @@ -38,7 +40,7 @@ import org.apache.hadoop.mapred.RecordRe @InterfaceStability.Stable public class WrappedRecordReader - implements ComposableRecordReader { + implements ComposableRecordReader, Configurable { private boolean empty = false; private RecordReader rr; @@ -47,6 +49,7 @@ public class WrappedRecordReader vjoin; @@ -55,13 +58,20 @@ public class WrappedRecordReader rr, Class cmpcl) throws IOException { + this(id, rr, cmpcl, null); + } + + WrappedRecordReader(int id, RecordReader rr, + Class cmpcl, + Configuration conf) throws IOException { this.id = id; this.rr = rr; + this.conf = (conf == null) ? new Configuration() : conf; khead = rr.createKey(); vhead = rr.createValue(); try { cmp = (null == cmpcl) - ? WritableComparator.get(khead.getClass()) + ? WritableComparator.get(khead.getClass(), this.conf) : cmpcl.newInstance(); } catch (InstantiationException e) { throw (IOException)new IOException().initCause(e); @@ -207,4 +217,13 @@ public class WrappedRecordReader + * * mapreduce.input.fileinputformat.split.minsize.

* *

Clearly, logical splits based on input-size is insufficient for many Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java Tue Aug 19 23:49:39 2014 @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; @@ -51,10 +53,25 @@ public abstract class InputSplit { /** * Get the list of nodes by name where the data for the split would be local. * The locations do not need to be serialized. + * * @return a new array of the node nodes. * @throws IOException * @throws InterruptedException */ public abstract String[] getLocations() throws IOException, InterruptedException; + + /** + * Gets info about which nodes the input split is stored on and how it is + * stored at each location. + * + * @return list of SplitLocationInfos describing how the split + * data is stored at each location. A null value indicates that all the + * locations have the data stored on disk. + * @throws IOException + */ + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return null; + } } \ No newline at end of file Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Tue Aug 19 23:49:39 2014 @@ -54,7 +54,7 @@ import org.apache.hadoop.util.StringUtil *

Here is an example on how to submit a job:

*

  *     // Create a new Job
- *     Job job = new Job(new Configuration());
+ *     Job job = Job.getInstance();
  *     job.setJarByClass(MyJob.class);
  *     
  *     // Specify various job-specific parameters     
@@ -113,16 +113,25 @@ public class Job extends JobContextImpl 
   private long statustime;
   private Cluster cluster;
 
+  /**
+   * @deprecated Use {@link #getInstance()}
+   */
   @Deprecated
   public Job() throws IOException {
     this(new Configuration());
   }
 
+  /**
+   * @deprecated Use {@link #getInstance(Configuration)}
+   */
   @Deprecated
   public Job(Configuration conf) throws IOException {
     this(new JobConf(conf));
   }
 
+  /**
+   * @deprecated Use {@link #getInstance(Configuration, String)}
+   */
   @Deprecated
   public Job(Configuration conf, String jobName) throws IOException {
     this(conf);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java Tue Aug 19 23:49:39 2014
@@ -48,7 +48,7 @@ import org.apache.hadoop.io.Text;
 @InterfaceStability.Stable
 public class JobID extends org.apache.hadoop.mapred.ID 
                    implements Comparable {
-  protected static final String JOB = "job";
+  public static final String JOB = "job";
   
   // Jobid regex for various tools and framework components
   public static final String JOBID_REGEX = 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Aug 19 23:49:39 2014
@@ -265,6 +265,7 @@ public interface MRJobConfig {
   public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";
 
   public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
+  public static final float DEFAULT_SHUFFLE_INPUT_BUFFER_PERCENT = 0.70f;
 
   public static final String SHUFFLE_MEMORY_LIMIT_PERCENT
     = "mapreduce.reduce.shuffle.memory.limit.percent";
@@ -292,11 +293,19 @@ public interface MRJobConfig {
   public static final String SHUFFLE_READ_TIMEOUT = "mapreduce.reduce.shuffle.read.timeout";
 
   public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
+  public static final String MAX_ALLOWED_FETCH_FAILURES_FRACTION = "mapreduce.reduce.shuffle.max-fetch-failures-fraction";
+  public static final float DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5f;
+  
+  public static final String MAX_FETCH_FAILURES_NOTIFICATIONS = "mapreduce.reduce.shuffle.max-fetch-failures-notifications";
+  public static final int DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
 
   public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
   
   public static final String MAX_SHUFFLE_FETCH_RETRY_DELAY = "mapreduce.reduce.shuffle.retry-delay.max.ms";
   public static final long DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY = 60000;
+  
+  public static final String MAX_SHUFFLE_FETCH_HOST_FAILURES = "mapreduce.reduce.shuffle.max-host-failures";
+  public static final int DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES = 5;
 
   public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
 
@@ -579,7 +588,17 @@ public interface MRJobConfig {
       MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold";
   public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
       50;
-  
+
+  /**
+   * The threshold in terms of seconds after which an unsatisfied mapper request
+   * triggers reducer preemption to free space. Default 0 implies that the reduces
+   * should be preempted immediately after allocation if there is currently no
+   * room for newly allocated mappers.
+   */
+  public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC =
+      "mapreduce.job.reducer.preempt.delay.sec";
+  public static final int DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC = 0;
+
   public static final String MR_AM_ENV =
       MR_AM_PREFIX + "env";
   

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java Tue Aug 19 23:49:39 2014
@@ -176,17 +176,36 @@ public abstract class OutputCommitter {
   /**
    * Is task output recovery supported for restarting jobs?
    * 
-   * If task output recovery is supported, job restart can be done more 
+   * If task output recovery is supported, job restart can be done more
    * efficiently.
    * 
    * @return true if task output recovery is supported,
    *         false otherwise
-   * @see #recoverTask(TaskAttemptContext)         
+   * @see #recoverTask(TaskAttemptContext)
+   * @deprecated Use {@link #isRecoverySupported(JobContext)} instead.
    */
+  @Deprecated
   public boolean isRecoverySupported() {
     return false;
   }
-  
+
+  /**
+   * Is task output recovery supported for restarting jobs?
+   * 
+   * If task output recovery is supported, job restart can be done more
+   * efficiently.
+   * 
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return true if task output recovery is supported,
+   *         false otherwise
+   * @throws IOException
+   * @see #recoverTask(TaskAttemptContext)
+   */
+  public boolean isRecoverySupported(JobContext jobContext) throws IOException {
+    return isRecoverySupported();
+  }
+
   /**
    * Recover the task output. 
    * 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java Tue Aug 19 23:49:39 2014
@@ -95,8 +95,8 @@ public class TaskCompletionEvent impleme
   }
   
   /**
-   * Returns enum Status.SUCESS or Status.FAILURE.
-   * @return task tracker status
+   * Returns {@link Status}
+   * @return task completion status
    */
   public Status getStatus() {
     return status;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java Tue Aug 19 23:49:39 2014
@@ -35,6 +35,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.DatumReader;
+import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 
 @InterfaceAudience.Private
@@ -69,9 +70,10 @@ public class EventReader implements Clos
     if (!EventWriter.VERSION.equals(version)) {
       throw new IOException("Incompatible event log version: "+version);
     }
-    
+
+    Schema myschema = new SpecificData(Event.class.getClassLoader()).getSchema(Event.class);
     this.schema = Schema.parse(in.readLine());
-    this.reader = new SpecificDatumReader(schema);
+    this.reader = new SpecificDatumReader(schema, myschema);
     this.decoder = DecoderFactory.get().jsonDecoder(schema, in);
   }
   
@@ -173,13 +175,15 @@ public class EventReader implements Clos
 
   static Counters fromAvro(JhCounters counters) {
     Counters result = new Counters();
-    for (JhCounterGroup g : counters.groups) {
-      CounterGroup group =
-          result.addGroup(StringInterner.weakIntern(g.name.toString()), 
-              StringInterner.weakIntern(g.displayName.toString()));
-      for (JhCounter c : g.counts) {
-        group.addCounter(StringInterner.weakIntern(c.name.toString()), 
-            StringInterner.weakIntern(c.displayName.toString()), c.value);
+    if(counters != null) {
+      for (JhCounterGroup g : counters.groups) {
+        CounterGroup group =
+            result.addGroup(StringInterner.weakIntern(g.name.toString()), 
+                StringInterner.weakIntern(g.displayName.toString()));
+        for (JhCounter c : g.counts) {
+          group.addCounter(StringInterner.weakIntern(c.name.toString()), 
+              StringInterner.weakIntern(c.displayName.toString()), c.value);
+        }
       }
     }
     return result;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Tue Aug 19 23:49:39 2014
@@ -288,8 +288,18 @@ public class JobHistoryParser implements
   private void handleTaskAttemptFailedEvent(
       TaskAttemptUnsuccessfulCompletionEvent event) {
     TaskInfo taskInfo = info.tasksMap.get(event.getTaskId());
+    if(taskInfo == null) {
+      LOG.warn("TaskInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
+          + " taskId:  " + event.getTaskId().toString());
+      return;
+    }
     TaskAttemptInfo attemptInfo = 
       taskInfo.attemptsMap.get(event.getTaskAttemptId());
+    if(attemptInfo == null) {
+      LOG.warn("AttemptInfo is null for TaskAttemptUnsuccessfulCompletionEvent"
+          + " taskAttemptId:  " + event.getTaskAttemptId().toString());
+      return;
+    }
     attemptInfo.finishTime = event.getFinishTime();
     attemptInfo.error = StringInterner.weakIntern(event.getError());
     attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus());

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java Tue Aug 19 23:49:39 2014
@@ -81,15 +81,14 @@ public class OracleDBRecordReader 0 && split.getStart() > 0){
+      if (split.getLength() > 0){
         String querystring = query.toString();
 
         query = new StringBuilder();
         query.append("SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( ");
         query.append(querystring);
-        query.append(" ) a WHERE rownum <= ").append(split.getStart());
-        query.append(" + ").append(split.getLength());
-        query.append(" ) WHERE dbif_rno >= ").append(split.getStart());
+        query.append(" ) a WHERE rownum <= ").append(split.getEnd());
+        query.append(" ) WHERE dbif_rno > ").append(split.getStart());
       }
     } catch (IOException ex) {
       // ignore, will not throw.

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Tue Aug 19 23:49:39 2014
@@ -579,7 +579,7 @@ public abstract class CombineFileInputFo
         blocks = new OneBlockInfo[0];
       } else {
 
-        if(locations.length == 0) {
+        if(locations.length == 0 && !stat.isDirectory()) {
           locations = new BlockLocation[] { new BlockLocation() };
         }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Tue Aug 19 23:49:39 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -359,6 +360,15 @@ public abstract class FileInputFormat SPLIT_SLOP) {
             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
-                                     blkLocations[blkIndex].getHosts()));
+                        blkLocations[blkIndex].getHosts(),
+                        blkLocations[blkIndex].getCachedHosts()));
             bytesRemaining -= splitSize;
           }
 
           if (bytesRemaining != 0) {
             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
-                       blkLocations[blkIndex].getHosts()));
+                       blkLocations[blkIndex].getHosts(),
+                       blkLocations[blkIndex].getCachedHosts()));
           }
         } else { // not splitable
-          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
+          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
+                      blkLocations[0].getCachedHosts()));
         }
       } else { 
         //Create empty hosts array for zero length files

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java Tue Aug 19 23:49:39 2014
@@ -22,11 +22,13 @@ import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
 
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -41,6 +43,7 @@ public class FileSplit extends InputSpli
   private long start;
   private long length;
   private String[] hosts;
+  private SplitLocationInfo[] hostInfos;
 
   public FileSplit() {}
 
@@ -57,6 +60,31 @@ public class FileSplit extends InputSpli
     this.length = length;
     this.hosts = hosts;
   }
+  
+  /** Constructs a split with host and cached-blocks information
+  *
+  * @param file the file name
+  * @param start the position of the first byte in the file to process
+  * @param length the number of bytes in the file to process
+  * @param hosts the list of hosts containing the block
+  * @param inMemoryHosts the list of hosts containing the block in memory
+  */
+ public FileSplit(Path file, long start, long length, String[] hosts,
+     String[] inMemoryHosts) {
+   this(file, start, length, hosts);
+   hostInfos = new SplitLocationInfo[hosts.length];
+   for (int i = 0; i < hosts.length; i++) {
+     // because N will be tiny, scanning is probably faster than a HashSet
+     boolean inMemory = false;
+     for (String inMemoryHost : inMemoryHosts) {
+       if (inMemoryHost.equals(hosts[i])) {
+         inMemory = true;
+         break;
+       }
+     }
+     hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
+   }
+ }
  
   /** The file containing this split's data. */
   public Path getPath() { return file; }
@@ -98,4 +126,10 @@ public class FileSplit extends InputSpli
       return this.hosts;
     }
   }
+  
+  @Override
+  @Evolving
+  public SplitLocationInfo[] getLocationInfo() throws IOException {
+    return hostInfos;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java Tue Aug 19 23:49:39 2014
@@ -38,6 +38,9 @@ import org.apache.hadoop.mapreduce.TaskA
  * Either line feed or carriage-return are used to signal end of line. 
  * Each line is divided into key and value parts by a separator byte. If no
  * such a byte exists, the key will be the entire line and value will be empty.
+ * The separator byte can be specified in config file under the attribute name
+ * mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default
+ * is the tab character ('\t').
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Tue Aug 19 23:49:39 2014
@@ -121,7 +121,7 @@ public class LineRecordReader extends Re
   private int maxBytesToConsume(long pos) {
     return isCompressedInput
       ? Integer.MAX_VALUE
-      : (int) Math.min(Integer.MAX_VALUE, end - pos);
+      : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
   }
 
   private long getFilePosition() throws IOException {
@@ -134,6 +134,39 @@ public class LineRecordReader extends Re
     return retVal;
   }
 
+  private int skipUtfByteOrderMark() throws IOException {
+    // Strip BOM(Byte Order Mark)
+    // Text only support UTF-8, we only need to check UTF-8 BOM
+    // (0xEF,0xBB,0xBF) at the start of the text stream.
+    int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength,
+        Integer.MAX_VALUE);
+    int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos));
+    // Even we read 3 extra bytes for the first line,
+    // we won't alter existing behavior (no backwards incompat issue).
+    // Because the newSize is less than maxLineLength and
+    // the number of bytes copied to Text is always no more than newSize.
+    // If the return size from readLine is not less than maxLineLength,
+    // we will discard the current line and read the next line.
+    pos += newSize;
+    int textLength = value.getLength();
+    byte[] textBytes = value.getBytes();
+    if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) &&
+        (textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) {
+      // find UTF-8 BOM, strip it.
+      LOG.info("Found UTF-8 BOM and skipped it");
+      textLength -= 3;
+      newSize -= 3;
+      if (textLength > 0) {
+        // It may work to use the same buffer and not do the copyBytes
+        textBytes = value.copyBytes();
+        value.set(textBytes, 3, textLength);
+      } else {
+        value.clear();
+      }
+    }
+    return newSize;
+  }
+
   public boolean nextKeyValue() throws IOException {
     if (key == null) {
       key = new LongWritable();
@@ -146,10 +179,14 @@ public class LineRecordReader extends Re
     // We always read one extra line, which lies outside the upper
     // split limit i.e. (end - 1)
     while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
-      newSize = in.readLine(value, maxLineLength,
-          Math.max(maxBytesToConsume(pos), maxLineLength));
-      pos += newSize;
-      if (newSize < maxLineLength) {
+      if (pos == 0) {
+        newSize = skipUtfByteOrderMark();
+      } else {
+        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
+        pos += newSize;
+      }
+
+      if ((newSize == 0) || (newSize < maxLineLength)) {
         break;
       }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java Tue Aug 19 23:49:39 2014
@@ -39,7 +39,6 @@ import org.apache.hadoop.mapreduce.TaskA
 /**
  * An InputFormat capable of performing joins over a set of data sources sorted
  * and partitioned the same way.
- * @see #setFormat
  *
  * A user may define new join types by setting the property
  * mapreduce.join.define.<ident> to a classname. 
@@ -47,6 +46,7 @@ import org.apache.hadoop.mapreduce.TaskA
  * assumed to be a ComposableRecordReader.
  * mapreduce.join.keycomparator can be a classname used to compare 
  * keys in the join.
+ * @see #setFormat
  * @see JoinRecordReader
  * @see MultiFilterRecordReader
  */

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java Tue Aug 19 23:49:39 2014
@@ -109,7 +109,7 @@ public abstract class CompositeRecordRea
         }
         // create priority queue
         if (null == q) {
-          cmp = WritableComparator.get(keyclass);
+          cmp = WritableComparator.get(keyclass, conf);
           q = new PriorityQueue>(3,
                 new Comparator>() {
                   public int compare(ComposableRecordReader o1,

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java Tue Aug 19 23:49:39 2014
@@ -92,7 +92,7 @@ public class WrappedRecordReader exte
       public void setupTask(TaskAttemptContext taskContext) { }
 
       @Override
+      @Deprecated
       public boolean isRecoverySupported() {
         return true;
       }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Tue Aug 19 23:49:39 2014
@@ -319,6 +319,7 @@ class Fetcher extends Thread {
 
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
+      scheduler.hostFailed(host.getHostName());
       for(TaskAttemptID left: remaining) {
         scheduler.copyFailed(left, host, false, connectExcpt);
       }
@@ -343,6 +344,7 @@ class Fetcher extends Thread {
       
       if(failedTasks != null && failedTasks.length > 0) {
         LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
+        scheduler.hostFailed(host.getHostName());
         for(TaskAttemptID left: failedTasks) {
           scheduler.copyFailed(left, host, true, false);
         }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Tue Aug 19 23:49:39 2014
@@ -156,7 +156,8 @@ public class MergeManagerImpl impl
     this.rfs = ((LocalFileSystem)localFS).getRaw();
     
     final float maxInMemCopyUse =
-      jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f);
+      jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT,
+          MRJobConfig.DEFAULT_SHUFFLE_INPUT_BUFFER_PERCENT);
     if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
       throw new IllegalArgumentException("Invalid value for " +
           MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
@@ -197,7 +198,7 @@ public class MergeManagerImpl impl
              "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
 
     if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
-      throw new RuntimeException("Invlaid configuration: "
+      throw new RuntimeException("Invalid configuration: "
           + "maxSingleShuffleLimit should be less than mergeThreshold"
           + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
           + "mergeThreshold: " + this.mergeThreshold);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java Tue Aug 19 23:49:39 2014
@@ -18,7 +18,6 @@
 package org.apache.hadoop.mapreduce.task.reduce;
 
 import java.io.IOException;
-
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
@@ -101,6 +100,7 @@ public class ShuffleSchedulerImpl i
 
   private final boolean reportReadErrorImmediately;
   private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
+  private int maxHostFailures;
 
   public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
                           TaskAttemptID reduceId,
@@ -132,6 +132,9 @@ public class ShuffleSchedulerImpl i
 
     this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
         MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
+    this.maxHostFailures = job.getInt(
+        MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
+        MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES);
   }
 
   @Override
@@ -213,9 +216,18 @@ public class ShuffleSchedulerImpl i
     progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
         + mbpsFormat.format(transferRate) + " MB/s)");
   }
+  
+  public synchronized void hostFailed(String hostname) {
+    if (hostFailures.containsKey(hostname)) {
+      IntWritable x = hostFailures.get(hostname);
+      x.set(x.get() + 1);
+    } else {
+      hostFailures.put(hostname, new IntWritable(1));
+    }
+  }
 
   public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
-                                      boolean readError, boolean connectExcpt) {
+      boolean readError, boolean connectExcpt) {
     host.penalize();
     int failures = 1;
     if (failureCounts.containsKey(mapId)) {
@@ -226,12 +238,9 @@ public class ShuffleSchedulerImpl i
       failureCounts.put(mapId, new IntWritable(1));
     }
     String hostname = host.getHostName();
-    if (hostFailures.containsKey(hostname)) {
-      IntWritable x = hostFailures.get(hostname);
-      x.set(x.get() + 1);
-    } else {
-      hostFailures.put(hostname, new IntWritable(1));
-    }
+    //report failure if already retried maxHostFailures times
+    boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() ? true : false;
+    
     if (failures >= abortFailureLimit) {
       try {
         throw new IOException(failures + " failures downloading " + mapId);
@@ -240,7 +249,7 @@ public class ShuffleSchedulerImpl i
       }
     }
 
-    checkAndInformJobTracker(failures, mapId, readError, connectExcpt);
+    checkAndInformJobTracker(failures, mapId, readError, connectExcpt, hostFail);
 
     checkReducerHealth();
 
@@ -270,9 +279,9 @@ public class ShuffleSchedulerImpl i
   // after every 'maxFetchFailuresBeforeReporting' failures
   private void checkAndInformJobTracker(
       int failures, TaskAttemptID mapId, boolean readError,
-      boolean connectExcpt) {
+      boolean connectExcpt, boolean hostFailed) {
     if (connectExcpt || (reportReadErrorImmediately && readError)
-        || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
+        || ((failures % maxFetchFailuresBeforeReporting) == 0) || hostFailed) {
       LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
       status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
     }
@@ -507,4 +516,7 @@ public class ShuffleSchedulerImpl i
     referee.join();
   }
 
+  public int getMaxHostFailures() {
+    return maxHostFailures;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java Tue Aug 19 23:49:39 2014
@@ -38,6 +38,21 @@ public class HostUtil {
         httpPort + "/tasklog?attemptid=" + taskAttemptID);
   }
 
+  /**
+   * Always throws {@link RuntimeException} because this method is not
+   * supposed to be called at runtime. This method is only for keeping
+   * binary compatibility with Hive 0.13. MAPREDUCE-5830 for the details.
+   * @deprecated Use {@link #getTaskLogUrl(String, String, String, String)}
+   * to construct the taskLogUrl.
+   */
+  @Deprecated
+  public static String getTaskLogUrl(String taskTrackerHostName,
+                                     String httpPort, String taskAttemptID) {
+    throw new RuntimeException(
+        "This method is not supposed to be called at runtime. " +
+        "Use HostUtil.getTaskLogUrl(String, String, String, String) instead.");
+  }
+
   public static String convertTrackerNameToHostName(String trackerName) {
     // Ugly!
     // Convert the trackerName to its host name

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Tue Aug 19 23:49:39 2014
@@ -83,6 +83,16 @@
 
 
 
+  mapreduce.job.reducer.preempt.delay.sec
+  0
+  The threshold in terms of seconds after which an unsatisfied mapper 
+  request triggers reducer preemption to free space. Default 0 implies that the 
+  reduces should be preempted immediately after allocation if there is currently no
+  room for newly allocated mappers.
+  
+
+
+
     mapreduce.job.max.split.locations
     10
     The max number of block locations to store for each split for 
@@ -661,7 +671,7 @@
 
   
     mapreduce.task.profile.params
-    
+    -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s
     JVM profiler parameters used to profile map and reduce task
       attempts. This string may contain a single format specifier %s that will
       be replaced by the path to profile.out in the task attempt log directory.
@@ -1215,7 +1225,9 @@
 
 
    mapreduce.job.classloader.system.classes
-   java.,javax.,org.apache.commons.logging.,org.apache.log4j.,org.apache.hadoop.
+   java.,javax.,org.w3c.dom.,org.xml.sax.,org.apache.commons.logging.,
+          org.apache.log4j.,org.apache.hadoop.,core-default.xml,
+          hdfs-default.xml,mapred-default.xml,yarn-default.xml
   A comma-separated list of classes that should be loaded from the
     system classpath, not the user-supplied JARs, when mapreduce.job.classloader
     is enabled. Names ending in '.' (period) are treated as package names,

Propchange: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-2006/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1588992-1596568
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1582150-1619000

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm Tue Aug 19 23:49:39 2014
@@ -18,8 +18,6 @@
 
 Hadoop MapReduce Next Generation - Distributed Cache Deploy
 
-  \[ {{{./index.html}Go Back}} \]
-
 * Introduction
 
   The MapReduce application framework has rudimentary support for deploying a

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm Tue Aug 19 23:49:39 2014
@@ -18,8 +18,6 @@
 
 Hadoop MapReduce Next Generation - Encrypted Shuffle
 
-  \[ {{{./index.html}Go Back}} \]
-
 * {Introduction}
 
   The Encrypted Shuffle capability allows encryption of the MapReduce shuffle

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm Tue Aug 19 23:49:39 2014
@@ -18,8 +18,6 @@
 
 Apache Hadoop MapReduce - Migrating from Apache Hadoop 1.x to Apache Hadoop 2.x 
 
-  \[ {{{../../hadoop-yarn/hadoop-yarn-site/index.html}Go Back}} \]
-
 * {Introduction}
 
   This document provides information for users to migrate their Apache Hadoop 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm Tue Aug 19 23:49:39 2014
@@ -18,8 +18,6 @@
 
 Hadoop MapReduce Next Generation - Pluggable Shuffle and Pluggable Sort
 
-  \[ {{{./index.html}Go Back}} \]
-
 * Introduction
 
   The pluggable shuffle and pluggable sort capabilities allow replacing the 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestClusterStatus.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestClusterStatus.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestClusterStatus.java Tue Aug 19 23:49:39 2014
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.junit.Test;
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java Tue Aug 19 23:49:39 2014
@@ -26,7 +26,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Random;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Tue Aug 19 23:49:39 2014
@@ -103,6 +103,29 @@ public class TestFileInputFormat {
   }
   
   @Test
+  public void testSplitLocationInfo() throws Exception {
+    Configuration conf = getConfiguration();
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        "test:///a1/a2");
+    JobConf job = new JobConf(conf);
+    TextInputFormat fileInputFormat = new TextInputFormat();
+    fileInputFormat.configure(job);
+    FileSplit[] splits = (FileSplit[]) fileInputFormat.getSplits(job, 1);
+    String[] locations = splits[0].getLocations();
+    Assert.assertEquals(2, locations.length);
+    SplitLocationInfo[] locationInfo = splits[0].getLocationInfo();
+    Assert.assertEquals(2, locationInfo.length);
+    SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
+        locationInfo[0] : locationInfo[1];
+    SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
+        locationInfo[0] : locationInfo[1];
+    Assert.assertTrue(localhostInfo.isOnDisk());
+    Assert.assertTrue(localhostInfo.isInMemory());
+    Assert.assertTrue(otherhostInfo.isOnDisk());
+    Assert.assertFalse(otherhostInfo.isInMemory());
+  }
+  
+  @Test
   public void testListStatusSimple() throws IOException {
     Configuration conf = new Configuration();
     conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
@@ -223,8 +246,9 @@ public class TestFileInputFormat {
     public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
         throws IOException {
       return new BlockLocation[] {
-          new BlockLocation(new String[] { "localhost:50010" },
-              new String[] { "localhost" }, 0, len) };
+          new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" },
+              new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
+              new String[0], 0, len, false) };
     }
 
     @Override

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobQueueClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobQueueClient.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobQueueClient.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobQueueClient.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.junit.Test;
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java Tue Aug 19 23:49:39 2014
@@ -23,9 +23,12 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
 
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -97,4 +100,129 @@ public class TestLineRecordReader {
     // character is a linefeed
     testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
   }
+
+  // Use the LineRecordReader to read records from the file
+  public ArrayList readRecords(URL testFileUrl, int splitSize)
+      throws IOException {
+
+    // Set up context
+    File testFile = new File(testFileUrl.getFile());
+    long testFileSize = testFile.length();
+    Path testFilePath = new Path(testFile.getAbsolutePath());
+    Configuration conf = new Configuration();
+    conf.setInt("io.file.buffer.size", 1);
+
+    // Gather the records returned by the record reader
+    ArrayList records = new ArrayList();
+
+    long offset = 0;
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    while (offset < testFileSize) {
+      FileSplit split =
+          new FileSplit(testFilePath, offset, splitSize, (String[]) null);
+      LineRecordReader reader = new LineRecordReader(conf, split);
+
+      while (reader.next(key, value)) {
+        records.add(value.toString());
+      }
+      offset += splitSize;
+    }
+    return records;
+  }
+
+  // Gather the records by just splitting on new lines
+  public String[] readRecordsDirectly(URL testFileUrl, boolean bzip)
+      throws IOException {
+    int MAX_DATA_SIZE = 1024 * 1024;
+    byte[] data = new byte[MAX_DATA_SIZE];
+    FileInputStream fis = new FileInputStream(testFileUrl.getFile());
+    int count;
+    if (bzip) {
+      BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis);
+      count = bzIn.read(data);
+      bzIn.close();
+    } else {
+      count = fis.read(data);
+    }
+    fis.close();
+    assertTrue("Test file data too big for buffer", count < data.length);
+    return new String(data, 0, count, "UTF-8").split("\n");
+  }
+
+  public void checkRecordSpanningMultipleSplits(String testFile,
+                                                int splitSize,
+                                                boolean bzip)
+      throws IOException {
+    URL testFileUrl = getClass().getClassLoader().getResource(testFile);
+    ArrayList records = readRecords(testFileUrl, splitSize);
+    String[] actuals = readRecordsDirectly(testFileUrl, bzip);
+
+    assertEquals("Wrong number of records", actuals.length, records.size());
+
+    boolean hasLargeRecord = false;
+    for (int i = 0; i < actuals.length; ++i) {
+      assertEquals(actuals[i], records.get(i));
+      if (actuals[i].length() > 2 * splitSize) {
+        hasLargeRecord = true;
+      }
+    }
+
+    assertTrue("Invalid test data. Doesn't have a large enough record",
+               hasLargeRecord);
+  }
+
+  @Test
+  public void testRecordSpanningMultipleSplits()
+      throws IOException {
+    checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt",
+        10, false);
+  }
+
+  @Test
+  public void testRecordSpanningMultipleSplitsCompressed()
+      throws IOException {
+    // The file is generated with bz2 block size of 100k. The split size
+    // needs to be larger than that for the CompressedSplitLineReader to
+    // work.
+    checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2",
+        200 * 1000, true);
+  }
+
+  @Test
+  public void testStripBOM() throws IOException {
+    // the test data contains a BOM at the start of the file
+    // confirm the BOM is skipped by LineRecordReader
+    String UTF8_BOM = "\uFEFF";
+    URL testFileUrl = getClass().getClassLoader().getResource("testBOM.txt");
+    assertNotNull("Cannot find testBOM.txt", testFileUrl);
+    File testFile = new File(testFileUrl.getFile());
+    Path testFilePath = new Path(testFile.getAbsolutePath());
+    long testFileSize = testFile.length();
+    Configuration conf = new Configuration();
+    conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+        LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+
+    // read the data and check whether BOM is skipped
+    FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
+        (String[])null);
+    LineRecordReader reader = new LineRecordReader(conf, split);
+    LongWritable key = new LongWritable();
+    Text value = new Text();
+    int numRecords = 0;
+    boolean firstLine = true;
+    boolean skipBOM = true;
+    while (reader.next(key, value)) {
+      if (firstLine) {
+        firstLine = false;
+        if (value.toString().startsWith(UTF8_BOM)) {
+          skipBOM = false;
+        }
+      }
+      ++numRecords;
+    }
+    reader.close();
+
+    assertTrue("BOM is not skipped", skipBOM);
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java Tue Aug 19 23:49:39 2014
@@ -35,7 +35,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static junit.framework.Assert.*;
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 /**

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskLog.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskLog.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskLog.java Tue Aug 19 23:49:39 2014
@@ -17,8 +17,8 @@
  */
 package org.apache.hadoop.mapred;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/db/TestDbClasses.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/db/TestDbClasses.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/db/TestDbClasses.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/db/TestDbClasses.java Tue Aug 19 23:49:39 2014
@@ -110,7 +110,7 @@ public class TestDbClasses {
         splitter, NullDBWritable.class, configuration, connect,
         dbConfiguration, "condition", fields, "table");
     assertEquals(
-        "SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( SELECT f1, f2 FROM table WHERE condition ORDER BY Order ) a WHERE rownum <= 1 + 9 ) WHERE dbif_rno >= 1",
+        "SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( SELECT f1, f2 FROM table WHERE condition ORDER BY Order ) a WHERE rownum <= 10 ) WHERE dbif_rno > 1",
         recorder.getSelectQuery());
   }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileRecordReader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileRecordReader.java Tue Aug 19 23:49:39 2014
@@ -22,7 +22,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.File;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Tue Aug 19 23:49:39 2014
@@ -26,7 +26,7 @@ import java.util.Set;
 
 import javax.annotation.Nullable;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.SplitLocationInfo;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.After;
@@ -139,6 +140,28 @@ public class TestFileInputFormat {
         1, mockFs.numListLocatedStatusCalls);
     FileSystem.closeAll();
   }
+  
+  @Test
+  public void testSplitLocationInfo() throws Exception {
+    Configuration conf = getConfiguration();
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        "test:///a1/a2");
+    Job job = Job.getInstance(conf);
+    TextInputFormat fileInputFormat = new TextInputFormat();
+    List splits = fileInputFormat.getSplits(job);
+    String[] locations = splits.get(0).getLocations();
+    Assert.assertEquals(2, locations.length);
+    SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo();
+    Assert.assertEquals(2, locationInfo.length);
+    SplitLocationInfo localhostInfo = locations[0].equals("localhost") ?
+        locationInfo[0] : locationInfo[1];
+    SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ?
+        locationInfo[0] : locationInfo[1];
+    Assert.assertTrue(localhostInfo.isOnDisk());
+    Assert.assertTrue(localhostInfo.isInMemory());
+    Assert.assertTrue(otherhostInfo.isOnDisk());
+    Assert.assertFalse(otherhostInfo.isInMemory());
+  }
 
   @Test
   public void testListStatusSimple() throws IOException {
@@ -402,9 +425,9 @@ public class TestFileInputFormat {
     public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
         throws IOException {
       return new BlockLocation[] {
-          new BlockLocation(new String[] { "localhost:50010" },
-              new String[] { "localhost" }, 0, len) };
-    }
+          new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" },
+              new String[] { "localhost", "otherhost" }, new String[] { "localhost" },
+              new String[0], 0, len, false) };    }
 
     @Override
     protected RemoteIterator listLocatedStatus(Path f,

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java Tue Aug 19 23:49:39 2014
@@ -23,9 +23,12 @@ import static org.junit.Assert.assertNot
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
 
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -101,4 +104,131 @@ public class TestLineRecordReader {
     // character is a linefeed
     testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498);
   }
+
+  // Use the LineRecordReader to read records from the file
+  public ArrayList readRecords(URL testFileUrl, int splitSize)
+      throws IOException {
+
+    // Set up context
+    File testFile = new File(testFileUrl.getFile());
+    long testFileSize = testFile.length();
+    Path testFilePath = new Path(testFile.getAbsolutePath());
+    Configuration conf = new Configuration();
+    conf.setInt("io.file.buffer.size", 1);
+    TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
+
+    // Gather the records returned by the record reader
+    ArrayList records = new ArrayList();
+
+    long offset = 0;
+    while (offset < testFileSize) {
+      FileSplit split = new FileSplit(testFilePath, offset, splitSize, null);
+      LineRecordReader reader = new LineRecordReader();
+      reader.initialize(split, context);
+
+      while (reader.nextKeyValue()) {
+        records.add(reader.getCurrentValue().toString());
+      }
+      offset += splitSize;
+    }
+    return records;
+  }
+
+  // Gather the records by just splitting on new lines
+  public String[] readRecordsDirectly(URL testFileUrl, boolean bzip)
+      throws IOException {
+    int MAX_DATA_SIZE = 1024 * 1024;
+    byte[] data = new byte[MAX_DATA_SIZE];
+    FileInputStream fis = new FileInputStream(testFileUrl.getFile());
+    int count;
+    if (bzip) {
+      BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis);
+      count = bzIn.read(data);
+      bzIn.close();
+    } else {
+      count = fis.read(data);
+    }
+    fis.close();
+    assertTrue("Test file data too big for buffer", count < data.length);
+    return new String(data, 0, count, "UTF-8").split("\n");
+  }
+
+  public void checkRecordSpanningMultipleSplits(String testFile,
+                                                int splitSize,
+                                                boolean bzip)
+      throws IOException {
+    URL testFileUrl = getClass().getClassLoader().getResource(testFile);
+    ArrayList records = readRecords(testFileUrl, splitSize);
+    String[] actuals = readRecordsDirectly(testFileUrl, bzip);
+
+    assertEquals("Wrong number of records", actuals.length, records.size());
+
+    boolean hasLargeRecord = false;
+    for (int i = 0; i < actuals.length; ++i) {
+      assertEquals(actuals[i], records.get(i));
+      if (actuals[i].length() > 2 * splitSize) {
+        hasLargeRecord = true;
+      }
+    }
+
+    assertTrue("Invalid test data. Doesn't have a large enough record",
+               hasLargeRecord);
+  }
+
+  @Test
+  public void testRecordSpanningMultipleSplits()
+      throws IOException {
+    checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt",
+                                      10,
+                                      false);
+  }
+
+  @Test
+  public void testRecordSpanningMultipleSplitsCompressed()
+      throws IOException {
+    // The file is generated with bz2 block size of 100k. The split size
+    // needs to be larger than that for the CompressedSplitLineReader to
+    // work.
+    checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2",
+                                      200 * 1000,
+                                      true);
+  }
+
+  @Test
+  public void testStripBOM() throws IOException {
+    // the test data contains a BOM at the start of the file
+    // confirm the BOM is skipped by LineRecordReader
+    String UTF8_BOM = "\uFEFF";
+    URL testFileUrl = getClass().getClassLoader().getResource("testBOM.txt");
+    assertNotNull("Cannot find testBOM.txt", testFileUrl);
+    File testFile = new File(testFileUrl.getFile());
+    Path testFilePath = new Path(testFile.getAbsolutePath());
+    long testFileSize = testFile.length();
+    Configuration conf = new Configuration();
+    conf.setInt(org.apache.hadoop.mapreduce.lib.input.
+        LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE);
+
+    TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID());
+
+    // read the data and check whether BOM is skipped
+    FileSplit split = new FileSplit(testFilePath, 0, testFileSize,
+        (String[])null);
+    LineRecordReader reader = new LineRecordReader();
+    reader.initialize(split, context);
+    int numRecords = 0;
+    boolean firstLine = true;
+    boolean skipBOM = true;
+    while (reader.nextKeyValue()) {
+      if (firstLine) {
+        firstLine = false;
+        if (reader.getCurrentValue().toString().startsWith(UTF8_BOM)) {
+          skipBOM = false;
+        }
+      }
+      ++numRecords;
+    }
+    reader.close();
+
+    assertTrue("BOM is not skipped", skipBOM);
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java Tue Aug 19 23:49:39 2014
@@ -30,7 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -193,7 +193,7 @@ public class TestMerger {
     RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass,
         valueClass, segments, 2, tmpDir, comparator, getReporter(),
         readsCounter, writesCounter, mergePhase);
-    Assert.assertEquals(1.0f, mergeQueue.getProgress().get());
+    Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), 0.0f);
   }
 
   private Progressable getReporter() {
@@ -274,4 +274,4 @@ public class TestMerger {
       }
     };
   }
-}
\ No newline at end of file
+}