orc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [orc] 04/04: ORC-557 Fix problem reading large header with uncompressed streams.
Date Tue, 22 Oct 2019 23:15:57 GMT
This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/orc.git

commit 1b2471666f6a41341bf6a9961a6a251c0f0246e0
Author: Owen O'Malley <omalley@apache.org>
AuthorDate: Fri Oct 18 16:31:29 2019 -0700

    ORC-557 Fix problem reading large header with uncompressed streams.
    
    Fixes #439 #435
    
    Signed-off-by: Owen O'Malley <omalley@apache.org>
---
 .../src/java/org/apache/orc/impl/InStream.java     | 69 ++++++++++------------
 .../src/test/org/apache/orc/impl/TestInStream.java | 52 ++++++++++++++++
 2 files changed, 83 insertions(+), 38 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java
index 5508e76..170d101 100644
--- a/java/core/src/java/org/apache/orc/impl/InStream.java
+++ b/java/core/src/java/org/apache/orc/impl/InStream.java
@@ -45,6 +45,9 @@ public abstract class InStream extends InputStream {
   protected final Object name;
   protected final long offset;
   protected final long length;
+  protected DiskRangeList bytes;
+  // position in the stream (0..length)
+  protected long position;
 
   public InStream(Object name, long offset, long length) {
     this.name = name;
@@ -59,6 +62,32 @@ public abstract class InStream extends InputStream {
   @Override
   public abstract void close();
 
+  /**
+   * Set the current range
+   * @param newRange the block that is current
+   * @param isJump if this was a seek instead of a natural read
+   */
+  abstract protected void setCurrent(DiskRangeList newRange,
+                                     boolean isJump);
+    /**
+     * Reset the input to a new set of data.
+     * @param input the input data
+     */
+  protected void reset(DiskRangeList input) {
+    bytes = input;
+    while (input != null &&
+               (input.getEnd() <= offset ||
+                    input.getOffset() > offset + length)) {
+      input = input.next;
+    }
+    if (input == null || input.getOffset() <= offset) {
+      position = 0;
+    } else {
+      position = input.getOffset() - offset;
+    }
+    setCurrent(input, true);
+  }
+
   public abstract void changeIv(Consumer<byte[]> modifier);
 
   static int getRangeNumber(DiskRangeList list, DiskRangeList current) {
@@ -75,9 +104,6 @@ public abstract class InStream extends InputStream {
    * Implements a stream over an uncompressed stream.
    */
   public static class UncompressedStream extends InStream {
-    private DiskRangeList bytes;
-    // position in the stream (0..length)
-    protected long position;
     protected ByteBuffer decrypted;
     protected DiskRangeList currentRange;
     protected long currentOffset;
@@ -100,16 +126,6 @@ public abstract class InStream extends InputStream {
       reset(input);
     }
 
-    protected void reset(DiskRangeList input) {
-      this.bytes = input;
-      if (input == null || input.getOffset() <= offset) {
-        position = 0;
-      } else {
-        position = input.getOffset() - offset;
-      }
-      setCurrent(input, true);
-    }
-
     @Override
     public int read() {
       if (decrypted == null || decrypted.remaining() == 0) {
@@ -230,7 +246,6 @@ public abstract class InStream extends InputStream {
    */
   static class EncryptionState {
     private final Object name;
-    private final EncryptionAlgorithm algorithm;
     private final Key key;
     private final byte[] iv;
     private final Cipher cipher;
@@ -240,7 +255,7 @@ public abstract class InStream extends InputStream {
     EncryptionState(Object name, long offset, StreamOptions options) {
       this.name = name;
       this.offset = offset;
-      algorithm = options.getAlgorithm();
+      EncryptionAlgorithm algorithm = options.getAlgorithm();
       key = options.getKey();
       iv = options.getIv();
       cipher = algorithm.createCipher();
@@ -347,9 +362,8 @@ public abstract class InStream extends InputStream {
         // what is the position of the start of the newRange?
         currentOffset = newRange.getOffset();
         ByteBuffer encrypted = newRange.getData().slice();
-        int ignoreBytes = 0;
         if (currentOffset < offset) {
-          ignoreBytes = (int) (offset - currentOffset);
+          int ignoreBytes = (int) (offset - currentOffset);
           encrypted.position(ignoreBytes);
           currentOffset = offset;
         }
@@ -382,12 +396,10 @@ public abstract class InStream extends InputStream {
   }
 
   private static class CompressedStream extends InStream {
-    private DiskRangeList bytes;
     private final int bufferSize;
     private ByteBuffer uncompressed;
     private final CompressionCodec codec;
     protected ByteBuffer compressed;
-    protected long position;
     protected DiskRangeList currentRange;
     private boolean isUncompressedOriginal;
 
@@ -426,25 +438,6 @@ public abstract class InStream extends InputStream {
       reset(input);
     }
 
-    /**
-     * Reset the input to a new set of data.
-     * @param input the input data
-     */
-    void reset(DiskRangeList input) {
-      bytes = input;
-      while (input != null &&
-                 (input.getEnd() <= offset ||
-                      input.getOffset() > offset + length)) {
-        input = input.next;
-      }
-      if (input == null || input.getOffset() <= offset) {
-        position = 0;
-      } else {
-        position = input.getOffset() - offset;
-      }
-      setCurrent(input, true);
-    }
-
     private void allocateForUncompressed(int size, boolean isDirect) {
       uncompressed = allocateBuffer(size, isDirect);
     }
diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java
index 9c6490c..7b7cbf1 100644
--- a/java/core/src/test/org/apache/orc/impl/TestInStream.java
+++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java
@@ -738,6 +738,58 @@ public class TestInStream {
   }
 
   @Test
+  public void testExtraFrontUncompressed() throws IOException {
+    // Set up a stream that starts at START, which is divided in to regions
+    // of CHUNK_LENGTH. There are two EXTRA_FRONT byte buffers in front of the
+    // stream.
+    final long START = 1_000_000_000;
+    final int EXTRA_FRONT = 3_000;
+    final int CHUNK_LENGTH = 100;
+    final int STREAM_LENGTH = 4096;
+
+    BufferChunkList list = new BufferChunkList();
+    list.add(new BufferChunk(ByteBuffer.allocate(EXTRA_FRONT),
+        START - 2 * EXTRA_FRONT));
+    byte[] extraFront = new byte[EXTRA_FRONT + CHUNK_LENGTH];
+    Arrays.fill(extraFront, (byte) -1);
+    for(int i=0; i < CHUNK_LENGTH; ++i) {
+      extraFront[EXTRA_FRONT + i] = (byte) i;
+    }
+    list.add(new BufferChunk(ByteBuffer.wrap(extraFront), START - EXTRA_FRONT));
+    byte[] expected = new byte[STREAM_LENGTH];
+    for(int i=CHUNK_LENGTH; i < expected.length; ++i) {
+      expected[i] = (byte) i;
+    }
+    int posn = CHUNK_LENGTH;
+    while (posn <= expected.length) {
+      list.add(new BufferChunk(
+          ByteBuffer.wrap(expected, posn,
+              Math.min(CHUNK_LENGTH, expected.length - posn)),
+          START + posn));
+      posn += CHUNK_LENGTH;
+    }
+
+    // now set up the stream to read it
+    InStream.StreamOptions options = InStream.options();
+    InStream inStream = InStream.create("test", list.get(), START, STREAM_LENGTH,
+        options);
+
+    // ensure the data is correct
+    byte[] inBuffer = new byte[STREAM_LENGTH];
+    posn = 0;
+    int read = inStream.read(inBuffer);
+    while (read != -1) {
+      assertEquals("Read length at " + posn,
+          Math.min(STREAM_LENGTH - posn, CHUNK_LENGTH), read);
+      for(int i=0; i < read; ++i) {
+        assertEquals("posn " + posn + " + " + i, (byte)(posn + i), inBuffer[i]);
+      }
+      posn += read;
+      read = inStream.read(inBuffer);
+    }
+  }
+
+  @Test
   public void testExtraFrontCompressed() throws IOException {
     // Set up a stream that starts at START, which is divided in to regions
     // of CHUNK_LENGTH. There are two EXTRA_FRONT byte buffers in front of the


Mime
View raw message