impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject incubator-impala git commit: IMPALA-3745: parquet invalid data handling
Date Thu, 16 Jun 2016 04:34:06 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 2dad444c8 -> 547be27e7


IMPALA-3745: parquet invalid data handling

Added checks/error handling:
* Negative string lengths while decoding dictionary or data page.
* Buffer overruns while decoding dictionary or data page.
* Some metadata FILECHECKs were converted to statuses.

Testing:
Unit tests for:
* decoding of strings with negative lengths
* truncation of all parquet types
* dictionary creation correctly handling error returns from Decode().

End-to-end tests for handling of negative string lengths in
dictionary- and plain-encoded data in corrupt files, and for
handling of buffer overruns for string data. The corrupted
parquet files were generated by hacking Impala's parquet
writer to write invalid lengths, and by hacking it to
write plain-encoded data instead of dictionary-encoded
data by default.

Performance:
set num_nodes=1;
set num_scanner_threads=1;
select * from biglineitem where l_orderkey = -1;

I inspected MaterializeTupleTime. Before the average was 8.24s and after
was 8.36s (a 1.4% slowdown, within the standard deviation of 1.8%).

Change-Id: Id565a2ccb7b82f9f92cc3b07f05642a3a835bece
Reviewed-on: http://gerrit.cloudera.org:8080/3387
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/547be27e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/547be27e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/547be27e

Branch: refs/heads/master
Commit: 547be27e77441c50db068aa51f19d87b3a2fd3af
Parents: 2dad444
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Wed Jun 15 10:17:12 2016 -0700
Committer: Tim Armstrong <tarmstrong@cloudera.com>
Committed: Wed Jun 15 21:33:39 2016 -0700

----------------------------------------------------------------------
 be/src/exec/data-source-scan-node.cc            |  19 ++--
 be/src/exec/hdfs-parquet-scanner.cc             |  93 ++++++++++++++-----
 be/src/exec/parquet-common.h                    |  63 ++++++++-----
 be/src/exec/parquet-plain-test.cc               |  38 +++++++-
 be/src/util/dict-encoding.h                     |  32 +++----
 be/src/util/dict-test.cc                        |  33 ++++++-
 common/thrift/generate_error_codes.py           |  12 ++-
 testdata/bad_parquet_data/README                |  10 ++
 .../dict-encoded-negative-len.parq              | Bin 0 -> 246 bytes
 .../dict-encoded-out-of-bounds.parq             | Bin 0 -> 246 bytes
 .../plain-encoded-negative-len.parq             | Bin 0 -> 242 bytes
 .../plain-encoded-out-of-bounds.parq            | Bin 0 -> 242 bytes
 testdata/bin/create-load-data.sh                |  11 +++
 .../functional/functional_schema_template.sql   |  16 ++++
 .../datasets/functional/schema_constraints.csv  |   2 +
 .../QueryTest/parquet-continue-on-error.test    |  18 ++++
 .../parquet-corrupt-rle-counts-abort.test       |   2 +-
 .../QueryTest/parquet-corrupt-rle-counts.test   |   2 +-
 tests/query_test/test_scanners.py               |   2 +-
 19 files changed, 272 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index 76e86e9..4543125 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -168,20 +168,25 @@ inline Status SetDecimalVal(const ColumnType& type, char* bytes,
int len,
   switch (type.GetByteSize()) {
     case 4: {
       Decimal4Value* val = reinterpret_cast<Decimal4Value*>(slot);
-      if (len > sizeof(Decimal4Value)) return Status(ERROR_INVALID_DECIMAL);
-      // TODO: Move Decode() to a more generic utils class (here and below)
-      ParquetPlainEncoder::Decode(buffer, len, val);
+      if (UNLIKELY(len > sizeof(Decimal4Value) ||
+          ParquetPlainEncoder::Decode(buffer, buffer + len, len, val) < 0)) {
+        return Status(ERROR_INVALID_DECIMAL);
+      }
     }
     case 8: {
       Decimal8Value* val = reinterpret_cast<Decimal8Value*>(slot);
-      if (len > sizeof(Decimal8Value)) return Status(ERROR_INVALID_DECIMAL);
-      ParquetPlainEncoder::Decode(buffer, len, val);
+      if (UNLIKELY(len > sizeof(Decimal8Value) ||
+          ParquetPlainEncoder::Decode(buffer, buffer + len, len, val) < 0)) {
+        return Status(ERROR_INVALID_DECIMAL);
+      }
       break;
     }
     case 16: {
       Decimal16Value* val = reinterpret_cast<Decimal16Value*>(slot);
-      if (len > sizeof(Decimal16Value)) return Status(ERROR_INVALID_DECIMAL);
-      ParquetPlainEncoder::Decode(buffer, len, val);
+      if (UNLIKELY(len > sizeof(Decimal16Value) ||
+          ParquetPlainEncoder::Decode(buffer, buffer + len, len, val) < 0)) {
+        return Status(ERROR_INVALID_DECIMAL);
+      }
       break;
     }
     default: DCHECK(false);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 1713af6..fa64f84 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -567,6 +567,7 @@ class HdfsParquetScanner::BaseScalarColumnReader :
 
     num_buffered_values_ = 0;
     data_ = NULL;
+    data_end_ = NULL;
     stream_ = stream;
     metadata_ = metadata;
     num_values_read_ = 0;
@@ -614,6 +615,9 @@ class HdfsParquetScanner::BaseScalarColumnReader :
   /// Pointer to start of next value in data page
   uint8_t* data_;
 
+  /// End of the data page.
+  const uint8_t* data_end_;
+
   /// Decoder for definition levels.
   LevelDecoder def_levels_;
 
@@ -654,9 +658,11 @@ class HdfsParquetScanner::BaseScalarColumnReader :
   template <bool ADVANCE_REP_LEVEL>
   bool NextLevels();
 
-  /// Creates a dictionary decoder from values/size and store in class. Subclass must
-  /// implement this.
-  virtual DictDecoderBase* CreateDictionaryDecoder(uint8_t* values, int size) = 0;
+  /// Creates a dictionary decoder from values/size. 'decoder' is set to point to a
+  /// dictionary decoder stored in this object. Subclass must implement this. Returns
+  /// an error status if the dictionary values could not be decoded successfully.
+  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
+      DictDecoderBase** decoder) = 0;
 
   /// Return true if the column has an initialized dictionary decoder. Subclass must
   /// implement this.
@@ -880,10 +886,15 @@ class HdfsParquetScanner::ScalarColumnReader :
     return true;
   }
 
