Author: ddas Date: Tue May 13 12:40:06 2008 New Revision: 655984 URL: http://svn.apache.org/viewvc?rev=655984&view=rev Log: HADOOP-3365. Removes an unnecessary copy of the key from SegmentDescriptor to MergeQueue. Contributed by Devaraj Das. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=655984&r1=655983&r2=655984&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Tue May 13 12:40:06 2008 @@ -225,6 +225,9 @@ HADOOP-3349. A file rename was incorrectly changing the name inside a lease record. (Tsz Wo (Nicholas), SZE via dhruba) + HADOOP-3365. Removes an unnecessary copy of the key from SegmentDescriptor + to MergeQueue. (Devaraj Das) + Release 0.17.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=655984&r1=655983&r2=655984&view=diff ============================================================================== --- hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original) +++ hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Tue May 13 12:40:06 2008 @@ -2690,6 +2690,7 @@ private Progress mergeProgress = new Progress(); private Path tmpDir; private Progressable progress = null; //handle to the progress reporting object + private SegmentDescriptor minSegment; //a TreeMap used to store the segments sorted by size (segment offset and //segment path name is used to break ties between segments of same sizes) @@ -2738,6 +2739,7 @@ while ((ms = (SegmentDescriptor)pop()) != null) { ms.cleanup(); } + minSegment = null; } public DataOutputBuffer getKey() throws IOException { return rawKey; @@ -2748,21 +2750,25 @@ public boolean next() throws IOException { if (size() == 0) return false; - SegmentDescriptor ms = (SegmentDescriptor)top(); - //save the raw key - rawKey.reset(); - rawKey.write(ms.getKey().getData(), 0, ms.getKey().getLength()); + int valLength; + if (minSegment != null) { + //minSegment is non-null for all invocations of next except the first + //one. For the first invocation, the priority queue is ready for use + //but for the subsequent invocations, first adjust the queue + adjustPriorityQueue(minSegment); + if (size() == 0) { + minSegment = null; + return false; + } + } + minSegment = (SegmentDescriptor)top(); + //save the raw key reference + rawKey = minSegment.getKey(); //load the raw value. Re-use the existing rawValue buffer - if (rawValue == null) - rawValue = ms.in.createValueBytes(); - int valLength = ms.nextRawValue(rawValue); - - if (ms.nextRawKey()) { - adjustTop(); - } else { - pop(); - ms.cleanup(); + if (rawValue == null) { + rawValue = minSegment.in.createValueBytes(); } + valLength = minSegment.nextRawValue(rawValue); if (progPerByte > 0) { totalBytesProcessed += rawKey.getLength() + valLength; mergeProgress.set(totalBytesProcessed * progPerByte); @@ -2774,6 +2780,14 @@ return mergeProgress; } + private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{ + if (ms.nextRawKey()) { + adjustTop(); + } else { + pop(); + ms.cleanup(); + } + } /** This is the single level merge that is called multiple times * depending on the factor size and the number of segments * @return RawKeyValueIterator