impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [1/3] incubator-impala git commit: IMPALA-2494: Support for byte array encoded decimals in Parquet scanner
Date Tue, 07 Nov 2017 18:13:57 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master e6c3a01b9 -> 216642e28


IMPALA-2494: Support for byte array encoded decimals in Parquet scanner

Extendes parquet column reader and associated classes to allow for more
than one possible physical type for a given logical type. This patch
only adds support for variable sized byte array encoded decimals and
more will be added in upcoming commits.
Also, column level metadata verification which was currently being
done per row group will now only be done once per column per file.

Testing:
Added backend test for verifying newly added decimal types are decoded
correctly.
Added Query test that decodes both plain and dictionary-encoded
decimals using binary encoding.

Performance:
Initial perf testing using tpcds_1000 shows no regression.

Change-Id: I2c0e881045109f337fecba53fec21f9cfb9e619e
Reviewed-on: http://gerrit.cloudera.org:8080/7822
Reviewed-by: Bikramjeet Vig <bikramjeet.vig@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/94236ff2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/94236ff2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/94236ff2

Branch: refs/heads/master
Commit: 94236ff2ff6e3d6d25a80150c98d4275914dc8c2
Parents: e6c3a01
Author: Bikramjeet Vig <bikramjeet.vig@cloudera.com>
Authored: Mon Aug 21 15:30:33 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Tue Nov 7 04:34:26 2017 +0000

----------------------------------------------------------------------
 be/src/exec/data-source-scan-node.cc            |  16 +-
 be/src/exec/hdfs-parquet-scanner.cc             |   9 +-
 be/src/exec/hdfs-parquet-scanner.h              |   6 +-
 be/src/exec/hdfs-parquet-table-writer.cc        |   4 +-
 be/src/exec/parquet-column-readers.cc           | 134 ++++++++-----
 be/src/exec/parquet-column-stats.cc             |  38 ++--
 be/src/exec/parquet-column-stats.h              |   3 +-
 be/src/exec/parquet-column-stats.inline.h       |  21 +-
 be/src/exec/parquet-common.h                    | 183 ++++++++++++-----
 be/src/exec/parquet-metadata-utils.cc           | 160 ++++++++-------
 be/src/exec/parquet-metadata-utils.h            |  11 +-
 be/src/exec/parquet-plain-test.cc               | 198 ++++++++++++++-----
 be/src/util/dict-encoding.h                     |   6 +-
 be/src/util/dict-test.cc                        |  84 ++++----
 testdata/data/README                            |   7 +
 testdata/data/binary_decimal_dictionary.parquet | Bin 0 -> 1222 bytes
 .../data/binary_decimal_no_dictionary.parquet   | Bin 0 -> 1211 bytes
 .../QueryTest/parquet-decimal-formats.test      |  25 +++
 tests/query_test/test_scanners.py               |  18 ++
 19 files changed, 637 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index 2639510..a836033 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -23,6 +23,7 @@
 #include "exec/parquet-common.h"
 #include "exec/read-write-util.h"
 #include "exprs/scalar-expr.h"
+#include "gen-cpp/parquet_types.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
@@ -172,24 +173,27 @@ inline Status SetDecimalVal(const ColumnType& type, char* bytes, int len,
   switch (type.GetByteSize()) {
     case 4: {
       Decimal4Value* val = reinterpret_cast<Decimal4Value*>(slot);
-      if (UNLIKELY(len > sizeof(Decimal4Value) ||
-          ParquetPlainEncoder::Decode(buffer, buffer + len, len, val) < 0)) {
+      if (UNLIKELY(len > sizeof(Decimal4Value) || (ParquetPlainEncoder::Decode
+          <Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer + len, len,
+              val)) < 0)) {
         return Status(ERROR_INVALID_DECIMAL);
       }
       break;
     }
     case 8: {
       Decimal8Value* val = reinterpret_cast<Decimal8Value*>(slot);
-      if (UNLIKELY(len > sizeof(Decimal8Value) ||
-          ParquetPlainEncoder::Decode(buffer, buffer + len, len, val) < 0)) {
+      if (UNLIKELY(len > sizeof(Decimal8Value) || (ParquetPlainEncoder::Decode
+          <Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer + len, len,
+              val)) < 0)) {
         return Status(ERROR_INVALID_DECIMAL);
       }
       break;
     }
     case 16: {
       Decimal16Value* val = reinterpret_cast<Decimal16Value*>(slot);
-      if (UNLIKELY(len > sizeof(Decimal16Value) ||
-          ParquetPlainEncoder::Decode(buffer, buffer + len, len, val) < 0)) {
+      if (UNLIKELY(len > sizeof(Decimal16Value) || (ParquetPlainEncoder::Decode
+          <Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer + len, len,
+              val)) < 0)) {
         return Status(ERROR_INVALID_DECIMAL);
       }
       break;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index b301ea0..e35f64a 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -1485,6 +1485,9 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
       continue;
     }
 
+    RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumn(filename(), *node->element,
+        slot_desc, state_));
+
     ParquetColumnReader* col_reader = ParquetColumnReader::Create(
         *node, slot_desc->type().IsCollectionType(), slot_desc, this);
     column_readers->push_back(col_reader);
@@ -1620,9 +1623,9 @@ Status HdfsParquetScanner::InitColumns(
           col_chunk.meta_data.num_values, num_values, filename());
     }
 
