hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r810944 [1/4] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/task/ src/java/org/apache/hadoop/mapreduce/task/reduce/...
Date Thu, 03 Sep 2009 13:56:22 GMT
Author: ddas
Date: Thu Sep  3 13:56:21 2009
New Revision: 810944

URL: http://svn.apache.org/viewvc?rev=810944&view=rev
Log:
MAPREDUCE-318. Modularizes the shuffle code. Contributed by Jothi Padmanabhan and Arun Murthy.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ExceptionReporter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryWriter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleClientMetrics.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleHeader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/BackupStore.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFileInputStream.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFileOutputStream.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/mapreduce/trunk/src/test/findbugsExcludeFile.xml
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Sep  3 13:56:21 2009
@@ -283,6 +283,9 @@
     MAPREDUCE-875. Make DBRecordReader execute queries lazily. (Aaron Kimball 
     via enis)
 
+    MAPREDUCE-318. Modularizes the shuffle code. (Jothi Padmanabhan and 
+    Arun Murthy via ddas)
+
   BUG FIXES
 
     MAPREDUCE-878. Rename fair scheduler design doc to 

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
Thu Sep  3 13:56:21 2009
@@ -654,6 +654,11 @@
         public int getNumSlots() {
           return t.getNumSlotsRequired();
         }
+
+        @Override
+        public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+          
+        }
       };
       taskStatuses.put(t.getTaskID().toString(), status);
       status.setRunState(TaskStatus.State.RUNNING);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/BackupStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/BackupStore.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/BackupStore.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/BackupStore.java Thu Sep  3 13:56:21
2009
@@ -35,7 +35,6 @@
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.IFile.InMemoryReader;
 import org.apache.hadoop.mapred.IFile.Reader;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Merger.Segment;
@@ -355,7 +354,11 @@
   
   private void clearSegmentList() throws IOException {
     for (Segment<K,V> segment: segmentList) {
+      long len = segment.getLength();
       segment.close();
+      if (segment.inMemory()) {
+       memCache.unreserve(len);
+      }
     }
     segmentList.clear();
   }
@@ -376,6 +379,10 @@
       }
     }
 
+    public void unreserve(long len) {
+      ramManager.unreserve((int)len);
+    }
+
     /**
      * Re-initialize the memory cache.
      * 
@@ -485,7 +492,7 @@
       ramManager.unreserve(blockSize - usedSize);
 
       Reader<K, V> reader = 
-        new InMemoryReader<K, V>(ramManager, 
+        new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null, 
             (org.apache.hadoop.mapred.TaskAttemptID) tid, 
             dataOut.getData(), 0, usedSize);
       Segment<K, V> segment = new Segment<K, V>(reader, false);
@@ -568,7 +575,6 @@
       availableSize = maxSize = size;
     }
 
-    @Override
     public boolean reserve(int requestedSize, InputStream in) {
       // Not used
       LOG.warn("Reserve(int, InputStream) not supported by BackupRamManager");
@@ -595,7 +601,6 @@
       }
     }
 
-    @Override
     public void unreserve(int requestedSize) {
       availableSize += requestedSize;
       LOG.debug("Unreserving: " + requestedSize +

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFile.java Thu Sep  3 13:56:21
2009
@@ -21,8 +21,6 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -48,10 +46,15 @@
  * 
  * There is a <code>Writer</code> to write out map-outputs in this format and

  * a <code>Reader</code> to read files of this format.
+ *
+ * <FRAMEWORK-USE-ONLY>
+ * This method is intended only for use by the Map/Reduce framework and not
+ * for external users
+ *
  */
