hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r632073 [1/2] - in /hadoop/core/trunk: ./ conf/ src/contrib/data_join/src/examples/org/apache/hadoop/contrib/utils/join/ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/contrib/streaming/src/java/org/apache/hadoop/s...
Date Thu, 28 Feb 2008 17:47:02 GMT
Author: tomwhite
Date: Thu Feb 28 09:46:49 2008
New Revision: 632073

URL: http://svn.apache.org/viewvc?rev=632073&view=rev
Log:
HADOOP-1986.  Add support for a general serialization mechanism for Map Reduce.

Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/io/InputBuffer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/OutputBuffer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/RawComparator.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/
    hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Deserializer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Serialization.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Serializer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/package.html
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/contrib/data_join/src/examples/org/apache/hadoop/contrib/utils/join/SampleDataJoinMapper.java
    hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java
    hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
    hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/ValueCountReduce.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
    hadoop/core/trunk/src/java/org/apache/hadoop/io/WritableComparator.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Mapper.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Partitioner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RecordReader.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RecordWriter.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Reducer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/HashPartitioner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityMapper.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/IdentityReducer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/InverseMapper.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/KeyFieldBasedPartitioner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/LongSumReducer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/NullOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/RegexMapper.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/TokenCountMapper.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/package.html
    hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
    hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestFileSystem.java
    hadoop/core/trunk/src/test/org/apache/hadoop/io/FileBench.java
    hadoop/core/trunk/src/test/org/apache/hadoop/io/TestSequenceFile.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Feb 28 09:46:49 2008
@@ -23,6 +23,9 @@
     HADOOP-1985.  This addresses rack-awareness for Map tasks and for 
     HDFS in a uniform way. (ddas)
 
+    HADOOP-1986.  Add support for a general serialization mechanism for
+    Map Reduce. (tomwhite)
+
   NEW FEATURES
 
     HADOOP-1398.  Add HBase in-memory block cache.  (tomwhite)

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Thu Feb 28 09:46:49 2008
@@ -118,6 +118,13 @@
                for compression/decompression.</description>
 </property>
 
+<property>
+  <name>io.serializations</name>
+  <value>org.apache.hadoop.io.serializer.WritableSerialization</value>
+  <description>A list of serialization classes that can be used for
+  obtaining serializers and deserializers.</description>
+</property>
+
 <!-- file system properties -->
 
 <property>

Modified: hadoop/core/trunk/src/contrib/data_join/src/examples/org/apache/hadoop/contrib/utils/join/SampleDataJoinMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/data_join/src/examples/org/apache/hadoop/contrib/utils/join/SampleDataJoinMapper.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/data_join/src/examples/org/apache/hadoop/contrib/utils/join/SampleDataJoinMapper.java (original)
+++ hadoop/core/trunk/src/contrib/data_join/src/examples/org/apache/hadoop/contrib/utils/join/SampleDataJoinMapper.java Thu Feb 28 09:46:49 2008
@@ -19,7 +19,6 @@
 package org.apache.hadoop.contrib.utils.join;
 
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 
 import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
 import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
@@ -47,7 +46,7 @@
     return new Text(groupKey);
   }
 
