hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [18/64] [abbrv] Import initial code for MAPREDUCE-2841 (native output collector)
Date Sat, 13 Sep 2014 01:41:23 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java
new file mode 100644
index 0000000..eb15164
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class InputBuffer {
+
+  private ByteBuffer byteBuffer;
+  private final BufferType type;
+
+  public InputBuffer(BufferType type, int inputSize) throws IOException {
+
+    final int capacity = inputSize;
+    this.type = type;
+
+    if (capacity > 0) {
+
+      switch (type) {
+      case DIRECT_BUFFER:
+        this.byteBuffer = DirectBufferPool.getInstance().borrowBuffer(capacity);
+        this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+        break;
+      case HEAP_BUFFER:
+        this.byteBuffer = ByteBuffer.allocate(capacity);
+        this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+        break;
+      }
+      byteBuffer.position(0);
+      byteBuffer.limit(0);
+    }
+  }
+
+  public BufferType getType() {
+    return this.type;
+  }
+
+  public InputBuffer(byte[] bytes) {
+    this.type = BufferType.HEAP_BUFFER;
+    if (bytes.length > 0) {
+      this.byteBuffer = ByteBuffer.wrap(bytes);
+      this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+      byteBuffer.position(0);
+      byteBuffer.limit(0);
+    }
+  }
+
+  public ByteBuffer getByteBuffer() {
+    return this.byteBuffer;
+  }
+
+  public int length() {
+    if (null == byteBuffer) {
+      return 0;
+    }
+    return byteBuffer.limit();
+  }
+
+  public void rewind(int startOffset, int length) {
+    if (null == byteBuffer) {
+      return;
+    }
+    byteBuffer.position(startOffset);
+    byteBuffer.limit(length);
+  }
+
+  public int remaining() {
+    if (null == byteBuffer) {
+      return 0;
+    }
+    return byteBuffer.remaining();
+  }
+
+  public int position() {
+    if (null == byteBuffer) {
+      return 0;
+    }
+    return byteBuffer.position();
+  }
+
+  public int position(int pos) {
+    if (null == byteBuffer) {
+      return 0;
+    }
+
+    byteBuffer.position(pos);
+    return pos;
+  }
+
+  public int capacity() {
+    if (null == byteBuffer) {
+      return 0;
+    }
+    return byteBuffer.capacity();
+  }
+
+  public byte[] array() {
+    if (null == byteBuffer) {
+      return null;
+    }
+    return byteBuffer.array();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java
new file mode 100644
index 0000000..3c54948
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.buffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class OutputBuffer {
+  protected ByteBuffer byteBuffer;
+  private final BufferType type;
+
+  public OutputBuffer(BufferType type, int outputBufferCapacity) {
+
+    this.type = type;
+    if (outputBufferCapacity > 0) {
+      switch (type) {
+      case DIRECT_BUFFER:
+        this.byteBuffer = ByteBuffer.allocateDirect(outputBufferCapacity);
+        this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+        break;
+      case HEAP_BUFFER:
+        this.byteBuffer = ByteBuffer.allocate(outputBufferCapacity);
+        this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+        break;
+      }
+    }
+  }
+
+  public OutputBuffer(byte[] bytes) {
+    this.type = BufferType.HEAP_BUFFER;
+    final int outputBufferCapacity = bytes.length;
+    if (outputBufferCapacity > 0) {
+      this.byteBuffer = ByteBuffer.wrap(bytes);
+      this.byteBuffer.order(ByteOrder.BIG_ENDIAN);
+      this.byteBuffer.position(0);
+    }
+  }
+
+  public BufferType getType() {
+    return this.type;
+  }
+
+  public ByteBuffer getByteBuffer() {
+    return this.byteBuffer;
+  }
+
+  public int length() {
+    return byteBuffer.position();
+  }
+
+  public void rewind() {
+    byteBuffer.position(0);
+  }
+
+  public int limit() {
+    return byteBuffer.limit();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java
new file mode 100644
index 0000000..50d7816
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataWriter;
+import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer;
+import org.apache.hadoop.mapred.nativetask.serde.KVSerializer;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+
+/**
+ * load data into a buffer signaled by a {@link BufferPuller}
+ */
+public class BufferPullee<IK, IV> implements IDataLoader {
+
+  public static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH;
+
+  private final SizedWritable<IK> tmpInputKey;
+  private final SizedWritable<IV> tmpInputValue;
+  private boolean inputKVBufferd = false;
+  private RawKeyValueIterator rIter;
+  private ByteBufferDataWriter nativeWriter;
+  protected KVSerializer<IK, IV> serializer;
+  private final OutputBuffer outputBuffer;
+  private final NativeDataTarget target;
+  private boolean closed = false;
+  
+  public BufferPullee(Class<IK> iKClass, Class<IV> iVClass, RawKeyValueIterator rIter, NativeDataTarget target)
+      throws IOException {
+    this.rIter = rIter;
+    tmpInputKey = new SizedWritable<IK>(iKClass);
+    tmpInputValue = new SizedWritable<IV>(iVClass);
+
+    if (null != iKClass && null != iVClass) {
+      this.serializer = new KVSerializer<IK, IV>(iKClass, iVClass);
+    }
+    this.outputBuffer = target.getOutputBuffer();
+    this.target = target;
+  }
+
+  @Override
+  public int load() throws IOException {
+    if (closed) {
+      return 0;
+    }
+    
+    if (null == outputBuffer) {
+      throw new IOException("output buffer not set");
+    }
+
+    this.nativeWriter = new ByteBufferDataWriter(target);
+    outputBuffer.rewind();
+
+    int written = 0;
+    boolean firstKV = true;
+
+    if (inputKVBufferd) {
+      written += serializer.serializeKV(nativeWriter, tmpInputKey, tmpInputValue);
+      inputKVBufferd = false;
+      firstKV = false;
+    }
+
+    while (rIter.next()) {
+      inputKVBufferd = false;
+      tmpInputKey.readFields(rIter.getKey());
+      tmpInputValue.readFields(rIter.getValue());
+      serializer.updateLength(tmpInputKey, tmpInputValue);
+
+      final int kvSize = tmpInputKey.length + tmpInputValue.length + KV_HEADER_LENGTH;
+
+      if (!firstKV && nativeWriter.shortOfSpace(kvSize)) {
+        inputKVBufferd = true;
+        break;
+      } else {
+        written += serializer.serializeKV(nativeWriter, tmpInputKey, tmpInputValue);
+        firstKV = false;
+      }
+    }
+
+    if (nativeWriter.hasUnFlushedData()) {
+      nativeWriter.flush();
+    }
+    return written;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    if (null != rIter) {
+      rIter.close();
+    }
+    if (null != nativeWriter) {
+      nativeWriter.close();
+    }
+    closed = true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java
new file mode 100644
index 0000000..704b664
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java
@@ -0,0 +1,187 @@
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.DataReceiver;
+import org.apache.hadoop.mapred.nativetask.NativeDataSource;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataReader;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.serde.SerializationFramework;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.apache.hadoop.util.Progress;
+
+/**
+ * actively signal a {@link BufferPullee} to load data into buffer and receive
+ */
+public class BufferPuller implements RawKeyValueIterator, DataReceiver {
+  
+  private static Log LOG = LogFactory.getLog(BufferPuller.class);
+
+  public final static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH;
+
+  byte[] keyBytes = new byte[0];
+  byte[] valueBytes = new byte[0];
+
+  private InputBuffer inputBuffer;
+  private InputBuffer asideBuffer;
+
+  int remain = 0;
+
+  private ByteBufferDataReader nativeReader;
+
+  DataInputBuffer keyBuffer = new DataInputBuffer();
+  DataInputBuffer valueBuffer = new DataInputBuffer();
+
+  private boolean noMoreData = false;
+
+  private NativeDataSource input;
+  private boolean closed = false;
+
+  public BufferPuller(NativeDataSource handler) throws IOException {
+    this.input = handler;
+    this.inputBuffer = handler.getInputBuffer();
+    nativeReader = new ByteBufferDataReader(null);
+    this.asideBuffer = new InputBuffer(BufferType.HEAP_BUFFER, inputBuffer.capacity());
+  }
+
+  @Override
+  public DataInputBuffer getKey() throws IOException {
+    return keyBuffer;
+  }
+
+  @Override
+  public DataInputBuffer getValue() throws IOException {
+    return valueBuffer;
+  }
+  
+  public void reset() {
+    noMoreData = false;
+  }
+
+  @Override
+  public boolean next() throws IOException {
+    if (closed) {
+      return false;
+    }
+    
+    if (noMoreData) {
+      return false;
+    }
+    final int asideRemain = asideBuffer.remaining();
+    final int inputRemain = inputBuffer.remaining();
+
+    if (asideRemain == 0 && inputRemain == 0) {
+      input.loadData();
+    }
+
+    if (asideBuffer.remaining() > 0) {
+      return nextKeyValue(asideBuffer);
+    } else if (inputBuffer.remaining() > 0) {
+      return nextKeyValue(inputBuffer);
+    } else {
+      noMoreData = true;
+      return false;
+    }
+  }
+
+  private boolean nextKeyValue(InputBuffer buffer) throws IOException {
+    if (closed) {
+      return false;
+    }
+    
+    nativeReader.reset(buffer);
+
+    final int keyLength = nativeReader.readInt();
+    if (keyBytes.length < keyLength) {
+      keyBytes = new byte[keyLength];
+    }
+
+    final int valueLength = nativeReader.readInt();
+    if (valueBytes.length < valueLength) {
+      valueBytes = new byte[valueLength];
+    }
+    
+    nativeReader.read(keyBytes, 0, keyLength);
+    nativeReader.read(valueBytes, 0, valueLength);
+
+    keyBuffer.reset(keyBytes, keyLength);
+    valueBuffer.reset(valueBytes, valueLength);
+
+    return true;
+  }
+
+  @Override
+  public boolean receiveData() throws IOException {
+    if (closed) {
+      return false;
+    }
+    
+    final ByteBuffer input = inputBuffer.getByteBuffer();
+    
+    if (null != asideBuffer && asideBuffer.length() > 0) {
+      if (asideBuffer.remaining() > 0) {
+        final byte[] output = asideBuffer.getByteBuffer().array();
+        final int write = Math.min(asideBuffer.remaining(), input.remaining());
+        input.get(output, asideBuffer.position(), write);
+        asideBuffer.position(asideBuffer.position() + write);
+      }
+
+      if (asideBuffer.remaining() == 0) {
+        asideBuffer.position(0);
+      }
+    }
+
+    if (input.remaining() == 0) {
+      return true;
+    }
+
+    if (input.remaining() < KV_HEADER_LENGTH) {
+      throw new IOException("incomplete data, input length is: " + input.remaining());
+    }
+    final int position = input.position();
+    final int keyLength = input.getInt();
+    final int valueLength = input.getInt();
+    input.position(position);
+    final int kvLength = keyLength + valueLength + KV_HEADER_LENGTH;
+    final int remaining = input.remaining();
+
+    if (kvLength > remaining) {
+      if (null == asideBuffer || asideBuffer.capacity() < kvLength) {
+        asideBuffer = new InputBuffer(BufferType.HEAP_BUFFER, kvLength);
+      }
+      asideBuffer.rewind(0, kvLength);
+
+      input.get(asideBuffer.array(), 0, remaining);
+      asideBuffer.position(remaining);
+    }
+    return true;
+  }
+
+  @Override
+  public Progress getProgress() {
+    return null;
+  }
+  
+  /**
+   * Closes the iterator so that the underlying streams can be closed.
+   * 
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    if (null != nativeReader) {
+      nativeReader.close();
+    }
+    closed = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java
new file mode 100644
index 0000000..8decad8
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.buffer.BufferType;
+import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataReader;
+import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer;
+import org.apache.hadoop.mapred.nativetask.serde.KVSerializer;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+
+/**
+ * collect data when signaled
+ */
+public class BufferPushee<OK, OV> implements Closeable {
+
+  private static Log LOG = LogFactory.getLog(BufferPushee.class);
+  
+  public final static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH;
+
+  private InputBuffer asideBuffer;
+  private final SizedWritable<OK> tmpOutputKey;
+  private final SizedWritable<OV> tmpOutputValue;
+  private RecordWriter<OK, OV> writer;
+  private ByteBufferDataReader nativeReader;
+
+  private KVSerializer<OK, OV> deserializer;
+  private boolean closed = false;
+
+  public BufferPushee(Class<OK> oKClass, Class<OV> oVClass, RecordWriter<OK, OV> writer) throws IOException {
+    tmpOutputKey = new SizedWritable<OK>(oKClass);
+    tmpOutputValue = new SizedWritable<OV>(oVClass);
+
+    this.writer = writer;
+
+    if (null != oKClass && null != oVClass) {
+      this.deserializer = new KVSerializer<OK, OV>(oKClass, oVClass);
+    }
+    this.nativeReader = new ByteBufferDataReader(null);
+  }
+
+  public boolean collect(InputBuffer buffer) throws IOException {
+    if (closed) {
+      return false;
+    }
+    
+    final ByteBuffer input = buffer.getByteBuffer();
+    if (null != asideBuffer && asideBuffer.length() > 0) {
+      if (asideBuffer.remaining() > 0) {
+        final byte[] output = asideBuffer.getByteBuffer().array();
+        final int write = Math.min(asideBuffer.remaining(), input.remaining());
+        input.get(output, asideBuffer.position(), write);
+        asideBuffer.position(asideBuffer.position() + write);
+      }
+
+      if (asideBuffer.remaining() == 0 && asideBuffer.position() > 0) {
+        asideBuffer.position(0);
+        write(asideBuffer);
+        asideBuffer.rewind(0, 0);
+      }
+    }
+
+    if (input.remaining() == 0) {
+      return true;
+    }
+
+    if (input.remaining() < KV_HEADER_LENGTH) {
+      throw new IOException("incomplete data, input length is: " + input.remaining());
+    }
+    final int position = input.position();
+    final int keyLength = input.getInt();
+    final int valueLength = input.getInt();
+    input.position(position);
+    final int kvLength = keyLength + valueLength + KV_HEADER_LENGTH;
+    final int remaining = input.remaining();
+
+    if (kvLength > remaining) {
+      if (null == asideBuffer || asideBuffer.capacity() < kvLength) {
+        asideBuffer = new InputBuffer(BufferType.HEAP_BUFFER, kvLength);
+      }
+      asideBuffer.rewind(0, kvLength);
+
+      input.get(asideBuffer.array(), 0, remaining);
+      asideBuffer.position(remaining);
+    } else {
+      write(buffer);
+    }
+    return true;
+  }
+
+  @SuppressWarnings("unchecked")
+  private boolean write(InputBuffer input) throws IOException {
+    if (closed) {
+      return false;
+    }
+    int totalRead = 0;
+    final int remain = input.remaining();
+    this.nativeReader.reset(input);
+    while (remain > totalRead) {
+      final int read = deserializer.deserializeKV(nativeReader, tmpOutputKey, tmpOutputValue);
+      if (read != 0) {
+        totalRead += read;
+        writer.write((OK) (tmpOutputKey.v), (OV) (tmpOutputValue.v));
+      }
+    }
+    if (remain != totalRead) {
+      throw new IOException("We expect to read " + remain + ", but we actually read: " + totalRead);
+    }
+    return true;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    if (null != writer) {
+      writer.close(null);
+    }
+    if (null != nativeReader) {
+      nativeReader.close();
+    }
+    closed = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java
new file mode 100644
index 0000000..3713078
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.nativetask.NativeDataTarget;
+import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataWriter;
+import org.apache.hadoop.mapred.nativetask.serde.IKVSerializer;
+import org.apache.hadoop.mapred.nativetask.serde.KVSerializer;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+
+/**
+ * actively push data into a buffer and signal a {@link BufferPushee} to collect it
+ */
+public class BufferPusher<K, V> implements OutputCollector<K, V> {
+  
+  private static Log LOG = LogFactory.getLog(BufferPusher.class);
+
+  private final SizedWritable<K> tmpInputKey;
+  private final SizedWritable<V> tmpInputValue;
+  private ByteBufferDataWriter out;
+  IKVSerializer serializer;
+  private boolean closed = false;
+
+  public BufferPusher(Class<K> iKClass, Class<V> iVClass, NativeDataTarget target) throws IOException {
+    tmpInputKey = new SizedWritable<K>(iKClass);
+    tmpInputValue = new SizedWritable<V>(iVClass);
+
+    if (null != iKClass && null != iVClass) {
+      this.serializer = new KVSerializer<K, V>(iKClass, iVClass);
+    }
+    this.out = new ByteBufferDataWriter(target);
+  }
+
+  public void collect(K key, V value, int partition) throws IOException {
+    tmpInputKey.reset(key);
+    tmpInputValue.reset(value);
+    serializer.serializePartitionKV(out, partition, tmpInputKey, tmpInputValue);
+  };
+
+  @Override
+  public void collect(K key, V value) throws IOException {
+    if (closed) {
+      return;
+    }
+    tmpInputKey.reset(key);
+    tmpInputValue.reset(value);
+    serializer.serializeKV(out, tmpInputKey, tmpInputValue);
+  };
+
+  public void flush() throws IOException {
+    if (null != out) {
+      if (out.hasUnFlushedData()) {
+        out.flush();
+      }
+    }
+  }
+  
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    if (null != out) {
+      out.close();
+    }
+    closed = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java
new file mode 100644
index 0000000..6a57683
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import static org.apache.hadoop.mapred.Task.Counter.COMBINE_INPUT_RECORDS;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Task.CombinerRunner;
+import org.apache.hadoop.mapred.nativetask.Command;
+import org.apache.hadoop.mapred.nativetask.CommandDispatcher;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.DataChannel;
+import org.apache.hadoop.mapred.nativetask.ICombineHandler;
+import org.apache.hadoop.mapred.nativetask.INativeHandler;
+import org.apache.hadoop.mapred.nativetask.NativeBatchProcessor;
+import org.apache.hadoop.mapred.nativetask.TaskContext;
+import org.apache.hadoop.mapred.nativetask.serde.SerializationFramework;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+
+public class CombinerHandler<K, V> implements ICombineHandler, CommandDispatcher {
+
+  public static String NAME = "NativeTask.CombineHandler";
+  private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class);
+  public static Command LOAD = new Command(1, "Load");
+  public static Command COMBINE = new Command(4, "Combine");
+  public final CombinerRunner<K, V> combinerRunner;
+
+  private final INativeHandler nativeHandler;
+  private final BufferPuller puller;
+  private final BufferPusher<K, V> kvPusher;
+  private boolean closed = false;
+
+  public static <K, V> ICombineHandler create(TaskContext context) throws IOException, ClassNotFoundException {
+    final JobConf conf = new JobConf(context.getConf());
+    conf.set(Constants.SERIALIZATION_FRAMEWORK,
+        String.valueOf(SerializationFramework.WRITABLE_SERIALIZATION.getType()));
+    String combinerClazz = conf.get(Constants.MAPRED_COMBINER_CLASS);
+    if (null == combinerClazz) {
+      combinerClazz = conf.get(MRJobConfig.COMBINE_CLASS_ATTR);
+    }
+
+    if (null == combinerClazz) {
+      return null;
+    } else {
+      LOG.info("NativeTask Combiner is enabled, class = " + combinerClazz);
+    }
+
+    final Counter combineInputCounter = context.getTaskReporter().getCounter(COMBINE_INPUT_RECORDS);
+    
+    final CombinerRunner<K, V> combinerRunner = CombinerRunner.create(conf, context.getTaskAttemptId(),
+        combineInputCounter, context.getTaskReporter(), null);
+
+    final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, conf, DataChannel.INOUT);
+    final BufferPusher<K, V> pusher = new BufferPusher<K, V>(context.getInputKeyClass(), context.getInputValueClass(),
+        nativeHandler);
+    final BufferPuller puller = new BufferPuller(nativeHandler);
+    return new CombinerHandler<K, V>(nativeHandler, combinerRunner, puller, pusher);
+  }
+
+  public CombinerHandler(INativeHandler nativeHandler, CombinerRunner<K, V> combiner, BufferPuller puller,
+      BufferPusher<K, V> kvPusher) throws IOException {
+    this.nativeHandler = nativeHandler;
+    this.combinerRunner = combiner;
+    this.puller = puller;
+    this.kvPusher = kvPusher;
+    nativeHandler.setCommandDispatcher(this);
+    nativeHandler.setDataReceiver(puller);
+  }
+
+  @Override
+  public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException {
+ if (null == command) {
+      return null;
+    }
+    if (command.equals(COMBINE)) {
+      combine();
+    }
+    return null;
+
+  }
+
+  @Override
+  public void combine() throws IOException{
+    try {
+      puller.reset();
+      combinerRunner.combine(puller, kvPusher);
+      kvPusher.flush();
+      return;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public long getId() {
+    return nativeHandler.getNativeHandler();
+  }
+
+  @Override
+  public void close() throws IOException {
+
+    if (closed) {
+      return;
+    }
+
+    if (null != puller) {
+      puller.close();
+    }
+
+    if (null != kvPusher) {
+      kvPusher.close();
+    }
+
+    if (null != nativeHandler) {
+      nativeHandler.close();
+    }
+    closed = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java
new file mode 100644
index 0000000..ff472a6
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.IOException;
+
+/**
+ * an IDataLoader loads data on demand
+ */
+public interface IDataLoader {
+
+  /**
+   * @return size of data loaded
+   * @throws IOException
+   */
+  public int load() throws IOException;
+
+  public void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java
new file mode 100644
index 0000000..678e13d
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.handlers;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.nativetask.Command;
+import org.apache.hadoop.mapred.nativetask.CommandDispatcher;
+import org.apache.hadoop.mapred.nativetask.DataChannel;
+import org.apache.hadoop.mapred.nativetask.ICombineHandler;
+import org.apache.hadoop.mapred.nativetask.INativeHandler;
+import org.apache.hadoop.mapred.nativetask.NativeBatchProcessor;
+import org.apache.hadoop.mapred.nativetask.TaskContext;
+import org.apache.hadoop.mapred.nativetask.util.NativeTaskOutput;
+import org.apache.hadoop.mapred.nativetask.util.OutputUtil;
+import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer;
+
+/**
+ * Java Record Reader + Java Mapper + Native Collector
+ */
+@SuppressWarnings("unchecked")
+public class NativeCollectorOnlyHandler<K, V> implements CommandDispatcher, Closeable {
+
+  public static String NAME = "NativeTask.MCollectorOutputHandler";
+  private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class);
+  public static Command GET_OUTPUT_PATH = new Command(100, "GET_OUTPUT_PATH");
+  public static Command GET_OUTPUT_INDEX_PATH = new Command(101, "GET_OUTPUT_INDEX_PATH");
+  public static Command GET_SPILL_PATH = new Command(102, "GET_SPILL_PATH");
+  public static Command GET_COMBINE_HANDLER = new Command(103, "GET_COMBINE_HANDLER");
+  
+  private NativeTaskOutput output;
+  private int spillNumber = 0;
+  private ICombineHandler combinerHandler = null;
+  private final BufferPusher<K, V> kvPusher;
+  private final INativeHandler nativeHandler;
+  private boolean closed = false;
+
+  public static <K, V> NativeCollectorOnlyHandler<K, V> create(TaskContext context) throws IOException {
+
+    
+    ICombineHandler combinerHandler = null;
+    try {
+      final TaskContext combineContext = context.copyOf();
+      combineContext.setInputKeyClass(context.getOuputKeyClass());
+      combineContext.setInputValueClass(context.getOutputValueClass());
+
+      combinerHandler = CombinerHandler.create(combineContext);
+    } catch (final ClassNotFoundException e) {
+      throw new IOException(e);
+    }
+    
+    if (null != combinerHandler) {
+      LOG.info("[NativeCollectorOnlyHandler] combiner is not null");
+    }
+
+    final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, context.getConf(), DataChannel.OUT);
+    final BufferPusher<K, V> kvPusher = new BufferPusher<K, V>(context.getOuputKeyClass(), context.getOutputValueClass(),
+        nativeHandler);
+
+    return new NativeCollectorOnlyHandler<K, V>(context, nativeHandler, kvPusher, combinerHandler);
+  }
+
+  protected NativeCollectorOnlyHandler(TaskContext context, INativeHandler nativeHandler,
+      BufferPusher<K, V> kvPusher, ICombineHandler combiner) throws IOException {
+    Configuration conf = context.getConf();
+    TaskAttemptID id = context.getTaskAttemptId();
+    if (null == id) {
+      this.output = OutputUtil.createNativeTaskOutput(conf, "");
+    } else {
+      this.output = OutputUtil.createNativeTaskOutput(context.getConf(), context.getTaskAttemptId()
+        .toString());
+    }
+    this.combinerHandler = combiner;
+    this.kvPusher = kvPusher;
+    this.nativeHandler = nativeHandler;
+    nativeHandler.setCommandDispatcher(this);
+  }
+
+  public void collect(K key, V value, int partition) throws IOException {
+    kvPusher.collect(key, value, partition);
+  };
+
+  public void flush() throws IOException {
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      return;
+    }
+
+    if (null != kvPusher) {
+      kvPusher.close();
+    }
+
+    if (null != combinerHandler) {
+      combinerHandler.close();
+    }
+
+    if (null != nativeHandler) {
+      nativeHandler.close();
+    }
+    closed = true;
+  }
+
+  @Override
+  public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException {
+    Path p = null;
+    if (null == command) {
+      return null;
+    }
+        
+    if (command.equals(GET_OUTPUT_PATH)) {
+      p = output.getOutputFileForWrite(-1);
+    } else if (command.equals(GET_OUTPUT_INDEX_PATH)) {
+      p = output.getOutputIndexFileForWrite(-1);
+    } else if (command.equals(GET_SPILL_PATH)) {
+      p = output.getSpillFileForWrite(spillNumber++, -1);
+      
+    } else if (command.equals(GET_COMBINE_HANDLER)) {
+      if (null == combinerHandler) {
+        return null;
+      }
+      final ReadWriteBuffer result = new ReadWriteBuffer(8);
+      
+      result.writeLong(combinerHandler.getId());
+      return result;
+    } else {
+      throw new IOException("Illegal command: " + command.toString());
+    }
+    if (p != null) {
+      final ReadWriteBuffer result = new ReadWriteBuffer();
+      result.writeString(p.toUri().getPath());
+      return result;
+    } else {
+      throw new IOException("MapOutputFile can't allocate spill/output file");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java
new file mode 100644
index 0000000..9a026be
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class BoolWritableSerializer extends DefaultSerializer implements
+    INativeComparable {
+
+  @Override
+  public int getLength(Writable w) throws IOException {
+    return 1;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java
new file mode 100644
index 0000000..1ec2fdb
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class ByteWritableSerializer extends DefaultSerializer implements
+    INativeComparable {
+
+  @Override
+  public int getLength(Writable w) throws IOException {
+    return 1;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java
new file mode 100644
index 0000000..2bd18d7
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class BytesWritableSerializer implements INativeComparable, INativeSerializer<BytesWritable> {
+
+  @Override
+  public int getLength(BytesWritable w) throws IOException {
+    return w.getLength();
+  }
+
+  @Override
+  public void serialize(BytesWritable w, DataOutput out) throws IOException {
+    out.write(w.getBytes(), 0, w.getLength());
+  }
+
+  @Override
+  public void deserialize(DataInput in, int length, BytesWritable w) throws IOException {
+    w.setSize(length);
+    in.readFully(w.getBytes(), 0, length);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DefaultSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DefaultSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DefaultSerializer.java
new file mode 100644
index 0000000..d4fc7e0
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DefaultSerializer.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class DefaultSerializer implements INativeSerializer<Writable> {
+
+  static class ModifiedByteArrayOutputStream extends ByteArrayOutputStream {
+
+    public byte[] getBuffer() {
+      return this.buf;
+    }
+  }
+
+  private final ModifiedByteArrayOutputStream outBuffer = new ModifiedByteArrayOutputStream();
+  private final DataOutputStream outData = new DataOutputStream(outBuffer);
+  private Writable buffered = null;
+  private int bufferedLength = -1;
+
+  @Override
+  public int getLength(Writable w) throws IOException {
+    // if (w == buffered) {
+    // return bufferedLength;
+    // }
+    buffered = null;
+    bufferedLength = -1;
+
+    outBuffer.reset();
+    w.write(outData);
+    bufferedLength = outBuffer.size();
+    buffered = w;
+    return bufferedLength;
+  }
+
+  @Override
+  public void serialize(Writable w, DataOutput out) throws IOException {
+    w.write(out);
+  }
+
+  @Override
+  public void deserialize(DataInput in, int length, Writable w) throws IOException {
+    w.readFields(in);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DoubleWritableSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DoubleWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DoubleWritableSerializer.java
new file mode 100644
index 0000000..8de0fba
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DoubleWritableSerializer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class DoubleWritableSerializer extends DefaultSerializer implements
+    INativeComparable {
+
+  @Override
+  public int getLength(Writable w) throws IOException {
+    return 8;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/FloatWritableSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/FloatWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/FloatWritableSerializer.java
new file mode 100644
index 0000000..4a2366c
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/FloatWritableSerializer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class FloatWritableSerializer extends DefaultSerializer implements
+    INativeComparable {
+
+  @Override
+  public int getLength(Writable w) throws IOException {
+    return 4;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IKVSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IKVSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IKVSerializer.java
new file mode 100644
index 0000000..64c5810
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IKVSerializer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.nativetask.buffer.DataInputStream;
+import org.apache.hadoop.mapred.nativetask.buffer.DataOutputStream;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+
+/**
+ * serializes key-value pair
+ */
+public interface IKVSerializer {
+
+  /**
+   * update the length field of SizedWritable
+   * @param key
+   * @param value
+   * @throws IOException
+   */
+  public void updateLength(SizedWritable key, SizedWritable value) throws IOException;
+
+  /**
+   *
+   * @param out
+   * @param key
+   * @param value
+   * @return       bytes written
+   * @throws IOException
+   */
+  public int serializeKV(DataOutputStream out, SizedWritable key, SizedWritable value) throws IOException;
+
+  /**
+   * serialize partitionId as well
+   * @param out
+   * @param partitionId
+   * @param key
+   * @param value
+   * @return
+   * @throws IOException
+   */
+  public int serializePartitionKV(DataOutputStream out, int partitionId, SizedWritable key, SizedWritable value)
+      throws IOException;
+
+  /**
+   *
+   * @param in
+   * @param key
+   * @param value
+   * @return      bytes read
+   * @throws IOException
+   */
+  public int deserializeKV(DataInputStream in, SizedWritable key, SizedWritable value) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/INativeSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/INativeSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/INativeSerializer.java
new file mode 100644
index 0000000..f61d12d
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/INativeSerializer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * an INativeSerializer serializes and deserializes data transferred between
+ * Java and native. {@link DefaultSerializer} provides default implementations.
+ *
+ * Note: if you implemented your customized NativeSerializer instead of DefaultSerializer,
+ * you have to make sure the native side can serialize it correctly.
+ * 
+ */
+public interface INativeSerializer<T> {
+
+  /**
+   * get length of data to be serialized. If the data length is already known (like IntWritable)
+   * and could immediately be returned from this method, it is good chance to implement customized
+   * NativeSerializer for efficiency
+   */
+  public int getLength(T w) throws IOException;
+
+  public void serialize(T w, DataOutput out) throws IOException;
+
+  public void deserialize(DataInput in, int length, T w) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IntWritableSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IntWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IntWritableSerializer.java
new file mode 100644
index 0000000..e7e19a9
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IntWritableSerializer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class IntWritableSerializer extends DefaultSerializer implements
+    INativeComparable {
+
+  @Override
+  public int getLength(Writable w) throws IOException {
+    return 4;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/KVSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/KVSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/KVSerializer.java
new file mode 100644
index 0000000..4b76df4
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/KVSerializer.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.Constants;
+import org.apache.hadoop.mapred.nativetask.buffer.DataInputStream;
+import org.apache.hadoop.mapred.nativetask.buffer.DataOutputStream;
+import org.apache.hadoop.mapred.nativetask.util.SizedWritable;
+
+public class KVSerializer<K, V> implements IKVSerializer {
+
+
+  private static final Log LOG = LogFactory.getLog(KVSerializer.class);
+  
+  public static int KV_HEAD_LENGTH = Constants.SIZEOF_KV_LENGTH;
+
+  private final INativeSerializer<Writable> keySerializer;
+  private final INativeSerializer<Writable> valueSerializer;
+
+  public KVSerializer(Class<K> kclass, Class<V> vclass) throws IOException {
+    
+    this.keySerializer = NativeSerialization.getInstance().getSerializer(kclass);
+    this.valueSerializer = NativeSerialization.getInstance().getSerializer(vclass);
+  }
+
+  @Override
+  public void updateLength(SizedWritable key, SizedWritable value) throws IOException {
+    key.length = keySerializer.getLength(key.v);
+    value.length = valueSerializer.getLength(value.v);
+    return;
+  }
+
+  @Override
+  public int serializeKV(DataOutputStream out, SizedWritable key, SizedWritable value) throws IOException {
+    return serializePartitionKV(out, -1, key, value);
+  }
+
+  @Override
+  public int serializePartitionKV(DataOutputStream out, int partitionId, SizedWritable key, SizedWritable value)
+      throws IOException {
+
+    if (key.length == SizedWritable.INVALID_LENGTH || value.length == SizedWritable.INVALID_LENGTH) {
+      updateLength(key, value);
+    }
+
+    final int keyLength = key.length;
+    final int valueLength = value.length;
+
+    int bytesWritten = KV_HEAD_LENGTH + keyLength + valueLength;
+    if (partitionId != -1) {
+      bytesWritten += Constants.SIZEOF_PARTITION_LENGTH;
+    }
+
+    if (out.hasUnFlushedData() && out.shortOfSpace(bytesWritten)) {
+      out.flush();
+    }
+
+    if (partitionId != -1) {
+      out.writeInt(partitionId);
+    }
+        
+    out.writeInt(keyLength);
+    out.writeInt(valueLength);
+    
+    keySerializer.serialize(key.v, out);
+    valueSerializer.serialize(value.v, out);
+
+    return bytesWritten;
+  }
+
+  @Override
+  public int deserializeKV(DataInputStream in, SizedWritable key, SizedWritable value) throws IOException {
+
+    if (!in.hasUnReadData()) {
+      return 0;
+    }
+
+    key.length = in.readInt();
+    value.length = in.readInt();
+
+    keySerializer.deserialize(in, key.length, key.v);
+    valueSerializer.deserialize(in, value.length, value.v);
+
+    return key.length + value.length + KV_HEAD_LENGTH;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/LongWritableSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/LongWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/LongWritableSerializer.java
new file mode 100644
index 0000000..ec326ca
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/LongWritableSerializer.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class LongWritableSerializer extends DefaultSerializer implements
+    INativeComparable {
+  @Override
+  public int getLength(Writable w) throws IOException {
+    return 8;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NativeSerialization.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NativeSerialization.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NativeSerialization.java
new file mode 100644
index 0000000..f5a033d
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NativeSerialization.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.io.Writable;
+
+public class NativeSerialization {
+
+  private final ConcurrentHashMap<String, Class<?>> map = new ConcurrentHashMap<String, Class<?>>();
+
+  public boolean accept(Class<?> c) {
+    return Writable.class.isAssignableFrom(c);
+  }
+
+  @SuppressWarnings("unchecked")
+  public INativeSerializer<Writable> getSerializer(Class<?> c) throws IOException {
+
+    if (null == c) {
+      return null;
+    }
+    if (!Writable.class.isAssignableFrom(c)) {
+      throw new IOException("Cannot serialize type " + c.getName() + ", we only accept subclass of Writable");
+    }
+    final String name = c.getName();
+    final Class<?> serializer = map.get(name);
+
+    if (null != serializer) {
+      try {
+        return (INativeSerializer<Writable>) serializer.newInstance();
+      } catch (final Exception e) {
+        throw new IOException(e);
+      }
+    }
+    return new DefaultSerializer();
+  }
+
+  public void register(String klass, Class<?> serializer) throws IOException {
+    if (null == klass || null == serializer) {
+      throw new IOException("invalid arguments, klass or serializer is null");
+    }
+
+    if (!INativeSerializer.class.isAssignableFrom(serializer)) {
+      throw new IOException("Serializer is not assigable from INativeSerializer");
+    }
+
+    final Class<?> storedSerializer = map.get(klass);
+    if (null == storedSerializer) {
+      map.put(klass, serializer);
+      return;
+    } else {
+      if (!storedSerializer.getName().equals(serializer.getName())) {
+        throw new IOException("Error! Serializer already registered, exist: " + storedSerializer.getName() + ", new: "
+            + serializer.getName());
+      }
+    }
+  }
+
+  public void reset() {
+    map.clear();
+  }
+
+  private static NativeSerialization instance = new NativeSerialization();
+
+  public static NativeSerialization getInstance() {
+    return instance;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NullWritableSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NullWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NullWritableSerializer.java
new file mode 100644
index 0000000..afa4e8e
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NullWritableSerializer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class NullWritableSerializer extends DefaultSerializer implements
+    INativeComparable {
+
+  @Override
+  public int getLength(Writable w) throws IOException {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/SerializationFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/SerializationFramework.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/SerializationFramework.java
new file mode 100644
index 0000000..e95a0c4
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/SerializationFramework.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.nativetask.serde;
+
+public enum SerializationFramework {
+  WRITABLE_SERIALIZATION(0), NATIVE_SERIALIZATION(1);
+
+  private int type;
+
+  SerializationFramework(int type) {
+    this.type = type;
+  }
+
+  public int getType() {
+    return type;
+  }
+};

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/TextSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/TextSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/TextSerializer.java
new file mode 100644
index 0000000..63a64de
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/TextSerializer.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class TextSerializer implements INativeSerializer<Text>, INativeComparable {
+
+  public TextSerializer() throws SecurityException, NoSuchMethodException {
+  }
+
+  @Override
+  public int getLength(Text w) throws IOException {
+    return w.getLength();
+  }
+
+  @Override
+  public void serialize(Text w, DataOutput out) throws IOException {
+     out.write(w.getBytes(), 0, w.getLength());
+  }
+
+  @Override
+  public void deserialize(DataInput in, int length, Text w) throws IOException {
+    try {
+      w.setCapacity(length, true);
+      w.setLength(length);
+    } catch (final Exception e) {
+      throw new IOException(e);
+    }
+    final byte[] bytes = w.getBytes();
+    in.readFully(bytes, 0, length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VIntWritableSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VIntWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VIntWritableSerializer.java
new file mode 100644
index 0000000..0e142f3
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VIntWritableSerializer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class VIntWritableSerializer extends DefaultSerializer implements
+    INativeComparable {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VLongWritableSerializer.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VLongWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VLongWritableSerializer.java
new file mode 100644
index 0000000..d66e179
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VLongWritableSerializer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.nativetask.serde;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.nativetask.INativeComparable;
+
+public class VLongWritableSerializer extends DefaultSerializer implements
+    INativeComparable {
+}
\ No newline at end of file


Mime
View raw message