tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject git commit: TEZ-1228. Define a memory & merge optimized vertex-intermediate file format for Tez
Date Fri, 11 Jul 2014 05:50:10 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master e7e0b3fb6 -> 2316c1dc9


TEZ-1228. Define a memory & merge optimized vertex-intermediate file format for Tez


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/2316c1dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/2316c1dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/2316c1dc

Branch: refs/heads/master
Commit: 2316c1dc951996765cbd887f6c3ab80a8e784fef
Parents: e7e0b3f
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Fri Jul 11 11:18:27 2014 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Fri Jul 11 11:18:27 2014 +0530

----------------------------------------------------------------------
 .../common/shuffle/impl/InMemoryReader.java     |  62 +-
 .../common/shuffle/impl/InMemoryWriter.java     |  45 +-
 .../runtime/library/common/sort/impl/IFile.java | 647 +++++++++++++------
 .../common/sort/impl/IFileInputStream.java      |   6 +-
 .../common/sort/impl/PipelinedSorter.java       |   6 +-
 .../common/sort/impl/TezIndexRecord.java        |   3 +-
 .../library/shuffle/common/ShuffleUtils.java    |  25 +-
 .../library/common/sort/impl/TestIFile.java     | 471 ++++++++++++--
 .../runtime/library/testutils/KVDataGen.java    |  36 +-
 9 files changed, 924 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2316c1dc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
index 9ed90d6..1492da4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.runtime.library.common.shuffle.impl;
 