-  protected TaggedMapOutput generateTaggedMapOutput(Writable value) {
+  protected TaggedMapOutput generateTaggedMapOutput(Object value) {
     TaggedMapOutput retv = new SampleTaggedMapOutput((Text) value);
     retv.setTag(new Text(this.inputTag));
     return retv;

Modified: hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java (original)
+++ hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java Thu Feb 28 09:46:49 2008
@@ -22,8 +22,6 @@
 import java.util.Iterator;
 
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
@@ -77,7 +75,7 @@
    * @param value
    * @return an object of TaggedMapOutput computed from the given value.
    */
-  protected abstract TaggedMapOutput generateTaggedMapOutput(Writable value);
+  protected abstract TaggedMapOutput generateTaggedMapOutput(Object value);
 
   /**
    * Generate a map output key. The user code can compute the key
@@ -89,7 +87,7 @@
    */
   protected abstract Text generateGroupKey(TaggedMapOutput aRecord);
 
-  public void map(WritableComparable key, Writable value,
+  public void map(Object key, Object value,
                   OutputCollector output, Reporter reporter) throws IOException {
     if (this.reporter == null) {
       this.reporter = reporter;
@@ -115,7 +113,7 @@
     }
   }
 
-  public void reduce(WritableComparable arg0, Iterator arg1,
+  public void reduce(Object arg0, Iterator arg1,
                      OutputCollector arg2, Reporter arg3) throws IOException {
     // TODO Auto-generated method stub
 

Modified: hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java (original)
+++ hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinReducerBase.java Thu Feb 28 09:46:49 2008
@@ -24,8 +24,6 @@
 import java.util.TreeMap;
 
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
@@ -90,7 +88,7 @@
    * @param arg1
    * @return
    */
-  private SortedMap<Object, ResetableIterator> regroup(Writable key,
+  private SortedMap<Object, ResetableIterator> regroup(Object key,
                                                        Iterator arg1, Reporter reporter) throws IOException {
     this.numOfValues = 0;
     SortedMap<Object, ResetableIterator> retv = new TreeMap<Object, ResetableIterator>();
@@ -121,7 +119,7 @@
     return retv;
   }
 
-  public void reduce(WritableComparable key, Iterator values,
+  public void reduce(Object key, Iterator values,
                      OutputCollector output, Reporter reporter) throws IOException {
     if (this.reporter == null) {
       this.reporter = reporter;
@@ -150,7 +148,7 @@
    * @param reporter
    * @throws IOException
    */
-  protected void collect(WritableComparable key, TaggedMapOutput aRecord,
+  protected void collect(Object key, TaggedMapOutput aRecord,
                          OutputCollector output, Reporter reporter) throws IOException {
     this.collected += 1;
     addLongValue("collectedCount", 1);
@@ -173,7 +171,7 @@
    * @throws IOException
    */
   private void joinAndCollect(Object[] tags, ResetableIterator[] values,
-                              WritableComparable key, OutputCollector output, Reporter reporter)
+                              Object key, OutputCollector output, Reporter reporter)
     throws IOException {
     if (values.length < 1) {
       return;
@@ -198,7 +196,7 @@
    * @throws IOException
    */
   private void joinAndCollect(Object[] tags, ResetableIterator[] values,
-                              int pos, Object[] partialList, WritableComparable key,
+                              int pos, Object[] partialList, Object key,
                               OutputCollector output, Reporter reporter) throws IOException {
 
     if (values.length == pos) {
@@ -230,7 +228,7 @@
    */
   protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);
 
-  public void map(WritableComparable arg0, Writable arg1, OutputCollector arg2,
+  public void map(Object arg0, Object arg1, OutputCollector arg2,
                   Reporter arg3) throws IOException {
     // TODO Auto-generated method stub
 

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java Thu Feb 28 09:46:49 2008
@@ -497,11 +497,11 @@
   }
 
   /**
-   * Write a writable value to the output stream using UTF-8 encoding
+   * Write a value to the output stream using UTF-8 encoding
    * @param value output value
    * @throws IOException
    */
-  void write(Writable value) throws IOException {
+  void write(Object value) throws IOException {
     byte[] bval;
     int valSize;
     if (value instanceof BytesWritable) {

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java Thu Feb 28 09:46:49 2008
@@ -28,10 +28,6 @@
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.StringUtils;
 
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.Writable;
-
 /** A generic Mapper bridge.
  *  It delegates operations to an external program via stdin and stdout.
  */
@@ -66,7 +62,7 @@
   // Do NOT declare default constructor
   // (MapRed creates it reflectively)
 
-  public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+  public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException {
     // init
     if (outThread_ == null) {
       startOutputThreads(output, reporter);

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java Thu Feb 28 09:46:49 2008
@@ -29,7 +29,6 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.StringUtils;
 
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.Writable;
 
 /** A generic Reducer bridge.
@@ -56,7 +55,7 @@
     return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
   }
 
-  public void reduce(WritableComparable key, Iterator values, OutputCollector output,
+  public void reduce(Object key, Iterator values, OutputCollector output,
                      Reporter reporter) throws IOException {
 
     // init

Modified: hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/ValueCountReduce.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/ValueCountReduce.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/ValueCountReduce.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/ValueCountReduce.java Thu Feb 28 09:46:49 2008
@@ -41,7 +41,7 @@
 
   }
 
-  public void reduce(WritableComparable arg0, Iterator arg1, OutputCollector arg2, Reporter arg3) throws IOException {
+  public void reduce(Object arg0, Iterator arg1, OutputCollector arg2, Reporter arg3) throws IOException {
     int count = 0;
     while (arg1.hasNext()) {
       count += 1;

Added: hadoop/core/trunk/src/java/org/apache/hadoop/io/InputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/InputBuffer.java?rev=632073&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/InputBuffer.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/InputBuffer.java Thu Feb 28 09:46:49 2008
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.*;
+
+
+/** A reusable {@link InputStream} implementation that reads from an in-memory
+ * buffer.
+ *
+ * <p>This saves memory over creating a new InputStream and
+ * ByteArrayInputStream each time data is read.
+ *
+ * <p>Typical usage is something like the following:<pre>
+ *
+ * InputBuffer buffer = new InputBuffer();
+ * while (... loop condition ...) {
+ *   byte[] data = ... get data ...;
+ *   int dataLength = ... get data length ...;
+ *   buffer.reset(data, dataLength);
+ *   ... read buffer using InputStream methods ...
+ * }
+ * </pre>
+ * @see DataInputBuffer
+ * @see DataOutput
+ */
+public class InputBuffer extends FilterInputStream {
+
+  private static class Buffer extends ByteArrayInputStream {
+    public Buffer() {
+      super(new byte[] {});
+    }
+
+    public void reset(byte[] input, int start, int length) {
+      this.buf = input;
+      this.count = start+length;
+      this.mark = start;
+      this.pos = start;
+    }
+
+    public int getPosition() { return pos; }
+    public int getLength() { return count; }
+  }
+
+  private Buffer buffer;
+  
+  /** Constructs a new empty buffer. */
+  public InputBuffer() {
+    this(new Buffer());
+  }
+
+  private InputBuffer(Buffer buffer) {
+    super(buffer);
+    this.buffer = buffer;
+  }
+
+  /** Resets the data that the buffer reads. */
+  public void reset(byte[] input, int length) {
+    buffer.reset(input, 0, length);
+  }
+
+  /** Resets the data that the buffer reads. */
+  public void reset(byte[] input, int start, int length) {
+    buffer.reset(input, start, length);
+  }
+
+  /** Returns the current position in the input. */
+  public int getPosition() { return buffer.getPosition(); }
+
+  /** Returns the length of the input. */
+  public int getLength() { return buffer.getLength(); }
+
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/io/OutputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/OutputBuffer.java?rev=632073&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/OutputBuffer.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/OutputBuffer.java Thu Feb 28 09:46:49 2008
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.*;
+
+/** A reusable {@link OutputStream} implementation that writes to an in-memory
+ * buffer.
+ *
+ * <p>This saves memory over creating a new OutputStream and
+ * ByteArrayOutputStream each time data is written.
+ *
+ * <p>Typical usage is something like the following:<pre>
+ *
+ * OutputBuffer buffer = new OutputBuffer();
+ * while (... loop condition ...) {
+ *   buffer.reset();
+ *   ... write buffer using OutputStream methods ...
+ *   byte[] data = buffer.getData();
+ *   int dataLength = buffer.getLength();
+ *   ... write data to its ultimate destination ...
+ * }
+ * </pre>
+ * @see DataOutputBuffer
+ * @see InputBuffer
+ */
+public class OutputBuffer extends FilterOutputStream {
+
+  private static class Buffer extends ByteArrayOutputStream {
+    public byte[] getData() { return buf; }
+    public int getLength() { return count; }
+    public void reset() { count = 0; }
+
+    public void write(InputStream in, int len) throws IOException {
+      int newcount = count + len;
+      if (newcount > buf.length) {
+        byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
+        System.arraycopy(buf, 0, newbuf, 0, count);
+        buf = newbuf;
+      }
+      IOUtils.readFully(in, buf, count, len);
+      count = newcount;
+    }
+  }
+
+  private Buffer buffer;
+  
+  /** Constructs a new empty buffer. */
+  public OutputBuffer() {
+    this(new Buffer());
+  }
+  
+  private OutputBuffer(Buffer buffer) {
+    super(buffer);
+    this.buffer = buffer;
+  }
+
+  /** Returns the current contents of the buffer.
+   *  Data is only valid to {@link #getLength()}.
+   */
+  public byte[] getData() { return buffer.getData(); }
+
+  /** Returns the length of the valid data currently in the buffer. */
+  public int getLength() { return buffer.getLength(); }
+
+  /** Resets the buffer to empty. */
+  public OutputBuffer reset() {
+    buffer.reset();
+    return this;
+  }
+
+  /** Writes bytes from a InputStream directly into the buffer. */
+  public void write(InputStream in, int length) throws IOException {
+    buffer.write(in, length);
+  }
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/io/RawComparator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/RawComparator.java?rev=632073&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/RawComparator.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/RawComparator.java Thu Feb 28 09:46:49 2008
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.io.serializer.DeserializerComparator;
+
+/**
+ * <p>
+ * A {@link Comparator} that operates directly on byte representations of
+ * objects.
+ * </p>
+ * @param <T>
+ * @see DeserializerComparator
+ */
+public interface RawComparator<T> extends Comparator<T> {
+
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
+
+}

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Thu Feb 28 09:46:49 2008
@@ -32,6 +32,8 @@
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.io.compress.zlib.ZlibFactory;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progress;
@@ -780,6 +782,10 @@
     Metadata metadata = null;
     Compressor compressor = null;
     
+    private Serializer keySerializer;
+    private Serializer uncompressedValSerializer;
+    private Serializer compressedValSerializer;
+    
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
     // starts and ends by scanning for this value.
@@ -876,6 +882,7 @@
     }
     
     /** Initialize. */
+    @SuppressWarnings("unchecked")
     void init(Path name, Configuration conf, FSDataOutputStream out,
               Class keyClass, Class valClass,
               boolean compress, CompressionCodec codec, Metadata metadata) 
@@ -887,6 +894,11 @@
       this.compress = compress;
       this.codec = codec;
       this.metadata = metadata;
+      SerializationFactory serializationFactory = new SerializationFactory(conf);
+      this.keySerializer = serializationFactory.getSerializer(keyClass);
+      this.keySerializer.open(buffer);
+      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
+      this.uncompressedValSerializer.open(buffer);
       if (this.codec != null) {
         ReflectionUtils.setConf(this.codec, this.conf);
         compressor = compressorPool.getCodec(this.codec.getCompressorType());
@@ -896,6 +908,8 @@
         this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
         this.deflateOut = 
           new DataOutputStream(new BufferedOutputStream(deflateFilter));
+        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
+        this.compressedValSerializer.open(deflateOut);
       }
     }
     
@@ -923,6 +937,12 @@
     /** Close the file. */
     public synchronized void close() throws IOException {
       compressorPool.returnCodec(compressor);
+      
+      keySerializer.close();
+      uncompressedValSerializer.close();
+      if (compressedValSerializer != null) {
+        compressedValSerializer.close();
+      }
 
       if (out != null) {
         out.flush();
@@ -945,6 +965,13 @@
     /** Append a key/value pair. */
     public synchronized void append(Writable key, Writable val)
       throws IOException {
+      append((Object) key, (Object) val);
+    }
+
+    /** Append a key/value pair. */
+    @SuppressWarnings("unchecked")
+    public synchronized void append(Object key, Object val)
+      throws IOException {
       if (key.getClass() != keyClass)
         throw new IOException("wrong key class: "+key.getClass().getName()
                               +" is not "+keyClass);
@@ -955,7 +982,7 @@
       buffer.reset();
 
       // Append the 'key'
-      key.write(buffer);
+      keySerializer.serialize(key);
       int keyLength = buffer.getLength();
       if (keyLength == 0)
         throw new IOException("zero length keys not allowed: " + key);
@@ -963,11 +990,11 @@
       // Append the 'value'
       if (compress) {
         deflateFilter.resetState();
-        val.write(deflateOut);
+        compressedValSerializer.serialize(val);
         deflateOut.flush();
         deflateFilter.finish();
       } else {
-        val.write(buffer);
+        uncompressedValSerializer.serialize(val);
       }
 
       // Write the record out
@@ -2107,7 +2134,7 @@
    */
   public static class Sorter {
 
-    private WritableComparator comparator;
+    private RawComparator comparator;
 
     private MergeSort mergeSort; //the implementation of merge sort
     
@@ -2129,15 +2156,15 @@
 
     /** Sort and merge files containing the named classes. */
     public Sorter(FileSystem fs, Class keyClass, Class valClass, Configuration conf)  {
-      this(fs, new WritableComparator(keyClass), valClass, conf);
+      this(fs, new WritableComparator(keyClass), keyClass, valClass, conf);
     }
 
-    /** Sort and merge using an arbitrary {@link WritableComparator}. */
-    public Sorter(FileSystem fs, WritableComparator comparator, Class valClass, 
-                  Configuration conf) {
+    /** Sort and merge using an arbitrary {@link RawComparator}. */
+    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
+                  Class valClass, Configuration conf) {
       this.fs = fs;
       this.comparator = comparator;
-      this.keyClass = comparator.getKeyClass();
+      this.keyClass = keyClass;
       this.valClass = valClass;
       this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
       this.factor = conf.getInt("io.sort.factor", 100);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/WritableComparator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/WritableComparator.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/WritableComparator.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/WritableComparator.java Thu Feb 28 09:46:49 2008
@@ -30,7 +30,7 @@
  * {@link #compare(byte[],int,int,byte[],int,int)}.  Static utility methods are
  * provided to assist in optimized implementations of this method.
  */
-public class WritableComparator implements Comparator {
+public class WritableComparator implements RawComparator {
 
   private static HashMap<Class, WritableComparator> comparators =
     new HashMap<Class, WritableComparator>(); // registry

Added: hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Deserializer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Deserializer.java?rev=632073&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Deserializer.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Deserializer.java Thu Feb 28 09:46:49 2008
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * <p>
+ * Provides a facility for deserializing objects of type <T> from an
+ * {@link InputStream}.
+ * </p>
+ * 
+ * <p>
+ * Deserializers are stateful, but must not buffer the input since
+ * other producers may read from the input between calls to
+ * {@link #deserialize(Object)}.
+ * </p>
+ * @param <T>
+ */
+public interface Deserializer<T> {
+  /**
+   * <p>Prepare the deserializer for reading.</p>
+   */
+  void open(InputStream in) throws IOException;
+  
+  /**
+   * <p>
+   * Deserialize the next object from the underlying input stream.
+   * If the object <code>t</code> is non-null then this deserializer
+   * <i>may</i> set its internal state to the next object read from the input
+   * stream. Otherwise, if the object <code>t</code> is null a new
+   * deserialized object will be created.
+   * </p>
+   * @return the deserialized object
+   */
+  T deserialize(T t) throws IOException;
+  
+  /**
+   * <p>Close the underlying input stream and clear up any resources.</p>
+   */
+  void close() throws IOException;
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java?rev=632073&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/DeserializerComparator.java Thu Feb 28 09:46:49 2008
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.hadoop.io.InputBuffer;
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * <p>
+ * A {@link RawComparator} that uses a {@link Deserializer} to deserialize
+ * the objects to be compared so that the standard {@link Comparator} can
+ * be used to compare them.
+ * </p>
+ * <p>
+ * One may optimize compare-intensive operations by using a custom
+ * implementation of {@link RawComparator} that operates directly
+ * on byte representations.
+ * </p>
+ * @param <T>
+ */
+public abstract class DeserializerComparator<T> implements RawComparator<T> {
+  
+  private InputBuffer buffer = new InputBuffer();
+  private Deserializer<T> deserializer;
+  
+  private T key1;
+  private T key2;
+
+  protected DeserializerComparator(Deserializer<T> deserializer)
+    throws IOException {
+    
+    this.deserializer = deserializer;
+    this.deserializer.open(buffer);
+  }
+
+  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+    try {
+      
+      buffer.reset(b1, s1, l1);
+      key1 = deserializer.deserialize(key1);
+      
+      buffer.reset(b2, s2, l2);
+      key2 = deserializer.deserialize(key2);
+      
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return compare(key1, key2);
+  }
+
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java?rev=632073&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/JavaSerialization.java Thu Feb 28 09:46:49 2008
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * <p>
+ * An experimental {@link Serialization} for Java {@link Serializable} classes.
+ * </p>
+ * @see JavaSerializationComparator
+ */
+public class JavaSerialization implements Serialization<Serializable> {
+  
+  static class JavaSerializationDeserializer<T extends Serializable>
+    implements Deserializer<T> {
+
+    private ObjectInputStream ois;
+
+    public void open(InputStream in) throws IOException {
+      ois = new ObjectInputStream(in) {
+        @Override protected void readStreamHeader() {
+          // no header
+        }
+      };
+    }
+    
+    @SuppressWarnings("unchecked")
+    public T deserialize(T object) throws IOException {
+      try {
+        // ignore passed-in object
+        return (T) ois.readObject();
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e.toString());
+      }
+    }
+
+    public void close() throws IOException {
+      ois.close();
+    }
+
+  }
+  
+  static class JavaSerializationSerializer
+    implements Serializer<Serializable> {
+
+    private ObjectOutputStream oos;
+
+    public void open(OutputStream out) throws IOException {
+      oos = new ObjectOutputStream(out) {
+        @Override protected void writeStreamHeader() {
+          // no header
+        }
+      };
+    }
+
+    public void serialize(Serializable object) throws IOException {
+      oos.writeObject(object);
+    }
+
+    public void close() throws IOException {
+      oos.close();
+    }
+
+  }
+
+  public boolean accept(Class<?> c) {
+    return Serializable.class.isAssignableFrom(c);
+  }
+
+  public Deserializer<Serializable> getDeserializer(Class<Serializable> c) {
+    return new JavaSerializationDeserializer<Serializable>();
+  }
+
+  public Serializer<Serializable> getSerializer(Class<Serializable> c) {
+    return new JavaSerializationSerializer();
+  }
+
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java?rev=632073&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/JavaSerializationComparator.java Thu Feb 28 09:46:49 2008
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * <p>
+ * A {@link RawComparator} that uses a {@link JavaSerialization}
+ * {@link Deserializer} to deserialize objects that are then compared via
+ * their {@link Comparable} interfaces.
+ * </p>
+ * @param <T>
+ * @see JavaSerialization
+ */
+public class JavaSerializationComparator<T extends Serializable&Comparable<T>>
+  extends DeserializerComparator<T> {
+
+  public JavaSerializationComparator() throws IOException {
+    super(new JavaSerialization.JavaSerializationDeserializer<T>());
+  }
+
+  public int compare(T o1, T o2) {
+    return o1.compareTo(o2);
+  }
+
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Serialization.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Serialization.java?rev=632073&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Serialization.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Serialization.java Thu Feb 28 09:46:49 2008
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+/**
+ * <p>
+ * Encapsulates a {@link Serializer}/{@link Deserializer} pair.
+ * </p>
+ * @param <T>
+ */
+public interface Serialization<T> {
+  
+  /**
+   * Allows clients to test whether this {@link Serialization}
+   * supports the given class.
+   */
+  boolean accept(Class<?> c);
+  
+  /**
+   * @return a {@link Serializer} for the given class.
+   */
+  Serializer<T> getSerializer(Class<T> c);
+
+  /**
+   * @return a {@link Deserializer} for the given class.
+   */
+  Deserializer<T> getDeserializer(Class<T> c);
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java?rev=632073&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/SerializationFactory.java Thu Feb 28 09:46:49 2008
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * <p>
+ * A factory for {@link Serialization}s.
+ * </p>
+ */
+public class SerializationFactory extends Configured {
+  
+  private static final Log LOG =
+    LogFactory.getLog(SerializationFactory.class.getName());
+
+  private List<Serialization<?>> serializations = new ArrayList<Serialization<?>>();
+  
+  /**
+   * <p>
+   * Serializations are found by reading the <code>io.serializations</code>
+   * property from <code>conf</code>, which is a comma-delimited list of
+   * classnames. 
+   * </p>
+   */
+  public SerializationFactory(Configuration conf) {
+    super(conf);
+    for (String serializerName : conf.getStrings("io.serializations")) {
+      add(serializerName);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private void add(String serializationName) {
+    try {
+      
+      Class<? extends Serialization> serializionClass =
+        (Class<? extends Serialization>) Class.forName(serializationName);
+      serializations.add((Serialization)
+          ReflectionUtils.newInstance(serializionClass, getConf()));
+    } catch (ClassNotFoundException e) {
+      LOG.warn("Serilization class not found: " +
+          StringUtils.stringifyException(e));
+    }
+  }
+
+  public <T> Serializer<T> getSerializer(Class<T> c) {
+    return getSerialization(c).getSerializer(c);
+  }
+
+  public <T> Deserializer<T> getDeserializer(Class<T> c) {
+    return getSerialization(c).getDeserializer(c);
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> Serialization<T> getSerialization(Class<T> c) {
+    for (Serialization serialization : serializations) {
+      if (serialization.accept(c)) {
+        return (Serialization<T>) serialization;
+      }
+    }
+    return null;
+  }
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Serializer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Serializer.java?rev=632073&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Serializer.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/Serializer.java Thu Feb 28 09:46:49 2008
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * <p>
+ * Provides a facility for serializing objects of type <T> to an
+ * {@link OutputStream}.
+ * </p>
+ * 
+ * <p>
+ * Serializers are stateful, but must not buffer the output since
+ * other producers may write to the output between calls to
+ * {@link #serialize(Object)}.
+ * </p>
+ * @param <T>
+ */
+public interface Serializer<T> {
+  /**
+   * <p>Prepare the serializer for writing.</p>
+   */
+  void open(OutputStream out) throws IOException;
+  
+  /**
+   * <p>Serialize <code>t</code> to the underlying output stream.</p>
+   */
+  void serialize(T t) throws IOException;
+  
+  /**
+   * <p>Close the underlying output stream and clear up any resources.</p>
+   */  
+  void close() throws IOException;
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java?rev=632073&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/WritableSerialization.java Thu Feb 28 09:46:49 2008
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.serializer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A {@link Serialization} for {@link Writable}s that delegates to
+ * {@link Writable#write(java.io.DataOutput)} and
+ * {@link Writable#readFields(java.io.DataInput)}.
+ */
+public class WritableSerialization implements Serialization<Writable> {
+  
+  static class WritableDeserializer implements Deserializer<Writable> {
+
+    private Class<?> writableClass;
+    private DataInputStream dataIn;
+    
+    public WritableDeserializer(Class<?> c) {
+      this.writableClass = c;
+    }
+    
+    public void open(InputStream in) {
+      if (in instanceof DataInputStream) {
+        dataIn = (DataInputStream) in;
+      } else {
+        dataIn = new DataInputStream(in);
+      }
+    }
+    
+    public Writable deserialize(Writable w) throws IOException {
+      Writable writable;
+      if (w == null) {
+        writable = (Writable) ReflectionUtils.newInstance(writableClass, null);
+      } else {
+        writable = w;
+      }
+      writable.readFields(dataIn);
+      return writable;
+    }
+
+    public void close() throws IOException {
+      dataIn.close();
+    }
+    
+  }
+  
+  static class WritableSerializer implements Serializer<Writable> {
+
+    private DataOutputStream dataOut;
+    
+    public void open(OutputStream out) {
+      if (out instanceof DataOutputStream) {
+        dataOut = (DataOutputStream) out;
+      } else {
+        dataOut = new DataOutputStream(out);
+      }
+    }
+
+    public void serialize(Writable w) throws IOException {
+      w.write(dataOut);
+    }
+
+    public void close() throws IOException {
+      dataOut.close();
+    }
+
+  }
+
+  public boolean accept(Class<?> c) {
+    return Writable.class.isAssignableFrom(c);
+  }
+
+  public Deserializer<Writable> getDeserializer(Class<Writable> c) {
+    return new WritableDeserializer(c);
+  }
+
+  public Serializer<Writable> getSerializer(Class<Writable> c) {
+    return new WritableSerializer();
+  }
+
+}

Added: hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/package.html
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/package.html?rev=632073&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/package.html (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/serializer/package.html Thu Feb 28 09:46:49 2008
@@ -0,0 +1,37 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+
+<p>
+This package provides a mechanism for using different serialization frameworks
+in Hadoop. The property "io.serializations" defines a list of
+{@link org.apache.hadoop.io.serializer.Serialization}s that know how to create
+{@link org.apache.hadoop.io.serializer.Serializer}s and
+{@link org.apache.hadoop.io.serializer.Deserializer}s.
+</p>
+
+<p>
+To add a new serialization framework write an implementation of
+{@link org.apache.hadoop.io.serializer.Serialization} and add its name to the
+"io.serializations" property.
+</p>
+
+</body>
+</html>

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java Thu Feb 28 09:46:49 2008
@@ -21,8 +21,9 @@
 import java.io.IOException;
 
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.OutputBuffer;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile.ValueBytes;
-import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
@@ -33,7 +34,7 @@
  */
 abstract class BasicTypeSorterBase implements BufferSorter {
   
-  protected DataOutputBuffer keyValBuffer; //the buffer used for storing
+  protected OutputBuffer keyValBuffer; //the buffer used for storing
                                            //key/values
   protected int[] startOffsets; //the array used to store the start offsets of
                                 //keys in keyValBuffer
@@ -43,7 +44,7 @@
   protected int[] pointers; //the array of startOffsets's indices. This will
                             //be sorted at the end to contain a sorted array of
                             //indices to offsets
-  protected WritableComparator comparator; //the comparator for the map output
+  protected RawComparator comparator; //the comparator for the map output
   protected int count; //the number of key/values
   //the overhead of the arrays in memory 
   //12 => 4 for keyoffsets, 4 for keylengths, 4 for valueLengths, and
@@ -90,7 +91,7 @@
     count++;
   }
 
-  public void setInputBuffer(DataOutputBuffer buffer) {
+  public void setInputBuffer(OutputBuffer buffer) {
     //store a reference to the keyValBuffer that we need to read during sort
     this.keyValBuffer = buffer;
   }
@@ -159,11 +160,11 @@
   private int[] valLengths;
   private int currStartOffsetIndex;
   private int currIndexInPointers;
-  private DataOutputBuffer keyValBuffer;
+  private OutputBuffer keyValBuffer;
   private DataOutputBuffer key = new DataOutputBuffer();
   private InMemUncompressedBytes value = new InMemUncompressedBytes();
   
-  public MRSortResultIterator(DataOutputBuffer keyValBuffer, 
+  public MRSortResultIterator(OutputBuffer keyValBuffer, 
                               int []pointers, int []startOffsets,
                               int []keyLengths, int []valLengths) {
     this.count = pointers.length;
@@ -214,7 +215,7 @@
     private byte[] data;
     int start;
     int dataSize;
-    private void reset(DataOutputBuffer d, int start, int length) 
+    private void reset(OutputBuffer d, int start, int length) 
       throws IOException {
       data = d.getData();
       this.start = start;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java Thu Feb 28 09:46:49 2008
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.OutputBuffer;
 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
 import org.apache.hadoop.util.Progressable;
 
@@ -56,7 +56,7 @@
    * buffer).
    * @param buffer the map output buffer
    */
-  public void setInputBuffer(DataOutputBuffer buffer);
+  public void setInputBuffer(OutputBuffer buffer);
   
   /** The framework invokes this method to get the memory consumed so far
    * by an implementation of this interface.

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java Thu Feb 28 09:46:49 2008
@@ -28,8 +28,6 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 
 /** 
  * A base class for file-based {@link InputFormat}.
@@ -41,9 +39,7 @@
  * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
  * not split-up and are processed as a whole by {@link Mapper}s.
  */
-public abstract class FileInputFormat<K extends WritableComparable,
-                                      V extends Writable>
-  implements InputFormat<K, V> {
+public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
 
   public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.mapred.FileInputFormat");

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java Thu Feb 28 09:46:49 2008
@@ -21,8 +21,6 @@
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 
 /** 
  * <code>InputFormat</code> describes the input-specification for a 
@@ -64,8 +62,7 @@
  * @see JobClient
  * @see FileInputFormat
  */
-public interface InputFormat<K extends WritableComparable,
-                             V extends Writable> {
+public interface InputFormat<K, V> {
 
   /**
    * Check for validity of the input-specification for the job. 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu Feb 28 09:46:49 2008
@@ -562,9 +562,8 @@
    *  
    * @return the map output key class.
    */
-  public Class<? extends WritableComparable> getMapOutputKeyClass() {
-    Class<? extends WritableComparable> retv = getClass("mapred.mapoutput.key.class", null,
-			  WritableComparable.class);
+  public Class<?> getMapOutputKeyClass() {
+    Class<?> retv = getClass("mapred.mapoutput.key.class", null, Object.class);
     if (retv == null) {
       retv = getOutputKeyClass();
     }
@@ -578,9 +577,8 @@
    * 
    * @param theClass the map output key class.
    */
-  public void setMapOutputKeyClass(Class<? extends WritableComparable> theClass) {
-    setClass("mapred.mapoutput.key.class", theClass,
-             WritableComparable.class);
+  public void setMapOutputKeyClass(Class<?> theClass) {
+    setClass("mapred.mapoutput.key.class", theClass, Object.class);
   }
   
   /**
@@ -590,9 +588,9 @@
    *  
    * @return the map output value class.
    */
-  public Class<? extends Writable> getMapOutputValueClass() {
-    Class<? extends Writable> retv = getClass("mapred.mapoutput.value.class", null,
-			  Writable.class);
+  public Class<?> getMapOutputValueClass() {
+    Class<?> retv = getClass("mapred.mapoutput.value.class", null,
+        Object.class);
     if (retv == null) {
       retv = getOutputValueClass();
     }
@@ -606,8 +604,8 @@
    * 
    * @param theClass the map output value class.
    */
-  public void setMapOutputValueClass(Class<? extends Writable> theClass) {
-    setClass("mapred.mapoutput.value.class", theClass, Writable.class);
+  public void setMapOutputValueClass(Class<?> theClass) {
+    setClass("mapred.mapoutput.value.class", theClass, Object.class);
   }
   
   /**
@@ -615,9 +613,9 @@
    * 
    * @return the key class for the job output data.
    */
-  public Class<? extends WritableComparable> getOutputKeyClass() {
+  public Class<?> getOutputKeyClass() {
     return getClass("mapred.output.key.class",
-                    LongWritable.class, WritableComparable.class);
+                    LongWritable.class, Object.class);
   }
   
   /**
@@ -625,33 +623,33 @@
    * 
    * @param theClass the key class for the job output data.
    */
-  public void setOutputKeyClass(Class<? extends WritableComparable> theClass) {
-    setClass("mapred.output.key.class", theClass, WritableComparable.class);
+  public void setOutputKeyClass(Class<?> theClass) {
+    setClass("mapred.output.key.class", theClass, Object.class);
   }
 
   /**
-   * Get the {@link WritableComparable} comparator used to compare keys.
+   * Get the {@link RawComparator} comparator used to compare keys.
    * 
-   * @return the {@link WritableComparable} comparator used to compare keys.
+   * @return the {@link RawComparator} comparator used to compare keys.
    */
-  public WritableComparator getOutputKeyComparator() {
+  public RawComparator getOutputKeyComparator() {
     Class theClass = getClass("mapred.output.key.comparator.class", null,
-                              WritableComparator.class);
+    		RawComparator.class);
     if (theClass != null)
-      return (WritableComparator)ReflectionUtils.newInstance(theClass, this);
+      return (RawComparator)ReflectionUtils.newInstance(theClass, this);
     return WritableComparator.get(getMapOutputKeyClass());
   }
 
   /**
-   * Set the {@link WritableComparable} comparator used to compare keys.
+   * Set the {@link RawComparator} comparator used to compare keys.
    * 
-   * @param theClass the {@link WritableComparable} comparator used to 
+   * @param theClass the {@link RawComparator} comparator used to 
    *                 compare keys.
    * @see #setOutputValueGroupingComparator(Class)                 
    */
-  public void setOutputKeyComparatorClass(Class<? extends WritableComparator> theClass) {
+  public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
     setClass("mapred.output.key.comparator.class",
-             theClass, WritableComparator.class);
+             theClass, RawComparator.class);
   }
 
   /** 
@@ -661,24 +659,24 @@
    * @return comparator set by the user for grouping values.
    * @see #setOutputValueGroupingComparator(Class) for details.  
    */
-  public WritableComparator getOutputValueGroupingComparator() {
+  public RawComparator getOutputValueGroupingComparator() {
     Class theClass = getClass("mapred.output.value.groupfn.class", null,
-                              WritableComparator.class);
+        RawComparator.class);
     if (theClass == null) {
       return getOutputKeyComparator();
     }
     
-    return (WritableComparator)ReflectionUtils.newInstance(theClass, this);
+    return (RawComparator)ReflectionUtils.newInstance(theClass, this);
   }
 
   /** 
-   * Set the user defined {@link WritableComparable} comparator for 
+   * Set the user defined {@link RawComparator} comparator for 
    * grouping keys in the input to the reduce.
    * 
    * <p>This comparator should be provided if the equivalence rules for keys
    * for sorting the intermediates are different from those for grouping keys
    * before each call to 
-   * {@link Reducer#reduce(WritableComparable, java.util.Iterator, OutputCollector, Reporter)}.</p>
+   * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
    *  
    * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
    * in a single call to the reduce function if K1 and K2 compare as equal.</p>
@@ -693,12 +691,13 @@
    * that much sense.)</p>
    * 
    * @param theClass the comparator class to be used for grouping keys. 
-   *                 It should extend <code>WritableComparator</code>.
+   *                 It should implement <code>RawComparator</code>.
    * @see #setOutputKeyComparatorClass(Class)                 
    */
-  public void setOutputValueGroupingComparator(Class theClass) {
+  public void setOutputValueGroupingComparator(
+		  Class<? extends RawComparator> theClass) {
     setClass("mapred.output.value.groupfn.class",
-             theClass, WritableComparator.class);
+             theClass, RawComparator.class);
   }
 
   /**
@@ -706,8 +705,8 @@
    * 
    * @return the value class for job outputs.
    */
-  public Class<? extends Writable> getOutputValueClass() {
-    return getClass("mapred.output.value.class", Text.class, Writable.class);
+  public Class<?> getOutputValueClass() {
+    return getClass("mapred.output.value.class", Text.class, Object.class);
   }
   
   /**
@@ -715,8 +714,8 @@
    * 
    * @param theClass the value class for job outputs.
    */
-  public void setOutputValueClass(Class<? extends Writable> theClass) {
-    setClass("mapred.output.value.class", theClass, Writable.class);
+  public void setOutputValueClass(Class<?> theClass) {
+    setClass("mapred.output.value.class", theClass, Object.class);
   }
 
   /**

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Thu Feb 28 09:46:49 2008
@@ -69,7 +69,7 @@
                          compressionType, codec,
                          progress);
 
-    return new RecordWriter() {
+    return new RecordWriter<WritableComparable, Writable>() {
 
         public void write(WritableComparable key, Writable value)
           throws IOException {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapRunnable.java Thu Feb 28 09:46:49 2008
@@ -20,9 +20,6 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
 /**
  * Expert: Generic interface for {@link Mapper}s.
  * 
@@ -31,8 +28,7 @@
  * 
  * @see Mapper
  */
-public interface MapRunnable<K1 extends WritableComparable, V1 extends Writable,
-                             K2 extends WritableComparable, V2 extends Writable>
+public interface MapRunnable<K1, V1, K2, V2>
     extends JobConfigurable {
   
   /** 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapRunner.java Thu Feb 28 09:46:49 2008
@@ -20,13 +20,10 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /** Default {@link MapRunnable} implementation.*/
-public class MapRunner<K1 extends WritableComparable, V1 extends Writable,
-                       K2 extends WritableComparable, V2 extends Writable>
+public class MapRunner<K1, V1, K2, V2>
     implements MapRunnable<K1, V1, K2, V2> {
   
   private Mapper<K1, V1, K2, V2> mapper;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Feb 28 09:46:49 2008
@@ -35,11 +35,11 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.InputBuffer;
+import org.apache.hadoop.io.OutputBuffer;
+import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Sorter;
 import org.apache.hadoop.io.SequenceFile.Writer;
@@ -47,6 +47,9 @@
 import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -119,7 +122,7 @@
    * @param <K>
    * @param <V>
    */
-  class TrackedRecordReader<K extends WritableComparable, V extends Writable> 
+  class TrackedRecordReader<K, V> 
       implements RecordReader<K,V> {
     private RecordReader<K,V> rawIn;
     private Counters.Counter inputByteCounter;
@@ -215,8 +218,7 @@
     done(umbilical);
   }
 
-  interface MapOutputCollector<K extends WritableComparable,
-                               V extends Writable>
+  interface MapOutputCollector<K, V>
     extends OutputCollector<K, V> {
 
     public void close() throws IOException;
@@ -225,8 +227,7 @@
         
   }
 
-  class DirectMapOutputCollector<K extends WritableComparable,
-                                 V extends Writable>
+  class DirectMapOutputCollector<K, V>
     implements MapOutputCollector<K, V> {
  
     private RecordWriter<K, V> out = null;
@@ -268,11 +269,11 @@
     private JobConf job;
     private Reporter reporter;
 
-    private DataOutputBuffer keyValBuffer; //the buffer where key/val will
-                                           //be stored before they are 
-                                           //passed on to the pending buffer
-    private DataOutputBuffer pendingKeyvalBuffer; // the key value buffer used
-                                                  // while spilling
+    private OutputBuffer keyValBuffer; //the buffer where key/val will
+                                       //be stored before they are 
+                                       //passed on to the pending buffer
+    private OutputBuffer pendingKeyvalBuffer; // the key value buffer used
+                                              // while spilling
     // a lock used for sync sort-spill with collect
     private final Object pendingKeyvalBufferLock = new Object();
     // since sort-spill and collect are done concurrently, exceptions are 
@@ -287,7 +288,14 @@
     private CompressionType compressionType;
     private Class keyClass;
     private Class valClass;
-    private WritableComparator comparator;
+    private RawComparator comparator;
+    private SerializationFactory serializationFactory;
+    private Serializer keySerializer;
+    private Serializer valSerializer;
+    private InputBuffer keyIn = new InputBuffer();
+    private InputBuffer valIn = new InputBuffer();
+    private Deserializer keyDeserializer;
+    private Deserializer valDeserializer;    
     private BufferSorter []sortImpl;
     private BufferSorter []pendingSortImpl; // sort impl for the pending buffer
     private SequenceFile.Writer writer;
@@ -299,6 +307,7 @@
     private Counters.Counter combineInputCounter;
     private Counters.Counter combineOutputCounter;
     
+    @SuppressWarnings("unchecked")
     public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, 
                            Reporter reporter) throws IOException {
       this.partitions = job.getNumReduceTasks();
@@ -306,13 +315,22 @@
                                                                   job.getPartitionerClass(), job);
       maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024 / 2;
       this.sortSpillException = null;
-      keyValBuffer = new DataOutputBuffer();
+      keyValBuffer = new OutputBuffer();
 
       this.job = job;
       this.reporter = reporter;
       this.comparator = job.getOutputKeyComparator();
       this.keyClass = job.getMapOutputKeyClass();
       this.valClass = job.getMapOutputValueClass();
+      this.serializationFactory = new SerializationFactory(conf);
+      this.keySerializer = serializationFactory.getSerializer(keyClass);
+      this.keySerializer.open(keyValBuffer);
+      this.valSerializer = serializationFactory.getSerializer(valClass);
+      this.valSerializer.open(keyValBuffer);
+      this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+      this.keyDeserializer.open(keyIn);
+      this.valDeserializer = serializationFactory.getDeserializer(valClass);
+      this.valDeserializer.open(valIn);
       this.localFs = FileSystem.getLocal(job);
       this.codec = null;
       this.compressionType = CompressionType.NONE;
@@ -357,8 +375,8 @@
     }
     
     @SuppressWarnings("unchecked")
-    public synchronized void collect(WritableComparable key,
-                                     Writable value) throws IOException {
+    public synchronized void collect(Object key,
+                                     Object value) throws IOException {
       
       if (key.getClass() != keyClass) {
         throw new IOException("Type mismatch in key from map: expected "
@@ -377,7 +395,9 @@
       }
       
       if (keyValBuffer == null) {
-        keyValBuffer = new DataOutputBuffer();
+        keyValBuffer = new OutputBuffer();
+        keySerializer.open(keyValBuffer);
+        valSerializer.open(keyValBuffer);
         sortImpl = new BufferSorter[partitions];
         for (int i = 0; i < partitions; i++)
           sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
@@ -387,9 +407,9 @@
       
       //dump the key/value to buffer
       int keyOffset = keyValBuffer.getLength(); 
-      key.write(keyValBuffer);
+      keySerializer.serialize(key);
       int keyLength = keyValBuffer.getLength() - keyOffset;
-      value.write(keyValBuffer);
+      valSerializer.serialize(value);
       int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
       int partNumber = partitioner.getPartition(key, value, partitions);
       sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
@@ -420,6 +440,8 @@
           // prepare for spilling
           pendingKeyvalBuffer = keyValBuffer;
           pendingSortImpl = sortImpl;
+          keySerializer.close();
+          valSerializer.close();
           keyValBuffer = null;
           sortImpl = null;
         }
@@ -483,7 +505,7 @@
                                                                       job.getCombinerClass(), job);
               // make collector
               OutputCollector combineCollector = new OutputCollector() {
-                  public void collect(WritableComparable key, Writable value)
+                  public void collect(Object key, Object value)
                     throws IOException {
                     synchronized (this) {
                       writer.append(key, value);
@@ -527,31 +549,27 @@
       }
     }
     
+    @SuppressWarnings("unchecked")
     private void spill(RawKeyValueIterator resultIter) throws IOException {
-      Writable key = null;
-      Writable value = null;
-
       try {
         // indicate progress, since constructor may take a while (because of 
         // user code) 
         reporter.progress();
-        key = (WritableComparable)ReflectionUtils.newInstance(keyClass, job);
-        value = (Writable)ReflectionUtils.newInstance(valClass, job);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
 
-      DataInputBuffer keyIn = new DataInputBuffer();
-      DataInputBuffer valIn = new DataInputBuffer();
+      Object key = null;
+      Object value = null;
       DataOutputBuffer valOut = new DataOutputBuffer();
       while (resultIter.next()) {
         keyIn.reset(resultIter.getKey().getData(), 
                     resultIter.getKey().getLength());
-        key.readFields(keyIn);
+        key = keyDeserializer.deserialize(key);
         valOut.reset();
         (resultIter.getValue()).writeUncompressedBytes(valOut);
         valIn.reset(valOut.getData(), valOut.getLength());
-        value.readFields(valIn);
+        value = valDeserializer.deserialize(value);
         writer.append(key, value);
         reporter.progress();
       }
@@ -613,7 +631,8 @@
       {
         //create a sorter object as we need access to the SegmentDescriptor
         //class and merge methods
-        Sorter sorter = new Sorter(localFs, job.getOutputKeyComparator(), valClass, job);
+        Sorter sorter = new Sorter(localFs, job.getOutputKeyComparator(),
+                                   keyClass, valClass, job);
         sorter.setProgressable(reporter);
         
         for (int parts = 0; parts < partitions; parts++){
@@ -665,7 +684,7 @@
     private class CombineValuesIterator extends ValuesIterator {
         
       public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, 
-                                   WritableComparator comparator, Class keyClass,
+                                   RawComparator comparator, Class keyClass,
                                    Class valClass, Configuration conf, Reporter reporter) 
         throws IOException {
         super(in, comparator, keyClass, valClass, conf, reporter);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Mapper.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Mapper.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Mapper.java Thu Feb 28 09:46:49 2008
@@ -23,8 +23,6 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Closeable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 
 /** 
@@ -43,7 +41,7 @@
  * de-initialization.</p>
  * 
  * <p>The framework then calls 
- * {@link #map(WritableComparable, Writable, OutputCollector, Reporter)} 
+ * {@link #map(Object, Object, OutputCollector, Reporter)} 
  * for each key/value pair in the <code>InputSplit</code> for that task.</p>
  * 
  * <p>All intermediate values associated with a given output key are 
@@ -130,9 +128,7 @@
  * @see MapRunnable
  * @see SequenceFile
  */
-public interface Mapper<K1 extends WritableComparable, V1 extends Writable,
-                        K2 extends WritableComparable, V2 extends Writable>
-  extends JobConfigurable, Closeable {
+public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
   
   /** 
    * Maps a single input key/value pair into an intermediate key/value pair.
@@ -140,7 +136,7 @@
    * <p>Output pairs need not be of the same types as input pairs.  A given 
    * input pair may map to zero or many output pairs.  Output pairs are 
    * collected with calls to 
-   * {@link OutputCollector#collect(WritableComparable,Writable)}.</p>
+   * {@link OutputCollector#collect(Object,Object)}.</p>
    *
    * <p>Applications can use the {@link Reporter} provided to report progress 
    * or just indicate that they are alive. In scenarios where the application 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java Thu Feb 28 09:46:49 2008
@@ -24,8 +24,6 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 
 /**
  * An abstract {@link InputFormat} that returns {@link MultiFileSplit}'s
@@ -36,8 +34,7 @@
  * to construct <code>RecordReader</code>'s for <code>MultiFileSplit</code>'s.
  * @see MultiFileSplit
  */
-public abstract class MultiFileInputFormat<K extends WritableComparable,
-                                           V extends Writable>
+public abstract class MultiFileInputFormat<K, V>
   extends FileInputFormat<K, V> {
 
   @Override

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputCollector.java Thu Feb 28 09:46:49 2008
@@ -20,10 +20,6 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-
 /**
  * Collects the <code>&lt;key, value&gt;</code> pairs output by {@link Mapper}s
  * and {@link Reducer}s.
@@ -33,8 +29,7 @@
  * <code>Mapper</code> or the <code>Reducer</code> i.e. intermediate outputs 
  * or the output of the job.</p>  
  */
-public interface OutputCollector<K extends WritableComparable,
-                                 V extends Writable> {
+public interface OutputCollector<K, V> {
   
   /** Adds a key/value pair to the output.
    *

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java Thu Feb 28 09:46:49 2008
@@ -21,8 +21,6 @@
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.Progressable;
 
 /** 
@@ -45,8 +43,7 @@
  * @see RecordWriter
  * @see JobConf
  */
-public interface OutputFormat<K extends WritableComparable,
-                              V extends Writable> {
+public interface OutputFormat<K, V> {
 
   /** 
    * Get the {@link RecordWriter} for the given job.

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java?rev=632073&r1=632072&r2=632073&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java Thu Feb 28 09:46:49 2008
@@ -22,15 +22,11 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.util.Progressable;
 
 /** A base class for {@link OutputFormat}. */
-public abstract class OutputFormatBase<K extends WritableComparable,
-                                       V extends Writable>
-  implements OutputFormat<K, V> {
+public abstract class OutputFormatBase<K, V> implements OutputFormat<K, V> {
 
   /**
    * Set whether the output of the job is compressed.



Mime
View raw message