parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject [parquet-mr] branch column-indexes updated: PARQUET-1389: Improve value skipping at page synchronization (#514)
Date Tue, 11 Sep 2018 11:52:31 GMT
This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch column-indexes
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/column-indexes by this push:
     new 55d791c  PARQUET-1389: Improve value skipping at page synchronization (#514)
55d791c is described below

commit 55d791c592ba9ea97123408040aa3fa01c632a81
Author: Gabor Szadovszky <gabor@apache.org>
AuthorDate: Tue Sep 11 13:52:28 2018 +0200

    PARQUET-1389: Improve value skipping at page synchronization (#514)
---
 .../parquet/column/impl/ColumnReaderBase.java      | 47 +++++++++++++++++++++-
 .../apache/parquet/column/values/ValuesReader.java | 12 ++++++
 .../delta/DeltaBinaryPackingValuesReader.java      |  8 ++++
 .../DeltaLengthByteArrayValuesReader.java          | 12 ++++--
 .../plain/FixedLenByteArrayPlainValuesReader.java  |  8 +++-
 .../column/values/plain/PlainValuesReader.java     | 36 +++++++++++------
 .../column/values/rle/ZeroIntegerValuesReader.java |  6 ++-
 7 files changed, 109 insertions(+), 20 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
index 0af85c7..7682236 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java
@@ -73,6 +73,14 @@ abstract class ColumnReaderBase implements ColumnReader {
     abstract void skip();
 
     /**
+     * Skips n values from the underlying page
+     *
+     * @param n
+     *          the number of values to be skipped
+     */
+    abstract void skip(int n);
+
+    /**
      * write current value to converter
      */
     abstract void writeValue();
@@ -163,6 +171,10 @@ abstract class ColumnReaderBase implements ColumnReader {
           public void skip() {
             dataColumn.skip();
           }
+          @Override
+          void skip(int n) {
+            dataColumn.skip(n);
+          }
           public int getDictionaryId() {
             return dictionaryId;
           }
@@ -203,6 +215,11 @@ abstract class ColumnReaderBase implements ColumnReader {
             current = 0;
             dataColumn.skip();
           }
+          @Override
+          void skip(int n) {
+            current = 0;
+            dataColumn.skip(n);
+          }
           public float getFloat() {
             return current;
           }
@@ -222,6 +239,11 @@ abstract class ColumnReaderBase implements ColumnReader {
             current = 0;
             dataColumn.skip();
           }
+          @Override
+          void skip(int n) {
+            current = 0;
+            dataColumn.skip(n);
+          }
           public double getDouble() {
             return current;
           }
@@ -242,6 +264,11 @@ abstract class ColumnReaderBase implements ColumnReader {
             dataColumn.skip();
           }
           @Override
+          void skip(int n) {
+            current = 0;
+            dataColumn.skip(n);
+          }
+          @Override
           public int getInteger() {
             return current;
           }
@@ -262,6 +289,11 @@ abstract class ColumnReaderBase implements ColumnReader {
             dataColumn.skip();
           }
           @Override
+          void skip(int n) {
+            current = 0;
+            dataColumn.skip(n);
+          }
+          @Override
           public long getLong() {
             return current;
           }
@@ -291,6 +323,11 @@ abstract class ColumnReaderBase implements ColumnReader {
             dataColumn.skip();
           }
           @Override
+          void skip(int n) {
+            current = false;
+            dataColumn.skip(n);
+          }
+          @Override
           public boolean getBoolean() {
             return current;
           }
@@ -311,6 +348,11 @@ abstract class ColumnReaderBase implements ColumnReader {
             dataColumn.skip();
           }
           @Override
+          void skip(int n) {
+            current = null;
+            dataColumn.skip(n);
+          }
+          @Override
           public Binary getBinary() {
             return current;
           }
@@ -511,6 +553,7 @@ abstract class ColumnReaderBase implements ColumnReader {
 
   private void checkRead() {
     int rl, dl;
+    int skipValues = 0;
     for (;;) {
       if (isPageFullyConsumed()) {
         if (isFullyConsumed()) {
@@ -519,6 +562,7 @@ abstract class ColumnReaderBase implements ColumnReader {
           return;
         }
         readPage();
+        skipValues = 0;
       }
       rl = repetitionLevelColumn.nextInt();
       dl = definitionLevelColumn.nextInt();
@@ -527,9 +571,10 @@ abstract class ColumnReaderBase implements ColumnReader {
         break;
       }
       if (dl == maxDefinitionLevel) {
-        binding.skip();
+        ++skipValues;
       }
     }
+    binding.skip(skipValues);
     repetitionLevel = rl;
     definitionLevel = dl;
   }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
index 5732660..3167d82 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
@@ -109,5 +109,17 @@ public abstract class ValuesReader {
    * Skips the next value in the page
    */
   abstract public void skip();
+
+  /**
+   * Skips the next n value in the page
+   *
+   * @param n
+   *          the number of values to be skipped
+   */
+  public void skip(int n) {
+    for (int i = 0; i < n; ++i) {
+      skip();
+    }
+  }
 }
 
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
index dceaa52..58e02f2 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
@@ -89,6 +89,14 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
   }
 
   @Override
+  public void skip(int n) {
+    // checkRead() is invoked before incrementing valuesRead so increase valuesRead size
in 2 steps
+    valuesRead += n - 1;
+    checkRead();
+    ++valuesRead;
+  }
+
+  @Override
   public int readInteger() {
     // TODO: probably implement it separately
     return (int) readLong();
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
index 1a2ccb9..4dbbcb5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
@@ -20,8 +20,6 @@ package org.apache.parquet.column.values.deltalengthbytearray;
 
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
@@ -64,7 +62,15 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader {
 
   @Override
   public void skip() {
-    int length = lengthReader.readInteger();
+    skip(1);
+  }
+
+  @Override
+  public void skip(int n) {
+    int length = 0;
+    for (int i = 0; i < n; ++i) {
+      length += lengthReader.readInteger();
+    }
     try {
       in.skipFully(length);
     } catch (IOException e) {
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
index 15ed434..631c908 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
@@ -19,7 +19,6 @@
 package org.apache.parquet.column.values.plain;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.ParquetDecodingException;
@@ -51,8 +50,13 @@ public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
 
   @Override
   public void skip() {
+    skip(1);
+  }
+
+  @Override
+  public void skip(int n) {
     try {
-      in.skipFully(length);
+      in.skipFully(n * length);
     } catch (IOException | RuntimeException e) {
       throw new ParquetDecodingException("could not skip bytes at offset " + in.position(),
e);
     }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
index f576528..127817e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
@@ -41,14 +41,26 @@ abstract public class PlainValuesReader extends ValuesReader {
     this.in = new LittleEndianDataInputStream(stream.remainingStream());
   }
 
+  @Override
+  public void skip() {
+    skip(1);
+  }
+
+  void skipBytesFully(int n) throws IOException {
+    int skipped = 0;
+    while (skipped < n) {
+      skipped += in.skipBytes(n - skipped);
+    }
+  }
+
   public static class DoublePlainValuesReader extends PlainValuesReader {
 
     @Override
-    public void skip() {
+    public void skip(int n) {
       try {
-        in.skipBytes(8);
+        skipBytesFully(n * 8);
       } catch (IOException e) {
-        throw new ParquetDecodingException("could not skip double", e);
+        throw new ParquetDecodingException("could not skip " + n + " double values", e);
       }
     }
 
@@ -65,11 +77,11 @@ abstract public class PlainValuesReader extends ValuesReader {
   public static class FloatPlainValuesReader extends PlainValuesReader {
 
     @Override
-    public void skip() {
+    public void skip(int n) {
       try {
-        in.skipBytes(4);
+        skipBytesFully(n * 4);
       } catch (IOException e) {
-        throw new ParquetDecodingException("could not skip float", e);
+        throw new ParquetDecodingException("could not skip " + n + " floats", e);
       }
     }
 
@@ -86,11 +98,11 @@ abstract public class PlainValuesReader extends ValuesReader {
   public static class IntegerPlainValuesReader extends PlainValuesReader {
 
     @Override
-    public void skip() {
+    public void skip(int n) {
       try {
-        in.skipBytes(4);
+        in.skipBytes(n * 4);
       } catch (IOException e) {
-        throw new ParquetDecodingException("could not skip int", e);
+        throw new ParquetDecodingException("could not skip " + n + " ints", e);
       }
     }
 
@@ -107,11 +119,11 @@ abstract public class PlainValuesReader extends ValuesReader {
   public static class LongPlainValuesReader extends PlainValuesReader {
 
     @Override
-    public void skip() {
+    public void skip(int n) {
       try {
-        in.skipBytes(8);
+        in.skipBytes(n * 8);
       } catch (IOException e) {
-        throw new ParquetDecodingException("could not skip long", e);
+        throw new ParquetDecodingException("could not skip " + n + " longs", e);
       }
     }
 
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
index fe00de9..8039cf9 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
@@ -19,8 +19,6 @@
 package org.apache.parquet.column.values.rle;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 
@@ -43,4 +41,8 @@ public class ZeroIntegerValuesReader extends ValuesReader {
   public void skip() {
   }
 
+  @Override
+  public void skip(int n) {
+  }
+
 }


Mime
View raw message