-    RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumn(file_metadata_, filename(),
-        row_group_idx, scalar_reader->col_idx(), scalar_reader->schema_element(),
-        scalar_reader->slot_desc_, state_));
+    RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(file_metadata_,
+        filename(), row_group_idx, scalar_reader->col_idx(),
+        scalar_reader->schema_element(), state_));
 
     if (col_chunk.meta_data.__isset.dictionary_page_offset) {
       // Already validated in ValidateColumnOffsets()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index a5333fe..e4b6ae7 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -43,7 +43,8 @@ class ParquetLevelDecoder;
 class ParquetColumnReader;
 class CollectionColumnReader;
 class BaseScalarColumnReader;
-template<typename T, bool MATERIALIZED> class ScalarColumnReader;
+template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+class ScalarColumnReader;
 class BoolColumnReader;
 
 /// This scanner parses Parquet files located in HDFS, and writes the content as tuples in
@@ -354,7 +355,8 @@ class HdfsParquetScanner : public HdfsScanner {
   friend class ParquetColumnReader;
   friend class CollectionColumnReader;
   friend class BaseScalarColumnReader;
-  template<typename T, bool MATERIALIZED> friend class ScalarColumnReader;
+  template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+  friend class ScalarColumnReader;
   friend class BoolColumnReader;
 
   /// Size of the file footer.  This is a guess.  If this value is too little, we will

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 4bbadb4..8c9d92e 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -859,7 +859,7 @@ Status HdfsParquetTableWriter::CreateSchema() {
     parquet::SchemaElement& node = file_metadata_.schema[i + 1];
     const ColumnType& type = output_expr_evals_[i]->root().type();
     node.name = table_desc_->col_descs()[i + num_clustering_cols].name();
-    node.__set_type(IMPALA_TO_PARQUET_TYPES[type.type]);
+    node.__set_type(INTERNAL_TO_PARQUET_TYPES[type.type]);
     node.__set_repetition_type(FieldRepetitionType::OPTIONAL);
     if (type.type == TYPE_DECIMAL) {
       // This column is type decimal. Update the file metadata to include the
@@ -892,7 +892,7 @@ Status HdfsParquetTableWriter::AddRowGroup() {
   current_row_group_->columns.resize(columns_.size());
   for (int i = 0; i < columns_.size(); ++i) {
     ColumnMetaData metadata;
-    metadata.type = IMPALA_TO_PARQUET_TYPES[columns_[i]->type().type];
+    metadata.type = INTERNAL_TO_PARQUET_TYPES[columns_[i]->type().type];
     metadata.path_in_schema.push_back(
         table_desc_->col_descs()[i + num_clustering_cols].name());
     metadata.codec = columns_[i]->codec();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 6d211a6..15c963d 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -208,10 +208,13 @@ bool ParquetLevelDecoder::FillCache(int batch_size,
   return true;
 }
 
-/// Per column type reader. If MATERIALIZED is true, the column values are materialized
-/// into the slot described by slot_desc. If MATERIALIZED is false, the column values
-/// are not materialized, but the position can be accessed.
-template<typename T, bool MATERIALIZED>
+/// Per column type reader. InternalType is the datatype that Impala uses internally to
+/// store tuple data and PARQUET_TYPE is the corresponding primitive datatype (as defined
+/// in the parquet spec) that is used to store column values in parquet files.
+/// If MATERIALIZED is true, the column values are materialized into the slot described
+/// by slot_desc. If MATERIALIZED is false, the column values are not materialized, but
+/// the position can be accessed.
+template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 class ScalarColumnReader : public BaseScalarColumnReader {
  public:
   ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
@@ -228,8 +231,9 @@ class ScalarColumnReader : public BaseScalarColumnReader {
 
     DCHECK(slot_desc_ != NULL);
     DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
-    if (slot_desc_->type().type == TYPE_DECIMAL) {
-      fixed_len_size_ = ParquetPlainEncoder::DecimalSize(slot_desc_->type());
+    if (slot_desc_->type().type == TYPE_DECIMAL
+        && PARQUET_TYPE == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+      fixed_len_size_ = node.element->type_length;
     } else if (slot_desc_->type().type == TYPE_VARCHAR) {
       fixed_len_size_ = slot_desc_->type().len;
     } else {
@@ -449,7 +453,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
 
   virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
       DictDecoderBase** decoder) {
-    if (!dict_decoder_.Reset(values, size, fixed_len_size_)) {
+    if (!dict_decoder_.template Reset<PARQUET_TYPE>(values, size, fixed_len_size_)) {
         return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
             slot_desc_->type().DebugString(), "could not decode dictionary");
     }
@@ -505,8 +509,9 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     void* slot = tuple->GetSlot(tuple_offset_);
     // Use an uninitialized stack allocation for temporary value to avoid running
     // constructors doing work unnecessarily, e.g. if T == StringValue.
-    alignas(T) uint8_t val_buf[sizeof(T)];
-    T* val_ptr = reinterpret_cast<T*>(NEEDS_CONVERSION ? val_buf : slot);
+    alignas(InternalType) uint8_t val_buf[sizeof(InternalType)];
+    InternalType* val_ptr = reinterpret_cast<InternalType*>(
+        NEEDS_CONVERSION ? val_buf : slot);
     if (IS_DICT_ENCODED) {
       DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY);
       if (UNLIKELY(!dict_decoder_.GetNextValue(val_ptr))) {
@@ -515,8 +520,8 @@ class ScalarColumnReader : public BaseScalarColumnReader {
       }
     } else {
       DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN);
-      int encoded_len =
-          ParquetPlainEncoder::Decode<T>(data_, data_end_, fixed_len_size_, val_ptr);
+      int encoded_len = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
+          data_,data_end_,fixed_len_size_, val_ptr);
       if (UNLIKELY(encoded_len < 0)) {
         SetPlainDecodeError();
         return false;
@@ -551,14 +556,14 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   }
 
   /// Converts and writes 'src' into 'slot' based on desc_->type()
-  bool ConvertSlot(const T* src, void* slot, MemPool* pool) {
+  bool ConvertSlot(const InternalType* src, void* slot, MemPool* pool) {
     DCHECK(false);
     return false;
   }
 
   /// Sets error message and returns false if the slot value is invalid, e.g., due to
   /// being out of the valid value range.
-  bool ValidateSlot(T* src, Tuple* tuple) const {
+  bool ValidateSlot(InternalType* src, Tuple* tuple) const {
     DCHECK(false);
     return false;
   }
@@ -574,7 +579,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   }
 
   /// Dictionary decoder for decoding column values.
-  DictDecoder<T> dict_decoder_;
+  DictDecoder<InternalType> dict_decoder_;
 
   /// True if dict_decoder_ has been initialized with a dictionary page.
   bool dict_decoder_init_;
@@ -588,12 +593,13 @@ class ScalarColumnReader : public BaseScalarColumnReader {
 };
 
 template<>
-inline bool ScalarColumnReader<StringValue, true>::NeedsConversionInline() const {
+inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>
+::NeedsConversionInline() const {
   return needs_conversion_;
 }
 
 template<>
-bool ScalarColumnReader<StringValue, true>::ConvertSlot(
+bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>::ConvertSlot(
     const StringValue* src, void* slot, MemPool* pool) {
   DCHECK(slot_desc() != NULL);
   DCHECK(slot_desc()->type().type == TYPE_CHAR);
@@ -606,12 +612,13 @@ bool ScalarColumnReader<StringValue, true>::ConvertSlot(
 }
 
 template<>
-inline bool ScalarColumnReader<TimestampValue, true>::NeedsConversionInline() const {
+inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
+::NeedsConversionInline() const {
   return needs_conversion_;
 }
 
 template<>
-bool ScalarColumnReader<TimestampValue, true>::ConvertSlot(
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot(
     const TimestampValue* src, void* slot, MemPool* pool) {
   // Conversion should only happen when this flag is enabled.
   DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
@@ -622,12 +629,13 @@ bool ScalarColumnReader<TimestampValue, true>::ConvertSlot(
 }
 
 template<>
-inline bool ScalarColumnReader<TimestampValue, true>::NeedsValidationInline() const {
+inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
+::NeedsValidationInline() const {
   return true;
 }
 
 template<>
-bool ScalarColumnReader<TimestampValue, true>::ValidateSlot(
+bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateSlot(
     TimestampValue* src, Tuple* tuple) const {
   if (UNLIKELY(!TimestampValue::IsValidDate(src->date()))) {
     ErrorMsg msg(TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE,
@@ -808,8 +816,8 @@ Status BaseScalarColumnReader::ReadPageHeader(bool peek,
     DCHECK(stream_->eosr());
     DCHECK_LT(num_values_read_, metadata_->num_values);
     // TODO for 2.3: node_.element->name isn't necessarily useful
-    ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
-                 metadata_->num_values, num_values_read_, node_.element->name, filename());
+    ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, metadata_->num_values,
+        num_values_read_, node_.element->name, filename());
     RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
     *eos = true;
     return Status::OK();
@@ -1248,8 +1256,46 @@ void CollectionColumnReader::UpdateDerivedState() {
   }
 }
 
+/// Returns a column reader for decimal types based on its size and parquet type.
+static ParquetColumnReader* GetDecimalColumnReader(const SchemaNode& node,
+    const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) {
+  switch (node.element->type) {
+    case parquet::Type::FIXED_LEN_BYTE_ARRAY:
+      switch (slot_desc->type().GetByteSize()) {
+      case 4:
+        return new ScalarColumnReader<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY,
+            true>(parent, node, slot_desc);
+      case 8:
+        return new ScalarColumnReader<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY,
+            true>(parent, node, slot_desc);
+      case 16:
+        return new ScalarColumnReader<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY,
+            true>(parent, node, slot_desc);
+      }
+      break;
+    case parquet::Type::BYTE_ARRAY:
+      switch (slot_desc->type().GetByteSize()) {
+      case 4:
+        return new ScalarColumnReader<Decimal4Value, parquet::Type::BYTE_ARRAY, true>(
+            parent, node, slot_desc);
+      case 8:
+        return new ScalarColumnReader<Decimal8Value, parquet::Type::BYTE_ARRAY, true>(
+            parent, node, slot_desc);
+      case 16:
+        return new ScalarColumnReader<Decimal16Value, parquet::Type::BYTE_ARRAY, true>(
+            parent, node, slot_desc);
+      }
+      break;
+    default:
+      DCHECK(false) << "Invalid decimal primitive type";
+  }
+  DCHECK(false) << "Invalid decimal type";
+  return nullptr;
+}
+
 ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
-    bool is_collection_field, const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) {
+    bool is_collection_field, const SlotDescriptor* slot_desc,
+    HdfsParquetScanner* parent) {
   ParquetColumnReader* reader = NULL;
   if (is_collection_field) {
     // Create collection reader (note this handles both NULL and non-NULL 'slot_desc')
@@ -1261,46 +1307,41 @@ ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
         reader = new BoolColumnReader(parent, node, slot_desc);
         break;
       case TYPE_TINYINT:
-        reader = new ScalarColumnReader<int8_t, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<int8_t, parquet::Type::INT32, true>(parent, node,
+            slot_desc);
         break;
       case TYPE_SMALLINT:
-        reader = new ScalarColumnReader<int16_t, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<int16_t, parquet::Type::INT32, true>(parent, node,
+            slot_desc);
         break;
       case TYPE_INT:
-        reader = new ScalarColumnReader<int32_t, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<int32_t, parquet::Type::INT32, true>(parent, node,
+            slot_desc);
         break;
       case TYPE_BIGINT:
-        reader = new ScalarColumnReader<int64_t, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<int64_t, parquet::Type::INT64, true>(parent, node,
+            slot_desc);
         break;
       case TYPE_FLOAT:
-        reader = new ScalarColumnReader<float, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<float, parquet::Type::FLOAT, true>(parent, node,
+            slot_desc);
         break;
       case TYPE_DOUBLE:
-        reader = new ScalarColumnReader<double, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<double, parquet::Type::DOUBLE, true>(parent, node,
+            slot_desc);
         break;
       case TYPE_TIMESTAMP:
-        reader = new ScalarColumnReader<TimestampValue, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>(
+            parent, node, slot_desc);
         break;
       case TYPE_STRING:
       case TYPE_VARCHAR:
       case TYPE_CHAR:
-        reader = new ScalarColumnReader<StringValue, true>(parent, node, slot_desc);
+        reader = new ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>(
+            parent, node, slot_desc);
         break;
       case TYPE_DECIMAL:
-        switch (slot_desc->type().GetByteSize()) {
-          case 4:
-            reader = new ScalarColumnReader<Decimal4Value, true>(
-                parent, node, slot_desc);
-            break;
-          case 8:
-            reader = new ScalarColumnReader<Decimal8Value, true>(
-                parent, node, slot_desc);
-            break;
-          case 16:
-            reader = new ScalarColumnReader<Decimal16Value, true>(
-                parent, node, slot_desc);
-            break;
-        }
+        reader = GetDecimalColumnReader(node, slot_desc, parent);
         break;
       default:
         DCHECK(false) << slot_desc->type().DebugString();
@@ -1309,7 +1350,8 @@ ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
     // Special case for counting scalar values (e.g. count(*), no materialized columns in
     // the file, only materializing a position slot). We won't actually read any values,
     // only the rep and def levels, so it doesn't matter what kind of reader we make.
-    reader = new ScalarColumnReader<int8_t, false>(parent, node, slot_desc);
+    reader = new ScalarColumnReader<int8_t, parquet::Type::INT32, false>(parent, node,
+        slot_desc);
   }
   return parent->obj_pool_.Add(reader);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-column-stats.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.cc b/be/src/exec/parquet-column-stats.cc
index 76b3365..4443309 100644
--- a/be/src/exec/parquet-column-stats.cc
+++ b/be/src/exec/parquet-column-stats.cc
@@ -61,11 +61,13 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
 
   switch (col_type.type) {
     case TYPE_BOOLEAN:
-      return ColumnStats<bool>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<bool>::DecodePlainValue(*stat_value, slot,
+          parquet::Type::BOOLEAN);
     case TYPE_TINYINT: {
       // parquet::Statistics encodes INT_8 values using 4 bytes.
       int32_t col_stats;
-      bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, &col_stats);
+      bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, &col_stats,
+          parquet::Type::INT32);
       if (!ret || col_stats < std::numeric_limits<int8_t>::min() ||
           col_stats > std::numeric_limits<int8_t>::max()) {
         return false;
@@ -76,7 +78,8 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
     case TYPE_SMALLINT: {
       // parquet::Statistics encodes INT_16 values using 4 bytes.
       int32_t col_stats;
-      bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, &col_stats);
+      bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, &col_stats,
+          parquet::Type::INT32);
       if (!ret || col_stats < std::numeric_limits<int16_t>::min() ||
           col_stats > std::numeric_limits<int16_t>::max()) {
         return false;
@@ -85,18 +88,24 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
       return true;
     }
     case TYPE_INT:
-      return ColumnStats<int32_t>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<int32_t>::DecodePlainValue(*stat_value, slot,
+          col_chunk.meta_data.type);
     case TYPE_BIGINT:
-      return ColumnStats<int64_t>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<int64_t>::DecodePlainValue(*stat_value, slot,
+          col_chunk.meta_data.type);
     case TYPE_FLOAT:
-      return ColumnStats<float>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<float>::DecodePlainValue(*stat_value, slot,
+          col_chunk.meta_data.type);
     case TYPE_DOUBLE:
-      return ColumnStats<double>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<double>::DecodePlainValue(*stat_value, slot,
+          col_chunk.meta_data.type);
     case TYPE_TIMESTAMP:
-      return ColumnStats<TimestampValue>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<TimestampValue>::DecodePlainValue(*stat_value, slot,
+          col_chunk.meta_data.type);
     case TYPE_STRING:
     case TYPE_VARCHAR:
-      return ColumnStats<StringValue>::DecodePlainValue(*stat_value, slot);
+      return ColumnStats<StringValue>::DecodePlainValue(*stat_value, slot,
+          col_chunk.meta_data.type);
     case TYPE_CHAR:
       /// We don't read statistics for CHAR columns, since CHAR support is broken in
       /// Impala (IMPALA-1652).
@@ -104,12 +113,15 @@ bool ColumnStatsBase::ReadFromThrift(const parquet::ColumnChunk& col_chunk,
     case TYPE_DECIMAL:
       switch (col_type.GetByteSize()) {
         case 4:
-          return ColumnStats<Decimal4Value>::DecodePlainValue(*stat_value, slot);
+          return ColumnStats<Decimal4Value>::DecodePlainValue(*stat_value, slot,
+              col_chunk.meta_data.type);
         case 8:
-          return ColumnStats<Decimal8Value>::DecodePlainValue(*stat_value, slot);
+          return ColumnStats<Decimal8Value>::DecodePlainValue(*stat_value, slot,
+              col_chunk.meta_data.type);
         case 16:
-          return ColumnStats<Decimal16Value>::DecodePlainValue(*stat_value, slot);
-      }
+          return ColumnStats<Decimal16Value>::DecodePlainValue(*stat_value, slot,
+              col_chunk.meta_data.type);
+        }
       DCHECK(false) << "Unknown decimal byte size: " << col_type.GetByteSize();
     default:
       DCHECK(false) << col_type.DebugString();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.h b/be/src/exec/parquet-column-stats.h
index 7278cdc..0ff277c 100644
--- a/be/src/exec/parquet-column-stats.h
+++ b/be/src/exec/parquet-column-stats.h
@@ -181,7 +181,8 @@ class ColumnStats : public ColumnStatsBase {
   /// Decodes the plain encoded stats value from 'buffer' and writes the result into the
   /// buffer pointed to by 'slot'. Returns true if decoding was successful, false
   /// otherwise. For timestamps, an additional validation will be performed.
-  static bool DecodePlainValue(const std::string& buffer, void* slot);
+  static bool DecodePlainValue(const std::string& buffer, void* slot,
+      parquet::Type::type parquet_type);
 
   /// Returns the number of bytes needed to encode value 'v'.
   int64_t BytesNeeded(const T& v) const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-column-stats.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.inline.h b/be/src/exec/parquet-column-stats.inline.h
index c0e90aa..5b67ee7 100644
--- a/be/src/exec/parquet-column-stats.inline.h
+++ b/be/src/exec/parquet-column-stats.inline.h
@@ -19,6 +19,7 @@
 #define IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H
 
 #include "exec/parquet-common.h"
+#include "gen-cpp/parquet_types.h"
 #include "parquet-column-stats.h"
 #include "runtime/string-value.inline.h"
 
@@ -79,11 +80,15 @@ inline void ColumnStats<T>::EncodePlainValue(
 }
 
 template <typename T>
-inline bool ColumnStats<T>::DecodePlainValue(const std::string& buffer, void* slot) {
+inline bool ColumnStats<T>::DecodePlainValue(const std::string& buffer, void* slot,
+    parquet::Type::type parquet_type) {
   T* result = reinterpret_cast<T*>(slot);
   int size = buffer.size();
   const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data());
-  if (ParquetPlainEncoder::Decode(data, data + size, size, result) == -1) return false;
+  if (ParquetPlainEncoder::DecodeByParquetType<T>(data, data + size, size, result,
+      parquet_type) == -1) {
+    return false;
+  }
   return true;
 }
 
@@ -103,7 +108,8 @@ inline void ColumnStats<bool>::EncodePlainValue(
 }
 
 template <>
-inline bool ColumnStats<bool>::DecodePlainValue(const std::string& buffer, void* slot) {
+inline bool ColumnStats<bool>::DecodePlainValue(const std::string& buffer, void* slot,
+    parquet::Type::type parquet_type) {
   bool* result = reinterpret_cast<bool*>(slot);
   DCHECK(buffer.size() == 1);
   *result = (buffer[0] != 0);
@@ -118,11 +124,14 @@ inline int64_t ColumnStats<bool>::BytesNeeded(const bool& v) const {
 /// Timestamp values need validation.
 template <>
 inline bool ColumnStats<TimestampValue>::DecodePlainValue(
-    const std::string& buffer, void* slot) {
+    const std::string& buffer, void* slot, parquet::Type::type parquet_type) {
   TimestampValue* result = reinterpret_cast<TimestampValue*>(slot);
   int size = buffer.size();
   const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data());
-  if (ParquetPlainEncoder::Decode(data, data + size, size, result) == -1) return false;
+  if (ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>(data, data + size,
+      size, result) == -1) {
+    return false;
+  }
   // We don't need to convert the value here, since we don't support reading timestamp
   // statistics written by Hive / old versions of parquet-mr. Should Hive add support for
   // writing new statistics for the deprecated timestamp type, we will have to add support
@@ -139,7 +148,7 @@ inline void ColumnStats<StringValue>::EncodePlainValue(
 
 template <>
 inline bool ColumnStats<StringValue>::DecodePlainValue(
-    const std::string& buffer, void* slot) {
+    const std::string& buffer, void* slot, parquet::Type::type parquet_type) {
   StringValue* result = reinterpret_cast<StringValue*>(slot);
   result->ptr = const_cast<char*>(buffer.data());
   result->len = buffer.size();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-common.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h
index 53de54a..91712e4 100644
--- a/be/src/exec/parquet-common.h
+++ b/be/src/exec/parquet-common.h
@@ -35,9 +35,9 @@ class TimestampValue;
 const uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
 const uint32_t PARQUET_CURRENT_VERSION = 1;
 
-/// Mapping of impala types to parquet storage types.  This is indexed by
+/// Mapping of impala's internal types to parquet storage types. This is indexed by
 /// PrimitiveType enum
-const parquet::Type::type IMPALA_TO_PARQUET_TYPES[] = {
+const parquet::Type::type INTERNAL_TO_PARQUET_TYPES[] = {
   parquet::Type::BOOLEAN,     // Invalid
   parquet::Type::BOOLEAN,     // NULL type
   parquet::Type::BOOLEAN,
@@ -84,9 +84,10 @@ const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = {
 /// calls.
 class ParquetPlainEncoder {
  public:
-  /// Returns the byte size of 'v'.
-  template <typename T>
-  static int ByteSize(const T& v) { return sizeof(T); }
+  /// Returns the byte size of 'v' where InternalType is the datatype that Impala uses
+  /// internally to store tuple data.
+  template <typename InternalType>
+  static int ByteSize(const InternalType& v) { return sizeof(InternalType); }
 
   /// Returns the encoded size of values of type t. Returns -1 if it is variable
   /// length. This can be different than the slot size of the types.
@@ -167,19 +168,52 @@ class ParquetPlainEncoder {
   /// be preallocated and big enough.  Buffer need not be aligned.
   /// 'fixed_len_size' is only applicable for data encoded using FIXED_LEN_BYTE_ARRAY and
   /// is the number of bytes the plain encoder should use.
-  template <typename T>
-  static int Encode(const T& t, int fixed_len_size, uint8_t* buffer) {
+  template <typename InternalType>
+  static int Encode(const InternalType& t, int fixed_len_size, uint8_t* buffer) {
     memcpy(buffer, &t, ByteSize(t));
     return ByteSize(t);
   }
 
+  template <typename InternalType>
+  static int DecodeByParquetType(const uint8_t* buffer, const uint8_t* buffer_end,
+      int fixed_len_size, InternalType* v, parquet::Type::type parquet_type) {
+    switch (parquet_type) {
+      case parquet::Type::BOOLEAN:
+        return ParquetPlainEncoder::Decode<InternalType, parquet::Type::BOOLEAN>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::INT32:
+        return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT32>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::INT64:
+        return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT64>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::INT96:
+        return ParquetPlainEncoder::Decode<InternalType, parquet::Type::INT96>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::FLOAT:
+        return ParquetPlainEncoder::Decode<InternalType, parquet::Type::FLOAT>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::DOUBLE:
+        return ParquetPlainEncoder::Decode<InternalType, parquet::Type::DOUBLE>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::BYTE_ARRAY:
+        return ParquetPlainEncoder::Decode<InternalType,
+            parquet::Type::BYTE_ARRAY>(buffer, buffer_end, fixed_len_size, v);
+      case parquet::Type::FIXED_LEN_BYTE_ARRAY:
+        return ParquetPlainEncoder::Decode<InternalType,
+            parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer_end, fixed_len_size, v);
+      default:
+        DCHECK(false) << "Unexpected physical type";
+    }
+  }
+
   /// Decodes t from 'buffer', reading up to the byte before 'buffer_end'. 'buffer'
-  /// need not be aligned. For types that are stored as FIXED_LEN_BYTE_ARRAY,
-  /// 'fixed_len_size' is the size of the object. Otherwise, it is unused.
+  /// need not be aligned. If PARQUET_TYPE is FIXED_LEN_BYTE_ARRAY then 'fixed_len_size'
+  /// is the size of the object. Otherwise, it is unused.
   /// Returns the number of bytes read or -1 if the value was not decoded successfully.
-  template <typename T>
+  template <typename InternalType, parquet::Type::type PARQUET_TYPE>
   static int Decode(const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
-      T* v) {
+      InternalType* v) {
     int byte_size = ByteSize(*v);
     if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
     memcpy(v, buffer, byte_size);
@@ -194,25 +228,27 @@ template <> int ParquetPlainEncoder::ByteSize(const ColumnType& t);
 /// Disable for bools. Plain encoding is not used for booleans.
 template <> int ParquetPlainEncoder::ByteSize(const bool& b);
 template <> int ParquetPlainEncoder::Encode(const bool&, int fixed_len_size, uint8_t*);
-template <> int ParquetPlainEncoder::Decode(const uint8_t*, const uint8_t*,
-    int fixed_len_size, bool* v);
+template <> int ParquetPlainEncoder::Decode<bool, parquet::Type::BOOLEAN>(const uint8_t*,
+    const uint8_t*, int fixed_len_size, bool* v);
 
 /// Not used for decimals since the plain encoding encodes them using
 /// FIXED_LEN_BYTE_ARRAY.
-template <>
-inline int ParquetPlainEncoder::ByteSize(const Decimal4Value&) {
+inline int DecimalByteSize() {
   DCHECK(false);
   return -1;
 }
+
+template <>
+inline int ParquetPlainEncoder::ByteSize(const Decimal4Value&) {
+  return DecimalByteSize();
+}
 template <>
 inline int ParquetPlainEncoder::ByteSize(const Decimal8Value&) {
-  DCHECK(false);
-  return -1;
+  return DecimalByteSize();
 }
 template <>
 inline int ParquetPlainEncoder::ByteSize(const Decimal16Value&) {
-  DCHECK(false);
-  return -1;
+  return DecimalByteSize();
 }
 
 /// Parquet doesn't have 8-bit or 16-bit ints. They are converted to 32-bit.
@@ -232,36 +268,39 @@ inline int ParquetPlainEncoder::ByteSize(const TimestampValue& v) {
 }
 
 template <>
-inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buffer_end,
-    int fixed_len_size, int8_t* v) {
+inline int ParquetPlainEncoder::Decode<int8_t, parquet::Type::INT32>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int8_t* v) {
   int byte_size = ByteSize(*v);
   if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
   *v = *buffer;
   return byte_size;
 }
 template <>
-inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buffer_end,
-    int fixed_len_size, int16_t* v) {
+inline int ParquetPlainEncoder::Decode<int16_t, parquet::Type::INT32>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int16_t* v) {
   int byte_size = ByteSize(*v);
   if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
   memcpy(v, buffer, sizeof(int16_t));
   return byte_size;
 }
 
+template<typename T>
+inline int EncodeToInt32(const T& v, int fixed_len_size, uint8_t* buffer) {
+  int32_t val = v;
+  memcpy(buffer, &val, sizeof(int32_t));
+  return ParquetPlainEncoder::ByteSize(v);
+}
+
 template <>
 inline int ParquetPlainEncoder::Encode(
     const int8_t& v, int fixed_len_size, uint8_t* buffer) {
-  int32_t val = v;
-  memcpy(buffer, &val, sizeof(int32_t));
-  return ByteSize(v);
+  return EncodeToInt32(v, fixed_len_size, buffer);
 }
 
 template <>
 inline int ParquetPlainEncoder::Encode(
     const int16_t& v, int fixed_len_size, uint8_t* buffer) {
-  int32_t val = v;
-  memcpy(buffer, &val, sizeof(int32_t));
-  return ByteSize(v);
+  return EncodeToInt32(v, fixed_len_size, buffer);
 }
 
 template <>
@@ -273,8 +312,9 @@ inline int ParquetPlainEncoder::Encode(
 }
 
 template <>
-inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buffer_end,
-    int fixed_len_size, StringValue* v) {
+inline int ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    StringValue* v) {
   if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1;
   memcpy(&v->len, buffer, sizeof(int32_t));
   int byte_size = ByteSize(*v);
@@ -290,49 +330,92 @@ inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buf
 /// that the value in the in-memory format has leading zeros or negative 1's.
 /// For example, precision 2 fits in 1 byte. All decimals stored as Decimal4Value
 /// will have 3 bytes of leading zeros, we will only store the interesting byte.
+template<typename T>
+inline int EncodeDecimal(const T& v, int fixed_len_size, uint8_t* buffer) {
+  DecimalUtil::EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
+  return fixed_len_size;
+}
+
 template <>
 inline int ParquetPlainEncoder::Encode(
     const Decimal4Value& v, int fixed_len_size, uint8_t* buffer) {
-  DecimalUtil::EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
-  return fixed_len_size;
+  return EncodeDecimal(v, fixed_len_size, buffer);
 }
 
 template <>
 inline int ParquetPlainEncoder::Encode(
     const Decimal8Value& v, int fixed_len_size, uint8_t* buffer) {
-  DecimalUtil::EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
-  return fixed_len_size;
+  return EncodeDecimal(v, fixed_len_size, buffer);
 }
 
 template <>
 inline int ParquetPlainEncoder::Encode(
     const Decimal16Value& v, int fixed_len_size, uint8_t* buffer) {
-  DecimalUtil::EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
-  return fixed_len_size;
+  return EncodeDecimal(v, fixed_len_size, buffer);
 }
 
-template <>
-inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buffer_end,
-    int fixed_len_size, Decimal4Value* v) {
+template<typename T>
+inline int DecodeDecimalFixedLen(const uint8_t* buffer, const uint8_t* buffer_end,
+    int fixed_len_size, T* v) {
   if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1;
   DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
   return fixed_len_size;
 }
 
 template <>
-inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buffer_end,
-    int fixed_len_size, Decimal8Value* v) {
-  if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1;
-  DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
-  return fixed_len_size;
+inline int ParquetPlainEncoder::
+Decode<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, Decimal4Value* v) {
+  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
 }
 
 template <>
-inline int ParquetPlainEncoder::Decode(const uint8_t* buffer, const uint8_t* buffer_end,
-    int fixed_len_size, Decimal16Value* v) {
-  if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1;
-  DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
-  return fixed_len_size;
+inline int ParquetPlainEncoder::
+Decode<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, Decimal8Value* v) {
+  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::
+Decode<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, Decimal16Value* v) {
+  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
+}
+
+/// Helper method to decode Decimal type stored as variable length byte array.
+template<typename T>
+inline int DecodeDecimalByteArray(const uint8_t* buffer, const uint8_t* buffer_end,
+    int fixed_len_size, T* v) {
+  if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1;
+  int encoded_byte_size;
+  memcpy(&encoded_byte_size, buffer, sizeof(int32_t));
+  int byte_size = sizeof(int32_t) + encoded_byte_size;
+  if (UNLIKELY(encoded_byte_size < 0 || buffer_end - buffer < byte_size)) return -1;
+  uint8_t* val_ptr = const_cast<uint8_t*>(buffer) + sizeof(int32_t);
+  DecimalUtil::DecodeFromFixedLenByteArray(val_ptr, encoded_byte_size, v);
+  return byte_size;
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<Decimal4Value, parquet::Type::BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    Decimal4Value* v) {
+  return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<Decimal8Value, parquet::Type::BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    Decimal8Value* v) {
+  return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    Decimal16Value* v) {
+  return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-metadata-utils.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.cc b/be/src/exec/parquet-metadata-utils.cc
index fc34eda..3fc6b3e 100644
--- a/be/src/exec/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet-metadata-utils.cc
@@ -40,6 +40,39 @@ using boost::algorithm::token_compress_on;
 
 namespace impala {
 
+namespace {
+
+const map<PrimitiveType, set<parquet::Type::type>> SUPPORTED_PHYSICAL_TYPES = {
+    {PrimitiveType::INVALID_TYPE, {parquet::Type::BOOLEAN}},
+    {PrimitiveType::TYPE_NULL, {parquet::Type::BOOLEAN}},
+    {PrimitiveType::TYPE_BOOLEAN, {parquet::Type::BOOLEAN}},
+    {PrimitiveType::TYPE_TINYINT, {parquet::Type::INT32}},
+    {PrimitiveType::TYPE_SMALLINT, {parquet::Type::INT32}},
+    {PrimitiveType::TYPE_INT, {parquet::Type::INT32}},
+    {PrimitiveType::TYPE_BIGINT, {parquet::Type::INT64}},
+    {PrimitiveType::TYPE_FLOAT, {parquet::Type::FLOAT}},
+    {PrimitiveType::TYPE_DOUBLE, {parquet::Type::DOUBLE}},
+    {PrimitiveType::TYPE_TIMESTAMP, {parquet::Type::INT96}},
+    {PrimitiveType::TYPE_STRING, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_DATE, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_DATETIME, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_BINARY, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_DECIMAL, {parquet::Type::FIXED_LEN_BYTE_ARRAY,
+        parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_CHAR, {parquet::Type::BYTE_ARRAY}},
+    {PrimitiveType::TYPE_VARCHAR, {parquet::Type::BYTE_ARRAY}},
+};
+
+/// Returns true if 'parquet_type' is a supported physical encoding for the Impala
+/// primitive type, false otherwise.
+bool IsSupportedPhysicalType(PrimitiveType impala_type,
+    parquet::Type::type parquet_type) {
+  auto encodings = SUPPORTED_PHYSICAL_TYPES.find(impala_type);
+  DCHECK(encodings != SUPPORTED_PHYSICAL_TYPES.end());
+  return encodings->second.find(parquet_type) != encodings->second.end();
+}
+
+}
 // Needs to be in sync with the order of enum values declared in TParquetArrayResolution.
 const std::vector<ParquetSchemaResolver::ArrayEncoding>
     ParquetSchemaResolver::ORDERED_ARRAY_ENCODINGS[] =
@@ -110,97 +143,85 @@ static bool IsEncodingSupported(parquet::Encoding::type e) {
   }
 }
 
-Status ParquetMetadataUtils::ValidateColumn(const parquet::FileMetaData& file_metadata,
-    const char* filename, int row_group_idx, int col_idx,
-    const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
-    RuntimeState* state) {
-  const parquet::ColumnChunk& file_data =
-      file_metadata.row_groups[row_group_idx].columns[col_idx];
+Status ParquetMetadataUtils::ValidateRowGroupColumn(
+    const parquet::FileMetaData& file_metadata, const char* filename, int row_group_idx,
+    int col_idx, const parquet::SchemaElement& schema_element, RuntimeState* state) {
+  const parquet::ColumnMetaData& col_chunk_metadata =
+      file_metadata.row_groups[row_group_idx].columns[col_idx].meta_data;
 
   // Check the encodings are supported.
-  const vector<parquet::Encoding::type>& encodings = file_data.meta_data.encodings;
+  const vector<parquet::Encoding::type>& encodings = col_chunk_metadata.encodings;
   for (int i = 0; i < encodings.size(); ++i) {
     if (!IsEncodingSupported(encodings[i])) {
-      stringstream ss;
-      ss << "File '" << filename << "' uses an unsupported encoding: "
-         << PrintEncoding(encodings[i]) << " for column '" << schema_element.name
-         << "'.";
-      return Status(ss.str());
+      return Status(Substitute("File '$0' uses an unsupported encoding: $1 for column "
+          "'$2'.", filename, PrintEncoding(encodings[i]), schema_element.name));
     }
   }
 
   // Check the compression is supported.
-  if (file_data.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED &&
-      file_data.meta_data.codec != parquet::CompressionCodec::SNAPPY &&
-      file_data.meta_data.codec != parquet::CompressionCodec::GZIP) {
-    stringstream ss;
-    ss << "File '" << filename << "' uses an unsupported compression: "
-        << file_data.meta_data.codec << " for column '" << schema_element.name
-        << "'.";
-    return Status(ss.str());
+  if (col_chunk_metadata.codec != parquet::CompressionCodec::UNCOMPRESSED &&
+      col_chunk_metadata.codec != parquet::CompressionCodec::SNAPPY &&
+      col_chunk_metadata.codec != parquet::CompressionCodec::GZIP) {
+    return Status(Substitute("File '$0' uses an unsupported compression: $1 for column "
+        "'$2'.", filename, col_chunk_metadata.codec, schema_element.name));
   }
 
-  // Validation after this point is only if col_reader is reading values.
-  if (slot_desc == NULL) return Status::OK();
-
-  parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[slot_desc->type().type];
-  if (UNLIKELY(type != file_data.meta_data.type)) {
-    return Status(Substitute("Unexpected Parquet type in file '$0' metadata expected $1 "
-        "actual $2: file may be corrupt", filename, type, file_data.meta_data.type));
+  if (col_chunk_metadata.type != schema_element.type) {
+    return Status(Substitute("Mismatched column chunk Parquet type in file '$0' column "
+            "'$1'. Expected $2 actual $3: file may be corrupt", filename,
+            schema_element.name, col_chunk_metadata.type, schema_element.type));
   }
+  return Status::OK();
+}
+
+Status ParquetMetadataUtils::ValidateColumn(const char* filename,
+    const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
+    RuntimeState* state) {
+  // Following validation logic is only for non-complex types.
+  if (slot_desc->type().IsComplexType()) return Status::OK();
+
+  if (UNLIKELY(!IsSupportedPhysicalType(slot_desc->type().type, schema_element.type))) {
+    return Status(Substitute("Unsupported Parquet type in file '$0' metadata. Logical "
+        "type: $1, physical type: $2. File may be corrupt.",
+        filename, slot_desc->type().type, schema_element.type));
+    }
 
   // Check the decimal scale in the file matches the metastore scale and precision.
   // We fail the query if the metadata makes it impossible for us to safely read
   // the file. If we don't require the metadata, we will fail the query if
   // abort_on_error is true, otherwise we will just log a warning.
-  bool is_converted_type_decimal = schema_element.__isset.converted_type &&
-      schema_element.converted_type == parquet::ConvertedType::DECIMAL;
+  bool is_converted_type_decimal = schema_element.__isset.converted_type
+      && schema_element.converted_type == parquet::ConvertedType::DECIMAL;
   if (slot_desc->type().type == TYPE_DECIMAL) {
     // We require that the scale and byte length be set.
-    if (schema_element.type != parquet::Type::FIXED_LEN_BYTE_ARRAY) {
-      stringstream ss;
-      ss << "File '" << filename << "' column '" << schema_element.name
-         << "' should be a decimal column encoded using FIXED_LEN_BYTE_ARRAY.";
-      return Status(ss.str());
-    }
-
-    if (!schema_element.__isset.type_length) {
-      stringstream ss;
-      ss << "File '" << filename << "' column '" << schema_element.name
-         << "' does not have type_length set.";
-      return Status(ss.str());
-    }
+    if (schema_element.type == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+      if (!schema_element.__isset.type_length) {
+        return Status(Substitute("File '$0' column '$1' does not have type_length set.",
+            filename, schema_element.name));
+      }
 
-    int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type());
-    if (schema_element.type_length != expected_len) {
-      stringstream ss;
-      ss << "File '" << filename << "' column '" << schema_element.name
-         << "' has an invalid type length. Expecting: " << expected_len
-         << " len in file: " << schema_element.type_length;
-      return Status(ss.str());
+      int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type());
+      if (schema_element.type_length != expected_len) {
+        return Status(Substitute("File '$0' column '$1' has an invalid type length. "
+            "Expecting: $2 len in file: $3", filename, schema_element.name, expected_len,
+            schema_element.type_length));
+      }
     }
-
     if (!schema_element.__isset.scale) {
-      stringstream ss;
-      ss << "File '" << filename << "' column '" << schema_element.name
-         << "' does not have the scale set.";
-      return Status(ss.str());
+      return Status(Substitute("File '$0' column '$1' does not have the scale set.",
+          filename, schema_element.name));
     }
 
     if (schema_element.scale != slot_desc->type().scale) {
       // TODO: we could allow a mismatch and do a conversion at this step.
-      stringstream ss;
-      ss << "File '" << filename << "' column '" << schema_element.name
-         << "' has a scale that does not match the table metadata scale."
-         << " File metadata scale: " << schema_element.scale
-         << " Table metadata scale: " << slot_desc->type().scale;
-      return Status(ss.str());
+      return Status(Substitute("File '$0' column '$1' has a scale that does not match "
+          "the table metadata scale. File metadata scale: $2 Table metadata scale: $3",
+          filename, schema_element.name, schema_element.scale, slot_desc->type().scale));
     }
 
     // The other decimal metadata should be there but we don't need it.
     if (!schema_element.__isset.precision) {
-      ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename,
-          schema_element.name);
+      ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename, schema_element.name);
       RETURN_IF_ERROR(state->LogOrReturnError(msg));
     } else {
       if (schema_element.precision != slot_desc->type().precision) {
@@ -218,10 +239,10 @@ Status ParquetMetadataUtils::ValidateColumn(const parquet::FileMetaData& file_me
           schema_element.name);
       RETURN_IF_ERROR(state->LogOrReturnError(msg));
     }
-  } else if (schema_element.__isset.scale || schema_element.__isset.precision ||
-      is_converted_type_decimal) {
-    ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL, filename,
-        schema_element.name, slot_desc->type().DebugString());
+  } else if (schema_element.__isset.scale || schema_element.__isset.precision
+      || is_converted_type_decimal) {
+    ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL, filename, schema_element.name,
+        slot_desc->type().DebugString());
     RETURN_IF_ERROR(state->LogOrReturnError(msg));
   }
   return Status::OK();
@@ -350,8 +371,8 @@ Status ParquetSchemaResolver::CreateSchemaTree(
     ++(*col_idx);
   } else if (node->element->num_children > SCHEMA_NODE_CHILDREN_SANITY_LIMIT) {
     // Sanity-check the schema to avoid allocating absurdly large buffers below.
-    return Status(Substitute("Schema in Parquet file '$0' has $1 children, more than limit of "
-        "$2. File is likely corrupt", filename_, node->element->num_children,
+    return Status(Substitute("Schema in Parquet file '$0' has $1 children, more than "
+        "limit of $2. File is likely corrupt", filename_, node->element->num_children,
         SCHEMA_NODE_CHILDREN_SANITY_LIMIT));
   } else if (node->element->num_children < 0) {
     return Status(Substitute("Corrupt Parquet file '$0': schema element has $1 children.",
@@ -668,8 +689,7 @@ Status ParquetSchemaResolver::ValidateScalarNode(const SchemaNode& node,
         PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
     return Status::Expected(msg);
   }
-  parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[col_type.type];
-  if (type != node.element->type) {
+  if (!IsSupportedPhysicalType(col_type.type, node.element->type)) {
     ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
         PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
     return Status::Expected(msg);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-metadata-utils.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.h b/be/src/exec/parquet-metadata-utils.h
index 767a1a3..f3a144d 100644
--- a/be/src/exec/parquet-metadata-utils.h
+++ b/be/src/exec/parquet-metadata-utils.h
@@ -44,10 +44,15 @@ class ParquetMetadataUtils {
   static Status ValidateOffsetInFile(const std::string& filename, int col_idx,
       int64_t file_length, int64_t offset, const std::string& offset_name);
 
-  /// Validates the column metadata to make sure this column is supported (e.g. encoding,
-  /// type, etc) and matches the type of given slot_desc.
-  static Status ValidateColumn(const parquet::FileMetaData& file_metadata,
+  /// Validates the column metadata inside a row group to make sure this column is
+  /// supported (e.g. encoding, type, etc).
+  static Status ValidateRowGroupColumn(const parquet::FileMetaData& file_metadata,
       const char* filename, int row_group_idx, int col_idx,
+      const parquet::SchemaElement& schema_element, RuntimeState* state);
+
+  /// Validates the column metadata to make sure the column is supported and its type
+  /// attributes conform to the parquet spec.
+  static Status ValidateColumn(const char* filename,
       const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
       RuntimeState* state);
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/exec/parquet-plain-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-plain-test.cc b/be/src/exec/parquet-plain-test.cc
index c86a33b..37acd2c 100644
--- a/be/src/exec/parquet-plain-test.cc
+++ b/be/src/exec/parquet-plain-test.cc
@@ -29,39 +29,83 @@
 
 namespace impala {
 
+template <typename InternalType>
+int Encode(const InternalType& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type physical_type){
+  return ParquetPlainEncoder::Encode(v, encoded_byte_size, buffer);
+}
+
+// Handle special case of encoding decimal types stored as BYTE_ARRAY since it is not
+// implemented in Impala.
+// When parquet_type equals BYTE_ARRAY: 'encoded_byte_size' is the sum of the
+// minimum number of bytes required to store the unscaled value and the bytes required to
+// store the size. Value 'v' passed to it should not contain leading zeros as this
+// method does not strictly conform to the parquet spec in removing those.
+template <typename DecimalType>
+int EncodeDecimal(const DecimalType& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type parquet_type) {
+  if (parquet_type == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+    return ParquetPlainEncoder::Encode(v, encoded_byte_size, buffer);
+  } else if (parquet_type == parquet::Type::BYTE_ARRAY) {
+    int decimal_size = encoded_byte_size - sizeof(int32_t);
+    memcpy(buffer, &decimal_size, sizeof(int32_t));
+    DecimalUtil::EncodeToFixedLenByteArray(buffer + sizeof(int32_t), decimal_size, v);
+    return encoded_byte_size;
+  }
+  return -1;
+}
+
+template<>
+int Encode(const Decimal4Value& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type parquet_type) {
+  return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type);
+}
+
+template<>
+int Encode(const Decimal8Value& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type parquet_type) {
+  return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type);
+}
+
+template<>
+int Encode(const Decimal16Value& v, int encoded_byte_size, uint8_t* buffer,
+    parquet::Type::type parquet_type){
+  return EncodeDecimal(v, encoded_byte_size, buffer, parquet_type);
+}
+
 /// Test that the decoder fails when asked to decode a truncated value.
-template <typename T>
-void TestTruncate(const T& v, int expected_byte_size) {
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void TestTruncate(const InternalType& v, int expected_byte_size) {
   uint8_t buffer[expected_byte_size];
-  int encoded_size = ParquetPlainEncoder::Encode(v, expected_byte_size, buffer);
+  int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
   EXPECT_EQ(encoded_size, expected_byte_size);
 
   // Check all possible truncations of the buffer.
   for (int truncated_size = encoded_size - 1; truncated_size >= 0; --truncated_size) {
-    T result;
+    InternalType result;
     /// Copy to heap-allocated buffer so that ASAN can detect buffer overruns.
     uint8_t* truncated_buffer = new uint8_t[truncated_size];
     memcpy(truncated_buffer, buffer, truncated_size);
-    int decoded_size = ParquetPlainEncoder::Decode(truncated_buffer,
-        truncated_buffer + truncated_size, expected_byte_size, &result);
+    int decoded_size = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
+        truncated_buffer, truncated_buffer + truncated_size, expected_byte_size, &result);
     EXPECT_EQ(-1, decoded_size);
     delete[] truncated_buffer;
   }
 }
 
-template <typename T>
-void TestType(const T& v, int expected_byte_size) {
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void TestType(const InternalType& v, int expected_byte_size) {
   uint8_t buffer[expected_byte_size];
-  int encoded_size = ParquetPlainEncoder::Encode(v, expected_byte_size, buffer);
+  int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
   EXPECT_EQ(encoded_size, expected_byte_size);
 
-  T result;
-  int decoded_size = ParquetPlainEncoder::Decode(buffer, buffer + expected_byte_size,
-      expected_byte_size, &result);
+  InternalType result;
+  int decoded_size = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(buffer,
+      buffer + expected_byte_size, expected_byte_size, &result);
   EXPECT_EQ(decoded_size, expected_byte_size);
   EXPECT_EQ(result, v);
 
-  TestTruncate(v, expected_byte_size);
+  TestTruncate<InternalType, PARQUET_TYPE>(v, expected_byte_size);
 }
 
 TEST(PlainEncoding, Basic) {
@@ -74,42 +118,108 @@ TEST(PlainEncoding, Basic) {
   StringValue sv("Hello");
   TimestampValue tv;
 
-  TestType(i8, sizeof(int32_t));
-  TestType(i16, sizeof(int32_t));
-  TestType(i32, sizeof(int32_t));
-  TestType(i64, sizeof(int64_t));
-  TestType(f, sizeof(float));
-  TestType(d, sizeof(double));
-  TestType(sv, sizeof(int32_t) + sv.len);
-  TestType(tv, 12);
-
-  TestType(Decimal4Value(1234), sizeof(Decimal4Value));
-  TestType(Decimal4Value(-1234), sizeof(Decimal4Value));
-
-  TestType(Decimal8Value(1234), sizeof(Decimal8Value));
-  TestType(Decimal8Value(-1234), sizeof(Decimal8Value));
-  TestType(Decimal8Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal8Value));
-  TestType(Decimal8Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal8Value));
-
-  TestType(Decimal16Value(1234), 16);
-  TestType(Decimal16Value(-1234), 16);
-  TestType(Decimal16Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal16Value));
-  TestType(Decimal16Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal16Value));
-  TestType(Decimal16Value(std::numeric_limits<int64_t>::max()), sizeof(Decimal16Value));
-  TestType(Decimal16Value(std::numeric_limits<int64_t>::min()), sizeof(Decimal16Value));
+  TestType<int8_t, parquet::Type::INT32>(i8, sizeof(int32_t));
+  TestType<int16_t, parquet::Type::INT32>(i16, sizeof(int32_t));
+  TestType<int32_t, parquet::Type::INT32>(i32, sizeof(int32_t));
+  TestType<int64_t, parquet::Type::INT64>(i64, sizeof(int64_t));
+  TestType<float, parquet::Type::FLOAT>(f, sizeof(float));
+  TestType<double, parquet::Type::DOUBLE>(d, sizeof(double));
+  TestType<StringValue, parquet::Type::BYTE_ARRAY>(sv, sizeof(int32_t) + sv.len);
+  TestType<TimestampValue, parquet::Type::INT96>(tv, 12);
+
+  int test_val = 1234;
+  int var_len_decimal_size = sizeof(int32_t)
+      + 2 /*min bytes required for storing test_val*/;
+  // Decimal4Value: General test case
+  TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(test_val),
+      var_len_decimal_size);
+  TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(test_val * -1),
+      var_len_decimal_size);
+  TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(test_val),
+      sizeof(Decimal4Value));
+  TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal4Value(test_val * -1), sizeof(Decimal4Value));
+
+  // Decimal8Value: General test case
+  TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(test_val),
+      var_len_decimal_size);
+  TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(test_val * -1),
+      var_len_decimal_size);
+  TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(test_val),
+      sizeof(Decimal8Value));
+  TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal8Value(test_val * -1), sizeof(Decimal8Value));
+
+  // Decimal16Value: General test case
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(test_val),
+      var_len_decimal_size);
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(test_val * -1),
+      var_len_decimal_size);
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>( Decimal16Value(test_val),
+      sizeof(Decimal16Value));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(test_val * -1), sizeof(Decimal16Value));
+
+  // Decimal8Value: int32 limits test
+  TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(
+      Decimal8Value(std::numeric_limits<int32_t>::max()),
+      sizeof(int32_t) + sizeof(int32_t));
+  TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(
+      Decimal8Value(std::numeric_limits<int32_t>::min()),
+      sizeof(int32_t) + sizeof(int32_t));
+  TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal8Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal8Value));
+  TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal8Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal8Value));
+
+  // Decimal16Value: int32 limits test
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int32_t>::max()),
+      sizeof(int32_t) + sizeof(int32_t));
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int32_t>::min()),
+      sizeof(int32_t) + sizeof(int32_t));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int32_t>::max()), sizeof(Decimal16Value));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int32_t>::min()), sizeof(Decimal16Value));
+
+  // Decimal16Value: int64 limits test
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int64_t>::max()),
+      sizeof(int32_t) + sizeof(int64_t));
+  TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int64_t>::min()),
+      sizeof(int32_t) + sizeof(int64_t));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int64_t>::max()), sizeof(Decimal16Value));
+  TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+      Decimal16Value(std::numeric_limits<int64_t>::min()), sizeof(Decimal16Value));
 
   // two digit values can be encoded with any byte size.
   for (int i = 1; i <=16; ++i) {
     if (i <= 4) {
-      TestType(Decimal4Value(i), i);
-      TestType(Decimal4Value(-i), i);
+      TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(i),
+          i + sizeof(int32_t));
+      TestType<Decimal4Value, parquet::Type::BYTE_ARRAY>(Decimal4Value(-i),
+          i + sizeof(int32_t));
+      TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(i), i);
+      TestType<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal4Value(-i), i);
     }
     if (i <= 8) {
-      TestType(Decimal8Value(i), i);
-      TestType(Decimal8Value(-i), i);
+      TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(i),
+          i + sizeof(int32_t));
+      TestType<Decimal8Value, parquet::Type::BYTE_ARRAY>(Decimal8Value(-i),
+          i + sizeof(int32_t));
+      TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(i), i);
+      TestType<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal8Value(-i), i);
     }
-    TestType(Decimal16Value(i), i);
-    TestType(Decimal16Value(-i), i);
+    TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(i),
+        i + sizeof(int32_t));
+    TestType<Decimal16Value, parquet::Type::BYTE_ARRAY>(Decimal16Value(-i),
+        i + sizeof(int32_t));
+    TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal16Value(i), i);
+    TestType<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(Decimal16Value(-i), i);
   }
 }
 