-class IFile {
+public class IFile {
 
-  static final int EOF_MARKER = -1;
+  public static final int EOF_MARKER = -1; // End of File Marker
   
   /**
    * <code>IFile.Writer</code> to write out intermediate map-outputs. 
@@ -91,6 +94,10 @@
       ownOutputStream = true;
     }
     
+    protected Writer(Counters.Counter writesCounter) {
+      writtenRecordsCounter = writesCounter;
+    }
+
     public Writer(Configuration conf, FSDataOutputStream out, 
         Class<K> keyClass, Class<V> valueClass,
         CompressionCodec codec, Counters.Counter writesCounter)
@@ -273,18 +280,18 @@
 
     final InputStream in;        // Possibly decompressed stream that we read
     Decompressor decompressor;
-    long bytesRead = 0;
-    final long fileLength;
-    boolean eof = false;
+    public long bytesRead = 0;
+    protected final long fileLength;
+    protected boolean eof = false;
     final IFileInputStream checksumIn;
-    DataInputStream dataIn;
     
-    byte[] buffer = null;
-    int bufferSize = DEFAULT_BUFFER_SIZE;
-
-    int recNo = 1;
-    int currentKeyLength;
-    int currentValueLength;
+    protected byte[] buffer = null;
+    protected int bufferSize = DEFAULT_BUFFER_SIZE;
+    protected DataInputStream dataIn;
+
+    protected int recNo = 1;
+    protected int currentKeyLength;
+    protected int currentValueLength;
     byte keyBytes[] = new byte[0];
     
     
@@ -458,119 +465,4 @@
     }
 
   }    
-  
-  /**
-   * <code>IFile.InMemoryReader</code> to read map-outputs present in-memory.
-   */
-  public static class InMemoryReader<K, V> extends Reader<K, V> {
-    RamManager ramManager;
-    TaskAttemptID taskAttemptId;
-    DataInputBuffer memDataIn = new DataInputBuffer();
-    private int start;
-    private int length;
-    public InMemoryReader(RamManager ramManager, TaskAttemptID taskAttemptId,
-                          byte[] data, int start, int length)
-                          throws IOException {
-      super(null, null, length - start, null, null);
-      this.ramManager = ramManager;
-      this.taskAttemptId = taskAttemptId;
-      
-      buffer = data;
-      bufferSize = (int)fileLength;
-      memDataIn.reset(buffer, start, length);
-      this.start = start;
-      this.length = length;
-    }
-
-    @Override
-    public void reset(int offset) {
-      memDataIn.reset(buffer, start + offset, length);
-      bytesRead = offset;
-      eof = false;
-    }
-    
-    @Override
-    public long getPosition() throws IOException {
-      // InMemoryReader does not initialize streams like Reader, so in.getPos()
-      // would not work. Instead, return the number of uncompressed bytes read,
-      // which will be correct since in-memory data is not compressed.
-      return bytesRead;
-    }
-    
-    @Override
-    public long getLength() { 
-      return fileLength;
-    }
-    
-    private void dumpOnError() {
-      File dumpFile = new File("../output/" + taskAttemptId + ".dump");
-      System.err.println("Dumping corrupt map-output of " + taskAttemptId + 
-                         " to " + dumpFile.getAbsolutePath());
-      try {
-        FileOutputStream fos = new FileOutputStream(dumpFile);
-        fos.write(buffer, 0, bufferSize);
-        fos.close();
-      } catch (IOException ioe) {
-        System.err.println("Failed to dump map-output of " + taskAttemptId);
-      }
-    }
-    
-    public boolean nextRawKey(DataInputBuffer key) throws IOException {
-      try {
-        if (!positionToNextRecord(memDataIn)) {
-          return false;
-        }
-        // Setup the key
-        int pos = memDataIn.getPosition();
-        byte[] data = memDataIn.getData();
-        key.reset(data, pos, currentKeyLength);
-        // Position for the next value
-        long skipped = memDataIn.skip(currentKeyLength);
-        if (skipped != currentKeyLength) {
-          throw new IOException("Rec# " + recNo + 
-              ": Failed to skip past key of length: " + 
-              currentKeyLength);
-        }
-
-        // Record the byte
-        bytesRead += currentKeyLength;
-        return true;
-      } catch (IOException ioe) {
-        dumpOnError();
-        throw ioe;
-      }
-    }
-    
-    public void nextRawValue(DataInputBuffer value) throws IOException {
-      try {
-        int pos = memDataIn.getPosition();
-        byte[] data = memDataIn.getData();
-        value.reset(data, pos, currentValueLength);
-
-        // Position for the next record
-        long skipped = memDataIn.skip(currentValueLength);
-        if (skipped != currentValueLength) {
-          throw new IOException("Rec# " + recNo + 
-              ": Failed to skip past value of length: " + 
-              currentValueLength);
-        }
-        // Record the byte
-        bytesRead += currentValueLength;
-
-        ++recNo;
-      } catch (IOException ioe) {
-        dumpOnError();
-        throw ioe;
-      }
-    }
-      
-    public void close() {
-      // Release
-      memDataIn = null;
-      buffer = null;
-      
-      // Inform the RamManager
-      ramManager.unreserve(bufferSize);
-    }
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFileInputStream.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFileInputStream.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFileInputStream.java Thu Sep
 3 13:56:21 2009
@@ -28,9 +28,14 @@
 /**
  * A checksum input stream, used for IFiles.
  * Used to validate the checksum of files created by {@link IFileOutputStream}. 
- */
+ * 
+ * <FRAMEWORK-USE-ONLY>
+ * This method is intended only for use by the Map/Reduce framework and not
+ * for external users
+ *
+*/
 
-class IFileInputStream extends InputStream {
+public class IFileInputStream extends InputStream {
   
   private final InputStream in; //The input stream to be verified for checksum. 
   private final long length; //The total length of the input file

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFileOutputStream.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFileOutputStream.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/IFileOutputStream.java Thu Sep
 3 13:56:21 2009
@@ -28,8 +28,13 @@
  * Checksum for the contents of the file is calculated and
  * appended to the end of the file on close of the stream.
  * Used for IFiles
+ *
+ * <FRAMEWORK-USE-ONLY>
+ * This method is intended only for use by the Map/Reduce framework and not
+ * for external users
+ *
  */
-class IFileOutputStream extends FilterOutputStream {
+public class IFileOutputStream extends FilterOutputStream {
   /**
    * The output stream to be checksummed. 
    */

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmTask.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JvmTask.java Thu Sep  3 13:56:21
2009
@@ -23,7 +23,15 @@
 import java.io.IOException;
 import org.apache.hadoop.io.Writable;
 
-class JvmTask implements Writable {
+/**
+ * Task abstraction that can be serialized, implements the writable interface
+ * 
+ * <FRAMEWORK-USE-ONLY>
+ * This method is intended only for use by the Map/Reduce framework and not
+ * for external users
+ *
+ */
+public class JvmTask implements Writable {
   Task t;
   boolean shouldDie;
   public JvmTask(Task t, boolean shouldDie) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Thu Sep  3
13:56:21 2009
@@ -26,14 +26,22 @@
 
 /**
  * Manipulate the working area for the transient store for maps and reduces.
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of 
+ * these methods are from child space and see mapred.local.dir as 
+ * taskTracker/jobCache/jobId/attemptId
+ * 
+ * <FRAMEWORK-USE-ONLY>
+ * This method is intended only for use by the Map/Reduce framework and not
+ * for external users
  */ 
-class MapOutputFile {
+public class MapOutputFile {
 
   private JobConf conf;
 
   static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
 
-  MapOutputFile() {
+  public MapOutputFile() {
   }
 
   private LocalDirAllocator lDirAlloc = 
@@ -165,7 +173,8 @@
    * @return path
    * @throws IOException
    */
-  public Path getInputFileForWrite(TaskID mapId, long size)
+  public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId, 
+                                   long size)
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(String.format(
         REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
@@ -185,4 +194,5 @@
       this.conf = new JobConf(conf);
     }
   }
+  
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskCompletionEventsUpdate.java
Thu Sep  3 13:56:21 2009
@@ -27,8 +27,12 @@
  * A class that represents the communication between the tasktracker and child
  * tasks w.r.t the map task completion events. It also indicates whether the 
  * child task should reset its events index.
+ *
+ * <FRAMEWORK-USE-ONLY>
+ * This method is intended only for use by the Map/Reduce framework and not
+ * for external users
  */
-class MapTaskCompletionEventsUpdate implements Writable {
+public class MapTaskCompletionEventsUpdate implements Writable {
   TaskCompletionEvent[] events;
   boolean reset;
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java Thu Sep  3
13:56:21 2009
@@ -105,4 +105,11 @@
     super.write(out);
     out.writeLong(mapFinishTime);
   }
+
+  @Override
+  public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+    throw new UnsupportedOperationException
+                ("addFetchFailedMap() not supported for MapTask");
+  }
+
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java?rev=810944&r1=810943&r2=810944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Merger.java Thu Sep  3 13:56:21
2009
@@ -40,7 +40,11 @@
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 
-class Merger {  
+/**
+ * Merger is an utility class used by the Map and Reduce tasks for merging
+ * both their memory and disk segments
+ */
+public class Merger {  
   private static final Log LOG = LogFactory.getLog(Merger.class);
 
   // Local directories
@@ -60,9 +64,32 @@
   throws IOException {
     return 
       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
-                           reporter).merge(keyClass, valueClass,
+                           reporter, null).merge(keyClass, valueClass,
                                            mergeFactor, tmpDir,
-                                           readsCounter, writesCounter, mergePhase);
+                                           readsCounter, writesCounter, 
+                                           mergePhase);
+  }
+
+  public static <K extends Object, V extends Object>
+  RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+                            Class<K> keyClass, Class<V> valueClass, 
+                            CompressionCodec codec,
+                            Path[] inputs, boolean deleteInputs, 
+                            int mergeFactor, Path tmpDir,
+                            RawComparator<K> comparator,
+                            Progressable reporter,
+                            Counters.Counter readsCounter,
+                            Counters.Counter writesCounter,
+                            Counters.Counter mergedMapOutputsCounter,
+                            Progress mergePhase)
+  throws IOException {
+    return 
+      new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
+                           reporter, mergedMapOutputsCounter).merge(
+                                           keyClass, valueClass,
+                                           mergeFactor, tmpDir,
+                                           readsCounter, writesCounter,
+                                           mergePhase);
   }
   
   public static <K extends Object, V extends Object>
@@ -76,7 +103,8 @@
                             Progress mergePhase)
       throws IOException {
     return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
-                 comparator, reporter, false, readsCounter, writesCounter, mergePhase);
+                 comparator, reporter, false, readsCounter, writesCounter,
+                 mergePhase);
   }
 
   public static <K extends Object, V extends Object>
@@ -116,7 +144,7 @@
                                                mergePhase);
   }
 
-  static <K extends Object, V extends Object>
+  public static <K extends Object, V extends Object>
     RawKeyValueIterator merge(Configuration conf, FileSystem fs,
                             Class<K> keyClass, Class<V> valueClass,
                             List<Segment<K, V>> segments,
@@ -184,14 +212,33 @@
     long segmentOffset = 0;
     long segmentLength = -1;
     
+    Counters.Counter mapOutputsCounter = null;
+
     public Segment(Configuration conf, FileSystem fs, Path file,
-                   CompressionCodec codec, boolean preserve) throws IOException {
-      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve);
+                   CompressionCodec codec, boolean preserve)
+    throws IOException {
+      this(conf, fs, file, codec, preserve, null);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   CompressionCodec codec, boolean preserve,
+                   Counters.Counter mergedMapOutputsCounter)
+  throws IOException {
+      this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve, 
+           mergedMapOutputsCounter);
+    }
+
+    public Segment(Configuration conf, FileSystem fs, Path file,
+                   long segmentOffset, long segmentLength,
+                   CompressionCodec codec,
+                   boolean preserve) throws IOException {
+      this(conf, fs, file, segmentOffset, segmentLength, codec, preserve, null);
     }
 
     public Segment(Configuration conf, FileSystem fs, Path file,
         long segmentOffset, long segmentLength, CompressionCodec codec,
-        boolean preserve) throws IOException {
+        boolean preserve, Counters.Counter mergedMapOutputsCounter)
+    throws IOException {
       this.conf = conf;
       this.fs = fs;
       this.file = file;
@@ -200,13 +247,22 @@
 
       this.segmentOffset = segmentOffset;
       this.segmentLength = segmentLength;
+      
+      this.mapOutputsCounter = mergedMapOutputsCounter;
     }
     
     public Segment(Reader<K, V> reader, boolean preserve) {
+      this(reader, preserve, null);
+    }
+    
+    public Segment(Reader<K, V> reader, boolean preserve, 
+                   Counters.Counter mapOutputsCounter) {
       this.reader = reader;
       this.preserve = preserve;
       
       this.segmentLength = reader.getLength();
+      
+      this.mapOutputsCounter = mapOutputsCounter;
     }
 
     void init(Counters.Counter readsCounter) throws IOException {
@@ -215,6 +271,10 @@
         in.seek(segmentOffset);
         reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter);
       }
+      
+      if (mapOutputsCounter != null) {
+        mapOutputsCounter.increment(1);
+      }
     }
     
     boolean inMemory() {
@@ -228,7 +288,7 @@
       return value;
     }
 
-    long getLength() { 
+    public long getLength() { 
       return (reader == null) ?
         segmentLength : reader.getLength();
     }
@@ -333,6 +393,15 @@
                       CompressionCodec codec, RawComparator<K> comparator,
                       Progressable reporter) 
     throws IOException {
+      this(conf, fs, inputs, deleteInputs, codec, comparator, reporter, null);
+    }
+    
+    public MergeQueue(Configuration conf, FileSystem fs, 
+                      Path[] inputs, boolean deleteInputs, 
+                      CompressionCodec codec, RawComparator<K> comparator,
+                      Progressable reporter, 
+                      Counters.Counter mergedMapOutputsCounter) 
+    throws IOException {
       this.conf = conf;
       this.fs = fs;
       this.codec = codec;
@@ -340,7 +409,11 @@
       this.reporter = reporter;
       
       for (Path file : inputs) {
-        segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs));
+        LOG.debug("MergeQ: adding: " + file);
+        segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs, 
+                                       (file.toString().endsWith(
+                                           Task.MERGED_OUTPUT_PREFIX) ? 
+                                        null : mergedMapOutputsCounter)));
       }
       
       // Sort segments on file-lengths



Mime
View raw message