-  virtual DictDecoderBase* CreateDictionaryDecoder(uint8_t* values, int size) {
-    dict_decoder_.Reset(values, size, fixed_len_size_);
+  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
+      DictDecoderBase** decoder) {
+    if (!dict_decoder_.Reset(values, size, fixed_len_size_)) {
+        return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
+            slot_desc_->type().DebugString(), "could not decode dictionary");
+    }
     dict_decoder_init_ = true;
-    return &dict_decoder_;
+    *decoder = &dict_decoder_;
+    return Status::OK();
   }
 
   virtual bool HasDictionaryDecoder() {
@@ -935,7 +946,13 @@ class HdfsParquetScanner::ScalarColumnReader :
       }
     } else {
       DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN);
-      data_ += ParquetPlainEncoder::Decode<T>(data_, fixed_len_size_, val_ptr);
+      int encoded_len =
+          ParquetPlainEncoder::Decode<T>(data_, data_end_, fixed_len_size_, val_ptr);
+      if (UNLIKELY(encoded_len < 0)) {
+        SetPlainDecodeError();
+        return false;
+      }
+      data_ += encoded_len;
     }
     if (UNLIKELY(NeedsConversion() &&
             !ConvertSlot(&val, reinterpret_cast<T*>(slot), pool))) {
@@ -961,7 +978,12 @@ class HdfsParquetScanner::ScalarColumnReader :
   /// Pull out slow-path Status construction code from ReadRepetitionLevel()/
   /// ReadDefinitionLevel() for performance.
   void __attribute__((noinline)) SetDictDecodeError() {
-    parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename());
+    parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename(),
+        slot_desc_->type().DebugString(), stream_->file_offset());
+  }
+  void __attribute__((noinline)) SetPlainDecodeError() {
+    parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE, filename(),
+        slot_desc_->type().DebugString(), stream_->file_offset());
   }
 
   /// Dictionary decoder for decoding column values.
@@ -1046,10 +1068,11 @@ class HdfsParquetScanner::BoolColumnReader :
   }
 
  protected:
-  virtual DictDecoderBase* CreateDictionaryDecoder(uint8_t* values, int size) {
+  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
+      DictDecoderBase** decoder) {
     DCHECK(false) << "Dictionary encoding is not supported for bools. Should never
"
                   << "have gotten this far.";
-    return NULL;
+    return Status::OK();
   }
 
   virtual bool HasDictionaryDecoder() {
@@ -1414,6 +1437,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
       }
 
       if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
+      data_end_ = data_ + data_size;
 
       uint8_t* dict_values = NULL;
       if (decompressor_.get() != NULL) {
@@ -1427,9 +1451,18 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
         RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_,
             &uncompressed_size, &dict_values));
         VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size;