@@ -155,8 +265,8 @@ TEST(PlainEncoding, CorruptString) {
   memcpy(buffer, &len, sizeof(int32_t));
 
   StringValue result;
-  int decoded_size =
-      ParquetPlainEncoder::Decode(buffer, buffer + sizeof(buffer), 0, &result);
+  int decoded_size = ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>(
+      buffer, buffer + sizeof(buffer), 0, &result);
   EXPECT_EQ(decoded_size, -1);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index 479f095..62b3d3a 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -210,6 +210,7 @@ class DictDecoder : public DictDecoderBase {
   /// dictionary values (values stored using FIXED_LEN_BYTE_ARRAY).
   /// Returns true if the dictionary values were all successfully decoded, or false
   /// if the dictionary was corrupt.
+  template<parquet::Type::type PARQUET_TYPE>
   bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size);
 
   virtual int num_entries() const { return dict_.size(); }
@@ -337,14 +338,15 @@ inline int DictEncoderBase::WriteData(uint8_t* buffer, int buffer_len) {
 }
 
 template<typename T>
+template<parquet::Type::type PARQUET_TYPE>
 inline bool DictDecoder<T>::Reset(uint8_t* dict_buffer, int dict_len,
     int fixed_len_size) {
   dict_.clear();
   uint8_t* end = dict_buffer + dict_len;
   while (dict_buffer < end) {
     T value;
-    int decoded_len =
-        ParquetPlainEncoder::Decode(dict_buffer, end, fixed_len_size, &value);
+    int decoded_len = ParquetPlainEncoder::Decode<T, PARQUET_TYPE>(dict_buffer, end,
+        fixed_len_size, &value);
     if (UNLIKELY(decoded_len < 0)) return false;
     dict_buffer += decoded_len;
     dict_.push_back(value);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index 620f431..de0fb11 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -29,15 +29,15 @@
 
 namespace impala {
 
-template<typename T>
-void ValidateDict(const vector<T>& values, const vector<T>& dict_values,
-                  int fixed_buffer_byte_size) {
-  set<T> values_set(values.begin(), values.end());
+template<typename InternalType, parquet::Type::type PARQUET_TYPE>
+void ValidateDict(const vector<InternalType>& values,
+    const vector<InternalType>& dict_values, int fixed_buffer_byte_size) {
+  set<InternalType> values_set(values.begin(), values.end());
 
   MemTracker tracker;
   MemPool pool(&tracker);
-  DictEncoder<T> encoder(&pool, fixed_buffer_byte_size);
-  for (T i: values) encoder.Put(i);
+  DictEncoder<InternalType> encoder(&pool, fixed_buffer_byte_size);
+  for (InternalType i: values) encoder.Put(i);
   EXPECT_EQ(encoder.num_entries(), values_set.size());
 
   uint8_t dict_buffer[encoder.dict_encoded_size()];
@@ -49,22 +49,22 @@ void ValidateDict(const vector<T>& values, const vector<T>& dict_values,
   EXPECT_GT(data_len, 0);
   encoder.ClearIndices();
 
-  DictDecoder<T> decoder;
-  ASSERT_TRUE(
-      decoder.Reset(dict_buffer, encoder.dict_encoded_size(), fixed_buffer_byte_size));
+  DictDecoder<InternalType> decoder;
+  ASSERT_TRUE(decoder.template Reset<PARQUET_TYPE>(dict_buffer,
+      encoder.dict_encoded_size(),fixed_buffer_byte_size));
 
   // Test direct access to the dictionary via indexes
   for (int i = 0; i < dict_values.size(); ++i) {
-    T expected_value = dict_values[i];
-    T out_value;
+    InternalType expected_value = dict_values[i];
+    InternalType out_value;
 
     decoder.GetValue(i, &out_value);
     EXPECT_EQ(expected_value, out_value);
   }
   // Test access to dictionary via internal stream
   ASSERT_OK(decoder.SetData(data_buffer, data_len));
-  for (T i: values) {
-    T j;
+  for (InternalType i: values) {
+    InternalType j;
     decoder.GetNextValue(&j);
     EXPECT_EQ(i, j);
   }
@@ -96,7 +96,7 @@ TEST(DictTest, TestStrings) {
   values.push_back(sv3);
   values.push_back(sv4);
 
-  ValidateDict(values, dict_values, -1);
+  ValidateDict<StringValue, parquet::Type::BYTE_ARRAY>(values, dict_values, -1);
 }
 
 TEST(DictTest, TestTimestamps) {
@@ -117,51 +117,57 @@ TEST(DictTest, TestTimestamps) {
   values.push_back(tv1);
   values.push_back(tv1);
 
-  ValidateDict(values, dict_values,
+  ValidateDict<TimestampValue, parquet::Type::INT96>(values, dict_values,
       ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TIMESTAMP)));
 }
 
-template<typename T>
-void IncrementValue(T* t) { ++(*t); }
+template<typename InternalType>
+void IncrementValue(InternalType* t) { ++(*t); }
 
 template <> void IncrementValue(Decimal4Value* t) { ++(t->value()); }
 template <> void IncrementValue(Decimal8Value* t) { ++(t->value()); }
 template <> void IncrementValue(Decimal16Value* t) { ++(t->value()); }
 
-template<typename T>
+template<typename InternalType, parquet::Type::type PARQUET_TYPE>
 void TestNumbers(int max_value, int repeat, int value_byte_size) {
-  vector<T> values;
-  vector<T> dict_values;
-  for (T val = 0; val < max_value; IncrementValue(&val)) {
+  vector<InternalType> values;
+  vector<InternalType> dict_values;
+  for (InternalType val = 0; val < max_value; IncrementValue(&val)) {
     for (int i = 0; i < repeat; ++i) {
       values.push_back(val);
     }
     dict_values.push_back(val);
   }
 
-  ValidateDict(values, dict_values, value_byte_size);
+  ValidateDict<InternalType, PARQUET_TYPE>(values, dict_values, value_byte_size);
 }
 
-template<typename T>
+template<typename InternalType, parquet::Type::type PARQUET_TYPE>
 void TestNumbers(int value_byte_size) {
-  TestNumbers<T>(100, 1, value_byte_size);
-  TestNumbers<T>(1, 100, value_byte_size);
-  TestNumbers<T>(1, 1, value_byte_size);
-  TestNumbers<T>(1, 2, value_byte_size);
+  TestNumbers<InternalType, PARQUET_TYPE>(100, 1, value_byte_size);
+  TestNumbers<InternalType, PARQUET_TYPE>(1, 100, value_byte_size);
+  TestNumbers<InternalType, PARQUET_TYPE>(1, 1, value_byte_size);
+  TestNumbers<InternalType, PARQUET_TYPE>(1, 2, value_byte_size);
 }
 
 TEST(DictTest, TestNumbers) {
-  TestNumbers<int8_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TINYINT)));
-  TestNumbers<int16_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_SMALLINT)));
-  TestNumbers<int32_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_INT)));
-  TestNumbers<int64_t>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_BIGINT)));
-  TestNumbers<float>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_FLOAT)));
-  TestNumbers<double>(ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_DOUBLE)));
+  TestNumbers<int8_t, parquet::Type::INT32>(
+      ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TINYINT)));
+  TestNumbers<int16_t, parquet::Type::INT32>(
+      ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_SMALLINT)));
+  TestNumbers<int32_t, parquet::Type::INT32>(
+      ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_INT)));
+  TestNumbers<int64_t, parquet::Type::INT64>(
+      ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_BIGINT)));
+  TestNumbers<float, parquet::Type::FLOAT>(
+      ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_FLOAT)));
+  TestNumbers<double, parquet::Type::DOUBLE>(
+      ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_DOUBLE)));
 
   for (int i = 1; i <= 16; ++i) {
-    if (i <= 4) TestNumbers<Decimal4Value>(i);
-    if (i <= 8) TestNumbers<Decimal8Value>(i);
-    TestNumbers<Decimal16Value>(i);
+    if (i <= 4) TestNumbers<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(i);
+    if (i <= 8) TestNumbers<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(i);
+    TestNumbers<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(i);
   }
 }
 