+import java.io.DataInput;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -35,22 +36,24 @@ import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class InMemoryReader extends Reader {
+
   private final InputAttemptIdentifier taskAttemptId;
   private final MergeManager merger;
   DataInputBuffer memDataIn = new DataInputBuffer();
   private int start;
   private int length;
-  private int prevKeyPos;
+  private int originalKeyPos;
 
-  public InMemoryReader(MergeManager merger, InputAttemptIdentifier taskAttemptId,
-                        byte[] data, int start, int length)
-  throws IOException {
-    super(null, length - start, null,null, null, false, 0, -1);
+  public InMemoryReader(MergeManager merger,
+      InputAttemptIdentifier taskAttemptId, byte[] data, int start,
+      int length)
+      throws IOException {
+    super(null, length - start, null, null, null, false, 0, -1);
     this.merger = merger;
     this.taskAttemptId = taskAttemptId;
 
     buffer = data;
-    bufferSize = (int)fileLength;
+    bufferSize = (int) length;
     memDataIn.reset(buffer, start, length);
     this.start = start;
     this.length = length;
@@ -70,15 +73,15 @@ public class InMemoryReader extends Reader {
     // which will be correct since in-memory data is not compressed.
     return bytesRead;
   }
-  
+
   @Override
-  public long getLength() { 
-    return fileLength;
+  public long getLength() {
+    return length;
   }
-  
+
   private void dumpOnError() {
     File dumpFile = new File("../output/" + taskAttemptId + ".dump");
-    System.err.println("Dumping corrupt map-output of " + taskAttemptId + 
+    System.err.println("Dumping corrupt map-output of " + taskAttemptId +
                        " to " + dumpFile.getAbsolutePath());
     try {
       FileOutputStream fos = new FileOutputStream(dumpFile);
@@ -88,7 +91,14 @@ public class InMemoryReader extends Reader {
       System.err.println("Failed to dump map-output of " + taskAttemptId);
     }
   }
-  
+
+  protected void readKeyValueLength(DataInput dIn) throws IOException {
+    super.readKeyValueLength(dIn);
+    if (currentKeyLength != IFile.RLE_MARKER) {
+      originalKeyPos = memDataIn.getPosition();
+    }
+  }
+
   public KeyState readRawKey(DataInputBuffer key) throws IOException {
     try {
       if (!positionToNextRecord(memDataIn)) {
@@ -96,23 +106,20 @@ public class InMemoryReader extends Reader {
       }
       // Setup the key
       int pos = memDataIn.getPosition();
-      byte[] data = memDataIn.getData();      
-      if(currentKeyLength == IFile.RLE_MARKER) {
-        key.reset(data, prevKeyPos, prevKeyLength);
-        currentKeyLength = prevKeyLength;
+      byte[] data = memDataIn.getData();
+      if (currentKeyLength == IFile.RLE_MARKER) {
+        // get key length from original key
+        key.reset(data, originalKeyPos, originalKeyLength);
         return KeyState.SAME_KEY;
-      }      
+      }
       key.reset(data, pos, currentKeyLength);
-      prevKeyPos = pos;
       // Position for the next value
       long skipped = memDataIn.skip(currentKeyLength);
       if (skipped != currentKeyLength) {
-        throw new IOException("Rec# " + recNo + 
-            ": Failed to skip past key of length: " + 
+        throw new IOException("Rec# " + recNo +
+            ": Failed to skip past key of length: " +
             currentKeyLength);
       }
-
-      // Record the byte
       bytesRead += currentKeyLength;
       return KeyState.NEW_KEY;
     } catch (IOException ioe) {
@@ -120,7 +127,7 @@ public class InMemoryReader extends Reader {
       throw ioe;
     }
   }
-  
+
   public void nextRawValue(DataInputBuffer value) throws IOException {
     try {
       int pos = memDataIn.getPosition();
@@ -130,25 +137,24 @@ public class InMemoryReader extends Reader {
       // Position for the next record
       long skipped = memDataIn.skip(currentValueLength);
       if (skipped != currentValueLength) {
-        throw new IOException("Rec# " + recNo + 
-            ": Failed to skip past value of length: " + 
+        throw new IOException("Rec# " + recNo +
+            ": Failed to skip past value of length: " +
             currentValueLength);
       }
       // Record the byte
       bytesRead += currentValueLength;
-
       ++recNo;
     } catch (IOException ioe) {
       dumpOnError();
       throw ioe;
     }
   }
-    
+
   public void close() {
     // Release
     dataIn = null;
     buffer = null;
-      // Inform the MergeManager
+    // Inform the MergeManager
     if (merger != null) {
       merger.unreserve(bufferSize);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2316c1dc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
index a9a86ff..b0b6606 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/InMemoryWriter.java
@@ -25,21 +25,20 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 
+import static org.apache.tez.runtime.library.common.sort.impl.IFile.*;
+
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class InMemoryWriter extends Writer {
   private static final Log LOG = LogFactory.getLog(InMemoryWriter.class);
 
-  private DataOutputStream out;
-
   // TODO Verify and fix counters if required.
-  
+
   public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) {
     super(null, null);
     this.out =
@@ -51,44 +50,6 @@ public class InMemoryWriter extends Writer {
     ("InMemoryWriter.append(K key, V value");
   }
 
-  public void append(DataInputBuffer key, DataInputBuffer value)
-  throws IOException {
-    int keyLength = key.getLength() - key.getPosition();
-    if (keyLength < 0) {
-      throw new IOException("Negative key-length not allowed: " + keyLength +
-                            " for " + key);
-    }
-
-    boolean sameKey = (key == IFile.REPEAT_KEY);
-
-    int valueLength = value.getLength() - value.getPosition();
-    if (valueLength < 0) {
-      throw new IOException("Negative value-length not allowed: " +
-                            valueLength + " for " + value);
-    }
-
-    if(sameKey) {
-      WritableUtils.writeVInt(out, IFile.RLE_MARKER);
-      WritableUtils.writeVInt(out, valueLength);
-      out.write(value.getData(), value.getPosition(), valueLength);
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("InMemWriter.append" +
-            " key.data=" + key.getData() +
-            " key.pos=" + key.getPosition() +
-            " key.len=" +key.getLength() +
-            " val.data=" + value.getData() +
-            " val.pos=" + value.getPosition() +
-            " val.len=" + value.getLength());
-      }
-      WritableUtils.writeVInt(out, keyLength);
-      WritableUtils.writeVInt(out, valueLength);
-      out.write(key.getData(), key.getPosition(), keyLength);
-      out.write(value.getData(), value.getPosition(), valueLength);
-    }
-
-  }
-
   public void close() throws IOException {
     // Write EOF_MARKER for key/value length
     WritableUtils.writeVInt(out, IFile.EOF_MARKER);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2316c1dc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index e8af024..3b6fea5 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -20,9 +20,10 @@ package org.apache.tez.runtime.library.common.sort.impl;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -60,30 +61,35 @@ public class IFile {
   private static final Log LOG = LogFactory.getLog(IFile.class);
   public static final int EOF_MARKER = -1; // End of File Marker
   public static final int RLE_MARKER = -2; // Repeat same key marker
+  public static final int V_END_MARKER = -3; // End of values marker
   public static final DataInputBuffer REPEAT_KEY = new DataInputBuffer();
-    
+  public static final byte[] HEADER = new byte[] { (byte) 'T', (byte) 'I',
+    (byte) 'F' , (byte) 0};
+
   /**
-   * <code>IFile.Writer</code> to write out intermediate map-outputs. 
+   * <code>IFile.Writer</code> to write out intermediate map-outputs.
    */
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   @SuppressWarnings({"unchecked", "rawtypes"})
   public static class Writer {
-    FSDataOutputStream out;
+    protected DataOutputStream out;
     boolean ownOutputStream = false;
     long start = 0;
     FSDataOutputStream rawOut;
-    AtomicBoolean closed = new AtomicBoolean(false);
-    
+    final AtomicBoolean closed = new AtomicBoolean(false);
+
     CompressionOutputStream compressedOut;
     Compressor compressor;
     boolean compressOutput = false;
-    
+
     long decompressedBytesWritten = 0;
     long compressedBytesWritten = 0;
 
     // Count records written to disk
     private long numRecordsWritten = 0;
+    private long rleWritten = 0; //number of RLE markers written
+    private long totalKeySaving = 0; //number of keys saved due to multi KV writes + RLE
     private final TezCounter writtenRecordsCounter;
     private final TezCounter serializedUncompressedBytes;
 
@@ -93,14 +99,21 @@ public class IFile {
     Class valueClass;
     Serializer keySerializer;
     Serializer valueSerializer;
-    
-    DataOutputBuffer buffer = new DataOutputBuffer();
-    DataOutputBuffer previous = new DataOutputBuffer();
-    
+
+    final DataOutputBuffer buffer = new DataOutputBuffer();
+    final DataOutputBuffer previous = new DataOutputBuffer();
+    Object prevKey = null;
+    boolean headerWritten = false;
+    boolean firstKey = true;
+
+    final int RLE_MARKER_SIZE = WritableUtils.getVIntSize(RLE_MARKER);
+    final int V_END_MARKER_SIZE = WritableUtils.getVIntSize(V_END_MARKER);
+
     // de-dup keys or not
-    private boolean rle = false;
+    protected final boolean rle;
+
 
-    public Writer(Configuration conf, FileSystem fs, Path file, 
+    public Writer(Configuration conf, FileSystem fs, Path file,
                   Class keyClass, Class valueClass,
                   CompressionCodec codec,
                   TezCounter writesCounter,
@@ -113,17 +126,26 @@ public class IFile {
     protected Writer(TezCounter writesCounter, TezCounter serializedBytesCounter) {
       writtenRecordsCounter = writesCounter;
       serializedUncompressedBytes = serializedBytesCounter;
+      this.rle = false;
     }
 
-    public Writer(Configuration conf, FSDataOutputStream out, 
+    public Writer(Configuration conf, FSDataOutputStream outputStream,
+        Class keyClass, Class valueClass, CompressionCodec codec, TezCounter writesCounter,
+        TezCounter serializedBytesCounter) throws IOException {
+      this(conf, outputStream, keyClass, valueClass, codec, writesCounter,
+          serializedBytesCounter, false);
+    }
+
+    public Writer(Configuration conf, FSDataOutputStream outputStream,
         Class keyClass, Class valueClass,
-        CompressionCodec codec, TezCounter writesCounter, TezCounter serializedBytesCounter)
-        throws IOException {
+        CompressionCodec codec, TezCounter writesCounter, TezCounter serializedBytesCounter,
+        boolean rle) throws IOException {
+      this.rawOut = outputStream;
       this.writtenRecordsCounter = writesCounter;
       this.serializedUncompressedBytes = serializedBytesCounter;
-      this.checksumOut = new IFileOutputStream(out);
-      this.rawOut = out;
+      this.checksumOut = new IFileOutputStream(outputStream);
       this.start = this.rawOut.getPos();
+      this.rle = rle;
       if (codec != null) {
         this.compressor = CodecPool.getCompressor(codec);
         if (this.compressor != null) {
@@ -138,12 +160,12 @@ public class IFile {
       } else {
         this.out = new FSDataOutputStream(checksumOut,null);
       }
-      
+      writeHeader(outputStream);
       this.keyClass = keyClass;
       this.valueClass = valueClass;
 
       if (keyClass != null) {
-        SerializationFactory serializationFactory = 
+        SerializationFactory serializationFactory =
           new SerializationFactory(conf);
         this.keySerializer = serializationFactory.getSerializer(keyClass);
         this.keySerializer.open(buffer);
@@ -152,15 +174,21 @@ public class IFile {
       }
     }
 
-    public Writer(Configuration conf, FileSystem fs, Path file) 
-    throws IOException {
+    public Writer(Configuration conf, FileSystem fs, Path file) throws IOException {
       this(conf, fs, file, null, null, null, null, null);
     }
 
-    public void close() throws IOException {
-      if (closed.getAndSet(true)) {
-        throw new IOException("Writer was already closed earlier");
+    protected void writeHeader(OutputStream outputStream) throws IOException {
+      if (!headerWritten) {
+        outputStream.write(HEADER, 0, HEADER.length - 1);
+        outputStream.write((compressOutput) ? (byte) 1 : (byte) 0);
+        outputStream.flush();
+        headerWritten = true;
       }
+    }
+
+    public void close() throws IOException {
+      checkState(!closed.getAndSet(true), "Writer was already closed earlier");
 
       // When IFile writer is created by BackupStore, we do not have
       // Key and Value classes set. So, check before closing the
@@ -170,20 +198,25 @@ public class IFile {
         valueSerializer.close();
       }
 
+      // write V_END_MARKER as needed
+      writeValueMarker(out);
+
       // Write EOF_MARKER for key/value length
       WritableUtils.writeVInt(out, EOF_MARKER);
       WritableUtils.writeVInt(out, EOF_MARKER);
       decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
-      
+      //account for header bytes
+      decompressedBytesWritten += HEADER.length;
+
       //Flush the stream
       out.flush();
-  
+
       if (compressOutput) {
         // Flush
         compressedOut.finish();
         compressedOut.resetState();
       }
-      
+
       // Close the underlying stream iff we own it...
       if (ownOutputStream) {
         out.close();
@@ -192,7 +225,7 @@ public class IFile {
         // Write the checksum
         checksumOut.finish();
       }
-
+      //header bytes are already included in rawOut
       compressedBytesWritten = rawOut.getPos() - start;
 
       if (compressOutput) {
@@ -202,163 +235,252 @@ public class IFile {
       }
 
       out = null;
-      if(writtenRecordsCounter != null) {
+      if (writtenRecordsCounter != null) {
         writtenRecordsCounter.increment(numRecordsWritten);
       }
+      LOG.info("Total keys written=" + numRecordsWritten + "; Savings(optimized due to " +
+          "multi-kv/rle)=" + totalKeySaving + "; number of RLEs written=" + rleWritten);
     }
 
+    /**
+     * Send key/value to be appended to IFile. To represent same key as previous
+     * one, send IFile.REPEAT_KEY as key parameter.  Should not call this method with
+     * IFile.REPEAT_KEY as the first key.
+     *
+     * @param key
+     * @param value
+     * @throws IOException
+     */
     public void append(Object key, Object value) throws IOException {
-      if (key.getClass() != keyClass)
-        throw new IOException("wrong key class: "+ key.getClass()
-                              +" is not "+ keyClass);
-      if (value.getClass() != valueClass)
-        throw new IOException("wrong value class: "+ value.getClass()
-                              +" is not "+ valueClass);
-      
-      boolean sameKey = false;
-
-      // Append the 'key'
-      keySerializer.serialize(key);
-      int keyLength = buffer.getLength();
-      if (keyLength < 0) {
-        throw new IOException("Negative key-length not allowed: " + keyLength + 
-                              " for " + key);
-      }     
-      
-      if(rle && keyLength == previous.getLength()) {
-        sameKey = (BufferUtils.compare(previous, buffer) == 0);       
-      }
-      
-      if(!sameKey) {
-        BufferUtils.copy(buffer, previous);
+      checkArgument((key == REPEAT_KEY || key.getClass() == keyClass),
+          "wrong key class: " + key.getClass() + " is not " + keyClass);
+      checkArgument((value.getClass() == valueClass),
+        "wrong value class: %s is not %s", value.getClass(), valueClass);
+
+      int keyLength = 0;
+      boolean sameKey = (key == REPEAT_KEY);
+      if (!sameKey) {
+        keySerializer.serialize(key);
+        keyLength = buffer.getLength();
+        checkState(keyLength >= 0, "Negative key-length not allowed: "
+            + keyLength + " for " + key);
+        if (rle && (keyLength == previous.getLength())) {
+          sameKey = (BufferUtils.compare(previous, buffer) == 0);
+        }
       }
 
       // Append the 'value'
       valueSerializer.serialize(value);
       int valueLength = buffer.getLength() - keyLength;
-      if (valueLength < 0) {
-        throw new IOException("Negative value-length not allowed: " + 
-                              valueLength + " for " + value);
-      }
-      
-      if(rle && sameKey) {
-        WritableUtils.writeVInt(out, RLE_MARKER);                   // Same key as previous
-        WritableUtils.writeVInt(out, valueLength);                  // value length
-        out.write(buffer.getData(), keyLength, buffer.getLength()); // only the value
-        // Update bytes written
-        decompressedBytesWritten += 0 + valueLength + 
-                                    WritableUtils.getVIntSize(RLE_MARKER) + 
-                                    WritableUtils.getVIntSize(valueLength);
-        if (serializedUncompressedBytes != null) {
-          serializedUncompressedBytes.increment(0 + valueLength);
-        }
-      } else {        
-        // Write the record out        
-        WritableUtils.writeVInt(out, keyLength);                  // key length
-        WritableUtils.writeVInt(out, valueLength);                // value length
-        out.write(buffer.getData(), 0, buffer.getLength());       // data
-        // Update bytes written
-        decompressedBytesWritten += keyLength + valueLength + 
-                                    WritableUtils.getVIntSize(keyLength) + 
-                                    WritableUtils.getVIntSize(valueLength);
-        if (serializedUncompressedBytes != null) {
-          serializedUncompressedBytes.increment(keyLength + valueLength);
+      checkState(valueLength >= 0,
+        "Negative value-length not allowed: %d for %s", valueLength, value);
+      if (!sameKey) {
+        //dump entire key value pair
+        writeKVPair(buffer.getData(), 0, keyLength, buffer.getData(),
+            keyLength, buffer.getLength() - keyLength);
+        if (rle) {
+          previous.reset();
+          previous.write(buffer.getData(), 0, keyLength); //store the key
         }
+      } else {
+        writeValue(buffer.getData(), keyLength, valueLength);
       }
 
+      prevKey = (sameKey) ? REPEAT_KEY : key;
       // Reset
       buffer.reset();
-      
-      
       ++numRecordsWritten;
     }
-    
-    public void append(DataInputBuffer key, DataInputBuffer value)
-    throws IOException {
-      int keyLength = key.getLength() - key.getPosition();
-      if (keyLength < 0) {
-        throw new IOException("Negative key-length not allowed: " + keyLength + 
-                              " for " + key);
+
+    /**
+     * Appends the value to previous key. Assumes that the caller has already done relevant checks
+     * for identical keys. Also, no validations are done in this method
+     *
+     * @param value
+     * @throws IOException
+     */
+    public void appendValue(Object value) throws IOException {
+      valueSerializer.serialize(value);
+      int valueLength = buffer.getLength();
+      checkState(valueLength >= 0,
+          "Negative value-length not allowed: %d for %s", valueLength, value);
+      writeValue(buffer.getData(), 0, valueLength);
+      buffer.reset();
+      ++numRecordsWritten;
+      prevKey = REPEAT_KEY;
+    }
+
+    /**
+     * Appends the value to previous key. Assumes that the caller has already done relevant checks
+     * for identical keys. Also, no validations are done in this method
+     *
+     * @param value
+     * @throws IOException
+     */
+    public void appendValue(DataInputBuffer value) throws IOException {
+      int valueLength = value.getLength() - value.getPosition();
+      checkState(valueLength >= 0,
+          "Negative value-length not allowed: %d for %s", valueLength, value);
+      writeValue(value.getData(), value.getPosition(), valueLength);
+      buffer.reset();
+      ++numRecordsWritten;
+      prevKey = REPEAT_KEY;
+    }
+
+    /**
+     * Appends the value to previous key. Assumes that the caller has already done relevant checks
+     * for identical keys. Also, no validations are done in this method
+     *
+     * @param valuesItr
+     * @throws IOException
+     */
+    public <V> void appendValues(Iterator<V> valuesItr) throws IOException {
+      while(valuesItr.hasNext()) {
+        appendValue(valuesItr.next());
+      }
+    }
+
+    /**
+     * Append key and its associated set of values.
+     *
+     * @param key
+     * @param valuesItr
+     * @param <K>
+     * @param <V>
+     * @throws IOException
+     */
+    public <K, V> void appendKeyValues(K key, Iterator<V> valuesItr) throws IOException {
+      if (valuesItr.hasNext()) {
+        append(key, valuesItr.next()); //append first KV pair
+      }
+      //append the remaining values
+      while(valuesItr.hasNext()) {
+        appendValue(valuesItr.next());
       }
-      
+    }
+
+    /**
+     * Send key/value to be appended to IFile. To represent same key as previous
+     * one, send IFile.REPEAT_KEY as key parameter.  Should not call this method with
+     * IFile.REPEAT_KEY as the first key.
+     *
+     * @param key
+     * @param value
+     * @throws IOException
+     */
+    public void append(DataInputBuffer key, DataInputBuffer value) throws IOException {
+      int keyLength = key.getLength() - key.getPosition();
+      checkState((key == REPEAT_KEY || keyLength >= 0),
+        "Negative key-length not allowed: %d for %s", keyLength, key);
+
       int valueLength = value.getLength() - value.getPosition();
-      if (valueLength < 0) {
-        throw new IOException("Negative value-length not allowed: " + 
-                              valueLength + " for " + value);
-      }
-      
-      boolean sameKey = false;
-      
-      if(rle && keyLength == previous.getLength()) {
-        sameKey = (keyLength != 0) && (BufferUtils.compare(previous, key) == 0);        
-      }
-      
-      if(rle && sameKey) {
-        WritableUtils.writeVInt(out, RLE_MARKER);
-        WritableUtils.writeVInt(out, valueLength);        
-        out.write(value.getData(), value.getPosition(), valueLength);
-
-        // Update bytes written
-        decompressedBytesWritten += 0 + valueLength
-            + WritableUtils.getVIntSize(RLE_MARKER)
-            + WritableUtils.getVIntSize(valueLength);
-        if (serializedUncompressedBytes != null) {
-          serializedUncompressedBytes.increment(0 + valueLength);
+      checkState(valueLength >= 0,
+        "Negative value-length not allowed: %d for %s", valueLength, value);
+
+      boolean sameKey = (key == REPEAT_KEY);
+      if (!sameKey && rle) {
+        sameKey = (keyLength != 0) && (BufferUtils.compare(previous, key) == 0);
+      }
+
+      if (!sameKey) {
+        writeKVPair(key.getData(), key.getPosition(), keyLength,
+            value.getData(), value.getPosition(), valueLength);
+        if (rle) {
+          BufferUtils.copy(key, previous);
         }
       } else {
-        WritableUtils.writeVInt(out, keyLength);
-        WritableUtils.writeVInt(out, valueLength);
-        out.write(key.getData(), key.getPosition(), keyLength);
-        out.write(value.getData(), value.getPosition(), valueLength);
-
-        // Update bytes written
-        decompressedBytesWritten += keyLength + valueLength
-            + WritableUtils.getVIntSize(keyLength)
-            + WritableUtils.getVIntSize(valueLength);
-        if (serializedUncompressedBytes != null) {
-          serializedUncompressedBytes.increment(keyLength + valueLength);
-        }
-                
-        BufferUtils.copy(key, previous);        
+        writeValue(value.getData(), value.getPosition(), valueLength);
       }
+      prevKey = (sameKey) ? REPEAT_KEY : key;
       ++numRecordsWritten;
     }
-    
+
+    protected void writeValue(byte[] data, int offset, int length) throws IOException {
+      writeRLE(out);
+      WritableUtils.writeVInt(out, length); // value length
+      out.write(data, offset, length);
+      // Update bytes written
+      decompressedBytesWritten +=
+          length + WritableUtils.getVIntSize(length);
+      if (serializedUncompressedBytes != null) {
+        serializedUncompressedBytes.increment(length);
+      }
+      totalKeySaving++;
+    }
+
+    protected void writeKVPair(byte[] keyData, int keyPos, int keyLength,
+        byte[] valueData, int valPos, int valueLength) throws IOException {
+      writeValueMarker(out);
+      WritableUtils.writeVInt(out, keyLength);
+      WritableUtils.writeVInt(out, valueLength);
+      out.write(keyData, keyPos, keyLength);
+      out.write(valueData, valPos, valueLength);
+
+      // Update bytes written
+      decompressedBytesWritten +=
+          keyLength + valueLength + WritableUtils.getVIntSize(keyLength)
+              + WritableUtils.getVIntSize(valueLength);
+      if (serializedUncompressedBytes != null) {
+        serializedUncompressedBytes.increment(keyLength + valueLength);
+      }
+    }
+
+    protected void writeRLE(DataOutputStream out) throws IOException {
+      /**
+       * To strike a balance between 2 use cases (lots of unique KV in stream
+       * vs lots of identical KV in stream), we start off by writing KV pair.
+       * If subsequent KV is identical, we write RLE marker along with V_END_MARKER
+       * {KL1, VL1, K1, V1}
+       * {RLE, VL2, V2, VL3, V3, ...V_END_MARKER}
+       */
+      if (prevKey != REPEAT_KEY) {
+        WritableUtils.writeVInt(out, RLE_MARKER);
+        decompressedBytesWritten += RLE_MARKER_SIZE;
+        rleWritten++;
+      }
+    }
+
+    protected void writeValueMarker(DataOutputStream out) throws IOException {
+      /**
+       * Write V_END_MARKER only in RLE scenario. This will
+       * save space in conditions where lots of unique KV pairs are found in the
+       * stream.
+       */
+      if (prevKey == REPEAT_KEY) {
+        WritableUtils.writeVInt(out, V_END_MARKER);
+        decompressedBytesWritten += V_END_MARKER_SIZE;
+      }
+    }
+
     // Required for mark/reset
     public DataOutputStream getOutputStream () {
       return out;
     }
-    
+
     // Required for mark/reset
     public void updateCountersForExternalAppend(long length) {
       ++numRecordsWritten;
       decompressedBytesWritten += length;
     }
-    
+
     public long getRawLength() {
       return decompressedBytesWritten;
     }
-    
+
     public long getCompressedLength() {
       return compressedBytesWritten;
     }
-    
-    public void setRLE(boolean rle) {
-      this.rle = rle;
-      previous.reset();
-    }
-
   }
 
   /**
-   * <code>IFile.Reader</code> to read intermediate map-outputs. 
+   * <code>IFile.Reader</code> to read intermediate map-outputs.
    */
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   public static class Reader {
-    
-    public enum KeyState {NO_KEY, NEW_KEY, SAME_KEY};
-    
+
+    public enum KeyState {NO_KEY, NEW_KEY, SAME_KEY}
+
     private static final int DEFAULT_BUFFER_SIZE = 128*1024;
 
     // Count records read from disk
@@ -369,26 +491,27 @@ public class IFile {
     final InputStream in;        // Possibly decompressed stream that we read
     Decompressor decompressor;
     public long bytesRead = 0;
-    protected final long fileLength;
+    final long fileLength;
     protected boolean eof = false;
-    final IFileInputStream checksumIn;
-    
+    IFileInputStream checksumIn;
+
     protected byte[] buffer = null;
     protected int bufferSize = DEFAULT_BUFFER_SIZE;
     protected DataInputStream dataIn;
 
     protected int recNo = 1;
+    protected int originalKeyLength;
     protected int prevKeyLength;
     protected int currentKeyLength;
     protected int currentValueLength;
     byte keyBytes[] = new byte[0];
-    
+
     long startPos;
-    
-    
+    protected boolean isCompressed = false;
+
     /**
      * Construct an IFile Reader.
-     * 
+     *
      * @param fs  FileSystem
      * @param file Path of the file to be opened. This file should have
      *             checksum bytes for the data at the end of the file.
@@ -400,14 +523,37 @@ public class IFile {
                   CompressionCodec codec,
                   TezCounter readsCounter, TezCounter bytesReadCounter, boolean ifileReadAhead,
                   int ifileReadAheadLength, int bufferSize) throws IOException {
-      this(fs.open(file), 
-           fs.getFileStatus(file).getLen(),
-           codec, readsCounter, bytesReadCounter, ifileReadAhead, ifileReadAheadLength, bufferSize);
+      this(fs.open(file), fs.getFileStatus(file).getLen(), codec,
+          readsCounter, bytesReadCounter, ifileReadAhead,
+          ifileReadAheadLength, bufferSize);
+    }
+
+    /**
+     * Construct an IFile Reader.
+     *
+     * @param in   The input stream
+     * @param length Length of the data in the stream, including the checksum
+     *               bytes.
+     * @param codec codec
+     * @param readsCounter Counter for records read from disk
+     * @throws IOException
+     */
+    public Reader(InputStream in, long length,
+        CompressionCodec codec,
+        TezCounter readsCounter, TezCounter bytesReadCounter,
+        boolean readAhead, int readAheadLength,
+        int bufferSize) throws IOException {
+      this(in, ((in != null) ? (length - HEADER.length) : length), codec,
+          readsCounter, bytesReadCounter, readAhead, readAheadLength,
+          bufferSize, ((in != null) ? isCompressedFlagEnabled(in) : false));
+      if (in != null && bytesReadCounter != null) {
+        bytesReadCounter.increment(IFile.HEADER.length);
+      }
     }
 
     /**
      * Construct an IFile Reader.
-     * 
+     *
      * @param in   The input stream
      * @param length Length of the data in the stream, including the checksum
      *               bytes.
@@ -415,15 +561,14 @@ public class IFile {
      * @param readsCounter Counter for records read from disk
      * @throws IOException
      */
-    public Reader(InputStream in, long length, 
+    public Reader(InputStream in, long length,
                   CompressionCodec codec,
                   TezCounter readsCounter, TezCounter bytesReadCounter,
                   boolean readAhead, int readAheadLength,
-                  int bufferSize) throws IOException {
-      readRecordsCounter = readsCounter;
-      this.bytesReadCounter = bytesReadCounter;
-      checksumIn = new IFileInputStream(in,length, readAhead, readAheadLength);
-      if (codec != null) {
+                  int bufferSize, boolean isCompressed) throws IOException {
+      this.isCompressed = isCompressed;
+      checksumIn = new IFileInputStream(in, length, readAhead, readAheadLength/*, isCompressed*/);
+      if (isCompressed && codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
         if (decompressor != null) {
           this.in = codec.createInputStream(checksumIn, decompressor);
@@ -434,28 +579,70 @@ public class IFile {
       } else {
         this.in = checksumIn;
       }
+
       this.dataIn = new DataInputStream(this.in);
-      this.fileLength = length;
-      
       startPos = checksumIn.getPosition();
-      
-      if (bufferSize != -1) {
-        this.bufferSize = bufferSize;
+      this.readRecordsCounter = readsCounter;
+      this.bytesReadCounter = bytesReadCounter;
+      this.fileLength = length;
+      this.bufferSize = Math.max(0, bufferSize);
+    }
+
+    /**
+     * Read entire ifile content to memory.
+     *
+     * @param buffer
+     * @param in
+     * @param compressedLength
+     * @param codec
+     * @param ifileReadAhead
+     * @param ifileReadAheadLength
+     * @throws IOException
+     */
+    public static void readToMemory(byte[] buffer, InputStream in, int compressedLength,
+        CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength)
+        throws IOException {
+      boolean isCompressed = IFile.Reader.isCompressedFlagEnabled(in);
+      IFileInputStream checksumIn = new IFileInputStream(in,
+          compressedLength - IFile.HEADER.length, ifileReadAhead,
+          ifileReadAheadLength);
+      in = checksumIn;
+      Decompressor decompressor = null;
+      if (isCompressed && codec != null) {
+        decompressor = CodecPool.getDecompressor(codec);
+        if (decompressor != null) {
+          decompressor.reset();
+          in = codec.createInputStream(checksumIn, decompressor);
+        } else {
+          LOG.warn("Could not obtain decompressor from CodecPool");
+          in = checksumIn;
+        }
+      }
+      try {
+        IOUtils.readFully(in, buffer, 0, buffer.length - IFile.HEADER.length);
+      } catch (IOException ioe) {
+        IOUtils.cleanup(LOG, in);
+        throw ioe;
+      } finally {
+        if (decompressor != null) {
+          decompressor.reset();
+          CodecPool.returnDecompressor(decompressor);
+        }
       }
     }
-    
-    public long getLength() { 
+
+    public long getLength() {
       return fileLength - checksumIn.getSize();
     }
-    
-    public long getPosition() throws IOException {    
-      return checksumIn.getPosition(); 
+
+    public long getPosition() throws IOException {
+      return checksumIn.getPosition();
     }
-    
+
     /**
      * Read upto len bytes into buf starting at offset off.
-     * 
-     * @param buf buffer 
+     *
+     * @param buf buffer
      * @param off offset
      * @param len length of buffer
      * @return the no. of bytes read
@@ -473,89 +660,122 @@ public class IFile {
       }
       return len;
     }
-    
-    protected boolean positionToNextRecord(DataInput dIn) throws IOException {
-      // Sanity check
-      if (eof) {
-        throw new EOFException("Completed reading " + bytesRead);
+
+    protected void readValueLength(DataInput dIn) throws IOException {
+      currentValueLength = WritableUtils.readVInt(dIn);
+      bytesRead += WritableUtils.getVIntSize(currentValueLength);
+      if (currentValueLength == V_END_MARKER) {
+        readKeyValueLength(dIn);
       }
-      
-      // Read key and value lengths
-      prevKeyLength = currentKeyLength;
+    }
+
+    protected void readKeyValueLength(DataInput dIn) throws IOException {
       currentKeyLength = WritableUtils.readVInt(dIn);
       currentValueLength = WritableUtils.readVInt(dIn);
-      bytesRead += WritableUtils.getVIntSize(currentKeyLength) +
-                   WritableUtils.getVIntSize(currentValueLength);
-      
+      if (currentKeyLength != RLE_MARKER) {
+        // original key length
+        originalKeyLength = currentKeyLength;
+      }
+      bytesRead +=
+          WritableUtils.getVIntSize(currentKeyLength)
+              + WritableUtils.getVIntSize(currentValueLength);
+    }
+
+    /**
+     * Reset key length and value length for next record in the file
+     *
+     * @param dIn
+     * @return
+     * @throws IOException
+     */
+    protected boolean positionToNextRecord(DataInput dIn) throws IOException {
+      // Sanity check
+      checkState(!eof, "Reached EOF. Completed reading %d", bytesRead);
+      prevKeyLength = currentKeyLength;
+
+      if (prevKeyLength == RLE_MARKER) {
+        // Same key as previous one. Just read value length alone
+        readValueLength(dIn);
+      } else {
+        readKeyValueLength(dIn);
+      }
+
       // Check for EOF
       if (currentKeyLength == EOF_MARKER && currentValueLength == EOF_MARKER) {
         eof = true;
         return false;
-      }      
-      
+      }
+
       // Sanity check
       if (currentKeyLength != RLE_MARKER && currentKeyLength < 0) {
-        throw new IOException("Rec# " + recNo + ": Negative key-length: " + 
-                              currentKeyLength);
+        throw new IOException("Rec# " + recNo + ": Negative key-length: " +
+                              currentKeyLength + " PreviousKeyLen: " + prevKeyLength);
       }
       if (currentValueLength < 0) {
-        throw new IOException("Rec# " + recNo + ": Negative value-length: " + 
+        throw new IOException("Rec# " + recNo + ": Negative value-length: " +
                               currentValueLength);
       }
-            
       return true;
     }
-    
+
     public final boolean nextRawKey(DataInputBuffer key) throws IOException {
       return readRawKey(key) != KeyState.NO_KEY;
     }
-    
+
     public KeyState readRawKey(DataInputBuffer key) throws IOException {
       if (!positionToNextRecord(dataIn)) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("currentKeyLength=" + currentKeyLength +
               ", currentValueLength=" + currentValueLength +
-              ", bytesRead=" + bytesRead + 
+              ", bytesRead=" + bytesRead +
               ", length=" + fileLength);
         }
         return KeyState.NO_KEY;
       }
       if(currentKeyLength == RLE_MARKER) {
-        currentKeyLength = prevKeyLength;
-        // no data to read
-        key.reset(keyBytes, currentKeyLength);
+        // get key length from original key
+        key.reset(keyBytes, originalKeyLength);
         return KeyState.SAME_KEY;
       }
       if (keyBytes.length < currentKeyLength) {
         keyBytes = new byte[currentKeyLength << 1];
       }
       int i = readData(keyBytes, 0, currentKeyLength);
-      if (i != currentKeyLength) {
-        throw new IOException ("Asked for " + currentKeyLength + " Got: " + i);
-      }
+      checkState((i == currentKeyLength), "Asked for %d got %",
+        currentKeyLength, i);
       key.reset(keyBytes, currentKeyLength);
       bytesRead += currentKeyLength;
       return KeyState.NEW_KEY;
     }
-    
+
     public void nextRawValue(DataInputBuffer value) throws IOException {
-      final byte[] valBytes = 
+      final byte[] valBytes =
         ((value.getData().length < currentValueLength) || (value.getData() == keyBytes))
         ? new byte[currentValueLength << 1]
         : value.getData();
       int i = readData(valBytes, 0, currentValueLength);
-      if (i != currentValueLength) {
-        throw new IOException ("Asked for " + currentValueLength + " Got: " + i);
-      }
+      checkState((i == currentValueLength), "Asked for %d got %d",
+        currentValueLength, i);
       value.reset(valBytes, currentValueLength);
-      
+
       // Record the bytes read
       bytesRead += currentValueLength;
 
       ++recNo;
       ++numRecordsRead;
     }
-    
+
+    public static boolean isCompressedFlagEnabled(InputStream in) throws IOException {
+      byte[] header = new byte[HEADER.length];
+      int bytesRead = in.read(header);
+
+      if (bytesRead != HEADER.length || !(header[0] == 'T' && header[1] == 'I'
+          && header[2] == 'F')) {
+        throw new IOException("Not a valid ifile header");
+      }
+      return (header[3] == 1);
+    }
+
     public void close() throws IOException {
       // Close the underlying stream
       in.close();
@@ -563,14 +783,14 @@ public class IFile {
       // Release the buffer
       dataIn = null;
       buffer = null;
-      if(readRecordsCounter != null) {
+      if (readRecordsCounter != null) {
         readRecordsCounter.increment(numRecordsRead);
       }
 
       if (bytesReadCounter != null) {
         bytesReadCounter.increment(checksumIn.getPosition() - startPos + checksumIn.getSize());
       }
-      
+
       // Return the decompressor
       if (decompressor != null) {
         decompressor.reset();
@@ -578,7 +798,7 @@ public class IFile {
         decompressor = null;
       }
     }
-    
+
     public void reset(int offset) {
       return;
     }
@@ -586,6 +806,19 @@ public class IFile {
     public void disableChecksumValidation() {
       checksumIn.disableChecksumValidation();
     }
+  }
+
+  public static void checkState(boolean expression, String format,
+      Object... args) throws IOException {
+    if (!expression) {
+      throw new IOException(String.format(format, args));
+    }
+  }
 
-  }    
+  static void checkArgument(boolean expression, String format,
+      Object... args) throws IOException {
+    if (!expression) {
+      throw new IOException(String.format(format, args));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2316c1dc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
index e919d3a..abfe4ad 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFileInputStream.java
@@ -62,7 +62,7 @@ public class IFileInputStream extends InputStream {
   public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
 
   private boolean disableChecksumValidation = false;
-  
+
   /**
    * Create a checksum input stream that reads without readAhead.
    * @param in
@@ -71,7 +71,7 @@ public class IFileInputStream extends InputStream {
   public IFileInputStream(InputStream in, long len) {
     this(in, len, false, 0);
   }
-  
+
   /**
    * Create a checksum input stream that reads
    * @param in The input stream to be verified for checksum.
@@ -81,7 +81,7 @@ public class IFileInputStream extends InputStream {
    */
   public IFileInputStream(InputStream in, long len, boolean readAhead, int readAheadLength) {
     this.in = in;
-    sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 
+    sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
         Integer.MAX_VALUE);
     checksumSize = sum.getChecksumSize();
     buffer = new byte[4096];

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2316c1dc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index fff59b1..231fb40 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -273,8 +273,7 @@ public class PipelinedSorter extends ExternalSorter {
         long segmentStart = out.getPos();
         Writer writer =
           new Writer(conf, out, keyClass, valClass, codec,
-              spilledRecordsCounter, null);
-        writer.setRLE(merger.needsRLE());
+              spilledRecordsCounter, null, merger.needsRLE());
         if (combiner == null) {
           while(kvIter.next()) {
             writer.append(kvIter.getKey(), kvIter.getValue());
@@ -382,8 +381,7 @@ public class PipelinedSorter extends ExternalSorter {
       long segmentStart = finalOut.getPos();
       Writer writer =
           new Writer(conf, finalOut, keyClass, valClass, codec,
-                           spilledRecordsCounter, null);
-      writer.setRLE(merger.needsRLE());
+                           spilledRecordsCounter, null, merger.needsRLE());
       if (combiner == null || numSpills < minSpillsForCombine) {
         TezMerger.writeFile(kvIter, writer, nullProgressable, TezJobConfig.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2316c1dc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
index a5bacd2..2f04a4d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
@@ -50,6 +50,7 @@ public class TezIndexRecord {
 
   public boolean hasData() {
     //TEZ-941 - Avoid writing out empty partitions
-    return !(rawLength == 2);
+    //EOF_MARKER + Header bytes
+    return !(rawLength == (IFile.HEADER.length + 2));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2316c1dc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
index 24de6f9..87af557 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/ShuffleUtils.java
@@ -36,13 +36,12 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.security.token.Token;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
-import org.apache.tez.runtime.library.common.sort.impl.IFileInputStream;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParams;
 import org.apache.tez.runtime.library.shuffle.common.HttpConnection.HttpConnectionParamsBuilder;
 
@@ -86,22 +85,9 @@ public class ShuffleUtils {
       InputStream input, int decompressedLength, int compressedLength,
       CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength,
       Log LOG, String identifier) throws IOException {
-    IFileInputStream checksumIn = new IFileInputStream(input, compressedLength,
-        ifileReadAhead, ifileReadAheadLength);
-
-    input = checksumIn;
-
-    Decompressor decompressor = null;
-    
-    // Are map-outputs compressed?
-    if (codec != null) {
-      decompressor = CodecPool.getDecompressor(codec);
-      decompressor.reset();
-      input = codec.createInputStream(input, decompressor);
-    }
-
     try {
-      IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
+      IFile.Reader.readToMemory(shuffleData, input, compressedLength, codec,
+        ifileReadAhead, ifileReadAheadLength);
       // metrics.inputBytes(shuffleData.length);
       LOG.info("Read " + shuffleData.length + " bytes from input for "
           + identifier);
@@ -110,11 +96,6 @@ public class ShuffleUtils {
       IOUtils.cleanup(LOG, input);
       // Re-throw
       throw ioe;
-    } finally {
-      if(decompressor != null) {
-        decompressor.reset();
-        CodecPool.returnDecompressor(decompressor);
-      }
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2316c1dc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index a781c0c..2de38d8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -18,33 +18,42 @@
 
 package org.apache.tez.runtime.library.common.sort.impl;
 
-import static org.junit.Assert.assertEquals;
-
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.BufferUtils;
 import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
+import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryWriter;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.testutils.KVDataGen;
 import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class TestIFile {
 
   private static final Log LOG = LogFactory.getLog(TestIFile.class);
@@ -52,6 +61,12 @@ public class TestIFile {
   private static Configuration defaultConf = new Configuration();
   private static FileSystem localFs = null;
   private static Path workDir = null;
+  private static CompressionCodec codec;
+  private Random rnd = new Random();
+  private String outputFileName = "ifile.out";
+  private Path outputPath;
+  private DataOutputBuffer k = new DataOutputBuffer();
+  private DataOutputBuffer v = new DataOutputBuffer();
 
   static {
     defaultConf.set("fs.defaultFS", "file:///");
@@ -67,90 +82,366 @@ public class TestIFile {
   }
 
   @Before
+  public void setUp() throws Exception {
+    CompressionCodecFactory codecFactory = new CompressionCodecFactory(new
+        Configuration());
+    codec = codecFactory.getCodecByClassName("org.apache.hadoop.io.compress.DefaultCodec");
+    outputPath = new Path(workDir, outputFileName);
+  }
+
+  @Before
   @After
   public void cleanup() throws Exception {
     localFs.delete(workDir, true);
   }
 
   @Test
-  public void testRepeatedKeysInMemReaderNoRLE() throws IOException {
-    String outputFileName = "ifile.out";
-    Path outputPath = new Path(workDir, outputFileName);
-    List<KVPair> data = KVDataGen.generateTestData(true);
-    Writer writer = writeTestFile(outputPath, false, data);
+  //empty IFile
+  public void testWithEmptyIFile() throws IOException {
+    testWriterAndReader(new LinkedList<KVPair>());
+    testWithDataBuffer(new LinkedList<KVPair>());
+  }
 
-    FSDataInputStream inStream =  localFs.open(outputPath);
-    byte[] bytes = new byte[(int)writer.getRawLength()];
+  @Test
+  //Write empty key value pairs
+  public void testWritingEmptyKeyValues() throws IOException {
+    DataInputBuffer key = new DataInputBuffer();
+    DataInputBuffer value = new DataInputBuffer();
+    IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath, null, null, null,
+        null, null);
+    writer.append(key, value);
+    writer.append(key, value);
+    writer.append(key, value);
+    writer.append(key, value);
+    writer.close();
 
-    readDataToMem(inStream, bytes);
-    inStream.close();
+    IFile.Reader reader = new Reader(localFs, outputPath, null, null, null, false, -1, 1024);
+    DataInputBuffer keyIn = new DataInputBuffer();
+    DataInputBuffer valIn = new DataInputBuffer();
+    int records = 0;
+    while (reader.nextRawKey(keyIn)) {
+      reader.nextRawValue(valIn);
+      records++;
+      assert(keyIn.getLength() == 0);
+      assert(valIn.getLength() == 0);
+    }
+    assertTrue("Number of records read does not match", (records == 4));
+    reader.close();
+  }
 
-    InMemoryReader inMemReader = new InMemoryReader(null, new InputAttemptIdentifier(0, 0), bytes, 0, bytes.length);
-    readAndVerify(inMemReader, data);
+  @Test
+  //test with unsorted data and repeat keys
+  public void testWithUnsortedData() throws IOException {
+    List<KVPair> unsortedData = KVDataGen.generateTestData(false, rnd.nextInt(100));
+    testWriterAndReader(unsortedData);
+    testWithDataBuffer(unsortedData);
   }
 
   @Test
-  public void testRepeatedKeysFileReaderNoRLE() throws IOException {
-    String outputFileName = "ifile.out";
-    Path outputPath = new Path(workDir, outputFileName);
-    List<KVPair> data = KVDataGen.generateTestData(true);
-    writeTestFile(outputPath, false, data);
+  //test with sorted data and repeat keys
+  public void testWithSortedData() throws IOException {
+    List<KVPair> sortedData = KVDataGen.generateTestData(true, rnd.nextInt(100));
+    testWriterAndReader(sortedData);
+    testWithDataBuffer(sortedData);
+  }
 
-    IFile.Reader reader = new IFile.Reader(localFs, outputPath, null, null, null, false, 0, -1);
 
-    readAndVerify(reader, data);
-    reader.close();
+  @Test
+  //test with sorted data and repeat keys
+  public void testWithRLEMarker() throws IOException {
+    //keys would be repeated exactly 2 times
+    //e.g (k1,v1), (k1,v2), (k3, v3), (k4, v4), (k4, v5)...
+    //This should trigger RLE marker in IFile.
+    List<KVPair> sortedData = KVDataGen.generateTestData(true, 1);
+    testWriterAndReader(sortedData);
+    testWithDataBuffer(sortedData);
+
+    List<KVPair> unsortedData = KVDataGen.generateTestData(false, 1);
+    testWriterAndReader(sortedData);
+    testWithDataBuffer(sortedData);
   }
 
-  @Ignore // TEZ-500
   @Test
-  public void testRepeatedKeysInMemReaderRLE() throws IOException {
-    String outputFileName = "ifile.out";
-    Path outputPath = new Path(workDir, outputFileName);
-    List<KVPair> data = KVDataGen.generateTestData(true);
-    Writer writer = writeTestFile(outputPath, true, data);
+  //test with unique keys
+  public void testWithUniqueKeys() throws IOException {
+    //all keys are unique
+    List<KVPair> sortedData = KVDataGen.generateTestData(true, 0);
+    testWriterAndReader(sortedData);
+    testWithDataBuffer(sortedData);
+  }
 
-    FSDataInputStream inStream =  localFs.open(outputPath);
-    byte[] bytes = new byte[(int)writer.getRawLength()];
+  @Test
+  //Test InMemoryWriter
+  public void testInMemoryWriter() throws IOException {
+    InMemoryWriter writer = null;
+    BoundedByteArrayOutputStream bout = new BoundedByteArrayOutputStream(1024 * 1024);
+
+    List<KVPair> data = KVDataGen.generateTestData(true, 0);
+
+    //No RLE, No RepeatKeys, no compression
+    writer = new InMemoryWriter(bout);
+    writeTestFileUsingDataBuffer(writer, false, false, data, null);
+    readUsingInMemoryReader(bout.getBuffer(), data);
+
+    //No RLE, RepeatKeys, no compression
+    bout.reset();
+    writer = new InMemoryWriter(bout);
+    writeTestFileUsingDataBuffer(writer, false, true, data, null);
+    readUsingInMemoryReader(bout.getBuffer(), data);
+
+    //RLE, No RepeatKeys, no compression
+    bout.reset();
+    writer = new InMemoryWriter(bout);
+    writeTestFileUsingDataBuffer(writer, true, false, data, null);
+    readUsingInMemoryReader(bout.getBuffer(), data);
+
+    //RLE, RepeatKeys, no compression
+    bout.reset();
+    writer = new InMemoryWriter(bout);
+    writeTestFileUsingDataBuffer(writer, true, true, data, null);
+    readUsingInMemoryReader(bout.getBuffer(), data);
+  }
 
-    readDataToMem(inStream, bytes);
-    inStream.close();
+  @Test
+  //Test appendValue feature
+  public void testAppendValue() throws IOException {
+    List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
+    IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
+        Text.class, IntWritable.class, codec, null, null);
 
+    Text previousKey = null;
+    for (KVPair kvp : data) {
+      if ((previousKey != null && previousKey.compareTo(kvp.getKey()) == 0)) {
+        writer.appendValue(kvp.getvalue());
+      } else {
+        writer.append(kvp.getKey(), kvp.getvalue());
+      }
+      previousKey = kvp.getKey();
+    }
 
-    InMemoryReader inMemReader = new InMemoryReader(null, new InputAttemptIdentifier(0, 0), bytes, 0, bytes.length);
-    readAndVerify(inMemReader, data);
+    writer.close();
+
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
   }
 
-  @Ignore // TEZ-500
   @Test
-  public void testRepeatedKeysFileReaderRLE() throws IOException {
-    String outputFileName = "ifile.out";
-    Path outputPath = new Path(workDir, outputFileName);
-    List<KVPair> data = KVDataGen.generateTestData(true);
-    writeTestFile(outputPath, true, data);
+  //Test appendValues feature
+  public void testAppendValues() throws IOException {
+    List<KVPair> data = new ArrayList<KVPair>();
+    List<IntWritable> values = new ArrayList<IntWritable>();
+
+    Text key = new Text("key");
+    IntWritable val = new IntWritable(1);
+    for(int i = 0; i < 5; i++) {
+      data.add(new KVPair(key, val));
+      values.add(val);
+    }
 
-    IFile.Reader reader = new IFile.Reader(localFs, outputPath, null, null, null, false, 0, -1);
+    IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
+        Text.class, IntWritable.class, codec, null, null);
+    writer.append(data.get(0).getKey(), data.get(0).getvalue()); //write first KV pair
+    writer.appendValues(values.subList(1, values.size()).iterator()); //add the rest here
 
-    readAndVerify(reader, data);
-    reader.close();
+    Text lastKey = new Text("key3");
+    IntWritable lastVal = new IntWritable(10);
+    data.add(new KVPair(lastKey, lastVal));
+
+    writer.append(lastKey, lastVal);
+    writer.close();
+
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
   }
 
-  private void readDataToMem(FSDataInputStream inStream, byte[] bytes) throws IOException {
-    int toRead = bytes.length;
-    int offset = 0;
-    while (toRead > 0) {
-      int ret = inStream.read(bytes, offset, toRead);
-      if (ret < 0) {
-        throw new IOException("Premature EOF from inputStream");
+  @Test
+  //Test appendKeyValues feature
+  public void testAppendKeyValues() throws IOException {
+    List<KVPair> data = new ArrayList<KVPair>();
+    List<IntWritable> values = new ArrayList<IntWritable>();
+
+    Text key = new Text("key");
+    IntWritable val = new IntWritable(1);
+    for(int i = 0; i < 5; i++) {
+      data.add(new KVPair(key, val));
+      values.add(val);
+    }
+
+    IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
+        Text.class, IntWritable.class, codec, null, null);
+    writer.appendKeyValues(data.get(0).getKey(), values.iterator());
+
+    Text lastKey = new Text("key3");
+    IntWritable lastVal = new IntWritable(10);
+    data.add(new KVPair(lastKey, lastVal));
+
+    writer.append(lastKey, lastVal);
+    writer.close();
+
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+  }
+
+  @Test
+  //Test appendValue with DataInputBuffer
+  public void testAppendValueWithDataInputBuffer() throws IOException {
+    List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
+    IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
+        Text.class, IntWritable.class, codec, null, null);
+
+    final DataInputBuffer previousKey = new DataInputBuffer();
+    DataInputBuffer key = new DataInputBuffer();
+    DataInputBuffer value = new DataInputBuffer();
+    for (KVPair kvp : data) {
+      populateData(kvp, key, value);
+
+      if ((previousKey != null && BufferUtils.compare(key, previousKey) == 0)) {
+        writer.appendValue(value);
+      } else {
+        writer.append(key, value);
       }
-      toRead -= ret;
-      offset += ret;
+      previousKey.reset(k.getData(), 0, k.getLength());
     }
-    LOG.info("Read: " + bytes.length + " bytes");
+
+    writer.close();
+
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+  }
+
+
+  /**
+   * Test different options (RLE, repeat keys, compression) on reader/writer
+   *
+   * @param data
+   * @throws IOException
+   */
+  private void testWriterAndReader(List<KVPair> data) throws IOException {
+    Writer writer = null;
+    //No RLE, No RepeatKeys
+    writer = writeTestFile(false, false, data, null);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
+
+    writer = writeTestFile(false, false, data, codec);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+
+    //No RLE, RepeatKeys
+    writer = writeTestFile(false, true, data, null);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
+
+    writer = writeTestFile(false, true, data, codec);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+
+    //RLE, No RepeatKeys
+    writer = writeTestFile(true, false, data, null);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
+
+    writer = writeTestFile(true, false, data, codec);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+
+    //RLE, RepeatKeys
+    writer = writeTestFile(true, true, data, null);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
+
+    writer = writeTestFile(true, true, data, codec);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+  }
+
+  /**
+   * Test different options (RLE, repeat keys, compression) on reader/writer
+   *
+   * @param data
+   * @throws IOException
+   */
+  private void testWithDataBuffer(List<KVPair> data) throws
+      IOException {
+    Writer writer = null;
+    //No RLE, No RepeatKeys
+    writer = writeTestFileUsingDataBuffer(false, false, data, null);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
+
+    writer = writeTestFileUsingDataBuffer(false, false, data, codec);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+
+    //No RLE, RepeatKeys
+    writer = writeTestFileUsingDataBuffer(false, true, data, null);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
+
+    writer = writeTestFileUsingDataBuffer(false, true, data, codec);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+
+    //RLE, No RepeatKeys
+    writer = writeTestFileUsingDataBuffer(true, false, data, null);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
+
+    writer = writeTestFileUsingDataBuffer(true, false, data, codec);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+
+    //RLE, RepeatKeys
+    writer = writeTestFileUsingDataBuffer(true, true, data, null);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, null);
+
+    writer = writeTestFileUsingDataBuffer(true, true, data, codec);
+    readAndVerifyData(writer.getRawLength(), writer.getCompressedLength(), data, codec);
+  }
+
+  private void readAndVerifyData(long rawLength, long compressedLength,
+      List<KVPair> originalData, CompressionCodec codec) throws
+      IOException {
+    readFileUsingInMemoryReader(rawLength, compressedLength, originalData);
+    readUsingIFileReader(originalData, codec);
+  }
+
+  /**
+   * Read data using in memory reader
+   *
+   * @param rawLength
+   * @param compressedLength
+   * @param originalData
+   * @throws IOException
+   */
+  private void readFileUsingInMemoryReader(long rawLength, long compressedLength,
+      List<KVPair> originalData) throws IOException {
+    LOG.info("Read using in memory reader");
+    FSDataInputStream inStream = localFs.open(outputPath);
+    byte[] bytes = new byte[(int) rawLength];
+
+    IFile.Reader.readToMemory(bytes, inStream,
+        (int) compressedLength, codec, false, -1);
+    inStream.close();
+
+    readUsingInMemoryReader(bytes, originalData);
+  }
+
+  private void readUsingInMemoryReader(byte[] bytes, List<KVPair> originalData)
+      throws IOException {
+    InMemoryReader inMemReader = new InMemoryReader(null,
+        new InputAttemptIdentifier(0, 0), bytes, 0, bytes.length);
+    verifyData(inMemReader, originalData);
   }
 
-  private void readAndVerify(Reader reader, List<KVPair> data)
+  /**
+   * Read data using IFile Reader
+   *
+   * @param originalData
+   * @param codec
+   * @throws IOException
+   */
+  private void readUsingIFileReader(List<KVPair> originalData,
+      CompressionCodec codec) throws IOException {
+    LOG.info("Read using IFile reader");
+    IFile.Reader reader = new IFile.Reader(localFs, outputPath,
+        codec, null, null, false, 0, -1);
+    verifyData(reader, originalData);
+    reader.close();
+  }
+
+  /**
+   * Data verification
+   *
+   * @param reader
+   * @param data
+   * @throws IOException
+   */
+  private void verifyData(Reader reader, List<KVPair> data)
       throws IOException {
+    LOG.info("Data verification");
     Text readKey = new Text();
     IntWritable readValue = new IntWritable();
     DataInputBuffer keyIn = new DataInputBuffer();
@@ -184,15 +475,29 @@ public class TestIFile {
     LOG.info("Found: " + numRecordsRead + " records");
   }
 
-  private Writer writeTestFile(Path outputPath, boolean useRle, List<KVPair> data)
-      throws IOException {
+  private Writer writeTestFile(boolean rle, boolean repeatKeys,
+      List<KVPair> data, CompressionCodec codec) throws IOException {
+    FSDataOutputStream out = localFs.create(outputPath);
+    IFile.Writer writer = new IFile.Writer(defaultConf, out,
+        Text.class, IntWritable.class, codec, null, null, rle);
+    writeTestFile(writer, rle, repeatKeys, data, codec);
+    out.close();
+    return  writer;
+  }
 
-    IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
-        Text.class, IntWritable.class, null, null, null);
-    writer.setRLE(useRle);
+  private Writer writeTestFile(IFile.Writer writer, boolean rle, boolean repeatKeys,
+      List<KVPair> data, CompressionCodec codec) throws IOException {
+    assertNotNull(writer);
 
+    Text previousKey = null;
     for (KVPair kvp : data) {
-      writer.append(kvp.getKey(), kvp.getvalue());
+      if (repeatKeys && (previousKey != null && previousKey.compareTo(kvp.getKey()) == 0)) {
+        //RLE is enabled in IFile when IFile.REPEAT_KEY is set
+        writer.append(IFile.REPEAT_KEY, kvp.getvalue());
+      } else {
+        writer.append(kvp.getKey(), kvp.getvalue());
+      }
+      previousKey = kvp.getKey();
     }
 
     writer.close();
@@ -202,4 +507,48 @@ public class TestIFile {
 
     return writer;
   }
-}
+
+  private Writer writeTestFileUsingDataBuffer(boolean rle, boolean repeatKeys,
+      List<KVPair> data, CompressionCodec codec) throws IOException {
+    FSDataOutputStream out = localFs.create(outputPath);
+    IFile.Writer writer = new IFile.Writer(defaultConf, out,
+        Text.class, IntWritable.class, codec, null, null, rle);
+    writeTestFileUsingDataBuffer(writer, rle, repeatKeys, data, codec);
+    out.close();
+    return writer;
+  }
+
+  private Writer writeTestFileUsingDataBuffer(IFile.Writer writer, boolean rle, boolean repeatKeys,
+      List<KVPair> data, CompressionCodec codec) throws IOException {
+    DataInputBuffer previousKey = new DataInputBuffer();
+    DataInputBuffer key = new DataInputBuffer();
+    DataInputBuffer value = new DataInputBuffer();
+    for (KVPair kvp : data) {
+      populateData(kvp, key, value);
+
+      if (repeatKeys && (previousKey != null && BufferUtils.compare(key, previousKey) == 0)) {
+        writer.append(IFile.REPEAT_KEY, value);
+      } else {
+        writer.append(key, value);
+      }
+      previousKey.reset(k.getData(), 0, k.getLength());
+    }
+
+    writer.close();
+
+    LOG.info("Uncompressed: " + writer.getRawLength());
+    LOG.info("CompressedSize: " + writer.getCompressedLength());
+
+    return writer;
+  }
+
+  private void populateData(KVPair kvp, DataInputBuffer key, DataInputBuffer value)
+      throws  IOException {
+    DataOutputBuffer k = new DataOutputBuffer();
+    DataOutputBuffer v = new DataOutputBuffer();
+    kvp.getKey().write(k);
+    kvp.getvalue().write(v);
+    key.reset(k.getData(), 0, k.getLength());
+    value.reset(v.getData(), 0, v.getLength());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/2316c1dc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
index 9786ada..8857c22 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
@@ -20,25 +20,43 @@ package org.apache.tez.runtime.library.testutils;
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 
 public class KVDataGen {
 
-  public static List<KVPair> generateTestData(boolean repeat) {
+  static Random rnd = new Random();
+
+  public static List<KVPair> generateTestData(boolean repeatKeys) {
+    return generateTestData(true, rnd.nextInt(100));
+  }
+
+  /**
+   * Generate key value pair
+   *
+   * @param sorted whether data should be sorted by key
+   * @param repeatCount number of keys to be repeated
+   * @return
+   */
+  public static List<KVPair> generateTestData(boolean sorted, int repeatCount) {
     List<KVPair> data = new LinkedList<KVPair>();
-    int repeatCount = 0;
+    Random rnd = new Random();
     for (int i = 0; i < 5; i++) {
-      Text key = new Text("key" + i);
+      String keyStr = (sorted) ? ("key" + i) : (rnd.nextLong() + "key" + i);
+      Text key = new Text(keyStr);
       IntWritable value = new IntWritable(i + repeatCount);
       KVPair kvp = new KVPair(key, value);
       data.add(kvp);
-      if (repeat && i == 2) { // Repeat this key
-        repeatCount++;
-        value.set(i + repeatCount);
-        kvp = new KVPair(key, value);
-        data.add(kvp);
+      if ((repeatCount > 0) && (i % 2 == 0)) { // Repeat this key for random number of times
+        int count = rnd.nextInt(5);
+        for(int j = 0; j < count; j++) {
+          repeatCount++;
+          value.set(i + rnd.nextInt());
+          kvp = new KVPair(key, value);
+          data.add(kvp);
+        }
       }
     }
     return data;
@@ -61,4 +79,4 @@ public class KVDataGen {
       return this.value;
     }
   }
-}
+}
\ No newline at end of file


Mime
View raw message