hadoop-common-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From elton sky <eltonsky9...@gmail.com>
Subject How does MergeQueue.merge actually sort <K,V> from different segments ??
Date Fri, 18 Jun 2010 01:33:25 GMT
Hello everyone,

I am going thru source code of MapReduce. In MergeQueue.merge, I can only
see the SEGMENTS are combined and sorted by length into a list for merge.
However, I could not find the procedure to sort those (key, value) in
segments by key...

here is the function:

   1. RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
   2. .
   3. .
   4. .
   5.
   6.         //if we have lesser number of segments remaining, then just
   return the
   7.         //iterator, else do another single level merge
   8.         if (numSegments <= factor) {
   9.           // Reset totalBytesProcessed to track the progress of the
   final merge.
   10.           // This is considered the progress of the reducePhase, the
   3rd phase
   11.           // of reduce task. Currently totalBytesProcessed is not
   used in sort
   12.           // phase of reduce task(i.e. when intermediate merges
   happen).
   13.           totalBytesProcessed = startBytes;
   14.
   15.           //calculate the length of the remaining segments. Required
   for
   16.           //calculating the merge progress
   17.           long totalBytes = 0;
   18.           for (int i = 0; i < segmentsToMerge.size(); i++) {
   19.             totalBytes += segmentsToMerge.get(i).getLength();
   20.           }
   21.           if (totalBytes != 0) //being paranoid
   22.             progPerByte = 1.0f / (float)totalBytes;
   23.
   24.           if (totalBytes != 0)
   25.             mergeProgress.set(totalBytesProcessed * progPerByte);
   26.           else
   27.             mergeProgress.set(1.0f); // Last pass and no segments
   left - we're done
   28.
   29.           LOG.info("Down to the last merge-pass, with " + numSegments
   +
   30.                    " segments left of total size: " + totalBytes + "
   bytes");
   31.           return this;
   32.         } else {
   33.           LOG.info("Merging " + segmentsToMerge.size() +
   34.                    " intermediate segments out of a total of " +
   35.                    (segments.size()+segmentsToMerge.size()));
   36.
   37.           //we want to spread the creation of temp files on multiple
   disks if
   38.           //available under the space constraints
   39.           long approxOutputSize = 0;
   40.           for (Segment<K, V> s : segmentsToMerge) {
   41.             approxOutputSize += s.getLength() +
   42.
      ChecksumFileSystem.getApproxChkSumLength(
   43.                                 s.getLength());
   44.           }
   45.           Path tmpFilename =
   46.             new Path(tmpDir, "intermediate").suffix("." + passNo);
   47.
   48.           Path outputFile =  lDirAlloc.getLocalPathForWrite(
   49.                                               tmpFilename.toString(),
   50.                                               approxOutputSize,
   conf);
   51.
   52.           Writer<K, V> writer =
   53.             new Writer<K, V>(conf, fs, outputFile, keyClass,
   valueClass, codec,
   54.                              writesCounter);
   55.           *writeFile(this, writer, reporter, conf);*
   56.           writer.close();
   57.
   58.           //we finished one single level merge; now clean up the
   priority
   59.           //queue
   60.           this.close();
   61.
   62.           // Add the newly create segment to the list of segments to
   be merged
   63.           Segment<K, V> tempSegment =
   64.             new Segment<K, V>(conf, fs, outputFile, codec, false);
   65.           segments.add(tempSegment);
   66.           numSegments = segments.size();
   67.           Collections.sort(segments, segmentComparator);
   68.
   69.           passNo++;
   70.         }
   71.         //we are worried about only the first pass merge factor. So
   reset the
   72.         //factor to what it originally was
   73.         factor = origFactor;
   74.       } while(true);
   75.     }

I can see if number of segments is less than factor, segments are
returned(is this right?). Otherwise, factor number of segments will be
merged pass by pass. But how those <K,V> in different segments get sort in
order ?

Elton

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message