hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r565628 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/
Date Tue, 14 Aug 2007 05:27:24 GMT
Author: omalley
Date: Mon Aug 13 22:27:23 2007
New Revision: 565628

URL: http://svn.apache.org/viewvc?view=rev&rev=565628
Log:
HADOOP-1698.  Fixes performance problems in the map output sorting with 
many reducers. Contributed by Devaraj Das.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=565628&r1=565627&r2=565628
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Aug 13 22:27:23 2007
@@ -520,6 +520,9 @@
 152. HADOOP-1629.  Added a upgrade test for HADOOP-1134.
      (Raghu Angadi via nigel)
 
+153. HADOOP-1698.  Fix performance problems on map output sorting for jobs
+     with large numbers of reduces. (Devaraj Das via omalley)
+
 Release 0.13.0 - 2007-06-08
 
  1. HADOOP-1047.  Fix TestReplication to succeed more reliably.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=565628&r1=565627&r2=565628
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Mon Aug 13 22:27:23
2007
@@ -648,9 +648,10 @@
     long lastSyncPos;                     // position of last sync
     byte[] sync;                          // 16 random bytes
     {
-      try {                                       // use hash of uid + host
+      try {                                       
         MessageDigest digester = MessageDigest.getInstance("MD5");
-        digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
+        long time = System.currentTimeMillis();
+        digester.update((new UID()+"@"+time).getBytes());
         sync = digester.digest();
       } catch (Exception e) {
         throw new RuntimeException(e);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java?view=diff&rev=565628&r1=565627&r2=565628
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java Mon Aug
13 22:27:23 2007
@@ -50,16 +50,21 @@
   //4 for indices into startOffsets array in the
   //pointers array (ignored the partpointers list itself)
   static private final int BUFFERED_KEY_VAL_OVERHEAD = 16;
+  static private final int INITIAL_ARRAY_SIZE = 5;
+  //we maintain the max lengths of the key/val that we encounter.  During 
+  //iteration of the sorted results, we will create a DataOutputBuffer to
+  //return the keys. The max size of the DataOutputBuffer will be the max
+  //keylength that we encounter. Expose this value to model memory more
+  //accurately.
+  private int maxKeyLength = 0;
+  private int maxValLength = 0;
+
   //Reference to the Progressable object for sending KeepAlive
   private Progressable reporter;
 
   //Implementation of methods of the SorterBase interface
   //
   public void configure(JobConf conf) {
-    startOffsets = new int[1024];
-    keyLengths = new int[1024];
-    valueLengths = new int[1024];
-    pointers = new int[1024];
     comparator = conf.getOutputKeyComparator();
   }
   
@@ -70,10 +75,16 @@
   public void addKeyValue(int recordOffset, int keyLength, int valLength) {
     //Add the start offset of the key in the startOffsets array and the
     //length in the keyLengths array.
-    if (count == startOffsets.length)
+    if (startOffsets == null || count == startOffsets.length)
       grow();
     startOffsets[count] = recordOffset;
     keyLengths[count] = keyLength;
+    if (keyLength > maxKeyLength) {
+      maxKeyLength = keyLength;
+    }
+    if (valLength > maxValLength) {
+      maxValLength = valLength;
+    }
     valueLengths[count] = valLength;
     pointers[count] = count;
     count++;
@@ -85,14 +96,30 @@
   }
 
   public long getMemoryUtilized() {
-    return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD;
+    //the total length of the arrays + the max{Key,Val}Length (this will be the 
+    //max size of the DataOutputBuffers during the iteration of the sorted
+    //keys).
+    if (startOffsets != null) {
+      return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD + 
+              maxKeyLength + maxValLength;
+    }
+    else { //nothing from this yet
+      return 0;
+    }
   }
 
   public abstract RawKeyValueIterator sort();
   
   public void close() {
-    //just set count to 0; we reuse the arrays
+    //set count to 0; also, we don't reuse the arrays since we want to maintain
+    //consistency in the memory model
     count = 0;
+    startOffsets = null;
+    keyLengths = null;
+    valueLengths = null;
+    pointers = null;
+    maxKeyLength = 0;
+    maxValLength = 0;
   }
   //A compare method that references the keyValBuffer through the indirect
   //pointers
@@ -106,7 +133,11 @@
   }
   
   private void grow() {
-    int newLength = startOffsets.length * 3/2;
+    int currLength = 0;
+    if (startOffsets != null) {
+      currLength = startOffsets.length;
+    }
+    int newLength = (int)(currLength * 1.1) + 1;
     startOffsets = grow(startOffsets, newLength);
     keyLengths = grow(keyLengths, newLength);
     valueLengths = grow(valueLengths, newLength);
@@ -115,7 +146,9 @@
   
   private int[] grow(int[] old, int newLength) {
     int[] result = new int[newLength];
-    System.arraycopy(old, 0, result, 0, old.length);
+    if(old != null) { 
+      System.arraycopy(old, 0, result, 0, old.length);
+    }
     return result;
   }
 } //BasicTypeSorterBase

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=565628&r1=565627&r2=565628
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon Aug 13 22:27:23
2007
@@ -330,6 +330,9 @@
       }
       
       synchronized (this) {
+        if (keyValBuffer == null) {
+          keyValBuffer = new DataOutputBuffer();
+        }
         //dump the key/value to buffer
         int keyOffset = keyValBuffer.getLength(); 
         key.write(keyValBuffer);
@@ -350,7 +353,9 @@
           totalMem += sortImpl[i].getMemoryUtilized();
         if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
           sortAndSpillToDisk();
-          keyValBuffer.reset();
+          //we don't reuse the keyValBuffer. We want to maintain consistency
+          //in the memory model (for negligible performance loss).
+          keyValBuffer = null;
           for (int i = 0; i < partitions; i++) {
             sortImpl[i].close();
           }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java?view=diff&rev=565628&r1=565627&r2=565628
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java Mon Aug 13 22:27:23
2007
@@ -58,4 +58,12 @@
   public int compare (IntWritable i, IntWritable j) {
     return super.compare(i.get(), j.get());
   }
+  
+  /** Add the extra memory that will be utilized by the sort method */
+  public long getMemoryUtilized() {
+    //this is memory that will be actually utilized (considering the temp
+    //array that will be allocated by the sort() method (mergesort))
+    return super.getMemoryUtilized() + super.count * 4; 
+  }
+
 }



Mime
View raw message