Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 88666 invoked from network); 18 Nov 2008 09:36:34 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 18 Nov 2008 09:36:34 -0000 Received: (qmail 33460 invoked by uid 500); 18 Nov 2008 09:36:42 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 33421 invoked by uid 500); 18 Nov 2008 09:36:42 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 33405 invoked by uid 99); 18 Nov 2008 09:36:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Nov 2008 01:36:41 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Nov 2008 09:35:22 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2ED572388961; Tue, 18 Nov 2008 01:35:37 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r718533 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ Date: Tue, 18 Nov 2008 09:35:35 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081118093537.2ED572388961@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ddas Date: Tue Nov 18 01:35:32 2008 New Revision: 718533 URL: http://svn.apache.org/viewvc?rev=718533&view=rev Log: HADOOP-2774. Provide a counter for the number of first level spill files in the map task and a counter for counting the number of spilled records in both map and reduce tasks. Contributed by Ravi Gummadi. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=718533&r1=718532&r2=718533&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Tue Nov 18 01:35:32 2008 @@ -111,6 +111,10 @@ HADOOP-4612. Removes RunJar's dependency on JobClient. (Sharad Agarwal via ddas) + HADOOP-2774. Provide a counter for the number of first level spill files in + the map task and a counter for counting the number of spilled records in both + map and reduce tasks. + OPTIMIZATIONS BUG FIXES Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java?rev=718533&r1=718532&r2=718533&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IFile.java Tue Nov 18 01:35:32 2008 @@ -65,7 +65,10 @@ long decompressedBytesWritten = 0; long compressedBytesWritten = 0; - + + // Count records when written to disk. + private static long numRecordsWritten = 0; + IFileOutputStream checksumOut; Class keyClass; @@ -178,6 +181,7 @@ decompressedBytesWritten += keyLength + valueLength + WritableUtils.getVIntSize(keyLength) + WritableUtils.getVIntSize(valueLength); + numRecordsWritten++; } public void append(DataInputBuffer key, DataInputBuffer value) @@ -203,8 +207,13 @@ decompressedBytesWritten += keyLength + valueLength + WritableUtils.getVIntSize(keyLength) + WritableUtils.getVIntSize(valueLength); -} + numRecordsWritten++; + } + public static long getNumRecordsWritten() { + return numRecordsWritten; + } + public long getRawLength() { return decompressedBytesWritten; } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java?rev=718533&r1=718532&r2=718533&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IndexRecord.java Tue Nov 18 01:35:32 2008 @@ -30,11 +30,14 @@ final long startOffset; final long rawLength; final long partLength; + final long numRecords; - public IndexRecord(long startOffset, long rawLength, long partLength) { + public IndexRecord(long startOffset, long rawLength, long partLength, + long numRecords) { this.startOffset = startOffset; this.rawLength = rawLength; this.partLength = partLength; + this.numRecords = numRecords; } public static IndexRecord[] readIndexFile(Path indexFileName, @@ -60,8 +63,9 @@ long startOffset = wrapper.readLong(); long rawLength = wrapper.readLong(); long partLength = wrapper.readLong(); + long numRecords = wrapper.readLong(); indexRecordArray[i] = - new IndexRecord(startOffset, rawLength, partLength); + new IndexRecord(startOffset, rawLength, partLength, numRecords); } } finally { Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java?rev=718533&r1=718532&r2=718533&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MRConstants.java Tue Nov 18 01:35:32 2008 @@ -47,5 +47,10 @@ */ public static final String RAW_MAP_OUTPUT_LENGTH = "Raw-Map-Output-Length"; + /** + * The custom http header used for the number of map output records. + */ + public static final String MAP_OUTPUT_NUM_RECORDS = "Map-Output-Num-Records"; + public static final String WORKDIR = "work"; } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=718533&r1=718532&r2=718533&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapTask.java Tue Nov 18 01:35:32 2008 @@ -24,6 +24,7 @@ import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS; import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES; import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS; +import static org.apache.hadoop.mapred.Task.Counter.MAP_FIRST_LEVEL_SPILLS; import java.io.DataInput; import java.io.DataOutput; @@ -67,7 +68,7 @@ /** * The size of each record in the index file for the map-outputs. */ - public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24; + public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 32; private BytesWritable split = new BytesWritable(); @@ -447,6 +448,7 @@ private final Counters.Counter mapOutputRecordCounter; private final Counters.Counter combineInputCounter; private final Counters.Counter combineOutputCounter; + private final Counters.Counter firstLevelSpillsCounter; private ArrayList indexCacheList; private int totalIndexCacheMemory; @@ -509,6 +511,7 @@ mapOutputRecordCounter = counters.findCounter(MAP_OUTPUT_RECORDS); combineInputCounter = counters.findCounter(COMBINE_INPUT_RECORDS); combineOutputCounter = counters.findCounter(COMBINE_OUTPUT_RECORDS); + firstLevelSpillsCounter = counters.findCounter(MAP_FIRST_LEVEL_SPILLS); // compression if (job.getCompressMapOutput()) { Class codecClass = @@ -948,7 +951,10 @@ IFile.Writer writer = null; try { long segmentStart = out.getPos(); + long numRecordsInThisPartition = 0; writer = new Writer(job, out, keyClass, valClass, codec); + long prevCnt = writer.getNumRecordsWritten(); + if (null == combinerClass) { // spill directly DataInputBuffer key = new DataInputBuffer(); @@ -979,17 +985,19 @@ combineAndSpill(kvIter, combineInputCounter); } } - + numRecordsInThisPartition = Writer.getNumRecordsWritten() - prevCnt; // close the writer writer.close(); if (indexChecksumOut != null) { - // write the index as - writeIndexRecord(indexChecksumOut, segmentStart, writer); + // write the index as + // + writeIndexRecord(indexChecksumOut, segmentStart, writer, + numRecordsInThisPartition); } else { irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(), - writer.getCompressedLength()); + writer.getCompressedLength(), numRecordsInThisPartition); } writer = null; } finally { @@ -1058,12 +1066,13 @@ } writer.close(); + long numRecords = (i == partition) ? 1:0; if (indexChecksumOut != null) { - writeIndexRecord(indexChecksumOut,segmentStart,writer); + writeIndexRecord(indexChecksumOut,segmentStart,writer,numRecords); } else { irArray[i] = new IndexRecord(segmentStart, writer.getRawLength(), - writer.getCompressedLength()); + writer.getCompressedLength(), numRecords); } writer = null; } catch (IOException e) { @@ -1172,6 +1181,8 @@ long finalIndexFileSize = 0; Path [] filename = new Path[numSpills]; + firstLevelSpillsCounter.increment(numSpills); + for(int i = 0; i < numSpills; i++) { filename[i] = mapOutputFile.getSpillFile(getTaskID(), i); finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); @@ -1188,7 +1199,8 @@ writeSingleSpillIndexToFile(getTaskID(), new Path(filename[0].getParent(),"file.out.index")); } - return; + spilledRecordsCounter.increment(Writer.getNumRecordsWritten()); + return; } //make correction in the length to include the sequence file header //lengths for each partition @@ -1220,7 +1232,7 @@ Writer writer = new Writer(job, finalOut, keyClass, valClass, codec); writer.close(); - writeIndexRecord(finalIndexChecksumOut, segmentStart, writer); + writeIndexRecord(finalIndexChecksumOut, segmentStart, writer, 0); } finalOut.close(); finalIndexChecksumOut.close(); @@ -1256,7 +1268,6 @@ } indexRecord = null; } - //merge @SuppressWarnings("unchecked") RawKeyValueIterator kvIter = @@ -1270,19 +1281,37 @@ long segmentStart = finalOut.getPos(); Writer writer = new Writer(job, finalOut, keyClass, valClass, codec); + long numRecordsInThisPartition; + long prevCnt = Writer.getNumRecordsWritten(); if (null == combinerClass || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter); } else { combineCollector.setWriter(writer); combineAndSpill(kvIter, combineInputCounter); } + numRecordsInThisPartition = Writer.getNumRecordsWritten() - prevCnt; //close writer.close(); //write index record - writeIndexRecord(finalIndexChecksumOut,segmentStart, writer); + writeIndexRecord(finalIndexChecksumOut,segmentStart, writer, + numRecordsInThisPartition); } + + // In Map Phase, Spills to disk are done at 3 places: + // (1) First Level Spills in sortAndSpill() - either + // (a) without combiner + // or (b) with combiner + // (2) Outputs of Intermediate(multi-level) merges in Merger.merge + // (3) Output of final level merge - See above if-else + // (a) Merger.writeFile + // or (b) combineAndSpill + // In all the cases, IFile.Writer.append() takes care of counting + // the records written to disk + + spilledRecordsCounter.increment(Writer.getNumRecordsWritten()); + finalOut.close(); finalIndexChecksumOut.close(); finalIndexOut.close(); @@ -1295,7 +1324,7 @@ private void writeIndexRecord(IFileOutputStream indexOut, long start, - Writer writer) + Writer writer, long numRecords) throws IOException { //when we write the offset/decompressed-length/compressed-length to //the final index file, we write longs for both compressed and @@ -1311,8 +1340,9 @@ wrapper.writeLong(writer.getRawLength()); long segmentLength = writer.getCompressedLength(); wrapper.writeLong(segmentLength); + wrapper.writeLong(numRecords); LOG.info("Index: (" + start + ", " + writer.getRawLength() + ", " + - segmentLength + ")"); + segmentLength + "," + numRecords + ")"); } /** @@ -1366,6 +1396,7 @@ wrapper.writeLong(irArray[i].startOffset); wrapper.writeLong(irArray[i].rawLength); wrapper.writeLong(irArray[i].partLength); + wrapper.writeLong(irArray[i].numRecords); } wrapper.close(); Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=718533&r1=718532&r2=718533&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Tue Nov 18 01:35:32 2008 @@ -436,6 +436,17 @@ values.informReduceProgress(); } + // In Reduce Phase, Spills to disk are + // (1) mapoutput directly written to reduceNode's disk in shuffleToDisk() + // spilledRecordsCounter is updated after shuffleToDisk() + // (2) All other spills to disk are either through + // (a) Merger.writeFile() + // or (b) combineAndSpill() + // In cases 2(a) & 2(b), IFile.Writer.append() takes care of + // counting the records written to disk + + spilledRecordsCounter.increment(Writer.getNumRecordsWritten()); + //Clean up: repeated in catch block below reducer.close(); out.close(reporter); @@ -1234,6 +1245,8 @@ Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH)); long compressedLength = Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH)); + long numRecords = + Long.parseLong(connection.getHeaderField(MAP_OUTPUT_NUM_RECORDS)); // Check if this map-output can be saved in-memory boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); @@ -1255,6 +1268,7 @@ mapOutput = shuffleToDisk(mapOutputLoc, input, filename, compressedLength); + spilledRecordsCounter.increment(numRecords); } return mapOutput; Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=718533&r1=718532&r2=718533&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Tue Nov 18 01:35:32 2008 @@ -62,13 +62,15 @@ MAP_SKIPPED_RECORDS, MAP_INPUT_BYTES, MAP_OUTPUT_BYTES, + MAP_FIRST_LEVEL_SPILLS, COMBINE_INPUT_RECORDS, COMBINE_OUTPUT_RECORDS, REDUCE_INPUT_GROUPS, REDUCE_INPUT_RECORDS, REDUCE_OUTPUT_RECORDS, REDUCE_SKIPPED_GROUPS, - REDUCE_SKIPPED_RECORDS + REDUCE_SKIPPED_RECORDS, + SPILLED_RECORDS } /** @@ -131,6 +133,7 @@ protected JobContext jobContext; protected TaskAttemptContext taskContext; private volatile boolean commitPending = false; + protected final Counters.Counter spilledRecordsCounter; //////////////////////////////////////////// // Constructors @@ -139,6 +142,7 @@ public Task() { taskStatus = TaskStatus.createTaskStatus(isMapTask()); taskId = new TaskAttemptID(); + spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS); } public Task(String jobFile, TaskAttemptID taskId, int partition) { @@ -155,6 +159,7 @@ TaskStatus.Phase.SHUFFLE, counters); this.mapOutputFile.setJobId(taskId.getJobID()); + spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS); } //////////////////////////////////////////// Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=718533&r1=718532&r2=718533&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Nov 18 01:35:32 2008 @@ -2760,6 +2760,7 @@ final long startOffset = info.startOffset; final long rawPartLength = info.rawLength; final long partLength = info.partLength; + final long numRecords = info.numRecords; //set the custom "Raw-Map-Output-Length" http header to //the raw (decompressed) length @@ -2770,6 +2771,11 @@ response.setHeader(MAP_OUTPUT_LENGTH, Long.toString(partLength)); + //set the custom "Map-Output-Num-Records" http header to + //the actual number of records being transferred + response.setHeader(MAP_OUTPUT_NUM_RECORDS, + Long.toString(numRecords)); + //use the same buffersize as used for reading the data from disk response.setBufferSize(MAX_BYTES_TO_READ); Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=718533&r1=718532&r2=718533&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties Tue Nov 18 01:35:32 2008 @@ -14,5 +14,6 @@ REDUCE_OUTPUT_RECORDS.name= Reduce output records REDUCE_SKIPPED_RECORDS.name= Reduce skipped records REDUCE_SKIPPED_GROUPS.name= Reduce skipped groups - +SPILLED_RECORDS.name= Spilled Records +MAP_FIRST_LEVEL_SPILLS.name= Map First Level Spills Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java?rev=718533&r1=718532&r2=718533&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestIndexCache.java Tue Nov 18 01:35:32 2008 @@ -43,7 +43,7 @@ fs.delete(p, true); conf.setInt("mapred.tasktracker.indexcache.mb", 1); final int partsPerMap = 1000; - final int bytesPerFile = partsPerMap * 24; + final int bytesPerFile = partsPerMap * 32; IndexCache cache = new IndexCache(conf); // fill cache @@ -105,6 +105,7 @@ assertEquals(fill, rec.startOffset); assertEquals(fill, rec.rawLength); assertEquals(fill, rec.partLength); + assertEquals(fill, rec.numRecords); } private static void writeFile(FileSystem fs, Path f, long fill, int parts) @@ -116,6 +117,7 @@ dout.writeLong(fill); dout.writeLong(fill); dout.writeLong(fill); + dout.writeLong(fill); } dout.close(); }