@@ -173,7 +179,8 @@ TEST(DictTest, TestInvalidStrings) {
   // Test a dictionary with a string encoded with negative length. Initializing
   // the decoder should fail.
   DictDecoder<StringValue> decoder;
-  ASSERT_FALSE(decoder.Reset(buffer, sizeof(buffer), 0));
+  ASSERT_FALSE(decoder.template Reset<parquet::Type::BYTE_ARRAY>(buffer, sizeof(buffer),
+      0));
 }
 
 TEST(DictTest, TestStringBufferOverrun) {
@@ -185,7 +192,8 @@ TEST(DictTest, TestStringBufferOverrun) {
   // Initializing the dictionary should fail, since the string would reference
   // invalid memory.
   DictDecoder<StringValue> decoder;
-  ASSERT_FALSE(decoder.Reset(buffer, sizeof(buffer), 0));
+  ASSERT_FALSE(decoder.template Reset<parquet::Type::BYTE_ARRAY>(buffer, sizeof(buffer),
+      0));
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index 231a901..e1dc496 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -116,3 +116,10 @@ Reproduction steps:
    file_metadata_.schema[0].__set_repetition_type(FieldRepetitionType::REQUIRED);
 2: Run test_compute_stats and grab the created Parquet file for
    alltypes_parquet table.
+
+binary_decimal_dictionary.parquet,
+binary_decimal_no_dictionary.parquet:
+Generated using parquet-mr and contents verified using parquet-tools-1.9.1.
+Contains decimals stored as variable sized BYTE_ARRAY with both dictionary
+and non-dictionary encoding respectively.
+

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/testdata/data/binary_decimal_dictionary.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/binary_decimal_dictionary.parquet b/testdata/data/binary_decimal_dictionary.parquet
new file mode 100644
index 0000000..621ed40
Binary files /dev/null and b/testdata/data/binary_decimal_dictionary.parquet differ

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/testdata/data/binary_decimal_no_dictionary.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/binary_decimal_no_dictionary.parquet b/testdata/data/binary_decimal_no_dictionary.parquet
new file mode 100644
index 0000000..3b8b096
Binary files /dev/null and b/testdata/data/binary_decimal_no_dictionary.parquet differ

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-formats.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-formats.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-formats.test
new file mode 100644
index 0000000..3c54aa1
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-decimal-formats.test
@@ -0,0 +1,25 @@
+====
+---- QUERY
+select * from decimal_encodings;
+---- RESULTS
+0.00,0.00,0.00
+255.00,255.00,255.00
+65535.00,65535.00,65535.00
+9999999.99,9999999.99,9999999.99
+0.00,9999999999999999.99,999999999999999999999999999999999999.99
+-255.00,-255.00,-255.00
+-65535.00,-65535.00,-65535.00
+-9999999.99,-9999999.99,-9999999.99
+0.00,-9999999999999999.99,-999999999999999999999999999999999999.99
+0.00,0.00,0.00
+255.00,255.00,255.00
+65535.00,65535.00,65535.00
+9999999.99,9999999.99,9999999.99
+0.00,9999999999999999.99,999999999999999999999999999999999999.99
+-255.00,-255.00,-255.00
+-65535.00,-65535.00,-65535.00
+-9999999.99,-9999999.99,-9999999.99
+0.00,-9999999999999999.99,-999999999999999999999999999999999999.99
+---- TYPES
+DECIMAL,DECIMAL,DECIMAL
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/94236ff2/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 954830a..17b9503 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -589,6 +589,24 @@ class TestParquet(ImpalaTestSuite):
     self.run_test_case('QueryTest/parquet-resolution-by-name', vector,
                        use_db=unique_database)
 
+  def test_decimal_encodings(self, vector, unique_database):
+    # Create a table using an existing data file with dictionary-encoded, variable-length
+    # physical encodings for decimals.
+    TABLE_NAME = "decimal_encodings"
+    self.client.execute('''create table if not exists %s.%s
+    (small_dec decimal(9,2), med_dec decimal(18,2), large_dec decimal(38,2))
+    STORED AS PARQUET''' % (unique_database, TABLE_NAME))
+
+    table_loc = get_fs_path(
+      "/test-warehouse/%s.db/%s" % (unique_database, TABLE_NAME))
+    for file_name in ["binary_decimal_dictionary.parquet",
+                      "binary_decimal_no_dictionary.parquet"]:
+      data_file_path = os.path.join(os.environ['IMPALA_HOME'],
+                                    "testdata/data/", file_name)
+      check_call(['hdfs', 'dfs', '-copyFromLocal', data_file_path, table_loc])
+
+    self.run_test_case('QueryTest/parquet-decimal-formats', vector, unique_database)
+
 # We use various scan range lengths to exercise corner cases in the HDFS scanner more
 # thoroughly. In particular, it will exercise:
 # 1. default scan range


Mime
View raw message