tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject git commit: TEZ-1492. IFile RLE not kicking in due to bug in BufferUtils.compare()
Date Wed, 27 Aug 2014 04:39:49 GMT
Repository: tez
Updated Branches:
  refs/heads/master 926faf182 -> db732b2fb


TEZ-1492. IFile RLE not kicking in due to bug in BufferUtils.compare()


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/db732b2f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/db732b2f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/db732b2f

Branch: refs/heads/master
Commit: db732b2fb708fe82cec25fbc31d2b73020ae58d1
Parents: 926faf1
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Wed Aug 27 10:01:23 2014 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Wed Aug 27 10:02:25 2014 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/io/BufferUtils.java  |  79 ------
 .../runtime/library/common/sort/impl/IFile.java |  10 +-
 .../tez/runtime/library/utils/BufferUtils.java  |  79 ++++++
 .../library/utils/FastByteComparisons.java      | 240 +++++++++++++++++++
 .../library/common/sort/impl/TestIFile.java     |  74 +++++-
 5 files changed, 388 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/db732b2f/tez-runtime-library/src/main/java/org/apache/hadoop/io/BufferUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/hadoop/io/BufferUtils.java b/tez-runtime-library/src/main/java/org/apache/hadoop/io/BufferUtils.java
deleted file mode 100644
index 3a62751..0000000
--- a/tez-runtime-library/src/main/java/org/apache/hadoop/io/BufferUtils.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.io;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-
-@Private
-public class BufferUtils {
-  public static int compare(DataInputBuffer buf1, DataInputBuffer buf2) {
-    byte[] b1 = buf1.getData();
-    byte[] b2 = buf2.getData();
-    int s1 = buf1.getPosition();
-    int s2 = buf2.getPosition();
-    int l1 = buf1.getLength();
-    int l2 = buf2.getLength();
-    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
-  }
-
-  public static int compare(DataOutputBuffer buf1, DataOutputBuffer buf2) {
-    byte[] b1 = buf1.getData();
-    byte[] b2 = buf2.getData();
-    int s1 = 0;
-    int s2 = 0;
-    int l1 = buf1.getLength();
-    int l2 = buf2.getLength();
-    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
-  }
-
-  public static int compare(DataInputBuffer buf1, DataOutputBuffer buf2) {
-    byte[] b1 = buf1.getData();
-    byte[] b2 = buf2.getData();
-    int s1 = buf1.getPosition();    
-    int s2 = 0;
-    int l1 = buf1.getLength();
-    int l2 = buf2.getLength();
-    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
-  }
-
-  public static int compare(DataOutputBuffer buf1, DataInputBuffer buf2) {
-    return compare(buf2, buf1);
-  }
-
-  public static void copy(DataInputBuffer src, DataOutputBuffer dst) 
-                              throws IOException {
-    byte[] b1 = src.getData();
-    int s1 = src.getPosition();    
-    int l1 = src.getLength();
-    dst.reset();
-    dst.write(b1, s1, l1 - s1);
-  }
-
-  public static void copy(DataOutputBuffer src, DataOutputBuffer dst) 
-                              throws IOException {
-    byte[] b1 = src.getData();
-    int s1 = 0;
-    int l1 = src.getLength();
-    dst.reset();
-    dst.write(b1, s1, l1);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/db732b2f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index f96f9ed..6ad7915 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -26,6 +26,7 @@ import java.io.OutputStream;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -36,7 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.BufferUtils;
+import org.apache.tez.runtime.library.utils.BufferUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.CodecPool;
@@ -110,7 +111,8 @@ public class IFile {
     final DataOutputBuffer previous = new DataOutputBuffer();
     Object prevKey = null;
     boolean headerWritten = false;
-    boolean firstKey = true;
+    @VisibleForTesting
+    boolean sameKey = false;
 
     final int RLE_MARKER_SIZE = WritableUtils.getVIntSize(RLE_MARKER);
     final int V_END_MARKER_SIZE = WritableUtils.getVIntSize(V_END_MARKER);
@@ -266,7 +268,7 @@ public class IFile {
           valueClass);
 
       int keyLength = 0;
-      boolean sameKey = (key == REPEAT_KEY);
+      sameKey = (key == REPEAT_KEY);
       if (!sameKey) {
         keySerializer.serialize(key);
         keyLength = buffer.getLength();
@@ -379,7 +381,7 @@ public class IFile {
       int valueLength = value.getLength() - value.getPosition();
       checkState(valueLength >= 0, NEGATIVE_VAL_LEN, valueLength, value);
 
-      boolean sameKey = (key == REPEAT_KEY);
+      sameKey = (key == REPEAT_KEY);
       if (!sameKey && rle) {
         sameKey = (keyLength != 0) && (BufferUtils.compare(previous, key) == 0);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/db732b2f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/BufferUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/BufferUtils.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/BufferUtils.java
new file mode 100644
index 0000000..a1685ed
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/BufferUtils.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.utils;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+@Private
+public class BufferUtils {
+  public static int compare(DataInputBuffer buf1, DataInputBuffer buf2) {
+    byte[] b1 = buf1.getData();
+    byte[] b2 = buf2.getData();
+    int s1 = buf1.getPosition();
+    int s2 = buf2.getPosition();
+    int l1 = buf1.getLength();
+    int l2 = buf2.getLength();
+    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+  }
+
+  public static int compare(DataOutputBuffer buf1, DataOutputBuffer buf2) {
+    byte[] b1 = buf1.getData();
+    byte[] b2 = buf2.getData();
+    int s1 = 0;
+    int s2 = 0;
+    int l1 = buf1.getLength();
+    int l2 = buf2.getLength();
+    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+  }
+
+  public static int compare(DataInputBuffer buf1, DataOutputBuffer buf2) {
+    byte[] b1 = buf1.getData();
+    byte[] b2 = buf2.getData();
+    int s1 = buf1.getPosition();
+    int s2 = 0;
+    int l1 = buf1.getLength();
+    int l2 = buf2.getLength();
+    return FastByteComparisons.compareTo(b1, s1, (l1 - s1), b2, s2, l2);
+  }
+
+  public static int compare(DataOutputBuffer buf1, DataInputBuffer buf2) {
+    return compare(buf2, buf1);
+  }
+
+  public static void copy(DataInputBuffer src, DataOutputBuffer dst) throws IOException {
+    byte[] b1 = src.getData();
+    int s1 = src.getPosition();
+    int l1 = src.getLength();
+    dst.reset();
+    dst.write(b1, s1, l1 - s1);
+  }
+
+  public static void copy(DataOutputBuffer src, DataOutputBuffer dst) throws IOException
{
+    byte[] b1 = src.getData();
+    int s1 = 0;
+    int l1 = src.getLength();
+    dst.reset();
+    dst.write(b1, s1, l1);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/db732b2f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/FastByteComparisons.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/FastByteComparisons.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/FastByteComparisons.java
new file mode 100644
index 0000000..4bd2552
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/FastByteComparisons.java
@@ -0,0 +1,240 @@
+package org.apache.tez.runtime.library.utils;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.lang.reflect.Field;
+import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+import sun.misc.Unsafe;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedBytes;
+
+/**
+ * Same as {@link org.apache.hadoop.io.FastByteComparisons}
+ *
+ * Utility code to do optimized byte-array comparison.
+ * This is borrowed and slightly modified from Guava's {@link com.google.common.primitives.UnsignedBytes}
+ * class to be able to compare arrays that start at non-zero offsets.
+ */
+final class FastByteComparisons {
+
+  /**
+   * Lexicographically compare two byte arrays.
+   */
+  public static int compareTo(byte[] b1, int s1, int l1, byte[] b2, int s2,
+      int l2) {
+    return LexicographicalComparerHolder.BEST_COMPARER.compareTo(
+        b1, s1, l1, b2, s2, l2);
+  }
+
+
+  private interface Comparer<T> {
+    abstract public int compareTo(T buffer1, int offset1, int length1,
+        T buffer2, int offset2, int length2);
+  }
+
+  private static Comparer<byte[]> lexicographicalComparerJavaImpl() {
+    return LexicographicalComparerHolder.PureJavaComparer.INSTANCE;
+  }
+
+
+  /**
+   * Provides a lexicographical comparer implementation; either a Java
+   * implementation or a faster implementation based on {@link sun.misc.Unsafe}.
+   *
+   * <p>Uses reflection to gracefully fall back to the Java implementation if
+   * {@code Unsafe} isn't available.
+   */
+  private static class LexicographicalComparerHolder {
+    static final String UNSAFE_COMPARER_NAME =
+        LexicographicalComparerHolder.class.getName() + "$UnsafeComparer";
+
+    static final Comparer<byte[]> BEST_COMPARER = getBestComparer();
+    /**
+     * Returns the Unsafe-using Comparer, or falls back to the pure-Java
+     * implementation if unable to do so.
+     */
+    static Comparer<byte[]> getBestComparer() {
+      try {
+        Class<?> theClass = Class.forName(UNSAFE_COMPARER_NAME);
+
+        // yes, UnsafeComparer does implement Comparer<byte[]>
+        @SuppressWarnings("unchecked")
+        Comparer<byte[]> comparer =
+            (Comparer<byte[]>) theClass.getEnumConstants()[0];
+        return comparer;
+      } catch (Throwable t) { // ensure we really catch *everything*
+        return lexicographicalComparerJavaImpl();
+      }
+    }
+
+    private enum PureJavaComparer implements Comparer<byte[]> {
+      INSTANCE;
+
+      @Override
+      public int compareTo(byte[] buffer1, int offset1, int length1,
+          byte[] buffer2, int offset2, int length2) {
+        // Short circuit equal case
+        if (buffer1 == buffer2 &&
+            offset1 == offset2 &&
+            length1 == length2) {
+          return 0;
+        }
+        // Bring WritableComparator code local
+        int end1 = offset1 + length1;
+        int end2 = offset2 + length2;
+        for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++)
{
+          int a = (buffer1[i] & 0xff);
+          int b = (buffer2[j] & 0xff);
+          if (a != b) {
+            return a - b;
+          }
+        }
+        return length1 - length2;
+      }
+    }
+
+    @SuppressWarnings("unused") // used via reflection
+    private enum UnsafeComparer implements Comparer<byte[]> {
+      INSTANCE;
+
+      static final Unsafe theUnsafe;
+
+      /** The offset to the first element in a byte array. */
+      static final int BYTE_ARRAY_BASE_OFFSET;
+
+      static {
+        theUnsafe = (Unsafe) AccessController.doPrivileged(
+            new PrivilegedAction<Object>() {
+              @Override
+              public Object run() {
+                try {
+                  Field f = Unsafe.class.getDeclaredField("theUnsafe");
+                  f.setAccessible(true);
+                  return f.get(null);
+                } catch (NoSuchFieldException e) {
+                  // It doesn't matter what we throw;
+                  // it's swallowed in getBestComparer().
+                  throw new Error();
+                } catch (IllegalAccessException e) {
+                  throw new Error();
+                }
+              }
+            });
+
+        BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class);
+
+        // sanity check - this should never fail
+        if (theUnsafe.arrayIndexScale(byte[].class) != 1) {
+          throw new AssertionError();
+        }
+      }
+
+      static final boolean littleEndian =
+          ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+      /**
+       * Returns true if x1 is less than x2, when both values are treated as
+       * unsigned.
+       */
+      static boolean lessThanUnsigned(long x1, long x2) {
+        return (x1 + Long.MIN_VALUE) < (x2 + Long.MIN_VALUE);
+      }
+
+      /**
+       * Lexicographically compare two arrays.
+       *
+       * @param buffer1 left operand
+       * @param buffer2 right operand
+       * @param offset1 Where to start comparing in the left buffer
+       * @param offset2 Where to start comparing in the right buffer
+       * @param length1 How much to compare from the left buffer
+       * @param length2 How much to compare from the right buffer
+       * @return 0 if equal, < 0 if left is less than right, etc.
+       */
+      @Override
+      public int compareTo(byte[] buffer1, int offset1, int length1,
+          byte[] buffer2, int offset2, int length2) {
+        // Short circuit equal case
+        if (buffer1 == buffer2 &&
+            offset1 == offset2 &&
+            length1 == length2) {
+          return 0;
+        }
+        int minLength = Math.min(length1, length2);
+        int minWords = minLength / Longs.BYTES;
+        int offset1Adj = offset1 + BYTE_ARRAY_BASE_OFFSET;
+        int offset2Adj = offset2 + BYTE_ARRAY_BASE_OFFSET;
+
+        /*
+         * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+         * time is no slower than comparing 4 bytes at a time even on 32-bit.
+         * On the other hand, it is substantially faster on 64-bit.
+         */
+        for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+          long lw = theUnsafe.getLong(buffer1, offset1Adj + (long) i);
+          long rw = theUnsafe.getLong(buffer2, offset2Adj + (long) i);
+          long diff = lw ^ rw;
+
+          if (diff != 0) {
+            if (!littleEndian) {
+              return lessThanUnsigned(lw, rw) ? -1 : 1;
+            }
+
+            // Use binary search
+            int n = 0;
+            int y;
+            int x = (int) diff;
+            if (x == 0) {
+              x = (int) (diff >>> 32);
+              n = 32;
+            }
+
+            y = x << 16;
+            if (y == 0) {
+              n += 16;
+            } else {
+              x = y;
+            }
+
+            y = x << 8;
+            if (y == 0) {
+              n += 8;
+            }
+            return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) &
0xFFL));
+          }
+        }
+
+        // The epilogue to cover the last (minLength % 8) elements.
+        for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+          int result = UnsignedBytes.compare(
+              buffer1[offset1 + i],
+              buffer2[offset2 + i]);
+          if (result != 0) {
+            return result;
+          }
+        }
+        return length1 - length2;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/db732b2f/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index df50bba..99db52d 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.hadoop.io.BufferUtils;
+import org.apache.tez.runtime.library.utils.BufferUtils;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IntWritable;
@@ -51,6 +51,7 @@ import org.apache.tez.runtime.library.testutils.KVDataGen;
 import org.apache.tez.runtime.library.testutils.KVDataGen.KVPair;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -168,16 +169,67 @@ public class TestIFile {
   @Test
   //test with sorted data and repeat keys
   public void testWithRLEMarker() throws IOException {
-    //keys would be repeated exactly 2 times
-    //e.g (k1,v1), (k1,v2), (k3, v3), (k4, v4), (k4, v5)...
-    //This should trigger RLE marker in IFile.
-    List<KVPair> sortedData = KVDataGen.generateTestData(true, 1);
-    testWriterAndReader(sortedData);
-    testWithDataBuffer(sortedData);
+    //Test with append(Object, Object)
+    FSDataOutputStream out = localFs.create(outputPath);
+    IFile.Writer writer = new IFile.Writer(defaultConf, out,
+        Text.class, IntWritable.class, codec, null, null, true);
 
-    List<KVPair> unsortedData = KVDataGen.generateTestData(false, 1);
-    testWriterAndReader(unsortedData);
-    testWithDataBuffer(unsortedData);
+    Text key = new Text("key0");
+    IntWritable value = new IntWritable(0);
+    writer.append(key, value);
+
+    //same key (RLE should kick in)
+    key = new Text("key0");
+    writer.append(key, value);
+    assertTrue(writer.sameKey);
+
+    //Different key
+    key = new Text("key1");
+    writer.append(key, value);
+    assertFalse(writer.sameKey);
+    writer.close();
+    out.close();
+
+
+    //Test with append(DataInputBuffer key, DataInputBuffer value)
+    byte[] kvbuffer = "key1Value1key1Value2key3Value3".getBytes();
+    int keyLength = 4;
+    int valueLength = 6;
+    int pos = 0;
+    out = localFs.create(outputPath);
+    writer = new IFile.Writer(defaultConf, out,
+        Text.class, IntWritable.class, codec, null, null, true);
+
+    DataInputBuffer kin = new DataInputBuffer();
+    kin.reset(kvbuffer, pos, keyLength);
+
+    DataInputBuffer vin = new DataInputBuffer();
+    DataOutputBuffer vout = new DataOutputBuffer();
+    (new IntWritable(0)).write(vout);
+    vin.reset(vout.getData(), vout.getLength());
+
+    //Write initial KV pair
+    writer.append(kin, vin);
+    assertFalse(writer.sameKey);
+    pos += (keyLength + valueLength);
+
+    //Second key is similar to key1 (RLE should kick in)
+    kin.reset(kvbuffer, pos, keyLength);
+    (new IntWritable(0)).write(vout);
+    vin.reset(vout.getData(), vout.getLength());
+    writer.append(kin, vin);
+    assertTrue(writer.sameKey);
+    pos += (keyLength + valueLength);
+
+    //Next key (key3) is different (RLE should not kick in)
+    kin.reset(kvbuffer, pos, keyLength);
+    (new IntWritable(0)).write(vout);
+    vin.reset(vout.getData(), vout.getLength());
+    writer.append(kin, vin);
+    assertFalse(writer.sameKey);
+
+    writer.close();
+    out.close();
   }
 
   @Test
@@ -570,4 +622,4 @@ public class TestIFile {
     key.reset(k.getData(), 0, k.getLength());
     value.reset(v.getData(), 0, v.getLength());
   }
-}
\ No newline at end of file
+}


Mime
View raw message