Return-Path:
Clearly, logical splits based on input-size is insufficient for many
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java Tue Aug 19 23:49:39 2014
@@ -22,6 +22,8 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -51,10 +53,25 @@ public abstract class InputSplit {
/**
* Get the list of nodes by name where the data for the split would be local.
* The locations do not need to be serialized.
+ *
* @return a new array of the node nodes.
* @throws IOException
* @throws InterruptedException
*/
public abstract
String[] getLocations() throws IOException, InterruptedException;
+
+ /**
+ * Gets info about which nodes the input split is stored on and how it is
+ * stored at each location.
+ *
+ * @return list of SplitLocationInfo
s describing how the split
+ * data is stored at each location. A null value indicates that all the
+ * locations have the data stored on disk.
+ * @throws IOException
+ */
+ @Evolving
+ public SplitLocationInfo[] getLocationInfo() throws IOException {
+ return null;
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Tue Aug 19 23:49:39 2014
@@ -54,7 +54,7 @@ import org.apache.hadoop.util.StringUtil
*
Here is an example on how to submit a job:
** // Create a new Job - * Job job = new Job(new Configuration()); + * Job job = Job.getInstance(); * job.setJarByClass(MyJob.class); * * // Specify various job-specific parameters @@ -113,16 +113,25 @@ public class Job extends JobContextImpl private long statustime; private Cluster cluster; + /** + * @deprecated Use {@link #getInstance()} + */ @Deprecated public Job() throws IOException { this(new Configuration()); } + /** + * @deprecated Use {@link #getInstance(Configuration)} + */ @Deprecated public Job(Configuration conf) throws IOException { this(new JobConf(conf)); } + /** + * @deprecated Use {@link #getInstance(Configuration, String)} + */ @Deprecated public Job(Configuration conf, String jobName) throws IOException { this(conf); Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobID.java Tue Aug 19 23:49:39 2014 @@ -48,7 +48,7 @@ import org.apache.hadoop.io.Text; @InterfaceStability.Stable public class JobID extends org.apache.hadoop.mapred.ID implements Comparable{ - protected static final String JOB = "job"; + public static final String JOB = "job"; // Jobid regex for various tools and framework components public static final String JOBID_REGEX = Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Aug 19 23:49:39 2014 @@ -265,6 +265,7 @@ public interface MRJobConfig { public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes"; public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent"; + public static final float DEFAULT_SHUFFLE_INPUT_BUFFER_PERCENT = 0.70f; public static final String SHUFFLE_MEMORY_LIMIT_PERCENT = "mapreduce.reduce.shuffle.memory.limit.percent"; @@ -292,11 +293,19 @@ public interface MRJobConfig { public static final String SHUFFLE_READ_TIMEOUT = "mapreduce.reduce.shuffle.read.timeout"; public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures"; + public static final String MAX_ALLOWED_FETCH_FAILURES_FRACTION = "mapreduce.reduce.shuffle.max-fetch-failures-fraction"; + public static final float DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5f; + + public static final String MAX_FETCH_FAILURES_NOTIFICATIONS = "mapreduce.reduce.shuffle.max-fetch-failures-notifications"; + public static final int DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS = 3; public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror"; public static final String MAX_SHUFFLE_FETCH_RETRY_DELAY = "mapreduce.reduce.shuffle.retry-delay.max.ms"; public static final long DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY = 60000; + + public static final String MAX_SHUFFLE_FETCH_HOST_FAILURES = "mapreduce.reduce.shuffle.max-host-failures"; + public static final int DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES = 5; public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr"; @@ -579,7 +588,17 @@ public interface MRJobConfig { MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold"; public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD = 50; - + + /** + * The threshold in terms of seconds after which an unsatisfied mapper request + * triggers reducer preemption to free space. Default 0 implies that the reduces + * should be preempted immediately after allocation if there is currently no + * room for newly allocated mappers. + */ + public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC = + "mapreduce.job.reducer.preempt.delay.sec"; + public static final int DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC = 0; + public static final String MR_AM_ENV = MR_AM_PREFIX + "env"; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java Tue Aug 19 23:49:39 2014 @@ -176,17 +176,36 @@ public abstract class OutputCommitter { /** * Is task output recovery supported for restarting jobs? * - * If task output recovery is supported, job restart can be done more + * If task output recovery is supported, job restart can be done more * efficiently. * * @return true
if task output recovery is supported, *false
otherwise - * @see #recoverTask(TaskAttemptContext) + * @see #recoverTask(TaskAttemptContext) + * @deprecated Use {@link #isRecoverySupported(JobContext)} instead. */ + @Deprecated public boolean isRecoverySupported() { return false; } - + + /** + * Is task output recovery supported for restarting jobs? + * + * If task output recovery is supported, job restart can be done more + * efficiently. + * + * @param jobContext + * Context of the job whose output is being written. + * @returntrue
if task output recovery is supported, + *false
otherwise + * @throws IOException + * @see #recoverTask(TaskAttemptContext) + */ + public boolean isRecoverySupported(JobContext jobContext) throws IOException { + return isRecoverySupported(); + } + /** * Recover the task output. * Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskCompletionEvent.java Tue Aug 19 23:49:39 2014 @@ -95,8 +95,8 @@ public class TaskCompletionEvent impleme } /** - * Returns enum Status.SUCESS or Status.FAILURE. - * @return task tracker status + * Returns {@link Status} + * @return task completion status */ public Status getStatus() { return status; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java Tue Aug 19 23:49:39 2014 @@ -35,6 +35,7 @@ import org.apache.avro.Schema; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DatumReader; +import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; @InterfaceAudience.Private @@ -69,9 +70,10 @@ public class EventReader implements Clos if (!EventWriter.VERSION.equals(version)) { throw new IOException("Incompatible event log version: "+version); } - + + Schema myschema = new SpecificData(Event.class.getClassLoader()).getSchema(Event.class); this.schema = Schema.parse(in.readLine()); - this.reader = new SpecificDatumReader(schema); + this.reader = new SpecificDatumReader(schema, myschema); this.decoder = DecoderFactory.get().jsonDecoder(schema, in); } @@ -173,13 +175,15 @@ public class EventReader implements Clos static Counters fromAvro(JhCounters counters) { Counters result = new Counters(); - for (JhCounterGroup g : counters.groups) { - CounterGroup group = - result.addGroup(StringInterner.weakIntern(g.name.toString()), - StringInterner.weakIntern(g.displayName.toString())); - for (JhCounter c : g.counts) { - group.addCounter(StringInterner.weakIntern(c.name.toString()), - StringInterner.weakIntern(c.displayName.toString()), c.value); + if(counters != null) { + for (JhCounterGroup g : counters.groups) { + CounterGroup group = + result.addGroup(StringInterner.weakIntern(g.name.toString()), + StringInterner.weakIntern(g.displayName.toString())); + for (JhCounter c : g.counts) { + group.addCounter(StringInterner.weakIntern(c.name.toString()), + StringInterner.weakIntern(c.displayName.toString()), c.value); + } } } return result; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Tue Aug 19 23:49:39 2014 @@ -288,8 +288,18 @@ public class JobHistoryParser implements private void handleTaskAttemptFailedEvent( TaskAttemptUnsuccessfulCompletionEvent event) { TaskInfo taskInfo = info.tasksMap.get(event.getTaskId()); + if(taskInfo == null) { + LOG.warn("TaskInfo is null for TaskAttemptUnsuccessfulCompletionEvent" + + " taskId: " + event.getTaskId().toString()); + return; + } TaskAttemptInfo attemptInfo = taskInfo.attemptsMap.get(event.getTaskAttemptId()); + if(attemptInfo == null) { + LOG.warn("AttemptInfo is null for TaskAttemptUnsuccessfulCompletionEvent" + + " taskAttemptId: " + event.getTaskAttemptId().toString()); + return; + } attemptInfo.finishTime = event.getFinishTime(); attemptInfo.error = StringInterner.weakIntern(event.getError()); attemptInfo.status = StringInterner.weakIntern(event.getTaskStatus()); Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/OracleDBRecordReader.java Tue Aug 19 23:49:39 2014 @@ -81,15 +81,14 @@ public class OracleDBRecordReader0 && split.getStart() > 0){ + if (split.getLength() > 0){ String querystring = query.toString(); query = new StringBuilder(); query.append("SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( "); query.append(querystring); - query.append(" ) a WHERE rownum <= ").append(split.getStart()); - query.append(" + ").append(split.getLength()); - query.append(" ) WHERE dbif_rno >= ").append(split.getStart()); + query.append(" ) a WHERE rownum <= ").append(split.getEnd()); + query.append(" ) WHERE dbif_rno > ").append(split.getStart()); } } catch (IOException ex) { // ignore, will not throw. Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Tue Aug 19 23:49:39 2014 @@ -579,7 +579,7 @@ public abstract class CombineFileInputFo blocks = new OneBlockInfo[0]; } else { - if(locations.length == 0) { + if(locations.length == 0 && !stat.isDirectory()) { locations = new BlockLocation[] { new BlockLocation() }; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Tue Aug 19 23:49:39 2014 @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapred.LocatedFileStatusFetcher; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -359,6 +360,15 @@ public abstract class FileInputFormat SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, - blkLocations[blkIndex].getHosts())); + blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, - blkLocations[blkIndex].getHosts())); + blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable - splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); + splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), + blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java Tue Aug 19 23:49:39 2014 @@ -22,11 +22,13 @@ import java.io.IOException; import java.io.DataInput; import java.io.DataOutput; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -41,6 +43,7 @@ public class FileSplit extends InputSpli private long start; private long length; private String[] hosts; + private SplitLocationInfo[] hostInfos; public FileSplit() {} @@ -57,6 +60,31 @@ public class FileSplit extends InputSpli this.length = length; this.hosts = hosts; } + + /** Constructs a split with host and cached-blocks information + * + * @param file the file name + * @param start the position of the first byte in the file to process + * @param length the number of bytes in the file to process + * @param hosts the list of hosts containing the block + * @param inMemoryHosts the list of hosts containing the block in memory + */ + public FileSplit(Path file, long start, long length, String[] hosts, + String[] inMemoryHosts) { + this(file, start, length, hosts); + hostInfos = new SplitLocationInfo[hosts.length]; + for (int i = 0; i < hosts.length; i++) { + // because N will be tiny, scanning is probably faster than a HashSet + boolean inMemory = false; + for (String inMemoryHost : inMemoryHosts) { + if (inMemoryHost.equals(hosts[i])) { + inMemory = true; + break; + } + } + hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory); + } + } /** The file containing this split's data. */ public Path getPath() { return file; } @@ -98,4 +126,10 @@ public class FileSplit extends InputSpli return this.hosts; } } + + @Override + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return hostInfos; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/KeyValueTextInputFormat.java Tue Aug 19 23:49:39 2014 @@ -38,6 +38,9 @@ import org.apache.hadoop.mapreduce.TaskA * Either line feed or carriage-return are used to signal end of line. * Each line is divided into key and value parts by a separator byte. If no * such a byte exists, the key will be the entire line and value will be empty. + * The separator byte can be specified in config file under the attribute name + * mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default + * is the tab character ('\t'). */ @InterfaceAudience.Public @InterfaceStability.Stable Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Tue Aug 19 23:49:39 2014 @@ -121,7 +121,7 @@ public class LineRecordReader extends Re private int maxBytesToConsume(long pos) { return isCompressedInput ? Integer.MAX_VALUE - : (int) Math.min(Integer.MAX_VALUE, end - pos); + : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength); } private long getFilePosition() throws IOException { @@ -134,6 +134,39 @@ public class LineRecordReader extends Re return retVal; } + private int skipUtfByteOrderMark() throws IOException { + // Strip BOM(Byte Order Mark) + // Text only support UTF-8, we only need to check UTF-8 BOM + // (0xEF,0xBB,0xBF) at the start of the text stream. + int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength, + Integer.MAX_VALUE); + int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos)); + // Even we read 3 extra bytes for the first line, + // we won't alter existing behavior (no backwards incompat issue). + // Because the newSize is less than maxLineLength and + // the number of bytes copied to Text is always no more than newSize. + // If the return size from readLine is not less than maxLineLength, + // we will discard the current line and read the next line. + pos += newSize; + int textLength = value.getLength(); + byte[] textBytes = value.getBytes(); + if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) && + (textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) { + // find UTF-8 BOM, strip it. + LOG.info("Found UTF-8 BOM and skipped it"); + textLength -= 3; + newSize -= 3; + if (textLength > 0) { + // It may work to use the same buffer and not do the copyBytes + textBytes = value.copyBytes(); + value.set(textBytes, 3, textLength); + } else { + value.clear(); + } + } + return newSize; + } + public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); @@ -146,10 +179,14 @@ public class LineRecordReader extends Re // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { - newSize = in.readLine(value, maxLineLength, - Math.max(maxBytesToConsume(pos), maxLineLength)); - pos += newSize; - if (newSize < maxLineLength) { + if (pos == 0) { + newSize = skipUtfByteOrderMark(); + } else { + newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); + pos += newSize; + } + + if ((newSize == 0) || (newSize < maxLineLength)) { break; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputFormat.java Tue Aug 19 23:49:39 2014 @@ -39,7 +39,6 @@ import org.apache.hadoop.mapreduce.TaskA /** * An InputFormat capable of performing joins over a set of data sources sorted * and partitioned the same way. - * @see #setFormat * * A user may define new join types by setting the property * mapreduce.join.define.<ident> to a classname. @@ -47,6 +46,7 @@ import org.apache.hadoop.mapreduce.TaskA * assumed to be a ComposableRecordReader. * mapreduce.join.keycomparator can be a classname used to compare * keys in the join. + * @see #setFormat * @see JoinRecordReader * @see MultiFilterRecordReader */ Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/CompositeRecordReader.java Tue Aug 19 23:49:39 2014 @@ -109,7 +109,7 @@ public abstract class CompositeRecordRea } // create priority queue if (null == q) { - cmp = WritableComparator.get(keyclass); + cmp = WritableComparator.get(keyclass, conf); q = new PriorityQueue >(3, new Comparator >() { public int compare(ComposableRecordReader o1, Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/WrappedRecordReader.java Tue Aug 19 23:49:39 2014 @@ -92,7 +92,7 @@ public class WrappedRecordReader exte public void setupTask(TaskAttemptContext taskContext) { } @Override + @Deprecated public boolean isRecoverySupported() { return true; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Tue Aug 19 23:49:39 2014 @@ -319,6 +319,7 @@ class Fetcher extends Thread { // If connect did not succeed, just mark all the maps as failed, // indirectly penalizing the host + scheduler.hostFailed(host.getHostName()); for(TaskAttemptID left: remaining) { scheduler.copyFailed(left, host, false, connectExcpt); } @@ -343,6 +344,7 @@ class Fetcher extends Thread { if(failedTasks != null && failedTasks.length > 0) { LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks)); + scheduler.hostFailed(host.getHostName()); for(TaskAttemptID left: failedTasks) { scheduler.copyFailed(left, host, true, false); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Tue Aug 19 23:49:39 2014 @@ -156,7 +156,8 @@ public class MergeManagerImpl impl this.rfs = ((LocalFileSystem)localFS).getRaw(); final float maxInMemCopyUse = - jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f); + jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, + MRJobConfig.DEFAULT_SHUFFLE_INPUT_BUFFER_PERCENT); if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) { throw new IllegalArgumentException("Invalid value for " + MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " + @@ -197,7 +198,7 @@ public class MergeManagerImpl impl "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold); if (this.maxSingleShuffleLimit >= this.mergeThreshold) { - throw new RuntimeException("Invlaid configuration: " + throw new RuntimeException("Invalid configuration: " + "maxSingleShuffleLimit should be less than mergeThreshold" + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit + "mergeThreshold: " + this.mergeThreshold); Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java Tue Aug 19 23:49:39 2014 @@ -18,7 +18,6 @@ package org.apache.hadoop.mapreduce.task.reduce; import java.io.IOException; - import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; @@ -101,6 +100,7 @@ public class ShuffleSchedulerImpl i private final boolean reportReadErrorImmediately; private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY; + private int maxHostFailures; public ShuffleSchedulerImpl(JobConf job, TaskStatus status, TaskAttemptID reduceId, @@ -132,6 +132,9 @@ public class ShuffleSchedulerImpl i this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY, MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY); + this.maxHostFailures = job.getInt( + MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES, + MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES); } @Override @@ -213,9 +216,18 @@ public class ShuffleSchedulerImpl i progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at " + mbpsFormat.format(transferRate) + " MB/s)"); } + + public synchronized void hostFailed(String hostname) { + if (hostFailures.containsKey(hostname)) { + IntWritable x = hostFailures.get(hostname); + x.set(x.get() + 1); + } else { + hostFailures.put(hostname, new IntWritable(1)); + } + } public synchronized void copyFailed(TaskAttemptID mapId, MapHost host, - boolean readError, boolean connectExcpt) { + boolean readError, boolean connectExcpt) { host.penalize(); int failures = 1; if (failureCounts.containsKey(mapId)) { @@ -226,12 +238,9 @@ public class ShuffleSchedulerImpl i failureCounts.put(mapId, new IntWritable(1)); } String hostname = host.getHostName(); - if (hostFailures.containsKey(hostname)) { - IntWritable x = hostFailures.get(hostname); - x.set(x.get() + 1); - } else { - hostFailures.put(hostname, new IntWritable(1)); - } + //report failure if already retried maxHostFailures times + boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() ? true : false; + if (failures >= abortFailureLimit) { try { throw new IOException(failures + " failures downloading " + mapId); @@ -240,7 +249,7 @@ public class ShuffleSchedulerImpl i } } - checkAndInformJobTracker(failures, mapId, readError, connectExcpt); + checkAndInformJobTracker(failures, mapId, readError, connectExcpt, hostFail); checkReducerHealth(); @@ -270,9 +279,9 @@ public class ShuffleSchedulerImpl i // after every 'maxFetchFailuresBeforeReporting' failures private void checkAndInformJobTracker( int failures, TaskAttemptID mapId, boolean readError, - boolean connectExcpt) { + boolean connectExcpt, boolean hostFailed) { if (connectExcpt || (reportReadErrorImmediately && readError) - || ((failures % maxFetchFailuresBeforeReporting) == 0)) { + || ((failures % maxFetchFailuresBeforeReporting) == 0) || hostFailed) { LOG.info("Reporting fetch failure for " + mapId + " to jobtracker."); status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId); } @@ -507,4 +516,7 @@ public class ShuffleSchedulerImpl i referee.join(); } + public int getMaxHostFailures() { + return maxHostFailures; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java Tue Aug 19 23:49:39 2014 @@ -38,6 +38,21 @@ public class HostUtil { httpPort + "/tasklog?attemptid=" + taskAttemptID); } + /** + * Always throws {@link RuntimeException} because this method is not + * supposed to be called at runtime. This method is only for keeping + * binary compatibility with Hive 0.13. MAPREDUCE-5830 for the details. + * @deprecated Use {@link #getTaskLogUrl(String, String, String, String)} + * to construct the taskLogUrl. + */ + @Deprecated + public static String getTaskLogUrl(String taskTrackerHostName, + String httpPort, String taskAttemptID) { + throw new RuntimeException( + "This method is not supposed to be called at runtime. " + + "Use HostUtil.getTaskLogUrl(String, String, String, String) instead."); + } + public static String convertTrackerNameToHostName(String trackerName) { // Ugly! // Convert the trackerName to its host name Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Tue Aug 19 23:49:39 2014 @@ -83,6 +83,16 @@ + + +mapreduce.job.reducer.preempt.delay.sec +0 +The threshold in terms of seconds after which an unsatisfied mapper + request triggers reducer preemption to free space. Default 0 implies that the + reduces should be preempted immediately after allocation if there is currently no + room for newly allocated mappers. + +mapreduce.job.max.split.locations 10 The max number of block locations to store for each split for @@ -661,7 +671,7 @@ mapreduce.task.profile.params -+ -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s JVM profiler parameters used to profile map and reduce task attempts. This string may contain a single format specifier %s that will be replaced by the path to profile.out in the task attempt log directory. @@ -1215,7 +1225,9 @@ mapreduce.job.classloader.system.classes -java.,javax.,org.apache.commons.logging.,org.apache.log4j.,org.apache.hadoop. +java.,javax.,org.w3c.dom.,org.xml.sax.,org.apache.commons.logging., + org.apache.log4j.,org.apache.hadoop.,core-default.xml, + hdfs-default.xml,mapred-default.xml,yarn-default.xml A comma-separated list of classes that should be loaded from the system classpath, not the user-supplied JARs, when mapreduce.job.classloader is enabled. Names ending in '.' (period) are treated as package names, Propchange: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/branches/HDFS-2006/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1588992-1596568 Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1582150-1619000 Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/DistributedCacheDeploy.apt.vm Tue Aug 19 23:49:39 2014 @@ -18,8 +18,6 @@ Hadoop MapReduce Next Generation - Distributed Cache Deploy - \[ {{{./index.html}Go Back}} \] - * Introduction The MapReduce application framework has rudimentary support for deploying a Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/EncryptedShuffle.apt.vm Tue Aug 19 23:49:39 2014 @@ -18,8 +18,6 @@ Hadoop MapReduce Next Generation - Encrypted Shuffle - \[ {{{./index.html}Go Back}} \] - * {Introduction} The Encrypted Shuffle capability allows encryption of the MapReduce shuffle Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/MapReduce_Compatibility_Hadoop1_Hadoop2.apt.vm Tue Aug 19 23:49:39 2014 @@ -18,8 +18,6 @@ Apache Hadoop MapReduce - Migrating from Apache Hadoop 1.x to Apache Hadoop 2.x - \[ {{{../../hadoop-yarn/hadoop-yarn-site/index.html}Go Back}} \] - * {Introduction} This document provides information for users to migrate their Apache Hadoop Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm Tue Aug 19 23:49:39 2014 @@ -18,8 +18,6 @@ Hadoop MapReduce Next Generation - Pluggable Shuffle and Pluggable Sort - \[ {{{./index.html}Go Back}} \] - * Introduction The pluggable shuffle and pluggable sort capabilities allow replacing the Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestClusterStatus.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestClusterStatus.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestClusterStatus.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestClusterStatus.java Tue Aug 19 23:49:39 2014 @@ -17,7 +17,7 @@ */ package org.apache.hadoop.mapred; -import junit.framework.Assert; +import org.junit.Assert; import org.junit.Test; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestCounters.java Tue Aug 19 23:49:39 2014 @@ -26,7 +26,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Random; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Tue Aug 19 23:49:39 2014 @@ -103,6 +103,29 @@ public class TestFileInputFormat { } @Test + public void testSplitLocationInfo() throws Exception { + Configuration conf = getConfiguration(); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1/a2"); + JobConf job = new JobConf(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + fileInputFormat.configure(job); + FileSplit[] splits = (FileSplit[]) fileInputFormat.getSplits(job, 1); + String[] locations = splits[0].getLocations(); + Assert.assertEquals(2, locations.length); + SplitLocationInfo[] locationInfo = splits[0].getLocationInfo(); + Assert.assertEquals(2, locationInfo.length); + SplitLocationInfo localhostInfo = locations[0].equals("localhost") ? + locationInfo[0] : locationInfo[1]; + SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ? + locationInfo[0] : locationInfo[1]; + Assert.assertTrue(localhostInfo.isOnDisk()); + Assert.assertTrue(localhostInfo.isInMemory()); + Assert.assertTrue(otherhostInfo.isOnDisk()); + Assert.assertFalse(otherhostInfo.isInMemory()); + } + + @Test public void testListStatusSimple() throws IOException { Configuration conf = new Configuration(); conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads); @@ -223,8 +246,9 @@ public class TestFileInputFormat { public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException { return new BlockLocation[] { - new BlockLocation(new String[] { "localhost:50010" }, - new String[] { "localhost" }, 0, len) }; + new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" }, + new String[] { "localhost", "otherhost" }, new String[] { "localhost" }, + new String[0], 0, len, false) }; } @Override Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobQueueClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobQueueClient.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobQueueClient.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobQueueClient.java Tue Aug 19 23:49:39 2014 @@ -23,7 +23,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintWriter; -import junit.framework.Assert; +import org.junit.Assert; import org.junit.Test; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java Tue Aug 19 23:49:39 2014 @@ -23,9 +23,12 @@ import static org.junit.Assert.assertNot import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.net.URL; +import java.util.ArrayList; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -97,4 +100,129 @@ public class TestLineRecordReader { // character is a linefeed testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498); } + + // Use the LineRecordReader to read records from the file + public ArrayList readRecords(URL testFileUrl, int splitSize) + throws IOException { + + // Set up context + File testFile = new File(testFileUrl.getFile()); + long testFileSize = testFile.length(); + Path testFilePath = new Path(testFile.getAbsolutePath()); + Configuration conf = new Configuration(); + conf.setInt("io.file.buffer.size", 1); + + // Gather the records returned by the record reader + ArrayList records = new ArrayList (); + + long offset = 0; + LongWritable key = new LongWritable(); + Text value = new Text(); + while (offset < testFileSize) { + FileSplit split = + new FileSplit(testFilePath, offset, splitSize, (String[]) null); + LineRecordReader reader = new LineRecordReader(conf, split); + + while (reader.next(key, value)) { + records.add(value.toString()); + } + offset += splitSize; + } + return records; + } + + // Gather the records by just splitting on new lines + public String[] readRecordsDirectly(URL testFileUrl, boolean bzip) + throws IOException { + int MAX_DATA_SIZE = 1024 * 1024; + byte[] data = new byte[MAX_DATA_SIZE]; + FileInputStream fis = new FileInputStream(testFileUrl.getFile()); + int count; + if (bzip) { + BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis); + count = bzIn.read(data); + bzIn.close(); + } else { + count = fis.read(data); + } + fis.close(); + assertTrue("Test file data too big for buffer", count < data.length); + return new String(data, 0, count, "UTF-8").split("\n"); + } + + public void checkRecordSpanningMultipleSplits(String testFile, + int splitSize, + boolean bzip) + throws IOException { + URL testFileUrl = getClass().getClassLoader().getResource(testFile); + ArrayList records = readRecords(testFileUrl, splitSize); + String[] actuals = readRecordsDirectly(testFileUrl, bzip); + + assertEquals("Wrong number of records", actuals.length, records.size()); + + boolean hasLargeRecord = false; + for (int i = 0; i < actuals.length; ++i) { + assertEquals(actuals[i], records.get(i)); + if (actuals[i].length() > 2 * splitSize) { + hasLargeRecord = true; + } + } + + assertTrue("Invalid test data. Doesn't have a large enough record", + hasLargeRecord); + } + + @Test + public void testRecordSpanningMultipleSplits() + throws IOException { + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt", + 10, false); + } + + @Test + public void testRecordSpanningMultipleSplitsCompressed() + throws IOException { + // The file is generated with bz2 block size of 100k. The split size + // needs to be larger than that for the CompressedSplitLineReader to + // work. + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2", + 200 * 1000, true); + } + + @Test + public void testStripBOM() throws IOException { + // the test data contains a BOM at the start of the file + // confirm the BOM is skipped by LineRecordReader + String UTF8_BOM = "\uFEFF"; + URL testFileUrl = getClass().getClassLoader().getResource("testBOM.txt"); + assertNotNull("Cannot find testBOM.txt", testFileUrl); + File testFile = new File(testFileUrl.getFile()); + Path testFilePath = new Path(testFile.getAbsolutePath()); + long testFileSize = testFile.length(); + Configuration conf = new Configuration(); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + + // read the data and check whether BOM is skipped + FileSplit split = new FileSplit(testFilePath, 0, testFileSize, + (String[])null); + LineRecordReader reader = new LineRecordReader(conf, split); + LongWritable key = new LongWritable(); + Text value = new Text(); + int numRecords = 0; + boolean firstLine = true; + boolean skipBOM = true; + while (reader.next(key, value)) { + if (firstLine) { + firstLine = false; + if (value.toString().startsWith(UTF8_BOM)) { + skipBOM = false; + } + } + ++numRecords; + } + reader.close(); + + assertTrue("BOM is not skipped", skipBOM); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java Tue Aug 19 23:49:39 2014 @@ -35,7 +35,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import static junit.framework.Assert.*; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; /** Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskLog.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskLog.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskLog.java Tue Aug 19 23:49:39 2014 @@ -17,8 +17,8 @@ */ package org.apache.hadoop.mapred; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/db/TestDbClasses.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/db/TestDbClasses.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/db/TestDbClasses.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/db/TestDbClasses.java Tue Aug 19 23:49:39 2014 @@ -110,7 +110,7 @@ public class TestDbClasses { splitter, NullDBWritable.class, configuration, connect, dbConfiguration, "condition", fields, "table"); assertEquals( - "SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( SELECT f1, f2 FROM table WHERE condition ORDER BY Order ) a WHERE rownum <= 1 + 9 ) WHERE dbif_rno >= 1", + "SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( SELECT f1, f2 FROM table WHERE condition ORDER BY Order ) a WHERE rownum <= 10 ) WHERE dbif_rno > 1", recorder.getSelectQuery()); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileRecordReader.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileRecordReader.java Tue Aug 19 23:49:39 2014 @@ -22,7 +22,7 @@ import java.io.FileWriter; import java.io.IOException; import java.io.File; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Tue Aug 19 23:49:39 2014 @@ -26,7 +26,7 @@ import java.util.Set; import javax.annotation.Nullable; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.junit.After; @@ -139,6 +140,28 @@ public class TestFileInputFormat { 1, mockFs.numListLocatedStatusCalls); FileSystem.closeAll(); } + + @Test + public void testSplitLocationInfo() throws Exception { + Configuration conf = getConfiguration(); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1/a2"); + Job job = Job.getInstance(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + List splits = fileInputFormat.getSplits(job); + String[] locations = splits.get(0).getLocations(); + Assert.assertEquals(2, locations.length); + SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo(); + Assert.assertEquals(2, locationInfo.length); + SplitLocationInfo localhostInfo = locations[0].equals("localhost") ? + locationInfo[0] : locationInfo[1]; + SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ? + locationInfo[0] : locationInfo[1]; + Assert.assertTrue(localhostInfo.isOnDisk()); + Assert.assertTrue(localhostInfo.isInMemory()); + Assert.assertTrue(otherhostInfo.isOnDisk()); + Assert.assertFalse(otherhostInfo.isInMemory()); + } @Test public void testListStatusSimple() throws IOException { @@ -402,9 +425,9 @@ public class TestFileInputFormat { public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException { return new BlockLocation[] { - new BlockLocation(new String[] { "localhost:50010" }, - new String[] { "localhost" }, 0, len) }; - } + new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" }, + new String[] { "localhost", "otherhost" }, new String[] { "localhost" }, + new String[0], 0, len, false) }; } @Override protected RemoteIterator listLocatedStatus(Path f, Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestLineRecordReader.java Tue Aug 19 23:49:39 2014 @@ -23,9 +23,12 @@ import static org.junit.Assert.assertNot import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.net.URL; +import java.util.ArrayList; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -101,4 +104,131 @@ public class TestLineRecordReader { // character is a linefeed testSplitRecords("blockEndingInCRThenLF.txt.bz2", 136498); } + + // Use the LineRecordReader to read records from the file + public ArrayList readRecords(URL testFileUrl, int splitSize) + throws IOException { + + // Set up context + File testFile = new File(testFileUrl.getFile()); + long testFileSize = testFile.length(); + Path testFilePath = new Path(testFile.getAbsolutePath()); + Configuration conf = new Configuration(); + conf.setInt("io.file.buffer.size", 1); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + + // Gather the records returned by the record reader + ArrayList records = new ArrayList (); + + long offset = 0; + while (offset < testFileSize) { + FileSplit split = new FileSplit(testFilePath, offset, splitSize, null); + LineRecordReader reader = new LineRecordReader(); + reader.initialize(split, context); + + while (reader.nextKeyValue()) { + records.add(reader.getCurrentValue().toString()); + } + offset += splitSize; + } + return records; + } + + // Gather the records by just splitting on new lines + public String[] readRecordsDirectly(URL testFileUrl, boolean bzip) + throws IOException { + int MAX_DATA_SIZE = 1024 * 1024; + byte[] data = new byte[MAX_DATA_SIZE]; + FileInputStream fis = new FileInputStream(testFileUrl.getFile()); + int count; + if (bzip) { + BZip2CompressorInputStream bzIn = new BZip2CompressorInputStream(fis); + count = bzIn.read(data); + bzIn.close(); + } else { + count = fis.read(data); + } + fis.close(); + assertTrue("Test file data too big for buffer", count < data.length); + return new String(data, 0, count, "UTF-8").split("\n"); + } + + public void checkRecordSpanningMultipleSplits(String testFile, + int splitSize, + boolean bzip) + throws IOException { + URL testFileUrl = getClass().getClassLoader().getResource(testFile); + ArrayList records = readRecords(testFileUrl, splitSize); + String[] actuals = readRecordsDirectly(testFileUrl, bzip); + + assertEquals("Wrong number of records", actuals.length, records.size()); + + boolean hasLargeRecord = false; + for (int i = 0; i < actuals.length; ++i) { + assertEquals(actuals[i], records.get(i)); + if (actuals[i].length() > 2 * splitSize) { + hasLargeRecord = true; + } + } + + assertTrue("Invalid test data. Doesn't have a large enough record", + hasLargeRecord); + } + + @Test + public void testRecordSpanningMultipleSplits() + throws IOException { + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt", + 10, + false); + } + + @Test + public void testRecordSpanningMultipleSplitsCompressed() + throws IOException { + // The file is generated with bz2 block size of 100k. The split size + // needs to be larger than that for the CompressedSplitLineReader to + // work. + checkRecordSpanningMultipleSplits("recordSpanningMultipleSplits.txt.bz2", + 200 * 1000, + true); + } + + @Test + public void testStripBOM() throws IOException { + // the test data contains a BOM at the start of the file + // confirm the BOM is skipped by LineRecordReader + String UTF8_BOM = "\uFEFF"; + URL testFileUrl = getClass().getClassLoader().getResource("testBOM.txt"); + assertNotNull("Cannot find testBOM.txt", testFileUrl); + File testFile = new File(testFileUrl.getFile()); + Path testFilePath = new Path(testFile.getAbsolutePath()); + long testFileSize = testFile.length(); + Configuration conf = new Configuration(); + conf.setInt(org.apache.hadoop.mapreduce.lib.input. + LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE); + + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + + // read the data and check whether BOM is skipped + FileSplit split = new FileSplit(testFilePath, 0, testFileSize, + (String[])null); + LineRecordReader reader = new LineRecordReader(); + reader.initialize(split, context); + int numRecords = 0; + boolean firstLine = true; + boolean skipBOM = true; + while (reader.nextKeyValue()) { + if (firstLine) { + firstLine = false; + if (reader.getCurrentValue().toString().startsWith(UTF8_BOM)) { + skipBOM = false; + } + } + ++numRecords; + } + reader.close(); + + assertTrue("BOM is not skipped", skipBOM); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java Tue Aug 19 23:49:39 2014 @@ -30,7 +30,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -193,7 +193,7 @@ public class TestMerger { RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass, valueClass, segments, 2, tmpDir, comparator, getReporter(), readsCounter, writesCounter, mergePhase); - Assert.assertEquals(1.0f, mergeQueue.getProgress().get()); + Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), 0.0f); } private Progressable getReporter() { @@ -274,4 +274,4 @@ public class TestMerger { } }; } -} \ No newline at end of file +}