hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1436936 [1/2] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-c...
Date Tue, 22 Jan 2013 14:10:43 GMT
Author: tucu
Date: Tue Jan 22 14:10:42 2013
New Revision: 1436936

URL: http://svn.apache.org/viewvc?rev=1436936&view=rev
Log:
MAPREDUCE-4808. Refactor MapOutput and MergeManager to facilitate reuse by Shuffle implementations. (masokan via tucu)

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeThread.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1436936&r1=1436935&r2=1436936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Jan 22 14:10:42 2013
@@ -19,6 +19,8 @@ Trunk (Unreleased)
     MAPREDUCE-4887. Add RehashPartitioner, to smooth distributions
     with poor implementations of Object#hashCode().  (Radim Kolar via cutting)
 
+    MAPREDUCE-4808. Refactor MapOutput and MergeManager to facilitate reuse by Shuffle implementations. (masokan via tucu)
+
   IMPROVEMENTS
 
     MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for

Modified: hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1436936&r1=1436935&r2=1436936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Tue Jan 22 14:10:42 2013
@@ -268,7 +268,7 @@
      This class is unlikely to get subclassed, so ignore
     -->
      <Match>
-       <Class name="org.apache.hadoop.mapreduce.task.reduce.MergeManager" />
+       <Class name="org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl" />
        <Bug pattern="SC_START_IN_CTOR" />
      </Match>
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1436936&r1=1436935&r2=1436936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Tue Jan 22 14:10:42 2013
@@ -19,8 +19,6 @@ package org.apache.hadoop.mapreduce.task
 
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
@@ -38,12 +36,7 @@ import javax.net.ssl.HttpsURLConnection;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.IFileInputStream;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.MRConfig;
@@ -51,9 +44,6 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.mapreduce.task.reduce.MapOutput.Type;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -70,7 +60,7 @@ class Fetcher<K,V> extends Thread {
   /* Default read timeout (in milliseconds) */
   private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
 
-  private final Progressable reporter;
+  private final Reporter reporter;
   private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
   
@@ -92,15 +82,10 @@ class Fetcher<K,V> extends Thread {
   private final int connectionTimeout;
   private final int readTimeout;
   
-  // Decompression of map-outputs
-  private final CompressionCodec codec;
-  private final Decompressor decompressor;
   private final SecretKey jobTokenSecret;
 
   private volatile boolean stopped = false;
 
-  private JobConf job;
-
   private static boolean sslShuffle;
   private static SSLFactory sslFactory;
 
@@ -108,7 +93,6 @@ class Fetcher<K,V> extends Thread {
                  ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
                  Reporter reporter, ShuffleClientMetrics metrics,
                  ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
-    this.job = job;
     this.reporter = reporter;
     this.scheduler = scheduler;
     this.merger = merger;
@@ -130,16 +114,6 @@ class Fetcher<K,V> extends Thread {
     wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.WRONG_REDUCE.toString());
     
-    if (job.getCompressMapOutput()) {
-      Class<? extends CompressionCodec> codecClass =
-        job.getMapOutputCompressorClass(DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, job);
-      decompressor = CodecPool.getDecompressor(codec);
-    } else {
-      codec = null;
-      decompressor = null;
-    }
-
     this.connectionTimeout = 
       job.getInt(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT,
                  DEFAULT_STALLED_COPY_TIMEOUT);
@@ -170,7 +144,7 @@ class Fetcher<K,V> extends Thread {
         MapHost host = null;
         try {
           // If merge is on, block
-          merger.waitForInMemoryMerge();
+          merger.waitForResource();
 
           // Get a host to shuffle from
           host = scheduler.getHost();
@@ -386,8 +360,8 @@ class Fetcher<K,V> extends Thread {
       mapOutput = merger.reserve(mapId, decompressedLength, id);
       
       // Check if we can shuffle *now* ...
-      if (mapOutput.getType() == Type.WAIT) {
-        LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
+      if (mapOutput == null) {
+        LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");
         //Not an error but wait to process data.
         return EMPTY_ATTEMPT_ID_ARRAY;
       } 
@@ -396,13 +370,9 @@ class Fetcher<K,V> extends Thread {
       LOG.info("fetcher#" + id + " about to shuffle output of map " + 
                mapOutput.getMapId() + " decomp: " +
                decompressedLength + " len: " + compressedLength + " to " +
-               mapOutput.getType());
-      if (mapOutput.getType() == Type.MEMORY) {
-        shuffleToMemory(host, mapOutput, input, 
-                        (int) decompressedLength, (int) compressedLength);
-      } else {
-        shuffleToDisk(host, mapOutput, input, compressedLength);
-      }
+               mapOutput.getDescription());
+      mapOutput.shuffle(host, input, compressedLength, decompressedLength,
+                        metrics, reporter);
       
       // Inform the shuffle scheduler
       long endTime = System.currentTimeMillis();
@@ -538,84 +508,4 @@ class Fetcher<K,V> extends Thread {
       }
     }
   }
-
-  private void shuffleToMemory(MapHost host, MapOutput<K,V> mapOutput, 
-                               InputStream input, 
-                               int decompressedLength, 
-                               int compressedLength) throws IOException {    
-    IFileInputStream checksumIn = 
-      new IFileInputStream(input, compressedLength, job);
-
-    input = checksumIn;       
-  
-    // Are map-outputs compressed?
-    if (codec != null) {
-      decompressor.reset();
-      input = codec.createInputStream(input, decompressor);
-    }
-  
-    // Copy map-output into an in-memory buffer
-    byte[] shuffleData = mapOutput.getMemory();
-    
-    try {
-      IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
-      metrics.inputBytes(shuffleData.length);
-      reporter.progress();
-      LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
-               mapOutput.getMapId());
-    } catch (IOException ioe) {      
-      // Close the streams
-      IOUtils.cleanup(LOG, input);
-
-      // Re-throw
-      throw ioe;
-    }
-
-  }
-  
-  private void shuffleToDisk(MapHost host, MapOutput<K,V> mapOutput, 
-                             InputStream input, 
-                             long compressedLength) 
-  throws IOException {
-    // Copy data to local-disk
-    OutputStream output = mapOutput.getDisk();
-    long bytesLeft = compressedLength;
-    try {
-      final int BYTES_TO_READ = 64 * 1024;
-      byte[] buf = new byte[BYTES_TO_READ];
-      while (bytesLeft > 0) {
-        int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
-        if (n < 0) {
-          throw new IOException("read past end of stream reading " + 
-                                mapOutput.getMapId());
-        }
-        output.write(buf, 0, n);
-        bytesLeft -= n;
-        metrics.inputBytes(n);
-        reporter.progress();
-      }
-
-      LOG.info("Read " + (compressedLength - bytesLeft) + 
-               " bytes from map-output for " +
-               mapOutput.getMapId());
-
-      output.close();
-    } catch (IOException ioe) {
-      // Close the streams
-      IOUtils.cleanup(LOG, input, output);
-
-      // Re-throw
-      throw ioe;
-    }
-
-    // Sanity check
-    if (bytesLeft != 0) {
-      throw new IOException("Incomplete map output received for " +
-                            mapOutput.getMapId() + " from " +
-                            host.getHostName() + " (" + 
-                            bytesLeft + " bytes missing of " + 
-                            compressedLength + ")"
-      );
-    }
-  }
 }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java?rev=1436936&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java Tue Jan 22 14:10:42 2013
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.InputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.IOUtils;
+
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+
+import org.apache.hadoop.mapred.IFileInputStream;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class InMemoryMapOutput<K, V> extends MapOutput<K, V> {
+  private static final Log LOG = LogFactory.getLog(InMemoryMapOutput.class);
+  private Configuration conf;
+  private final MergeManagerImpl<K, V> merger;
+  private final byte[] memory;
+  private BoundedByteArrayOutputStream byteStream;
+  // Decompression of map-outputs
+  private final CompressionCodec codec;
+  private final Decompressor decompressor;
+
+  public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
+                           MergeManagerImpl<K, V> merger,
+                           int size, CompressionCodec codec,
+                           boolean primaryMapOutput) {
+    super(mapId, (long)size, primaryMapOutput);
+    this.conf = conf;
+    this.merger = merger;
+    this.codec = codec;
+    byteStream = new BoundedByteArrayOutputStream(size);
+    memory = byteStream.getBuffer();
+    if (codec != null) {
+      decompressor = CodecPool.getDecompressor(codec);
+    } else {
+      decompressor = null;
+    }
+  }
+
+  public byte[] getMemory() {
+    return memory;
+  }
+
+  public BoundedByteArrayOutputStream getArrayStream() {
+    return byteStream;
+  }
+
+  @Override
+  public void shuffle(MapHost host, InputStream input,
+                      long compressedLength, long decompressedLength,
+                      ShuffleClientMetrics metrics,
+                      Reporter reporter) throws IOException {
+    IFileInputStream checksumIn = 
+      new IFileInputStream(input, compressedLength, conf);
+
+    input = checksumIn;       
+  
+    // Are map-outputs compressed?
+    if (codec != null) {
+      decompressor.reset();
+      input = codec.createInputStream(input, decompressor);
+    }
+  
+    try {
+      IOUtils.readFully(input, memory, 0, memory.length);
+      metrics.inputBytes(memory.length);
+      reporter.progress();
+      LOG.info("Read " + memory.length + " bytes from map-output for " +
+                getMapId());
+    } catch (IOException ioe) {      
+      // Close the streams
+      IOUtils.cleanup(LOG, input);
+
+      // Re-throw
+      throw ioe;
+    } finally {
+      CodecPool.returnDecompressor(decompressor);
+    }
+  }
+
+  @Override
+  public void commit() throws IOException {
+    merger.closeInMemoryFile(this);
+  }
+  
+  @Override
+  public void abort() {
+    merger.unreserve(memory.length);
+  }
+
+  @Override
+  public String getDescription() {
+    return "MEMORY";
+  }
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java?rev=1436936&r1=1436935&r2=1436936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryReader.java Tue Jan 22 14:10:42 2013
@@ -35,12 +35,12 @@ import org.apache.hadoop.mapreduce.TaskA
 @InterfaceStability.Unstable
 public class InMemoryReader<K, V> extends Reader<K, V> {
   private final TaskAttemptID taskAttemptId;
-  private final MergeManager<K,V> merger;
+  private final MergeManagerImpl<K,V> merger;
   DataInputBuffer memDataIn = new DataInputBuffer();
   private int start;
   private int length;
 
-  public InMemoryReader(MergeManager<K,V> merger, TaskAttemptID taskAttemptId,
+  public InMemoryReader(MergeManagerImpl<K,V> merger, TaskAttemptID taskAttemptId,
                         byte[] data, int start, int length)
   throws IOException {
     super(null, null, length - start, null, null);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java?rev=1436936&r1=1436935&r2=1436936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapOutput.java Tue Jan 22 14:10:42 2013
@@ -17,119 +17,36 @@
  */
 package org.apache.hadoop.mapreduce.task.reduce;
 
+import java.io.InputStream;
 import java.io.IOException;
-import java.io.OutputStream;
+
 import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapOutputFile;
+
+import org.apache.hadoop.mapred.Reporter;
+
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 @InterfaceAudience.LimitedPrivate({"MapReduce"})
 @InterfaceStability.Unstable
-public class MapOutput<K,V> {
-  private static final Log LOG = LogFactory.getLog(MapOutput.class);
+public abstract class MapOutput<K, V> {
   private static AtomicInteger ID = new AtomicInteger(0);
   
-  public static enum Type {
-    WAIT,
-    MEMORY,
-    DISK
-  }
-  
   private final int id;
-  
-  private final MergeManager<K,V> merger;
   private final TaskAttemptID mapId;
-  
   private final long size;
-  
-  private final byte[] memory;
-  private BoundedByteArrayOutputStream byteStream;
-  
-  private final FileSystem localFS;
-  private final Path tmpOutputPath;
-  private final Path outputPath;
-  private final OutputStream disk; 
-  
-  private final Type type;
-  
   private final boolean primaryMapOutput;
   
-  public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, long size, 
-            JobConf conf, LocalDirAllocator localDirAllocator,
-            int fetcher, boolean primaryMapOutput, MapOutputFile mapOutputFile)
-         throws IOException {
+  public MapOutput(TaskAttemptID mapId, long size, boolean primaryMapOutput) {
     this.id = ID.incrementAndGet();
     this.mapId = mapId;
-    this.merger = merger;
-
-    type = Type.DISK;
-
-    memory = null;
-    byteStream = null;
-
     this.size = size;
-    
-    this.localFS = FileSystem.getLocal(conf);
-    outputPath =
-      mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size);
-    tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
-
-    disk = localFS.create(tmpOutputPath);
-    
     this.primaryMapOutput = primaryMapOutput;
   }
   
-  public MapOutput(TaskAttemptID mapId, MergeManager<K,V> merger, int size, 
-            boolean primaryMapOutput) {
-    this.id = ID.incrementAndGet();
-    this.mapId = mapId;
-    this.merger = merger;
-
-    type = Type.MEMORY;
-    byteStream = new BoundedByteArrayOutputStream(size);
-    memory = byteStream.getBuffer();
-
-    this.size = size;
-    
-    localFS = null;
-    disk = null;
-    outputPath = null;
-    tmpOutputPath = null;
-    
-    this.primaryMapOutput = primaryMapOutput;
-  }
-
-  public MapOutput(TaskAttemptID mapId) {
-    this.id = ID.incrementAndGet();
-    this.mapId = mapId;
-    
-    type = Type.WAIT;
-    merger = null;
-    memory = null;
-    byteStream = null;
-    
-    size = -1;
-    
-    localFS = null;
-    disk = null;
-    outputPath = null;
-    tmpOutputPath = null;
-
-    this.primaryMapOutput = false;
-}
-  
   public boolean isPrimaryMapOutput() {
     return primaryMapOutput;
   }
@@ -147,62 +64,28 @@ public class MapOutput<K,V> {
     return id;
   }
 
-  public Path getOutputPath() {
-    return outputPath;
-  }
-
-  public byte[] getMemory() {
-    return memory;
-  }
-
-  public BoundedByteArrayOutputStream getArrayStream() {
-    return byteStream;
-  }
-  
-  public OutputStream getDisk() {
-    return disk;
-  }
-
   public TaskAttemptID getMapId() {
     return mapId;
   }
 
-  public Type getType() {
-    return type;
-  }
-
   public long getSize() {
     return size;
   }
 
-  public void commit() throws IOException {
-    if (type == Type.MEMORY) {
-      merger.closeInMemoryFile(this);
-    } else if (type == Type.DISK) {
-      localFS.rename(tmpOutputPath, outputPath);
-      merger.closeOnDiskFile(outputPath);
-    } else {
-      throw new IOException("Cannot commit MapOutput of type WAIT!");
-    }
-  }
-  
-  public void abort() {
-    if (type == Type.MEMORY) {
-      merger.unreserve(memory.length);
-    } else if (type == Type.DISK) {
-      try {
-        localFS.delete(tmpOutputPath, false);
-      } catch (IOException ie) {
-        LOG.info("failure to clean up " + tmpOutputPath, ie);
-      }
-    } else {
-      throw new IllegalArgumentException
-                   ("Cannot commit MapOutput with of type WAIT!");
-    }
-  }
+  public abstract void shuffle(MapHost host, InputStream input,
+                               long compressedLength,
+                               long decompressedLength,
+                               ShuffleClientMetrics metrics,
+                               Reporter reporter) throws IOException;
+
+  public abstract void commit() throws IOException;
   
+  public abstract void abort();
+
+  public abstract String getDescription();
+
   public String toString() {
-    return "MapOutput(" + mapId + ", " + type + ")";
+    return "MapOutput(" + mapId + ", " + getDescription() + ")";
   }
   
   public static class MapOutputComparator<K, V> 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java?rev=1436936&r1=1436935&r2=1436936&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManager.java Tue Jan 22 14:10:42 2013
@@ -15,783 +15,56 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.mapreduce.task.reduce;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
+package org.apache.hadoop.mapreduce.task.reduce;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.IFile;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapOutputFile;
-import org.apache.hadoop.mapred.Merger;
 import org.apache.hadoop.mapred.RawKeyValueIterator;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Task;
-import org.apache.hadoop.mapred.IFile.Reader;
-import org.apache.hadoop.mapred.IFile.Writer;
-import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.Task.CombineOutputCollector;
-import org.apache.hadoop.mapred.Task.CombineValuesIterator;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator;
 import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.ReflectionUtils;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
 
-@SuppressWarnings(value={"unchecked"})
-@InterfaceAudience.LimitedPrivate({"MapReduce"})
+/**
+ * An interface for a reduce side merge that works with the default Shuffle
+ * implementation.
+ */
+@InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class MergeManager<K, V> {
-  
-  private static final Log LOG = LogFactory.getLog(MergeManager.class);
-  
-  /* Maximum percentage of the in-memory limit that a single shuffle can 
-   * consume*/ 
-  private static final float DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT
-    = 0.25f;
-
-  private final TaskAttemptID reduceId;
-  
-  private final JobConf jobConf;
-  private final FileSystem localFS;
-  private final FileSystem rfs;
-  private final LocalDirAllocator localDirAllocator;
-  
-  protected MapOutputFile mapOutputFile;
-  
-  Set<MapOutput<K, V>> inMemoryMergedMapOutputs = 
-    new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
-  private final IntermediateMemoryToMemoryMerger memToMemMerger;
-
-  Set<MapOutput<K, V>> inMemoryMapOutputs = 
-    new TreeSet<MapOutput<K,V>>(new MapOutputComparator<K, V>());
-  private final MergeThread<MapOutput<K,V>, K,V> inMemoryMerger;
-  
-  Set<Path> onDiskMapOutputs = new TreeSet<Path>();
-  private final OnDiskMerger onDiskMerger;
-  
-  private final long memoryLimit;
-  private long usedMemory;
-  private long commitMemory;
-  private final long maxSingleShuffleLimit;
-  
-  private final int memToMemMergeOutputsThreshold; 
-  private final long mergeThreshold;
-  
-  private final int ioSortFactor;
-
-  private final Reporter reporter;
-  private final ExceptionReporter exceptionReporter;
-  
+public interface MergeManager<K, V> {
   /**
-   * Combiner class to run during in-memory merge, if defined.
+   * To wait until merge has some freed resources available so that it can
+   * accept shuffled data.  This will be called before a network connection is
+   * established to get the map output.
    */
-  private final Class<? extends Reducer> combinerClass;
+  public void waitForResource() throws InterruptedException;
 
   /**
-   * Resettable collector used for combine.
+   * To reserve resources for data to be shuffled.  This will be called after
+   * a network connection is made to shuffle the data.
+   * @param mapId mapper from which data will be shuffled.
+   * @param requestedSize size in bytes of data that will be shuffled.
+   * @param fetcher id of the map output fetcher that will shuffle the data.
+   * @return a MapOutput object that can be used by shuffle to shuffle data.  If
+   * required resources cannot be reserved immediately, a null can be returned.
    */
-  private final CombineOutputCollector<K,V> combineCollector;
-
-  private final Counters.Counter spilledRecordsCounter;
-
-  private final Counters.Counter reduceCombineInputCounter;
-
-  private final Counters.Counter mergedMapOutputsCounter;
-  
-  private final CompressionCodec codec;
-  
-  private final Progress mergePhase;
-
-  public MergeManager(TaskAttemptID reduceId, JobConf jobConf, 
-                      FileSystem localFS,
-                      LocalDirAllocator localDirAllocator,  
-                      Reporter reporter,
-                      CompressionCodec codec,
-                      Class<? extends Reducer> combinerClass,
-                      CombineOutputCollector<K,V> combineCollector,
-                      Counters.Counter spilledRecordsCounter,
-                      Counters.Counter reduceCombineInputCounter,
-                      Counters.Counter mergedMapOutputsCounter,
-                      ExceptionReporter exceptionReporter,
-                      Progress mergePhase, MapOutputFile mapOutputFile) {
-    this.reduceId = reduceId;
-    this.jobConf = jobConf;
-    this.localDirAllocator = localDirAllocator;
-    this.exceptionReporter = exceptionReporter;
-    
-    this.reporter = reporter;
-    this.codec = codec;
-    this.combinerClass = combinerClass;
-    this.combineCollector = combineCollector;
-    this.reduceCombineInputCounter = reduceCombineInputCounter;
-    this.spilledRecordsCounter = spilledRecordsCounter;
-    this.mergedMapOutputsCounter = mergedMapOutputsCounter;
-    this.mapOutputFile = mapOutputFile;
-    this.mapOutputFile.setConf(jobConf);
-    
-    this.localFS = localFS;
-    this.rfs = ((LocalFileSystem)localFS).getRaw();
-    
-    final float maxInMemCopyUse =
-      jobConf.getFloat(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, 0.90f);
-    if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
-      throw new IllegalArgumentException("Invalid value for " +
-          MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT + ": " +
-          maxInMemCopyUse);
-    }
-
-    // Allow unit tests to fix Runtime memory
-    this.memoryLimit = 
-      (long)(jobConf.getLong(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
-          Math.min(Runtime.getRuntime().maxMemory(), Integer.MAX_VALUE))
-        * maxInMemCopyUse);
- 
-    this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100);
+  public MapOutput<K, V> reserve(TaskAttemptID mapId, long requestedSize,
+                                 int fetcher) throws IOException;
 
-    final float singleShuffleMemoryLimitPercent =
-        jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,
-            DEFAULT_SHUFFLE_MEMORY_LIMIT_PERCENT);
-    if (singleShuffleMemoryLimitPercent <= 0.0f
-        || singleShuffleMemoryLimitPercent > 1.0f) {
-      throw new IllegalArgumentException("Invalid value for "
-          + MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT + ": "
-          + singleShuffleMemoryLimitPercent);
-    }
-
-    usedMemory = 0L;
-    commitMemory = 0L;
-    this.maxSingleShuffleLimit = 
-      (long)(memoryLimit * singleShuffleMemoryLimitPercent);
-    this.memToMemMergeOutputsThreshold = 
-            jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
-    this.mergeThreshold = (long)(this.memoryLimit * 
-                          jobConf.getFloat(MRJobConfig.SHUFFLE_MERGE_PERCENT, 
-                                           0.90f));
-    LOG.info("MergerManager: memoryLimit=" + memoryLimit + ", " +
-             "maxSingleShuffleLimit=" + maxSingleShuffleLimit + ", " +
-             "mergeThreshold=" + mergeThreshold + ", " + 
-             "ioSortFactor=" + ioSortFactor + ", " +
-             "memToMemMergeOutputsThreshold=" + memToMemMergeOutputsThreshold);
-
-    if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
-      throw new RuntimeException("Invlaid configuration: "
-          + "maxSingleShuffleLimit should be less than mergeThreshold"
-          + "maxSingleShuffleLimit: " + this.maxSingleShuffleLimit
-          + "mergeThreshold: " + this.mergeThreshold);
-    }
-
-    boolean allowMemToMemMerge = 
-      jobConf.getBoolean(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, false);
-    if (allowMemToMemMerge) {
-      this.memToMemMerger = 
-        new IntermediateMemoryToMemoryMerger(this,
-                                             memToMemMergeOutputsThreshold);
-      this.memToMemMerger.start();
-    } else {
-      this.memToMemMerger = null;
-    }
-    
-    this.inMemoryMerger = createInMemoryMerger();
-    this.inMemoryMerger.start();
-    
-    this.onDiskMerger = new OnDiskMerger(this);
-    this.onDiskMerger.start();
-    
-    this.mergePhase = mergePhase;
-  }
-  
-  protected MergeThread<MapOutput<K,V>, K,V> createInMemoryMerger() {
-    return new InMemoryMerger(this);
-  }
-
-  TaskAttemptID getReduceId() {
-    return reduceId;
-  }
-
-  @VisibleForTesting
-  ExceptionReporter getExceptionReporter() {
-    return exceptionReporter;
-  }
-
-  public void waitForInMemoryMerge() throws InterruptedException {
-    inMemoryMerger.waitForMerge();
-  }
-  
-  private boolean canShuffleToMemory(long requestedSize) {
-    return (requestedSize < maxSingleShuffleLimit); 
-  }
-  
-  final private MapOutput<K,V> stallShuffle = new MapOutput<K,V>(null);
-
-  public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId, 
-                                             long requestedSize,
-                                             int fetcher
-                                             ) throws IOException {
-    if (!canShuffleToMemory(requestedSize)) {
-      LOG.info(mapId + ": Shuffling to disk since " + requestedSize + 
-               " is greater than maxSingleShuffleLimit (" + 
-               maxSingleShuffleLimit + ")");
-      return new MapOutput<K,V>(mapId, this, requestedSize, jobConf, 
-                                localDirAllocator, fetcher, true,
-                                mapOutputFile);
-    }
-    
-    // Stall shuffle if we are above the memory limit
-
-    // It is possible that all threads could just be stalling and not make
-    // progress at all. This could happen when:
-    //
-    // requested size is causing the used memory to go above limit &&
-    // requested size < singleShuffleLimit &&
-    // current used size < mergeThreshold (merge will not get triggered)
-    //
-    // To avoid this from happening, we allow exactly one thread to go past
-    // the memory limit. We check (usedMemory > memoryLimit) and not
-    // (usedMemory + requestedSize > memoryLimit). When this thread is done
-    // fetching, this will automatically trigger a merge thereby unlocking
-    // all the stalled threads
-    
-    if (usedMemory > memoryLimit) {
-      LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
-          + ") is greater than memoryLimit (" + memoryLimit + ")." + 
-          " CommitMemory is (" + commitMemory + ")"); 
-      return stallShuffle;
-    }
-    
-    // Allow the in-memory shuffle to progress
-    LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
-        + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
-        + "CommitMemory is (" + commitMemory + ")"); 
-    return unconditionalReserve(mapId, requestedSize, true);
-  }
-  
   /**
-   * Unconditional Reserve is used by the Memory-to-Memory thread
-   * @return
+   * Called at the end of shuffle.
+   * @return a key value iterator object.
    */
-  private synchronized MapOutput<K, V> unconditionalReserve(
-      TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
-    usedMemory += requestedSize;
-    return new MapOutput<K,V>(mapId, this, (int)requestedSize, 
-        primaryMapOutput);
-  }
-  
-  synchronized void unreserve(long size) {
-    usedMemory -= size;
-  }
-
-  public synchronized void closeInMemoryFile(MapOutput<K,V> mapOutput) { 
-    inMemoryMapOutputs.add(mapOutput);
-    LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()
-        + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()
-        + ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);
-
-    commitMemory+= mapOutput.getSize();
-
-    // Can hang if mergeThreshold is really low.
-    if (commitMemory >= mergeThreshold) {
-      LOG.info("Starting inMemoryMerger's merge since commitMemory=" +
-          commitMemory + " > mergeThreshold=" + mergeThreshold + 
-          ". Current usedMemory=" + usedMemory);
-      inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
-      inMemoryMergedMapOutputs.clear();
-      inMemoryMerger.startMerge(inMemoryMapOutputs);
-      commitMemory = 0L;  // Reset commitMemory.
-    }
-    
-    if (memToMemMerger != null) {
-      if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) { 
-        memToMemMerger.startMerge(inMemoryMapOutputs);
-      }
-    }
-  }
-  
-  
-  public synchronized void closeInMemoryMergedFile(MapOutput<K,V> mapOutput) {
-    inMemoryMergedMapOutputs.add(mapOutput);
-    LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + 
-             ", inMemoryMergedMapOutputs.size() -> " + 
-             inMemoryMergedMapOutputs.size());
-  }
-  
-  public synchronized void closeOnDiskFile(Path file) {
-    onDiskMapOutputs.add(file);
-    
-    if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
-      onDiskMerger.startMerge(onDiskMapOutputs);
-    }
-  }
-  
-  public RawKeyValueIterator close() throws Throwable {
-    // Wait for on-going merges to complete
-    if (memToMemMerger != null) { 
-      memToMemMerger.close();
-    }
-    inMemoryMerger.close();
-    onDiskMerger.close();
-    
-    List<MapOutput<K, V>> memory = 
-      new ArrayList<MapOutput<K, V>>(inMemoryMergedMapOutputs);
-    memory.addAll(inMemoryMapOutputs);
-    List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
-    return finalMerge(jobConf, rfs, memory, disk);
-  }
-   
-  private class IntermediateMemoryToMemoryMerger 
-  extends MergeThread<MapOutput<K, V>, K, V> {
-    
-    public IntermediateMemoryToMemoryMerger(MergeManager<K, V> manager, 
-                                            int mergeFactor) {
-      super(manager, mergeFactor, exceptionReporter);
-      setName("InMemoryMerger - Thread to do in-memory merge of in-memory " +
-      		    "shuffled map-outputs");
-      setDaemon(true);
-    }
-
-    @Override
-    public void merge(List<MapOutput<K, V>> inputs) throws IOException {
-      if (inputs == null || inputs.size() == 0) {
-        return;
-      }
-
-      TaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
-      List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
-      long mergeOutputSize = 
-        createInMemorySegments(inputs, inMemorySegments, 0);
-      int noInMemorySegments = inMemorySegments.size();
-      
-      MapOutput<K, V> mergedMapOutputs = 
-        unconditionalReserve(dummyMapId, mergeOutputSize, false);
-      
-      Writer<K, V> writer = 
-        new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream());
-      
-      LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments +
-               " segments of total-size: " + mergeOutputSize);
-
-      RawKeyValueIterator rIter = 
-        Merger.merge(jobConf, rfs,
-                     (Class<K>)jobConf.getMapOutputKeyClass(),
-                     (Class<V>)jobConf.getMapOutputValueClass(),
-                     inMemorySegments, inMemorySegments.size(),
-                     new Path(reduceId.toString()),
-                     (RawComparator<K>)jobConf.getOutputKeyComparator(),
-                     reporter, null, null, null);
-      Merger.writeFile(rIter, writer, reporter, jobConf);
-      writer.close();
-
-      LOG.info(reduceId +  
-               " Memory-to-Memory merge of the " + noInMemorySegments +
-               " files in-memory complete.");
-
-      // Note the output of the merge
-      closeInMemoryMergedFile(mergedMapOutputs);
-    }
-  }
-  
-  private class InMemoryMerger extends MergeThread<MapOutput<K,V>, K,V> {
-    
-    public InMemoryMerger(MergeManager<K, V> manager) {
-      super(manager, Integer.MAX_VALUE, exceptionReporter);
-      setName
-      ("InMemoryMerger - Thread to merge in-memory shuffled map-outputs");
-      setDaemon(true);
-    }
-    
-    @Override
-    public void merge(List<MapOutput<K,V>> inputs) throws IOException {
-      if (inputs == null || inputs.size() == 0) {
-        return;
-      }
-      
-      //name this output file same as the name of the first file that is 
-      //there in the current list of inmem files (this is guaranteed to
-      //be absent on the disk currently. So we don't overwrite a prev. 
-      //created spill). Also we need to create the output file now since
-      //it is not guaranteed that this file will be present after merge
-      //is called (we delete empty files as soon as we see them
-      //in the merge method)
-
-      //figure out the mapId 
-      TaskAttemptID mapId = inputs.get(0).getMapId();
-      TaskID mapTaskId = mapId.getTaskID();
-
-      List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();
-      long mergeOutputSize = 
-        createInMemorySegments(inputs, inMemorySegments,0);
-      int noInMemorySegments = inMemorySegments.size();
-
-      Path outputPath = 
-        mapOutputFile.getInputFileForWrite(mapTaskId,
-                                           mergeOutputSize).suffix(
-                                               Task.MERGED_OUTPUT_PREFIX);
-
-      Writer<K,V> writer = 
-        new Writer<K,V>(jobConf, rfs, outputPath,
-                        (Class<K>) jobConf.getMapOutputKeyClass(),
-                        (Class<V>) jobConf.getMapOutputValueClass(),
-                        codec, null);
-
-      RawKeyValueIterator rIter = null;
-      try {
-        LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
-                 " segments...");
-        
-        rIter = Merger.merge(jobConf, rfs,
-                             (Class<K>)jobConf.getMapOutputKeyClass(),
-                             (Class<V>)jobConf.getMapOutputValueClass(),
-                             inMemorySegments, inMemorySegments.size(),
-                             new Path(reduceId.toString()),
-                             (RawComparator<K>)jobConf.getOutputKeyComparator(),
-                             reporter, spilledRecordsCounter, null, null);
-        
-        if (null == combinerClass) {
-          Merger.writeFile(rIter, writer, reporter, jobConf);
-        } else {
-          combineCollector.setWriter(writer);
-          combineAndSpill(rIter, reduceCombineInputCounter);
-        }
-        writer.close();
-
-        LOG.info(reduceId +  
-            " Merge of the " + noInMemorySegments +
-            " files in-memory complete." +
-            " Local file is " + outputPath + " of size " + 
-            localFS.getFileStatus(outputPath).getLen());
-      } catch (IOException e) { 
-        //make sure that we delete the ondisk file that we created 
-        //earlier when we invoked cloneFileAttributes
-        localFS.delete(outputPath, true);
-        throw e;
-      }
-
-      // Note the output of the merge
-      closeOnDiskFile(outputPath);
-    }
-
-  }
-  
-  private class OnDiskMerger extends MergeThread<Path,K,V> {
-    
-    public OnDiskMerger(MergeManager<K, V> manager) {
-      super(manager, Integer.MAX_VALUE, exceptionReporter);
-      setName("OnDiskMerger - Thread to merge on-disk map-outputs");
-      setDaemon(true);
-    }
-    
-    @Override
-    public void merge(List<Path> inputs) throws IOException {
-      // sanity check
-      if (inputs == null || inputs.isEmpty()) {
-        LOG.info("No ondisk files to merge...");
-        return;
-      }
-      
-      long approxOutputSize = 0;
-      int bytesPerSum = 
-        jobConf.getInt("io.bytes.per.checksum", 512);
-      
-      LOG.info("OnDiskMerger: We have  " + inputs.size() + 
-               " map outputs on disk. Triggering merge...");
-      
-      // 1. Prepare the list of files to be merged. 
-      for (Path file : inputs) {
-        approxOutputSize += localFS.getFileStatus(file).getLen();
-      }
-
-      // add the checksum length
-      approxOutputSize += 
-        ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);
-
-      // 2. Start the on-disk merge process
-      Path outputPath = 
-        localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), 
-            approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);
-      Writer<K,V> writer = 
-        new Writer<K,V>(jobConf, rfs, outputPath, 
-                        (Class<K>) jobConf.getMapOutputKeyClass(), 
-                        (Class<V>) jobConf.getMapOutputValueClass(),
-                        codec, null);
-      RawKeyValueIterator iter  = null;
-      Path tmpDir = new Path(reduceId.toString());
-      try {
-        iter = Merger.merge(jobConf, rfs,
-                            (Class<K>) jobConf.getMapOutputKeyClass(),
-                            (Class<V>) jobConf.getMapOutputValueClass(),
-                            codec, inputs.toArray(new Path[inputs.size()]), 
-                            true, ioSortFactor, tmpDir, 
-                            (RawComparator<K>) jobConf.getOutputKeyComparator(), 
-                            reporter, spilledRecordsCounter, null, 
-                            mergedMapOutputsCounter, null);
-
-        Merger.writeFile(iter, writer, reporter, jobConf);
-        writer.close();
-      } catch (IOException e) {
-        localFS.delete(outputPath, true);
-        throw e;
-      }
-
-      closeOnDiskFile(outputPath);
-
-      LOG.info(reduceId +
-          " Finished merging " + inputs.size() + 
-          " map output files on disk of total-size " + 
-          approxOutputSize + "." + 
-          " Local output file is " + outputPath + " of size " +
-          localFS.getFileStatus(outputPath).getLen());
-    }
-  }
-  
-  private void combineAndSpill(
-      RawKeyValueIterator kvIter,
-      Counters.Counter inCounter) throws IOException {
-    JobConf job = jobConf;
-    Reducer combiner = ReflectionUtils.newInstance(combinerClass, job);
-    Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
-    Class<V> valClass = (Class<V>) job.getMapOutputValueClass();
-    RawComparator<K> comparator = 
-      (RawComparator<K>)job.getOutputKeyComparator();
-    try {
-      CombineValuesIterator values = new CombineValuesIterator(
-          kvIter, comparator, keyClass, valClass, job, Reporter.NULL,
-          inCounter);
-      while (values.more()) {
-        combiner.reduce(values.getKey(), values, combineCollector,
-                        Reporter.NULL);
-        values.nextKey();
-      }
-    } finally {
-      combiner.close();
-    }
-  }
-
-  private long createInMemorySegments(List<MapOutput<K,V>> inMemoryMapOutputs,
-                                      List<Segment<K, V>> inMemorySegments, 
-                                      long leaveBytes
-                                      ) throws IOException {
-    long totalSize = 0L;
-    // We could use fullSize could come from the RamManager, but files can be
-    // closed but not yet present in inMemoryMapOutputs
-    long fullSize = 0L;
-    for (MapOutput<K,V> mo : inMemoryMapOutputs) {
-      fullSize += mo.getMemory().length;
-    }
-    while(fullSize > leaveBytes) {
-      MapOutput<K,V> mo = inMemoryMapOutputs.remove(0);
-      byte[] data = mo.getMemory();
-      long size = data.length;
-      totalSize += size;
-      fullSize -= size;
-      Reader<K,V> reader = new InMemoryReader<K,V>(MergeManager.this, 
-                                                   mo.getMapId(),
-                                                   data, 0, (int)size);
-      inMemorySegments.add(new Segment<K,V>(reader, true, 
-                                            (mo.isPrimaryMapOutput() ? 
-                                            mergedMapOutputsCounter : null)));
-    }
-    return totalSize;
-  }
-
-  class RawKVIteratorReader extends IFile.Reader<K,V> {
-
-    private final RawKeyValueIterator kvIter;
-
-    public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
-        throws IOException {
-      super(null, null, size, null, spilledRecordsCounter);
-      this.kvIter = kvIter;
-    }
-    public boolean nextRawKey(DataInputBuffer key) throws IOException {
-      if (kvIter.next()) {
-        final DataInputBuffer kb = kvIter.getKey();
-        final int kp = kb.getPosition();
-        final int klen = kb.getLength() - kp;
-        key.reset(kb.getData(), kp, klen);
-        bytesRead += klen;
-        return true;
-      }
-      return false;
-    }
-    public void nextRawValue(DataInputBuffer value) throws IOException {
-      final DataInputBuffer vb = kvIter.getValue();
-      final int vp = vb.getPosition();
-      final int vlen = vb.getLength() - vp;
-      value.reset(vb.getData(), vp, vlen);
-      bytesRead += vlen;
-    }
-    public long getPosition() throws IOException {
-      return bytesRead;
-    }
-
-    public void close() throws IOException {
-      kvIter.close();
-    }
-  }
-
-  private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
-                                       List<MapOutput<K,V>> inMemoryMapOutputs,
-                                       List<Path> onDiskMapOutputs
-                                       ) throws IOException {
-    LOG.info("finalMerge called with " + 
-             inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
-             onDiskMapOutputs.size() + " on-disk map-outputs");
-    
-    final float maxRedPer =
-      job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);
-    if (maxRedPer > 1.0 || maxRedPer < 0.0) {
-      throw new IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +
-                            maxRedPer);
-    }
-    int maxInMemReduce = (int)Math.min(
-        Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
-    
-
-    // merge config params
-    Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
-    Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
-    boolean keepInputs = job.getKeepFailedTaskFiles();
-    final Path tmpDir = new Path(reduceId.toString());
-    final RawComparator<K> comparator =
-      (RawComparator<K>)job.getOutputKeyComparator();
-
-    // segments required to vacate memory
-    List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
-    long inMemToDiskBytes = 0;
-    boolean mergePhaseFinished = false;
-    if (inMemoryMapOutputs.size() > 0) {
-      TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
-      inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
-                                                memDiskSegments,
-                                                maxInMemReduce);
-      final int numMemDiskSegments = memDiskSegments.size();
-      if (numMemDiskSegments > 0 &&
-            ioSortFactor > onDiskMapOutputs.size()) {
-        
-        // If we reach here, it implies that we have less than io.sort.factor
-        // disk segments and this will be incremented by 1 (result of the 
-        // memory segments merge). Since this total would still be 
-        // <= io.sort.factor, we will not do any more intermediate merges,
-        // the merge of all these disk segments would be directly fed to the
-        // reduce method
-        
-        mergePhaseFinished = true;
-        // must spill to disk, but can't retain in-mem for intermediate merge
-        final Path outputPath = 
-          mapOutputFile.getInputFileForWrite(mapId,
-                                             inMemToDiskBytes).suffix(
-                                                 Task.MERGED_OUTPUT_PREFIX);
-        final RawKeyValueIterator rIter = Merger.merge(job, fs,
-            keyClass, valueClass, memDiskSegments, numMemDiskSegments,
-            tmpDir, comparator, reporter, spilledRecordsCounter, null, 
-            mergePhase);
-        final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
-            keyClass, valueClass, codec, null);
-        try {
-          Merger.writeFile(rIter, writer, reporter, job);
-          // add to list of final disk outputs.
-          onDiskMapOutputs.add(outputPath);
-        } catch (IOException e) {
-          if (null != outputPath) {
-            try {
-              fs.delete(outputPath, true);
-            } catch (IOException ie) {
-              // NOTHING
-            }
-          }
-          throw e;
-        } finally {
-          if (null != writer) {
-            writer.close();
-          }
-        }
-        LOG.info("Merged " + numMemDiskSegments + " segments, " +
-                 inMemToDiskBytes + " bytes to disk to satisfy " +
-                 "reduce memory limit");
-        inMemToDiskBytes = 0;
-        memDiskSegments.clear();
-      } else if (inMemToDiskBytes != 0) {
-        LOG.info("Keeping " + numMemDiskSegments + " segments, " +
-                 inMemToDiskBytes + " bytes in memory for " +
-                 "intermediate, on-disk merge");
-      }
-    }
-
-    // segments on disk
-    List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
-    long onDiskBytes = inMemToDiskBytes;
-    Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
-    for (Path file : onDisk) {
-      onDiskBytes += fs.getFileStatus(file).getLen();
-      LOG.debug("Disk file: " + file + " Length is " + 
-          fs.getFileStatus(file).getLen());
-      diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
-                                         (file.toString().endsWith(
-                                             Task.MERGED_OUTPUT_PREFIX) ?
-                                          null : mergedMapOutputsCounter)
-                                        ));
-    }
-    LOG.info("Merging " + onDisk.length + " files, " +
-             onDiskBytes + " bytes from disk");
-    Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
-      public int compare(Segment<K, V> o1, Segment<K, V> o2) {
-        if (o1.getLength() == o2.getLength()) {
-          return 0;
-        }
-        return o1.getLength() < o2.getLength() ? -1 : 1;
-      }
-    });
-
-    // build final list of segments from merged backed by disk + in-mem
-    List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
-    long inMemBytes = createInMemorySegments(inMemoryMapOutputs, 
-                                             finalSegments, 0);
-    LOG.info("Merging " + finalSegments.size() + " segments, " +
-             inMemBytes + " bytes from memory into reduce");
-    if (0 != onDiskBytes) {
-      final int numInMemSegments = memDiskSegments.size();
-      diskSegments.addAll(0, memDiskSegments);
-      memDiskSegments.clear();
-      // Pass mergePhase only if there is a going to be intermediate
-      // merges. See comment where mergePhaseFinished is being set
-      Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
-      RawKeyValueIterator diskMerge = Merger.merge(
-          job, fs, keyClass, valueClass, diskSegments,
-          ioSortFactor, numInMemSegments, tmpDir, comparator,
-          reporter, false, spilledRecordsCounter, null, thisPhase);
-      diskSegments.clear();
-      if (0 == finalSegments.size()) {
-        return diskMerge;
-      }
-      finalSegments.add(new Segment<K,V>(
-            new RawKVIteratorReader(diskMerge, onDiskBytes), true));
-    }
-    return Merger.merge(job, fs, keyClass, valueClass,
-                 finalSegments, finalSegments.size(), tmpDir,
-                 comparator, reporter, spilledRecordsCounter, null,
-                 null);
-  
-  }
+  public RawKeyValueIterator close() throws Throwable;
 }



Mime
View raw message