hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [1/3] hive git commit: HIVE-16671 : LLAP IO: BufferUnderflowException may happen in very rare(?) cases due to ORC end-of-CB estimation (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Date Sat, 20 May 2017 01:16:36 GMT
Repository: hive
Updated Branches:
  refs/heads/master 85415f7b8 -> bb2f25c1a


HIVE-16671 : LLAP IO: BufferUnderflowException may happen in very rare(?) cases due to ORC
end-of-CB estimation (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5d62dc8a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5d62dc8a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5d62dc8a

Branch: refs/heads/master
Commit: 5d62dc8ae853517e4c477516283af67439a26f0d
Parents: 85415f7
Author: sergey <sershe@apache.org>
Authored: Fri May 19 17:22:35 2017 -0700
Committer: sergey <sershe@apache.org>
Committed: Fri May 19 17:22:35 2017 -0700

----------------------------------------------------------------------
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 81 +++++++++++++++++---
 .../io/orc/encoded/TestEncodedReaderImpl.java   | 77 +++++++++++++++++++
 2 files changed, 149 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5d62dc8a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 5b2e9b5..6cd85d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -50,6 +50,8 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
 import org.apache.orc.OrcProto;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import sun.misc.Cleaner;
 
 
@@ -1229,10 +1231,26 @@ class EncodedReaderImpl implements EncodedReader {
     ByteBuffer slice = null;
     ByteBuffer compressed = current.getChunk();
     long cbStartOffset = current.getOffset();
-    int b0 = compressed.get() & 0xff;
-    int b1 = compressed.get() & 0xff;
-    int b2 = compressed.get() & 0xff;
+    int b0 = -1, b1 = -1, b2 = -1;
+    // First, read the CB header. Due to ORC estimates, ZCR, etc. this can be complex.
+    if (compressed.remaining() >= 3) {
+      // The overwhelming majority of cases will go here. Read 3 bytes. Tada!
+      b0 = compressed.get() & 0xff;
+      b1 = compressed.get() & 0xff;
+      b2 = compressed.get() & 0xff;
+    } else {
+      // Bad luck! Handle the corner cases where 3 bytes are in multiple blocks.
+      int[] bytes = new int[3];
+      current = readLengthBytesFromSmallBuffers(
+          current, cbStartOffset, bytes, badEstimates, isTracingEnabled);
+      if (current == null) return null;
+      compressed = current.getChunk();
+      b0 = bytes[0];
+      b1 = bytes[1];
+      b2 = bytes[2];
+    }
     int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+
     if (chunkLength > bufferSize) {
       throw new IllegalArgumentException("Buffer size too small. size = " +
           bufferSize + " needed = " + chunkLength);
@@ -1252,7 +1270,8 @@ class EncodedReaderImpl implements EncodedReader {
           cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers);
     }
     if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
-      badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0));
+      badEstimates.add(addIncompleteCompressionBuffer(
+          cbStartOffset, current, 0, isTracingEnabled));
       return null; // This is impossible to read from this chunk.
     }
 
@@ -1304,12 +1323,56 @@ class EncodedReaderImpl implements EncodedReader {
         }
         tmp.removeSelf();
       } else {
-        badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount));
+        badEstimates.add(addIncompleteCompressionBuffer(
+            cbStartOffset, tmp, extraChunkCount, isTracingEnabled));
+        return null; // This is impossible to read from this chunk.
+      }
+    }
+  }
+
+
+  @VisibleForTesting
+  static BufferChunk readLengthBytesFromSmallBuffers(BufferChunk first, long cbStartOffset,
+      int[] result, List<IncompleteCb> badEstimates, boolean isTracingEnabled) throws
IOException {
+    if (!first.hasContiguousNext()) {
+      badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, first, 0, isTracingEnabled));
+      return null; // This is impossible to read from this chunk.
+    }
+    int ix = readLengthBytes(first.getChunk(), result, 0);
+    assert ix < 3; // Otherwise we wouldn't be here.
+    DiskRangeList current = first.next;
+    first.removeSelf();
+    while (true) {
+      if (!(current instanceof BufferChunk)) {
+        throw new IOException(
+            "Trying to extend compressed block into uncompressed block " + current);
+      }
+      BufferChunk currentBc = (BufferChunk) current;
+      ix = readLengthBytes(currentBc.getChunk(), result, ix);
+      if (ix == 3) return currentBc; // Done, we have 3 bytes. Continue reading this buffer.
+      DiskRangeList tmp = current;
+      current = current.hasContiguousNext() ? current.next : null;
+      if (current != null) {
+        if (isTracingEnabled) {
+          LOG.trace("Removing partial CB " + tmp + " from ranges after copying its contents");
+        }
+        tmp.removeSelf();
+      } else {
+        badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, tmp, -1, isTracingEnabled));
         return null; // This is impossible to read from this chunk.
       }
     }
   }
 