+        if (current_page_header_.uncompressed_page_size != uncompressed_size) {
+          return Status(Substitute("Error decompressing dictionary page in file '$0'. "
+              "Expected $1 uncompressed bytes but got $2", filename(),
+              current_page_header_.uncompressed_page_size, uncompressed_size));
+        }
         data_size = uncompressed_size;
       } else {
-        FILE_CHECK_EQ(data_size, current_page_header_.uncompressed_page_size);
+        if (current_page_header_.uncompressed_page_size != data_size) {
+          return Status(Substitute("Error reading dictionary page in file '$0'. "
+              "Expected $1 bytes but got $2", filename(),
+              current_page_header_.uncompressed_page_size, data_size));
+        }
         // Copy dictionary from io buffer (which will be recycled as we read
         // more data) to a new buffer
         dict_values = parent_->dictionary_pool_->TryAllocate(data_size);
@@ -1442,11 +1475,13 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage()
{
         memcpy(dict_values, data_, data_size);
       }
 
-      DictDecoderBase* dict_decoder = CreateDictionaryDecoder(dict_values, data_size);
+      DictDecoderBase* dict_decoder;
+      RETURN_IF_ERROR(CreateDictionaryDecoder(dict_values, data_size, &dict_decoder));
       if (dict_header != NULL &&
           dict_header->num_values != dict_decoder->num_entries()) {
-        return Status(Substitute(
-            "Invalid dictionary. Expected $0 entries but data contained $1 entries",
+        return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
+            slot_desc_->type().DebugString(),
+            Substitute("Expected $0 entries but data contained $1 entries",
             dict_header->num_values, dict_decoder->num_entries()));
       }
       // Done with dictionary page, read next page
@@ -1463,6 +1498,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
     // TODO: when we start using page statistics, we will need to ignore certain corrupt
     // statistics. See IMPALA-2208 and PARQUET-251.
     if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
+    data_end_ = data_ + data_size;
     num_buffered_values_ = current_page_header_.data_page_header.num_values;
     num_values_read_ += num_buffered_values_;
 
@@ -1481,12 +1517,21 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage()
{
           &decompressed_buffer));
       VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size
                 << " to " << uncompressed_size;
-      FILE_CHECK_EQ(current_page_header_.uncompressed_page_size, uncompressed_size);
+      if (current_page_header_.uncompressed_page_size != uncompressed_size) {
+        return Status(Substitute("Error decompressing data page in file '$0'. "
+            "Expected $1 uncompressed bytes but got $2", filename(),
+            current_page_header_.uncompressed_page_size, uncompressed_size));
+      }
       data_ = decompressed_buffer;
       data_size = current_page_header_.uncompressed_page_size;
+      data_end_ = data_ + data_size;
     } else {
       DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
-      FILE_CHECK_EQ(current_page_header_.compressed_page_size, uncompressed_size);
+      if (current_page_header_.compressed_page_size != uncompressed_size) {
+        return Status(Substitute("Error reading data page in file '$0'. "
+            "Expected $1 bytes but got $2", filename(),
+            current_page_header_.compressed_page_size, uncompressed_size));
+      }
     }
 
     // Initialize the repetition level data
@@ -1530,7 +1575,7 @@ Status HdfsParquetScanner::LevelDecoder::Init(const string& filename,
         return status;
       }
       if (num_bytes < 0) {
-        return Status(TErrorCode::PARQUET_CORRUPT_VALUE, filename, num_bytes);
+        return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, num_bytes);
       }
       int bit_width = Bits::Log2Ceiling64(max_level + 1);
       Reset(*data, num_bytes, bit_width);
@@ -2241,7 +2286,7 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
 
   // Make sure footer has enough bytes to contain the required information.
   if (remaining_bytes_buffered < 0) {
-    return Status(Substitute("File '$0' is invalid.  Missing metadata.", filename()));
+    return Status(Substitute("File '$0' is invalid. Missing metadata.", filename()));
   }
 
   // Validate magic file bytes are correct.
