hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r390231 - in /lucene/hadoop/trunk/src/java/org/apache/hadoop: io/WritableUtils.java mapred/CombiningCollector.java
Date Thu, 30 Mar 2006 20:10:38 GMT
Author: cutting
Date: Thu Mar 30 12:10:36 2006
New Revision: 390231

URL: http://svn.apache.org/viewcvs?rev=390231&view=rev
Log:
Fix for HADOOP-2.  The combiner now clones keys and values, so mappers may now safely reuse
emitted keys and values.  Contributed by Owen O'Malley.

Modified:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java?rev=390231&r1=390230&r2=390231&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/WritableUtils.java Thu Mar 30 12:10:36
2006
@@ -17,7 +17,7 @@
 package org.apache.hadoop.io;
 
 import java.io.*;
-
+import org.apache.hadoop.mapred.JobConf;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -189,5 +189,46 @@
     System.out.println();
   }
 
-
+  /**
+   * A pair of input/output buffers that we use to clone writables.
+   */
+  private static class CopyInCopyOutBuffer {
+    DataOutputBuffer outBuffer = new DataOutputBuffer();
+    DataInputBuffer inBuffer = new DataInputBuffer();
+    /**
+     * Move the data from the output buffer to the input buffer.
+     */
+    void moveData() {
+      inBuffer.reset(outBuffer.getData(), outBuffer.getLength());
+    }
+  }
+  
+  /**
+   * Allocate a buffer for each thread that tries to clone objects.
+   */
+  private static ThreadLocal cloneBuffers = new ThreadLocal() {
+    protected synchronized Object initialValue() {
+      return new CopyInCopyOutBuffer();
+    }
+  };
+  
+  /**
+   * Make a copy of a writable object using serialization to a buffer.
+   * @param orig The object to copy
+   * @return The copied object
+   */
+  public static Writable clone(Writable orig, JobConf conf) {
+    try {
+      Writable newInst = (Writable)conf.newInstance(orig.getClass());
+      CopyInCopyOutBuffer buffer = (CopyInCopyOutBuffer)cloneBuffers.get();
+      buffer.outBuffer.reset();
+      orig.write(buffer.outBuffer);
+      buffer.moveData();
+      newInst.readFields(buffer.inBuffer);
+      return newInst;
+    } catch (IOException e) {
+      throw new RuntimeException("Error writing/reading clone buffer", e);
+    }
+  }
+  
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java?rev=390231&r1=390230&r2=390231&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java Thu Mar
30 12:10:36 2006
@@ -51,12 +51,16 @@
 
     // buffer new value in map
     ArrayList values = (ArrayList)keyToValues.get(key);
-    if (values == null) {                         // no values yet for this key
-      values = new ArrayList(1);                  // make a new list
-      values.add(value);                          // add this value
-      keyToValues.put(key, values);               // add to map
+    Writable valueClone = WritableUtils.clone(value, job);
+    if (values == null) {
+      // this is a new key, so create a new list
+      values = new ArrayList(1);
+      values.add(valueClone);
+      Writable keyClone = WritableUtils.clone(key, job);
+      keyToValues.put(keyClone, values);
     } else {
-      values.add(value);                          // other values: just add new
+      // other values for this key, so just add.
+      values.add(valueClone);
     }
 
     count++;



Mime
View raw message