+  private static int readLengthBytes(ByteBuffer compressed, int[] bytes, int ix) {
+    int byteCount = compressed.remaining();
+    while (byteCount > 0 && ix < 3) {
+      bytes[ix++] = compressed.get() & 0xff;
+      --byteCount;
+    }
+    return ix;
+  }
+
   private void releaseBuffers(Collection<ByteBuffer> toRelease, boolean isFromDataReader)
{
     if (toRelease == null) return;
     for (ByteBuffer buf : toRelease) {
@@ -1342,12 +1405,12 @@ class EncodedReaderImpl implements EncodedReader {
   }
 
 
-  private IncompleteCb addIncompleteCompressionBuffer(
-      long cbStartOffset, DiskRangeList target, int extraChunkCount) {
+  private static IncompleteCb addIncompleteCompressionBuffer(long cbStartOffset,
+      DiskRangeList target, int extraChunkCountToLog, boolean isTracingEnabled) {
     IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
     if (isTracingEnabled) {
-      LOG.trace("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with
"
-          + icb + " in the buffers");
+      LOG.trace("Replacing " + target + " (and " + extraChunkCountToLog
+          + " previous chunks) with " + icb + " in the buffers");
     }
     target.replaceSelfWith(icb);
     return icb;

http://git-wip-us.apache.org/repos/asf/hive/blob/5d62dc8a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java
new file mode 100644
index 0000000..28ca441
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc.encoded;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.orc.impl.BufferChunk;
+import org.junit.Test;
+
+public class TestEncodedReaderImpl {
+  @Test
+  public void testReadLength() throws IOException {
+    ByteBuffer one = ByteBuffer.wrap(new byte[] { 1 }), two = ByteBuffer.wrap(new byte[]
{ 2 }),
+        three = ByteBuffer.wrap(new byte[] { 3 }), twoThree = ByteBuffer.wrap(new byte[]
{ 2, 3 }),
+        oneTwo = ByteBuffer.wrap(new byte[] { 1, 2 });
+    BufferChunk bc = new BufferChunk(one, 0);
+    int[] result = new int[3];
+    List<IncompleteCb> l = new ArrayList<>();
+    BufferChunk rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l,
true);
+    assertNull(rv);
+    one.position(0);
+    bc.insertAfter(new BufferChunk(two, 1));
+    Arrays.fill(result, -1);
+    rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+    assertNull(rv);
+    one.position(0);
+    two.position(0);
+    bc.insertAfter(new BufferChunk(two, 1)).insertAfter(new BufferChunk(three, 2));
+    Arrays.fill(result, -1);
+    rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+    assertNotNull(rv);
+    for (int i = 0; i < result.length; ++i) {
+      assertEquals(i + 1, result[i]);
+    }
+    one.position(0);
+    bc.insertAfter(new BufferChunk(twoThree, 1));
+    Arrays.fill(result, -1);
+    rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+    assertNotNull(rv);
+    for (int i = 0; i < result.length; ++i) {
+      assertEquals(i + 1, result[i]);
+    }
+    bc = new BufferChunk(oneTwo, 0);
+    Arrays.fill(result, -1);
+    rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+    assertNull(rv);
+    three.position(0);
+    bc.insertAfter(new BufferChunk(three, 2));
+    Arrays.fill(result, -1);
+    rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+    assertNotNull(rv);
+    for (int i = 0; i < result.length; ++i) {
+      assertEquals(i + 1, result[i]);
+    }
+  }
+}


Mime
View raw message