@@ -2839,8 +2884,9 @@ Status HdfsParquetScanner::InitColumns(
 
     // TODO: this will need to change when we have co-located files and the columns
     // are different files.
-    if (!col_chunk.file_path.empty()) {
-      FILE_CHECK_EQ(col_chunk.file_path, string(filename()));
+    if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
+      return Status(Substitute("Expected parquet column file path '$0' to match "
+          "filename '$1'", col_chunk.file_path, filename()));
     }
 
     const DiskIoMgr::ScanRange* split_range =
@@ -3150,8 +3196,11 @@ Status HdfsParquetScanner::ValidateEndOfRowGroup(
     // All readers should have exhausted the final data page. This could fail if one
     // column has more values than stated in the metadata, meaning the final data page
     // will still have unread values.
-    // TODO for 2.3: make this a bad status
-    FILE_CHECK_EQ(reader->num_buffered_values_, 0);
+    if (reader->num_buffered_values_ != 0) {
+      return Status(Substitute("Corrupt parquet metadata in file '$0': metadata reports "
+          "'$1' more values in data page than actually present", filename(),
+          reader->num_buffered_values_));
+    }
     // Sanity check that the num_values_read_ value is the same for all readers. All
     // readers should have been advanced in lockstep (the above check is more likely to
     // fail if this not the case though, since num_values_read_ is only updated at the end

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/be/src/exec/parquet-common.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h
index d346794..18a67a9 100644
--- a/be/src/exec/parquet-common.h
+++ b/be/src/exec/parquet-common.h
@@ -16,6 +16,7 @@
 #ifndef IMPALA_EXEC_PARQUET_COMMON_H
 #define IMPALA_EXEC_PARQUET_COMMON_H
 
+#include "common/compiler-util.h"
 #include "gen-cpp/Descriptors_types.h"
 #include "gen-cpp/parquet_types.h"
 #include "runtime/decimal-value.h"
@@ -169,14 +170,17 @@ class ParquetPlainEncoder {
     return ByteSize(t);
   }
 
-  /// Decodes t from buffer. Returns the number of bytes read.  Buffer need
-  /// not be aligned.
-  /// For types that are stored as FIXED_LEN_BYTE_ARRAY, fixed_len_size is the size
-  /// of the object. Otherwise, it is unused.
+  /// Decodes t from 'buffer', reading up to the byte before 'buffer_end'. 'buffer'
+  /// need not be aligned. For types that are stored as FIXED_LEN_BYTE_ARRAY,
+  /// 'fixed_len_size' is the size of the object. Otherwise, it is unused.
+  /// Returns the number of bytes read or -1 if the value was not decoded successfully.
   template<typename T>
-  static int Decode(uint8_t* buffer, int fixed_len_size, T* v) {
-    memcpy(v, buffer, ByteSize(*v));
-    return ByteSize(*v);
+  static int Decode(uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+      T* v) {
+    int byte_size = ByteSize(*v);
+    if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
+    memcpy(v, buffer, byte_size);
+    return byte_size;
   }
 
   /// Encode 't', which must be in the machine endian, to FIXED_LEN_BYTE_ARRAY
@@ -186,7 +190,8 @@ class ParquetPlainEncoder {
 
   /// Decodes into v assuming buffer is encoded using FIXED_LEN_BYTE_ARRAY of
   /// 'fixed_len_size'. The bytes in buffer must be big endian and the result stored in
-  /// v is the machine endian format.
+  /// v is the machine endian format. The caller is responsible for ensuring that
+  /// 'buffer' is at least 'fixed_len_size' bytes long.
   template<typename T>
   static int DecodeFromFixedLenByteArray(uint8_t* buffer, int fixed_len_size, T* v);
 };
@@ -194,7 +199,8 @@ class ParquetPlainEncoder {
 /// Disable for bools. Plain encoding is not used for booleans.
 template<> int ParquetPlainEncoder::ByteSize(const bool& b);
 template<> int ParquetPlainEncoder::Encode(uint8_t*, int fixed_len_size, const bool&);
-template<> int ParquetPlainEncoder::Decode(uint8_t*, int fixed_len_size, bool* v);
+template<> int ParquetPlainEncoder::Decode(uint8_t*, const uint8_t*, int fixed_len_size,
+    bool* v);
 
 /// Not used for decimals since the plain encoding encodes them using
 /// FIXED_LEN_BYTE_ARRAY.
@@ -228,14 +234,20 @@ inline int ParquetPlainEncoder::ByteSize(const TimestampValue& v)
{
 }
 
 template<>
-inline int ParquetPlainEncoder::Decode(uint8_t* buffer, int fixed_len_size, int8_t* v) {
+inline int ParquetPlainEncoder::Decode(uint8_t* buffer, const uint8_t* buffer_end,
+    int fixed_len_size, int8_t* v) {
+  int byte_size = ByteSize(*v);
+  if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
   *v = *buffer;
-  return ByteSize(*v);
+  return byte_size;
 }
 template<>
-inline int ParquetPlainEncoder::Decode(uint8_t* buffer, int fixed_len_size, int16_t* v) {
+inline int ParquetPlainEncoder::Decode(uint8_t* buffer, const uint8_t* buffer_end,
+    int fixed_len_size, int16_t* v) {
+  int byte_size = ByteSize(*v);
+  if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
   memcpy(v, buffer, sizeof(int16_t));
-  return ByteSize(*v);
+  return byte_size;
 }
 
 template<>
@@ -264,13 +276,15 @@ inline int ParquetPlainEncoder::Encode(
 
 template<>
 inline int ParquetPlainEncoder::Decode(
-    uint8_t* buffer, int fixed_len_size, StringValue* v) {
+    uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, StringValue* v) {
+  if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1;
   memcpy(&v->len, buffer, sizeof(int32_t));
+  int byte_size = ByteSize(*v);
+  if (UNLIKELY(v->len < 0 || buffer_end - buffer < byte_size)) return -1;
   v->ptr = reinterpret_cast<char*>(buffer) + sizeof(int32_t);
-  int bytesize = ByteSize(*v);
   if (fixed_len_size > 0) v->len = std::min(v->len, fixed_len_size);
-  // we still read bytesize bytes, even if we truncate
-  return bytesize;
+  // we still read byte_size bytes, even if we truncate
+  return byte_size;
 }
 
 /// Write decimals as big endian (byte comparable) to benefit from common prefixes.
@@ -300,22 +314,25 @@ inline int ParquetPlainEncoder::Encode(
 }
 
 template<>
-inline int ParquetPlainEncoder::Decode(
-    uint8_t* buffer, int fixed_len_size, Decimal4Value* v) {
+inline int ParquetPlainEncoder::Decode(uint8_t* buffer, const uint8_t* buffer_end,
+    int fixed_len_size, Decimal4Value* v) {
+  if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1;
   DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
   return fixed_len_size;
 }
 
 template<>
-inline int ParquetPlainEncoder::Decode(
-    uint8_t* buffer, int fixed_len_size, Decimal8Value* v) {
+inline int ParquetPlainEncoder::Decode(uint8_t* buffer, const uint8_t* buffer_end,
+    int fixed_len_size, Decimal8Value* v) {
+  if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1;
   DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
   return fixed_len_size;
 }
 
 template<>
-inline int ParquetPlainEncoder::Decode(
-    uint8_t* buffer, int fixed_len_size, Decimal16Value* v) {
+inline int ParquetPlainEncoder::Decode(uint8_t* buffer, const uint8_t* buffer_end,
+    int fixed_len_size, Decimal16Value* v) {
+  if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1;
   DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
   return fixed_len_size;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/be/src/exec/parquet-plain-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-plain-test.cc b/be/src/exec/parquet-plain-test.cc
index 6bfdd7a..351d6f7 100644
--- a/be/src/exec/parquet-plain-test.cc
+++ b/be/src/exec/parquet-plain-test.cc
@@ -26,6 +26,26 @@
 
 namespace impala {
 
+/// Test that the decoder fails when asked to decode a truncated value.
+template <typename T>
+void TestTruncate(const T& v, int expected_byte_size) {
+  uint8_t buffer[expected_byte_size];
+  int encoded_size = ParquetPlainEncoder::Encode(buffer, expected_byte_size, v);
+  EXPECT_EQ(encoded_size, expected_byte_size);
+
+  // Check all possible truncations of the buffer.
+  for (int truncated_size = encoded_size - 1; truncated_size >= 0; --truncated_size) {
+    T result;
+    /// Copy to heap-allocated buffer so that ASAN can detect buffer overruns.
+    uint8_t* truncated_buffer = new uint8_t[truncated_size];
+    memcpy(truncated_buffer, buffer, truncated_size);
+    int decoded_size = ParquetPlainEncoder::Decode(truncated_buffer,
+        truncated_buffer + truncated_size, expected_byte_size, &result);
+    EXPECT_EQ(-1, decoded_size);
+    delete[] truncated_buffer;
+  }
+}
+
 template <typename T>
 void TestType(const T& v, int expected_byte_size) {
   uint8_t buffer[expected_byte_size];
@@ -33,9 +53,12 @@ void TestType(const T& v, int expected_byte_size) {
   EXPECT_EQ(encoded_size, expected_byte_size);
 
   T result;
-  int decoded_size = ParquetPlainEncoder::Decode(buffer, expected_byte_size, &result);
+  int decoded_size = ParquetPlainEncoder::Decode(buffer, buffer + expected_byte_size,
+      expected_byte_size, &result);
   EXPECT_EQ(decoded_size, expected_byte_size);
   EXPECT_EQ(result, v);
+
+  TestTruncate(v, expected_byte_size);
 }
 
 TEST(PlainEncoding, Basic) {
@@ -121,6 +144,19 @@ TEST(PlainEncoding, DecimalBigEndian) {
   ASSERT_EQ(memcmp(result_buffer, buffer_swapped + 16 - sizeof(d16), sizeof(d16)), 0);
 }
 
+/// Test that corrupt strings are handled correctly.
+TEST(PlainEncoding, CorruptString) {
+  // Test string with negative length.
+  uint8_t buffer[sizeof(int32_t) + 10];
+  int32_t len = -10;
+  memcpy(buffer, &len, sizeof(int32_t));
+
+  StringValue result;
+  int decoded_size =
+      ParquetPlainEncoder::Decode(buffer, buffer + sizeof(buffer), 0, &result);
+  EXPECT_EQ(decoded_size, -1);
+}
+
 }
 
 int main(int argc, char **argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index 795a6b1..a79a974 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -184,19 +184,18 @@ class DictDecoderBase {
 template<typename T>
 class DictDecoder : public DictDecoderBase {
  public:
-  /// The input buffer containing the dictionary.  'dict_len' is the byte length
-  /// of dict_buffer.
+  /// Construct empty dictionary.
+  DictDecoder() {}
+
+  /// Initialize the decoder with an input buffer containing the dictionary.
+  /// 'dict_len' is the byte length of dict_buffer.
   /// For string data, the decoder returns StringValues with data directly from
   /// dict_buffer (i.e. no copies).
   /// fixed_len_size is the size that must be passed to decode fixed-length
   /// dictionary values (values stored using FIXED_LEN_BYTE_ARRAY).
-  DictDecoder(uint8_t* dict_buffer, int dict_len, int fixed_len_size);
-
-  /// Construct empty dictionary.
-  DictDecoder() {}
-
-  /// Reset decoder to fresh state.
-  void Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size);
+  /// Returns true if the dictionary values were all successfully decoded, or false
+  /// if the dictionary was corrupt.
+  bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size);
 
   virtual int num_entries() const { return dict_.size(); }
 
@@ -312,22 +311,19 @@ inline int DictEncoderBase::WriteData(uint8_t* buffer, int buffer_len)
{
 }
 
 template<typename T>
-inline DictDecoder<T>::DictDecoder(uint8_t* dict_buffer, int dict_len,
-    int fixed_len_size) {
-  Reset(dict_buffer, dict_len, fixed_len_size);
-}
-
-template<typename T>
-inline void DictDecoder<T>::Reset(uint8_t* dict_buffer, int dict_len,
+inline bool DictDecoder<T>::Reset(uint8_t* dict_buffer, int dict_len,
     int fixed_len_size) {
   dict_.clear();
   uint8_t* end = dict_buffer + dict_len;
   while (dict_buffer < end) {
     T value;
-    dict_buffer +=
-        ParquetPlainEncoder::Decode(dict_buffer, fixed_len_size, &value);
+    int decoded_len =
+        ParquetPlainEncoder::Decode(dict_buffer, end, fixed_len_size, &value);
+    if (UNLIKELY(decoded_len < 0)) return false;
+    dict_buffer += decoded_len;
     dict_.push_back(value);
   }
+  return true;
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index 1fb8eaa..3b04853 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -47,8 +47,9 @@ void ValidateDict(const vector<T>& values, int fixed_buffer_byte_size)
{
   EXPECT_GT(data_len, 0);
   encoder.ClearIndices();
 
-  DictDecoder<T> decoder(
-      dict_buffer, encoder.dict_encoded_size(), fixed_buffer_byte_size);
+  DictDecoder<T> decoder;
+  ASSERT_TRUE(
+      decoder.Reset(dict_buffer, encoder.dict_encoded_size(), fixed_buffer_byte_size));
   decoder.SetData(data_buffer, data_len);
   for (T i: values) {
     T j;
@@ -130,17 +131,41 @@ TEST(DictTest, TestNumbers) {
   TestNumbers<float>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_FLOAT)));
   TestNumbers<double>(ParquetPlainEncoder::ByteSize(ColumnType(TYPE_DOUBLE)));
 
-  for (int i = 1; i <=16; ++i) {
+  for (int i = 1; i <= 16; ++i) {
     if (i <= 4) TestNumbers<Decimal4Value>(i);
     if (i <= 8) TestNumbers<Decimal8Value>(i);
     TestNumbers<Decimal16Value>(i);
   }
 }
 
+TEST(DictTest, TestInvalidStrings) {
+  uint8_t buffer[sizeof(int32_t) + 10];
+  int32_t len = -10;
+  memcpy(buffer, &len, sizeof(int32_t));
+
+  // Test a dictionary with a string encoded with negative length. Initializing
+  // the decoder should fail.
+  DictDecoder<StringValue> decoder;
+  ASSERT_FALSE(decoder.Reset(buffer, sizeof(buffer), 0));
+}
+
+TEST(DictTest, TestStringBufferOverrun) {
+  // Test string length past end of buffer.
+  uint8_t buffer[sizeof(int32_t) + 10];
+  int32_t len = 100;
+  memcpy(buffer, &len, sizeof(int32_t));
+
+  // Initializing the dictionary should fail, since the string would reference
+  // invalid memory.
+  DictDecoder<StringValue> decoder;
+  ASSERT_FALSE(decoder.Reset(buffer, sizeof(buffer), 0));
+}
+
+
 }
 
 int main(int argc, char **argv) {
   ::testing::InitGoogleTest(&argc, argv);
-  impala::InitCommonRuntime(argc, argv, true);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
   return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 0b6f5cf..7815233 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -119,7 +119,7 @@ error_codes = (
   ("UDF_VERIFY_FAILED", 33,
    "Failed to verify function $0 from LLVM module $1, see log for more details."),
 
-  ("PARQUET_CORRUPT_VALUE", 34, "File $0 corrupt. RLE level data bytes = $1"),
+  ("PARQUET_CORRUPT_RLE_BYTES", 34, "File $0 corrupt. RLE level data bytes = $1"),
 
   ("AVRO_DECIMAL_RESOLUTION_ERROR", 35, "Column '$0' has conflicting Avro decimal types.
"
    "Table schema $1: $2, file schema $1: $3"),
@@ -154,8 +154,8 @@ error_codes = (
   ("PARQUET_NUM_COL_VALS_ERROR", 45, "Mismatched number of values in column index $0 "
    "($1 vs. $2). file=$3"),
 
-  ("PARQUET_DICT_DECODE_FAILURE", 46, "Failed to decode dictionary-encoded value. "
-   "file=$0"),
+  ("PARQUET_DICT_DECODE_FAILURE", 46, "File '$0' is corrupt: error decoding "
+   "dictionary-encoded value of type $1 at offset $2"),
 
   ("SSL_PASSWORD_CMD_FAILED", 47,
    "SSL private-key password command ('$0') failed with error: $1"),
@@ -268,6 +268,12 @@ error_codes = (
 
   ("SCANNER_STRING_LENGTH_OVERFLOW", 87, "File '$0' could not be read: string $1 was "
     "longer than supported limit of $2 bytes at offset $3"),
+
+  ("PARQUET_CORRUPT_PLAIN_VALUE", 88, "File '$0' is corrupt: error decoding value of type
"
+   "$1 at offset $2"),
+
+  ("PARQUET_CORRUPT_DICTIONARY", 89, "File '$0' is corrupt: error reading dictionary for
"
+   "data of type $1: $2"),
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/testdata/bad_parquet_data/README
----------------------------------------------------------------------
diff --git a/testdata/bad_parquet_data/README b/testdata/bad_parquet_data/README
new file mode 100644
index 0000000..e976c36
--- /dev/null
+++ b/testdata/bad_parquet_data/README
@@ -0,0 +1,10 @@
+These Parquet files were created by modifying Impala's HdfsParquetTableWriter.
+
+String Data
+-----------
+These files have a single nullable string column 's'.
+
+dict-encoded-negative-len.parq: a single dictionary-encoded value with a negative length.
+dict-encoded-out-of-bounds.parq: a single dictionary-encoded value with a length past end
of page.
+plain-encoded-negative-len.parq: a single plain-encoded value with a negative length.
+plain-encoded-out-of-bounds.parq: a single plain-encoded value with a length past end of
page.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/testdata/bad_parquet_data/dict-encoded-negative-len.parq
----------------------------------------------------------------------
diff --git a/testdata/bad_parquet_data/dict-encoded-negative-len.parq b/testdata/bad_parquet_data/dict-encoded-negative-len.parq
new file mode 100644
index 0000000..637c9cd
Binary files /dev/null and b/testdata/bad_parquet_data/dict-encoded-negative-len.parq differ

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/testdata/bad_parquet_data/dict-encoded-out-of-bounds.parq
----------------------------------------------------------------------
diff --git a/testdata/bad_parquet_data/dict-encoded-out-of-bounds.parq b/testdata/bad_parquet_data/dict-encoded-out-of-bounds.parq
new file mode 100644
index 0000000..331535e
Binary files /dev/null and b/testdata/bad_parquet_data/dict-encoded-out-of-bounds.parq differ

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/testdata/bad_parquet_data/plain-encoded-negative-len.parq
----------------------------------------------------------------------
diff --git a/testdata/bad_parquet_data/plain-encoded-negative-len.parq b/testdata/bad_parquet_data/plain-encoded-negative-len.parq
new file mode 100644
index 0000000..e2f46c2
Binary files /dev/null and b/testdata/bad_parquet_data/plain-encoded-negative-len.parq differ

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/testdata/bad_parquet_data/plain-encoded-out-of-bounds.parq
----------------------------------------------------------------------
diff --git a/testdata/bad_parquet_data/plain-encoded-out-of-bounds.parq b/testdata/bad_parquet_data/plain-encoded-out-of-bounds.parq
new file mode 100644
index 0000000..67f925e
Binary files /dev/null and b/testdata/bad_parquet_data/plain-encoded-out-of-bounds.parq differ

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/testdata/bin/create-load-data.sh
----------------------------------------------------------------------
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index af98fc4..abeefab 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -275,6 +275,17 @@ function load-custom-data {
   hadoop fs -put -f ${IMPALA_HOME}/testdata/data/long_page_header.parquet \
                     /test-warehouse/bad_parquet_parquet
 
+  # IMPALA-3732: parquet files with corrupt strings
+  local parq_file
+  for parq_file in dict-encoded-negative-len.parq plain-encoded-negative-len.parq; do
+    hadoop fs -put -f ${IMPALA_HOME}/testdata/bad_parquet_data/$parq_file \
+                    /test-warehouse/bad_parquet_strings_negative_len_parquet
+  done
+  for parq_file in dict-encoded-out-of-bounds.parq plain-encoded-out-of-bounds.parq; do
+    hadoop fs -put -f ${IMPALA_HOME}/testdata/bad_parquet_data/$parq_file \
+                    /test-warehouse/bad_parquet_strings_out_of_bounds_parquet
+  done
+
   # Remove all index files in this partition.
   hadoop fs -rm /test-warehouse/alltypes_text_lzo/year=2009/month=1/*.lzo.index
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/testdata/datasets/functional/functional_schema_template.sql
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 171216f..e14492e 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -1363,6 +1363,22 @@ bad_parquet
 field STRING
 ====
 ---- DATASET
+-- Can't use LOAD DATA LOCAL with Impala so copied in create-load-data.sh.
+functional
+---- BASE_TABLE_NAME
+bad_parquet_strings_negative_len
+---- COLUMNS
+s STRING
+====
+---- DATASET
+-- Can't use LOAD DATA LOCAL with Impala so copied in create-load-data.sh.
+functional
+---- BASE_TABLE_NAME
+bad_parquet_strings_out_of_bounds
+---- COLUMNS
+s STRING
+====
+---- DATASET
 -- IMPALA-2130: Wrong verification of parquet file version
 functional
 ---- BASE_TABLE_NAME

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/testdata/datasets/functional/schema_constraints.csv
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 9251308..a49d445 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -38,6 +38,8 @@ table_name:bad_seq_snap, constraint:restrict_to, table_format:seq/snap/block
 table_name:bad_avro_snap_strings, constraint:restrict_to, table_format:avro/snap/block
 table_name:bad_avro_snap_floats, constraint:restrict_to, table_format:avro/snap/block
 table_name:bad_parquet, constraint:restrict_to, table_format:parquet/none/none
+table_name:bad_parquet_strings_negative_len, constraint:restrict_to, table_format:parquet/none/none
+table_name:bad_parquet_strings_out_of_bounds, constraint:restrict_to, table_format:parquet/none/none
 table_name:bad_magic_number, constraint:restrict_to, table_format:parquet/none/none
 table_name:bad_metadata_len, constraint:restrict_to, table_format:parquet/none/none
 table_name:bad_dict_page_offset, constraint:restrict_to, table_format:parquet/none/none

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/testdata/workloads/functional-query/queries/QueryTest/parquet-continue-on-error.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-continue-on-error.test
b/testdata/workloads/functional-query/queries/QueryTest/parquet-continue-on-error.test
index ebb0220..1a16d75 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet-continue-on-error.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-continue-on-error.test
@@ -83,3 +83,21 @@ bigint
 ---- ERRORS
 Column metadata states there are 11 values, but read 10 values from column id. file: hdfs://regex:.$
 ====
+---- QUERY
+SELECT * from bad_parquet_strings_negative_len
+---- TYPES
+STRING
+---- RESULTS
+---- ERRORS
+row_regex: .*File '.*/plain-encoded-negative-len.parq' is corrupt: error decoding value of
type STRING at offset 58.*
+row_regex: .*File '.*/dict-encoded-negative-len.parq' is corrupt: error reading dictionary
for data of type STRING: could not decode dictionary.*
+====
+---- QUERY
+SELECT * from bad_parquet_strings_out_of_bounds
+---- TYPES
+STRING
+---- RESULTS
+---- ERRORS
+row_regex: .*File '.*/plain-encoded-out-of-bounds.parq' is corrupt: error decoding value
of type STRING at offset 58.*
+row_regex: .*File '.*/dict-encoded-out-of-bounds.parq' is corrupt: error reading dictionary
for data of type STRING: could not decode dictionary.*
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/testdata/workloads/functional-query/queries/QueryTest/parquet-corrupt-rle-counts-abort.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-corrupt-rle-counts-abort.test
b/testdata/workloads/functional-query/queries/QueryTest/parquet-corrupt-rle-counts-abort.test
index e0789ab..c59ff94 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet-corrupt-rle-counts-abort.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-corrupt-rle-counts-abort.test
@@ -4,5 +4,5 @@ select * from bad_rle_counts
 ---- TYPES
 bigint
 ---- CATCH
-Failed to decode dictionary-encoded value.
+bad_rle_repeat_count.parquet' is corrupt: error decoding dictionary-encoded value of type
BIGINT at offset 55
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/testdata/workloads/functional-query/queries/QueryTest/parquet-corrupt-rle-counts.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-corrupt-rle-counts.test
b/testdata/workloads/functional-query/queries/QueryTest/parquet-corrupt-rle-counts.test
index c086278..d58ab85 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet-corrupt-rle-counts.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-corrupt-rle-counts.test
@@ -5,5 +5,5 @@ select * from bad_rle_counts
 bigint
 ---- RESULTS
 ---- ERRORS
-Failed to decode dictionary-encoded value. file: hdfs://regex:.$
+row_regex: .*File '.*/bad_rle_repeat_count.parquet' is corrupt: error decoding dictionary-encoded
value of type BIGINT at offset 55.*
 ====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/547be27e/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 035b64b..878c934 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -218,7 +218,7 @@ class TestParquet(ImpalaTestSuite):
     self.run_test_case('QueryTest/parquet', vector)
 
   @SkipIfOldAggsJoins.nested_types
-  def test_file_metadata_discrepancy(self, vector):
+  def test_corrupt_files(self, vector):
     vector.get_value('exec_option')['abort_on_error'] = 0
     self.run_test_case('QueryTest/parquet-continue-on-error', vector)
     vector.get_value('exec_option')['abort_on_error'] = 1



Mime
View raw message