spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-16334] Reusing same dictionary column for decoding consecutive row groups shouldn't throw an error
Date Fri, 02 Sep 2016 22:20:47 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 c9c36fa0c -> a3930c3b9


[SPARK-16334] Reusing same dictionary column for decoding consecutive row groups shouldn't
throw an error

This patch fixes a bug in the vectorized parquet reader that's caused by re-using the same
dictionary column vector while reading consecutive row groups. Specifically, this issue manifests
for a certain distribution of dictionary/plain encoded data while we read/populate the underlying
bit packed dictionary data into a column-vector based data structure.

Manually tested on datasets provided by the community. Thanks to Chris Perluss and Keith Kraus
for their invaluable help in tracking down this issue!

Author: Sameer Agarwal <sameerag@cs.berkeley.edu>

Closes #14941 from sameeragarwal/parquet-exception-2.

(cherry picked from commit a2c9acb0e54b2e38cb8ee6431f1ea0e0b4cd959a)
Signed-off-by: Davies Liu <davies.liu@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3930c3b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3930c3b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3930c3b

Branch: refs/heads/branch-2.0
Commit: a3930c3b9afa9f7eba2a5c8b8f279ca38e348e9b
Parents: c9c36fa
Author: Sameer Agarwal <sameerag@cs.berkeley.edu>
Authored: Fri Sep 2 15:16:16 2016 -0700
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Fri Sep 2 15:20:07 2016 -0700

----------------------------------------------------------------------
 .../parquet/VectorizedColumnReader.java         | 54 ++++++++++++++------
 1 file changed, 38 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a3930c3b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 6c47dc0..cb51cb4 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -221,15 +221,21 @@ public class VectorizedColumnReader {
         if (column.dataType() == DataTypes.IntegerType ||
             DecimalType.is32BitDecimalType(column.dataType())) {
           for (int i = rowId; i < rowId + num; ++i) {
-            column.putInt(i, dictionary.decodeToInt(dictionaryIds.getInt(i)));
+            if (!column.isNullAt(i)) {
+              column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
+            }
           }
         } else if (column.dataType() == DataTypes.ByteType) {
           for (int i = rowId; i < rowId + num; ++i) {
-            column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getInt(i)));
+            if (!column.isNullAt(i)) {
+              column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
+            }
           }
         } else if (column.dataType() == DataTypes.ShortType) {
           for (int i = rowId; i < rowId + num; ++i) {
-            column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getInt(i)));
+            if (!column.isNullAt(i)) {
+              column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
+            }
           }
         } else {
           throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
@@ -240,7 +246,9 @@ public class VectorizedColumnReader {
         if (column.dataType() == DataTypes.LongType ||
             DecimalType.is64BitDecimalType(column.dataType())) {
           for (int i = rowId; i < rowId + num; ++i) {
-            column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i)));
+            if (!column.isNullAt(i)) {
+              column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
+            }
           }
         } else {
           throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
@@ -249,21 +257,27 @@ public class VectorizedColumnReader {
 
       case FLOAT:
         for (int i = rowId; i < rowId + num; ++i) {
-          column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getInt(i)));
+          if (!column.isNullAt(i)) {
+            column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i)));
+          }
         }
         break;
 
       case DOUBLE:
         for (int i = rowId; i < rowId + num; ++i) {
-          column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getInt(i)));
+          if (!column.isNullAt(i)) {
+            column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i)));
+          }
         }
         break;
       case INT96:
         if (column.dataType() == DataTypes.TimestampType) {
           for (int i = rowId; i < rowId + num; ++i) {
             // TODO: Convert dictionary of Binaries to dictionary of Longs
-            Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
-            column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
+            if (!column.isNullAt(i)) {
+              Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
+              column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
+            }
           }
         } else {
           throw new UnsupportedOperationException();
@@ -275,26 +289,34 @@ public class VectorizedColumnReader {
         // and reuse it across batches. This should mean adding a ByteArray would just update
         // the length and offset.
         for (int i = rowId; i < rowId + num; ++i) {
-          Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
-          column.putByteArray(i, v.getBytes());
+          if (!column.isNullAt(i)) {
+            Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
+            column.putByteArray(i, v.getBytes());
+          }
         }
         break;
       case FIXED_LEN_BYTE_ARRAY:
         // DecimalType written in the legacy mode
         if (DecimalType.is32BitDecimalType(column.dataType())) {
           for (int i = rowId; i < rowId + num; ++i) {
-            Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
-            column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
+            if (!column.isNullAt(i)) {
+              Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
+              column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
+            }
           }
         } else if (DecimalType.is64BitDecimalType(column.dataType())) {
           for (int i = rowId; i < rowId + num; ++i) {
-            Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
-            column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
+            if (!column.isNullAt(i)) {
+              Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
+              column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
+            }
           }
         } else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
           for (int i = rowId; i < rowId + num; ++i) {
-            Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
-            column.putByteArray(i, v.getBytes());
+            if (!column.isNullAt(i)) {
+              Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
+              column.putByteArray(i, v.getBytes());
+            }
           }
         } else {
           throw new UnsupportedOperationException();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message