tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject git commit: TEZ-1288. Create FastTezSerialization as an optional feature (rajesh)
Date Mon, 28 Jul 2014 22:08:56 GMT
Repository: tez
Updated Branches:
  refs/heads/master c0d59139c -> ff7081e06


TEZ-1288. Create FastTezSerialization as an optional feature (rajesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ff7081e0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ff7081e0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ff7081e0

Branch: refs/heads/master
Commit: ff7081e06db40bfb7e6dfc72a1c9ad6ca93be79f
Parents: c0d5913
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Tue Jul 29 03:37:12 2014 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Tue Jul 29 03:37:12 2014 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/io/HashComparator.java    |  24 -
 .../runtime/library/common/ValuesIterator.java  |  10 +-
 .../common/comparator/HashComparator.java       |  24 +
 .../common/comparator/TezBytesComparator.java   |  38 ++
 .../TezBytesWritableSerialization.java          | 129 ++++++
 .../common/sort/impl/ExternalSorter.java        |   4 +
 .../runtime/library/common/sort/impl/IFile.java |   6 +-
 .../common/sort/impl/PipelinedSorter.java       |   2 +-
 .../conf/OnFileSortedOutputConfiguration.java   |  36 ++
 .../OnFileUnorderedKVOutputConfiguration.java   |  29 ++
 ...orderedPartitionedKVOutputConfiguration.java |  29 ++
 .../OrderedPartitionedKVEdgeConfigurer.java     |  30 ++
 .../conf/ShuffledMergedInputConfiguration.java  |  36 ++
 .../ShuffledUnorderedKVInputConfiguration.java  |  30 ++
 .../UnorderedPartitionedKVEdgeConfigurer.java   |  24 +
 .../UnorderedUnpartitionedKVEdgeConfigurer.java |  26 ++
 .../library/common/TestValuesIterator.java      | 460 +++++++++++++++++++
 .../TestOnFileUnorderedPartitionedKVOutput.java |   9 +-
 .../TestOrderedPartitionedKVEdgeConfigurer.java |  53 +++
 ...estUnorderedPartitionedKVEdgeConfigurer.java |   7 +
 ...tUnorderedUnpartitionedKVEdgeConfigurer.java |   6 +
 21 files changed, 982 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java b/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java
deleted file mode 100644
index a372e01..0000000
--- a/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.io;
-
-public interface HashComparator<KEY> {
-
-  int getHashCode(KEY key);
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
index ab1cf7f..c0068eb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
@@ -41,6 +43,7 @@ import com.google.common.base.Preconditions;
  */
 
 public class ValuesIterator<KEY,VALUE> {
+  private static final Log LOG = LogFactory.getLog(ValuesIterator.class.getName());
   protected TezRawKeyValueIterator in; //input iterator
   private KEY key;               // current key
   private KEY nextKey;
@@ -75,6 +78,7 @@ public class ValuesIterator<KEY,VALUE> {
     this.keyDeserializer.open(keyIn);
     this.valDeserializer = serializationFactory.getDeserializer(valClass);
     this.valDeserializer.open(this.valueIn);
+    LOG.info("keyDeserializer=" + keyDeserializer + "; valueDeserializer=" + valDeserializer);
   }
 
   TezRawKeyValueIterator getRawIterator() { return in; }
@@ -175,7 +179,8 @@ public class ValuesIterator<KEY,VALUE> {
     more = in.next();
     if (more) {      
       DataInputBuffer nextKeyBytes = in.getKey();
-      keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+     keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(),
+         nextKeyBytes.getLength() - nextKeyBytes.getPosition());
       nextKey = keyDeserializer.deserialize(nextKey);
       // TODO Is a counter increment required here ?
       hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
@@ -190,7 +195,8 @@ public class ValuesIterator<KEY,VALUE> {
    */
   private void readNextValue() throws IOException {
     DataInputBuffer nextValueBytes = in.getValue();
-    valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+    valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(),
+        nextValueBytes.getLength() - nextValueBytes.getPosition());
     value = valDeserializer.deserialize(value);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/HashComparator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/HashComparator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/HashComparator.java
new file mode 100644
index 0000000..4af03ba
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/HashComparator.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.comparator;
+
+public interface HashComparator<KEY> {
+
+  int getHashCode(KEY key);
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/TezBytesComparator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/TezBytesComparator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/TezBytesComparator.java
new file mode 100644
index 0000000..149188e
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/TezBytesComparator.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.comparator;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparator;
+
+public class TezBytesComparator extends WritableComparator {
+
+  public TezBytesComparator() {
+    super(BytesWritable.class);
+  }
+
+  /**
+   * Compare the buffers in serialized form.
+   */
+  @Override
+  public int compare(byte[] b1, int s1, int l1,
+      byte[] b2, int s2, int l2) {
+    return compareBytes(b1, s1, l1, b2, s2, l2);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/TezBytesWritableSerialization.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/TezBytesWritableSerialization.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/TezBytesWritableSerialization.java
new file mode 100644
index 0000000..e195178
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/TezBytesWritableSerialization.java
@@ -0,0 +1,129 @@
+package org.apache.tez.runtime.library.common.serializer;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * <pre>
+ * When using BytesWritable, data is serialized in memory (4 bytes per key and 4 bytes per value)
+ * and written to IFile where it gets serialized again (4 bytes per key and 4 bytes per value).
+ * This adds an overhead of 8 bytes per key value pair written. This class reduces this overhead
+ * by providing a fast serializer/deserializer to speed up inner loop of sort,
+ * spill, merge.
+ *
+ * Usage e.g:
+ *  OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
+ *         .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf)
+ *         .setFromConfiguration(conf)
+ *         .setKeySerializationClass(TezBytesWritableSerialization.class.getName(),
+ *            TezBytesComparator.class.getName()))
+ * </pre>
+ */
+public class TezBytesWritableSerialization extends Configured implements Serialization<Writable> {
+
+  private static final Log LOG = LogFactory.getLog(TezBytesWritableSerialization.class.getName());
+
+  @Override
+  public boolean accept(Class<?> c) {
+    return (BytesWritable.class.isAssignableFrom(c));
+  }
+
+  @Override
+  public Serializer<Writable> getSerializer(Class<Writable> c) {
+    return new TezBytesWritableSerializer();
+  }
+
+  @Override
+  public Deserializer<Writable> getDeserializer(Class<Writable> c) {
+    return new TezBytesWritableDeserializer(getConf(), c);
+  }
+
+  public static class TezBytesWritableDeserializer extends Configured
+      implements Deserializer<Writable> {
+    private Class<?> writableClass;
+    private DataInputBuffer dataIn;
+
+    public TezBytesWritableDeserializer(Configuration conf, Class<?> c) {
+      setConf(conf);
+      this.writableClass = c;
+    }
+
+    @Override
+    public void open(InputStream in) {
+      dataIn = (DataInputBuffer) in;
+    }
+
+    @Override
+    public Writable deserialize(Writable w) throws IOException {
+      BytesWritable writable = (BytesWritable) w;
+      if (w == null) {
+        writable = (BytesWritable) ReflectionUtils.newInstance(writableClass, getConf());
+      }
+
+      writable.set(dataIn.getData(), dataIn.getPosition(), dataIn.getLength() - dataIn
+          .getPosition());
+      return writable;
+    }
+
+    @Override
+    public void close() throws IOException {
+      dataIn.close();
+    }
+
+  }
+
+  public static class TezBytesWritableSerializer extends Configured implements
+      Serializer<Writable> {
+
+    private OutputStream dataOut;
+
+    @Override
+    public void open(OutputStream out) {
+      this.dataOut = out;
+    }
+
+    @Override
+    public void serialize(Writable w) throws IOException {
+      BytesWritable writable = (BytesWritable) w;
+      dataOut.write(writable.getBytes(), 0, writable.getLength());
+    }
+
+    @Override
+    public void close() throws IOException {
+      dataOut.close();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index cd0bc26..0f4dbe2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -162,6 +163,9 @@ public abstract class ExternalSorter {
     serializationFactory = new SerializationFactory(this.conf);
     keySerializer = serializationFactory.getSerializer(keyClass);
     valSerializer = serializationFactory.getSerializer(valClass);
+    LOG.info("keySerializer=" + keySerializer + "; valueSerializer=" + valSerializer
+        + "; comparator=" + (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf)
+        + "; conf=" + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
 
     //    counters    
     mapOutputByteCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 3b6fea5..cccadd9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -238,8 +238,10 @@ public class IFile {
       if (writtenRecordsCounter != null) {
         writtenRecordsCounter.increment(numRecordsWritten);
       }
-      LOG.info("Total keys written=" + numRecordsWritten + "; Savings(optimized due to " +
-          "multi-kv/rle)=" + totalKeySaving + "; number of RLEs written=" + rleWritten);
+      LOG.info("Total keys written=" + numRecordsWritten + "; rleEnabled=" + rle + "; Savings" +
+          "(due to multi-kv/rle)=" + totalKeySaving + "; number of RLEs written=" +
+          rleWritten + "; compressedLen=" + compressedBytesWritten + "; rawLen="
+          + decompressedBytesWritten);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index de43acf..6960736 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.HashComparator;
+import org.apache.tez.runtime.library.common.comparator.HashComparator;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
index ec90e6f..97712a3 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -308,6 +309,41 @@ public class OnFileSortedOutputConfiguration {
     }
 
     /**
+     * Set serialization class and the relevant comparator to be used for sorting.
+     * Providing custom serialization class could change the way, keys needs to be compared in
+     * sorting. Providing invalid comparator here could create invalid results.
+     *
+     * @param serializationClassName
+     * @param comparatorClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName,
+        String comparatorClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      Preconditions.checkArgument(comparatorClassName != null,
+          "comparator cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      setKeyComparatorClass(comparatorClassName);
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
      * Create the actual configuration instance.
      *
      * @return an instance of the Configuration

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
index 9f10138..17bb3b8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -200,6 +201,34 @@ public class OnFileUnorderedKVOutputConfiguration {
       return this;
     }
 
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for keys.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
     public Builder enableCompression(String compressionCodec) {
       this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true);
       if (compressionCodec != null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
index 01b7405..2967501 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -249,6 +250,34 @@ public class OnFileUnorderedPartitionedKVOutputConfiguration {
     }
 
     /**
+     * Set serialization class responsible for providing serializer/deserializer for keys.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
      * Create the actual configuration instance.
      *
      * @return an instance of the Configuration

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
index 78ef7bf..4da5060 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
@@ -155,6 +156,35 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
       return this;
     }
 
+    /**
+     * Set serialization class and the relevant comparator to be used for sorting.
+     * Providing custom serialization class could change the way, keys needs to be compared in
+     * sorting. Providing invalid comparator here could create invalid results.
+     *
+     * @param serializationClassName
+     * @param comparatorClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName,
+        String comparatorClassName) {
+      outputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName);
+      inputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName);
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      outputBuilder.setValueSerializationClass(serializationClassName);
+      inputBuilder.setValueSerializationClass(serializationClassName);
+      return this;
+    }
+
+
     @Override
     public Builder enableCompression(String compressionCodec) {
       outputBuilder.enableCompression(compressionCodec);

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
index 1a5a6ed..d6abf03 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -409,6 +410,41 @@ public class ShuffledMergedInputConfiguration {
     }
 
     /**
+     * Set serialization class and the relevant comparator to be used for sorting.
+     * Providing custom serialization class could change the way, keys needs to be compared in
+     * sorting. Providing invalid comparator here could create invalid results.
+     *
+     * @param serializationClassName
+     * @param comparatorClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName,
+        String comparatorClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      Preconditions.checkArgument(comparatorClassName != null,
+          "comparator cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      setKeyComparatorClass(comparatorClassName);
+      return this;
+    }
+
+    /**
+     * Serialization class to be used for serializing values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
      * Create the actual configuration instance.
      *
      * @return an instance of the Configuration

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
index 8452b36..7ff8a58 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -302,6 +303,35 @@ public class ShuffledUnorderedKVInputConfiguration {
     }
 
     /**
+     * Set serialization class responsible for providing serializer/deserializer for key/value and
+     * the corresponding comparator class to be used as key comparator.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      Preconditions.checkArgument(serializationClassName != null,
+          "serializationClassName cannot be null");
+      this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      return this;
+    }
+
+    /**
      * Create the actual configuration instance.
      *
      * @return an instance of the Configuration

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
index 5cd7f49..3f985f6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
@@ -178,6 +178,30 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
     }
 
     /**
+     * Set serialization class responsible for providing serializer/deserializer for keys.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName) {
+      outputBuilder.setKeySerializationClass(serializationClassName);
+      inputBuilder.setKeySerializationClass(serializationClassName);
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      outputBuilder.setValueSerializationClass(serializationClassName);
+      inputBuilder.setValueSerializationClass(serializationClassName);
+      return this;
+    }
+
+    /**
      * Configure the specific output
      *
      * @return a builder to configure the output

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
index 5c93df3..8adfad9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
@@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
@@ -192,6 +193,31 @@ public class UnorderedUnpartitionedKVEdgeConfigurer extends HadoopKeyValuesBased
     }
 
     /**
+     * Set serialization class responsible for providing serializer/deserializer for key/value and
+     * the corresponding comparator class to be used as key comparator.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setKeySerializationClass(String serializationClassName) {
+      outputBuilder.setKeySerializationClass(serializationClassName);
+      inputBuilder.setKeySerializationClass(serializationClassName);
+      return this;
+    }
+
+    /**
+     * Set serialization class responsible for providing serializer/deserializer for values.
+     *
+     * @param serializationClassName
+     * @return
+     */
+    public Builder setValueSerializationClass(String serializationClassName) {
+      outputBuilder.setValueSerializationClass(serializationClassName);
+      inputBuilder.setValueSerializationClass(serializationClassName);
+      return this;
+    }
+
+    /**
      * Configure the specific output
      * @return a builder to configure the output
      */

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
new file mode 100644
index 0000000..b74789a
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -0,0 +1,460 @@
+package org.apache.tez.runtime.library.common;
+
+import com.google.common.collect.Ordering;
+import com.google.common.collect.TreeMultimap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.counters.GenericCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
+import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
+import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
+import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryWriter;
+import org.apache.tez.runtime.library.common.shuffle.impl.MergeManager;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import static junit.framework.TestCase.assertFalse;
+import static junit.framework.TestCase.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+@RunWith(Parameterized.class)
+public class TestValuesIterator {
+
+  private static final Log LOG = LogFactory.getLog(TestValuesIterator.class);
+
+  static final String TEZ_BYTES_SERIALIZATION = TezBytesWritableSerialization.class.getName();
+
+  enum TestWithComparator {
+    LONG, INT, BYTES, TEZ_BYTES, TEXT, CUSTOM
+  }
+
+  Configuration conf;
+  FileSystem fs;
+  static final Random rnd = new Random();
+
+  final Class keyClass;
+  final Class valClass;
+  final RawComparator comparator;
+  final RawComparator correctComparator;
+  final boolean expectedTestResult;
+
+  int mergeFactor;
+  //For storing original data
+  final TreeMultimap<Writable, Writable> sortedDataMap;
+
+  TezRawKeyValueIterator rawKeyValueIterator;
+
+  Path baseDir;
+  Path tmpDir;
+  Path[] streamPaths; //merge stream paths
+
+  /**
+   * Constructor
+   *
+   * @param serializationClassName serialization class to be used
+   * @param key                    key class name
+   * @param val                    value class name
+   * @param comparator             to be used
+   * @param correctComparator      (real comparator to be used for correct results)
+   * @param testResult             expected result
+   * @throws IOException
+   */
+  public TestValuesIterator(String serializationClassName, Class key, Class val,
+      TestWithComparator comparator, TestWithComparator correctComparator, boolean testResult)
+      throws IOException {
+    this.keyClass = key;
+    this.valClass = val;
+    this.comparator = getComparator(comparator);
+    this.correctComparator =
+        (correctComparator == null) ? this.comparator : getComparator(correctComparator);
+    this.expectedTestResult = testResult;
+    sortedDataMap = TreeMultimap.create(this.correctComparator, Ordering.natural());
+    setupConf(serializationClassName);
+  }
+
+  private void setupConf(String serializationClassName) throws IOException {
+    mergeFactor = Math.max(2, rnd.nextInt(100));
+    conf = new Configuration();
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, mergeFactor);
+    if (serializationClassName != null) {
+      conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
+          + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+    }
+    baseDir = new Path(".", this.getClass().getName());
+    String localDirs = baseDir.toString();
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+    fs = FileSystem.getLocal(conf);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    fs.mkdirs(baseDir);
+    tmpDir = new Path(baseDir, "tmp");
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    fs.delete(baseDir, true);
+    sortedDataMap.clear();
+  }
+
+  @Test(timeout = 20000)
+  public void testIteratorWithInMemoryReader() throws IOException {
+    ValuesIterator iterator = createIterator(true);
+    testIterator(iterator);
+  }
+
+  @Test(timeout = 20000)
+  public void testIteratorWithIFileReader() throws IOException {
+    ValuesIterator iterator = createIterator(false);
+    testIterator(iterator);
+  }
+
+  /**
+   * Tests whether data in valuesIterator matches with sorted input data set.
+   *
+   * @param valuesIterator
+   * @throws IOException
+   */
+  private void testIterator(ValuesIterator valuesIterator) throws IOException {
+    Iterator<Writable> oriKeySet = sortedDataMap.keySet().iterator();
+    boolean result = true;
+    while (valuesIterator.moveToNext()) {
+      Writable key = (Writable) valuesIterator.getKey();
+      assertTrue(oriKeySet.hasNext());
+      Writable ori = oriKeySet.next();
+      if (!key.equals(ori)) {
+        result = false;
+        break;
+      }
+      for (Object val : valuesIterator.getValues()) {
+        if (!sortedDataMap.get(key).contains(val)) {
+          result = false;
+          break;
+        }
+      }
+    }
+    if (expectedTestResult) {
+      assertTrue(result);
+    } else {
+      assertFalse(result);
+    }
+  }
+
+  /**
+   * Create sample data (in memory / disk based), merge them and return ValuesIterator
+   *
+   * @param inMemory
+   * @return ValuesIterator
+   * @throws IOException
+   */
+  private ValuesIterator createIterator(boolean inMemory) throws IOException {
+    if (!inMemory) {
+      streamPaths = createFiles();
+      //Merge all files to get KeyValueIterator
+      rawKeyValueIterator =
+          TezMerger.merge(conf, fs, keyClass, valClass, null,
+              false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator,
+              new ProgressReporter(), null, null, null, null);
+    } else {
+      List<TezMerger.Segment> segments = createInMemStreams();
+      rawKeyValueIterator =
+          TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir,
+              comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"),
+              new GenericCounter("writesCounter", "y1"),
+              new GenericCounter("bytesReadCounter", "y2"), new Progress());
+    }
+    return new ValuesIterator(rawKeyValueIterator, comparator,
+        keyClass, valClass, conf, (TezCounter) new GenericCounter("inputKeyCounter", "y3"),
+        (TezCounter) new GenericCounter("inputValueCounter", "y4"));
+  }
+
+  @Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3} {4} {5} {6}]")
+  public static Collection<Object[]> getParameters() {
+    Collection<Object[]> parameters = new ArrayList<Object[]>();
+
+    //parameters for constructor
+    parameters.add(new Object[]
+        { null, Text.class, Text.class, TestWithComparator.TEXT, null, true });
+    parameters.add(new Object[]
+        { null, LongWritable.class, Text.class, TestWithComparator.LONG, null, true });
+    parameters.add(new Object[]
+        { null, IntWritable.class, Text.class, TestWithComparator.INT, null, true });
+    parameters.add(new Object[]
+        { null, BytesWritable.class, BytesWritable.class, TestWithComparator.BYTES, null, true });
+    parameters.add(new Object[]
+        {
+            TEZ_BYTES_SERIALIZATION, BytesWritable.class, BytesWritable.class,
+            TestWithComparator.TEZ_BYTES, null, true
+        });
+    parameters.add(new Object[]
+        {
+            TEZ_BYTES_SERIALIZATION, BytesWritable.class, LongWritable.class,
+            TestWithComparator.TEZ_BYTES,
+            null, true
+        });
+    parameters.add(new Object[]
+        {
+            TEZ_BYTES_SERIALIZATION, CustomKey.class, LongWritable.class,
+            TestWithComparator.TEZ_BYTES,
+            null, true
+        });
+
+    //negative tests
+    parameters.add(new Object[]
+        {
+            TEZ_BYTES_SERIALIZATION, BytesWritable.class, BytesWritable.class,
+            TestWithComparator.BYTES,
+            TestWithComparator.TEZ_BYTES, false
+        });
+    parameters.add(new Object[]
+        {
+            TEZ_BYTES_SERIALIZATION, CustomKey.class, LongWritable.class, TestWithComparator.CUSTOM,
+            TestWithComparator.TEZ_BYTES, false
+        });
+    return parameters;
+  }
+
+  private RawComparator getComparator(TestWithComparator comparator) {
+    switch (comparator) {
+    case LONG:
+      return new LongWritable.Comparator();
+    case INT:
+      return new IntWritable.Comparator();
+    case BYTES:
+      return new BytesWritable.Comparator();
+    case TEZ_BYTES:
+      return new TezBytesComparator();
+    case TEXT:
+      return new Text.Comparator();
+    case CUSTOM:
+      return new CustomKey.Comparator();
+    default:
+      return null;
+    }
+  }
+
+  private Path[] createFiles() throws IOException {
+    int numberOfStreams = Math.max(2, rnd.nextInt(10));
+    LOG.info("No of streams : " + numberOfStreams);
+
+    Path[] paths = new Path[numberOfStreams];
+    for (int i = 0; i < numberOfStreams; i++) {
+      paths[i] = new Path(baseDir, "ifile_" + i + ".out");
+      IFile.Writer writer =
+          new IFile.Writer(conf, fs, paths[i], keyClass, valClass, null,
+              null, null);
+      Map<Writable, Writable> data = createData();
+      //write data
+      for (Map.Entry<Writable, Writable> entry : data.entrySet()) {
+        writer.append(entry.getKey(), entry.getValue());
+      }
+      LOG.info("Wrote " + data.size() + " in " + paths[i]);
+      data.clear();
+      writer.close();
+    }
+    return paths;
+  }
+
+  /**
+   * create inmemory segments
+   *
+   * @return
+   * @throws IOException
+   */
+  public List<TezMerger.Segment> createInMemStreams() throws IOException {
+    int numberOfStreams = Math.max(2, rnd.nextInt(5));
+    LOG.info("No of streams : " + numberOfStreams);
+
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    Serializer keySerializer = serializationFactory.getSerializer(keyClass);
+    Serializer valueSerializer = serializationFactory.getSerializer(valClass);
+
+    LocalDirAllocator localDirAllocator =
+        new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    TezInputContext context = createTezInputContext();
+    MergeManager mergeManager = new MergeManager(conf, fs, localDirAllocator,
+        context, null, null, null, null, null, 1024 * 1024 * 10, null, false, -1);
+
+    DataOutputBuffer keyBuf = new DataOutputBuffer();
+    DataOutputBuffer valBuf = new DataOutputBuffer();
+    DataInputBuffer keyIn = new DataInputBuffer();
+    DataInputBuffer valIn = new DataInputBuffer();
+    keySerializer.open(keyBuf);
+    valueSerializer.open(valBuf);
+
+    List<TezMerger.Segment> segments = new LinkedList<TezMerger.Segment>();
+    for (int i = 0; i < numberOfStreams; i++) {
+      BoundedByteArrayOutputStream bout = new BoundedByteArrayOutputStream(1024 * 1024);
+      InMemoryWriter writer =
+          new InMemoryWriter(bout);
+      Map<Writable, Writable> data = createData();
+      //write data
+      for (Map.Entry<Writable, Writable> entry : data.entrySet()) {
+        keySerializer.serialize(entry.getKey());
+        valueSerializer.serialize(entry.getValue());
+        keyIn.reset(keyBuf.getData(), 0, keyBuf.getLength());
+        valIn.reset(valBuf.getData(), 0, valBuf.getLength());
+        writer.append(keyIn, valIn);
+        keyBuf.reset();
+        valBuf.reset();
+        keyIn.reset();
+        valIn.reset();
+      }
+      IFile.Reader reader = new InMemoryReader(mergeManager, null, bout.getBuffer(), 0,
+          bout.getBuffer().length);
+      segments.add(new TezMerger.Segment(reader, true));
+
+      data.clear();
+      writer.close();
+    }
+    return segments;
+  }
+
+  private TezInputContext createTezInputContext() {
+    TezCounters counters = new TezCounters();
+    TezInputContext inputContext = mock(TezInputContext.class);
+    doReturn(1024 * 1024 * 100l).when(inputContext).getTotalMemoryAvailableToTask();
+    doReturn(counters).when(inputContext).getCounters();
+    doReturn(1).when(inputContext).getInputIndex();
+    doReturn("srcVertex").when(inputContext).getSourceVertexName();
+    doReturn(1).when(inputContext).getTaskVertexIndex();
+    doReturn(new byte[1024]).when(inputContext).getUserPayload();
+    return inputContext;
+  }
+
+  private Map<Writable, Writable> createData() {
+    Map<Writable, Writable> map = new TreeMap<Writable, Writable>(comparator);
+    for (int j = 0; j < Math.max(10, rnd.nextInt(50)); j++) {
+      Writable key = createData(keyClass);
+      Writable value = createData(valClass);
+      map.put(key, value);
+      sortedDataMap.put(key, value);
+    }
+    return map;
+  }
+
+  private Writable createData(Class c) {
+    if (c.getName().equalsIgnoreCase(BytesWritable.class.getName())) {
+      return new BytesWritable(new BigInteger(256, rnd).toString().getBytes());
+    } else if (c.getName().equalsIgnoreCase(IntWritable.class.getName())) {
+      return new IntWritable(rnd.nextInt());
+    } else if (c.getName().equalsIgnoreCase(LongWritable.class.getName())) {
+      return new LongWritable(rnd.nextLong());
+    } else if (c.getName().equalsIgnoreCase(CustomKey.class.getName())) {
+      String rndStr = new BigInteger(256, rnd).toString() + "_" + new BigInteger(256,
+          rnd).toString();
+      return new CustomKey(rndStr.getBytes(), rndStr.hashCode());
+    } else if (c.getName().equalsIgnoreCase(Text.class.getName())) {
+      String rndStr = new BigInteger(256, rnd).toString() + "_"
+          + new BigInteger(256, rnd).toString();
+      return new Text(rndStr);
+    } else {
+      throw new IllegalArgumentException("Illegal argument : " + c.getName());
+    }
+  }
+
+  private static class ProgressReporter implements Progressable {
+    @Override public void progress() {
+      //no impl
+    }
+  }
+
+  //Custom key and comparator
+  public static class CustomKey extends BytesWritable {
+    private static final int LENGTH_BYTES = 4;
+    private int hashCode;
+
+    public CustomKey() {
+    }
+
+    public CustomKey(byte[] data, int hashCode) {
+      super(data);
+      this.hashCode = hashCode;
+    }
+
+    @Override
+    public int hashCode() {
+      return hashCode;
+    }
+
+    public static class Comparator extends WritableComparator {
+      public Comparator() {
+        super(CustomKey.class);
+      }
+
+      /**
+       * Compare the buffers in serialized form.
+       */
+      @Override
+      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+        return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2
+            + LENGTH_BYTES, l2 - LENGTH_BYTES);
+      }
+    }
+
+    static {
+      WritableComparator.define(CustomKey.class, new Comparator());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java
index 262d778..90b725d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.junit.Test;
 
@@ -122,7 +123,9 @@ public class TestOnFileUnorderedPartitionedKVOutput {
   public void testDefaultConfigsUsed() {
     OnFileUnorderedPartitionedKVOutputConfiguration.Builder builder =
         OnFileUnorderedPartitionedKVOutputConfiguration
-            .newBuilder("KEY", "VALUE", "PARTITIONER", null);
+            .newBuilder("KEY", "VALUE", "PARTITIONER", null)
+            .setKeySerializationClass("SerClass1")
+            .setValueSerializationClass("SerClass2");
     OnFileUnorderedPartitionedKVOutputConfiguration configuration = builder.build();
 
     byte[] confBytes = configuration.toByteArray();
@@ -143,6 +146,10 @@ public class TestOnFileUnorderedPartitionedKVOutput {
     assertEquals("KEY", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, ""));
     assertEquals("VALUE", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, ""));
     assertEquals("PARTITIONER", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, ""));
+    assertTrue(conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith("SerClass2," +
+        "SerClass1"));
+    //for unordered paritioned kv output, comparator is not populated
+    assertNull(conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
index eaef16f..09a006a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.junit.Test;
 
@@ -246,4 +247,56 @@ public class TestOrderedPartitionedKVEdgeConfigurer {
 
   }
 
+  @Test
+  public void testSerialization() {
+    OrderedPartitionedKVEdgeConfigurer.Builder builder = OrderedPartitionedKVEdgeConfigurer
+        .newBuilder("KEY", "VALUE", "PARTITIONER", null)
+        .enableCompression("CustomCodec")
+        .setKeySerializationClass("serClass1", "SomeComparator1")
+        .setValueSerializationClass("serClass2");
+
+    OrderedPartitionedKVEdgeConfigurer configuration = builder.build();
+
+    byte[] outputBytes = configuration.getOutputPayload();
+    byte[] inputBytes = configuration.getInputPayload();
+
+    OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration();
+    rebuiltOutput.fromByteArray(outputBytes);
+    ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration();
+    rebuiltInput.fromByteArray(inputBytes);
+
+    Configuration outputConf = rebuiltOutput.conf;
+    Configuration inputConf = rebuiltInput.conf;
+
+    assertEquals("KEY", outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, ""));
+    assertEquals("VALUE",
+        outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, ""));
+    assertEquals("PARTITIONER", outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, ""));
+    assertEquals("CustomCodec",
+        outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
+    assertEquals(true,
+        outputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS,
+            false));
+    assertNull(outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT));
+    //verify comparator and serialization class
+    assertEquals("SomeComparator1",
+        outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS));
+    assertTrue(outputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).trim().startsWith
+        ("serClass2,serClass1"));
+
+
+    //verify comparator and serialization class
+    assertEquals("SomeComparator1", inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS));
+    assertTrue(inputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).trim().startsWith
+        ("serClass2,serClass1"));
+
+    assertEquals("KEY", inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, ""));
+    assertEquals("VALUE",
+        inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, ""));
+    assertEquals("CustomCodec",
+        inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
+    assertEquals(true,
+        inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS,
+            false));
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
index 758434c..b006c4d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.junit.Test;
 
@@ -62,6 +63,8 @@ public class TestUnorderedPartitionedKVEdgeConfigurer {
   public void testDefaultConfigsUsed() {
     UnorderedPartitionedKVEdgeConfigurer.Builder builder =
         UnorderedPartitionedKVEdgeConfigurer.newBuilder("KEY", "VALUE", "PARTITIONER", null);
+    builder.setKeySerializationClass("SerClass1");
+    builder.setValueSerializationClass("SerClass2");
 
     UnorderedPartitionedKVEdgeConfigurer configuration = builder.build();
 
@@ -80,12 +83,16 @@ public class TestUnorderedPartitionedKVEdgeConfigurer {
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
     assertEquals("TestCodec",
         outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
+    assertTrue(outputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith
+        ("SerClass2,SerClass1"));
 
     Configuration inputConf = rebuiltInput.conf;
     assertEquals(true, inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
     assertEquals("TestCodec",
         inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
+    assertTrue(inputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith
+        ("SerClass2,SerClass1"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
index c86542f..01008f4 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.junit.Test;
 
@@ -55,6 +56,7 @@ public class TestUnorderedUnpartitionedKVEdgeConfigurer {
   public void testDefaultConfigsUsed() {
     UnorderedUnpartitionedKVEdgeConfigurer.Builder builder =
         UnorderedUnpartitionedKVEdgeConfigurer.newBuilder("KEY", "VALUE");
+    builder.setKeySerializationClass("SerClass1").setValueSerializationClass("SerClass2");
 
     UnorderedUnpartitionedKVEdgeConfigurer configuration = builder.build();
 
@@ -73,12 +75,16 @@ public class TestUnorderedUnpartitionedKVEdgeConfigurer {
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
     assertEquals("TestCodec",
         outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
+    assertTrue(outputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith
+        ("SerClass2,SerClass1"));
 
     Configuration inputConf = rebuiltInput.conf;
     assertEquals(true, inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,
         TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT));
     assertEquals("TestCodec",
         inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, ""));
+    assertTrue(inputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith
+        ("SerClass2,SerClass1"));
   }
 
   @Test


Mime
View raw message