impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [1/2] incubator-impala git commit: IMPALA-3764, 3914: fuzz test HDFS scanners and fix parquet bugs found
Date Thu, 11 Aug 2016 22:04:22 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 286da5921 -> 9162d5d05


IMPALA-3764,3914: fuzz test HDFS scanners and fix parquet bugs found

This adds a test that performs some simple fuzz testing of HDFS
scanners. It creates a copy of a given HDFS table, with each
file in the table corrupted in a random way: either a single
byte is set to a random value, or the file is truncated to a
random length. It then runs a query that scans the whole table
with several different batch_size settings. I made some effort
to make the failures reproducible by explicitly seeding the
random number generator, and providing a mechanism to override
the seed.

The fuzzer has found crashes resulting from corrupted or truncated
input files for RCFile, SequenceFile, Parquet, and Text LZO so far.
Avro only had a small buffer read overrun detected by ASAN.

Includes fixes for Parquet crashes found by the fuzzer, a small
buffer overrun in Avro, and a DCHECK in MemPool.

Initially it is only enabled for Avro, Parquet, and uncompressed
text. As follow-up work we should fix the bugs in the other scanners
and enable the test for them.

We also don't implement abort_on_error=0 correctly in Parquet:
for some file formats, corrupt headers result in the query being
aborted, so an exception will xfail the test.

Testing:
Ran the test with exploration_strategy=exhaustive in a loop locally
with both DEBUG and ASAN builds for a couple of days over a weekend.
Also ran exhaustive private build.

Change-Id: I50cf43195a7c582caa02c85ae400ea2256fa3a3b
Reviewed-on: http://gerrit.cloudera.org:8080/3833
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5afd9f7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5afd9f7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5afd9f7d

Branch: refs/heads/master
Commit: 5afd9f7df765006c067ef5f57d7f7431fe9e1247
Parents: 286da59
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Tue Aug 2 11:02:02 2016 -0700
Committer: Internal Jenkins <cloudera-hudson@gerrit.cloudera.org>
Committed: Thu Aug 11 08:42:41 2016 +0000

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc            |   5 +-
 be/src/exec/hdfs-parquet-scanner.cc             |  28 ++-
 be/src/exec/parquet-column-readers.cc           |  36 +++-
 be/src/exec/parquet-column-readers.h            |   4 +-
 be/src/exec/parquet-metadata-utils.cc           |  47 +++--
 be/src/exec/parquet-metadata-utils.h            |   9 +
 be/src/runtime/disk-io-mgr.cc                   |  11 +-
 be/src/runtime/scoped-buffer.h                  |  68 +++++++
 be/src/util/bit-stream-utils.h                  |   9 +-
 be/src/util/bit-stream-utils.inline.h           |   7 +-
 be/src/util/compress.cc                         |   5 +
 be/src/util/dict-encoding.h                     |  15 +-
 be/src/util/dict-test.cc                        |   3 +-
 be/src/util/rle-encoding.h                      |  10 +-
 be/src/util/rle-test.cc                         |   2 +-
 .../queries/QueryTest/parquet.test              |   2 +-
 tests/common/impala_test_suite.py               |  10 +
 tests/query_test/test_scanners.py               |   9 -
 tests/query_test/test_scanners_fuzz.py          | 203 +++++++++++++++++++
 19 files changed, 427 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 5ac7954..5429e04 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -200,7 +200,10 @@ Status BaseSequenceScanner::ReadSync() {
   uint8_t* hash;
   int64_t out_len;
   RETURN_IF_FALSE(stream_->GetBytes(SYNC_HASH_SIZE, &hash, &out_len, &parse_status_));
-  if (out_len != SYNC_HASH_SIZE || memcmp(hash, header_->sync, SYNC_HASH_SIZE)) {
+  if (out_len != SYNC_HASH_SIZE) {
+    return Status(Substitute("Hit end of stream after reading $0 bytes of $1-byte "
+        "synchronization marker", out_len, SYNC_HASH_SIZE));
+  } else if (memcmp(hash, header_->sync, SYNC_HASH_SIZE) != 0) {
     stringstream ss;
     ss  << "Bad synchronization marker" << endl
         << "  Expected: '"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 5582267..b0fd008 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -35,6 +35,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter.inline.h"
+#include "runtime/scoped-buffer.h"
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
 #include "runtime/string-value.h"
@@ -78,6 +79,7 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNode* scan_node,
     // Compute the offset of the file footer.
     int64_t footer_size = min(FOOTER_SIZE, files[i]->file_length);
     int64_t footer_start = files[i]->file_length - footer_size;
+    DCHECK_GE(footer_start, 0);
 
     // Try to find the split with the footer.
     DiskIoMgr::ScanRange* footer_split = FindFooterSplit(files[i]);
@@ -311,6 +313,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch, bool*
eos) {
       return Status::OK();
     }
     assemble_rows_timer_.Start();
+    DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
     int rows_remaining = file_metadata_.num_rows - row_group_rows_read_;
     int max_tuples = min(row_batch->capacity(), rows_remaining);
     TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
@@ -496,7 +499,14 @@ Status HdfsParquetScanner::AssembleRows(
         return Status::OK();
       }
       // Check that all column readers populated the same number of values.
-      if (c != 0) DCHECK_EQ(last_num_tuples, scratch_batch_->num_tuples);
+      if (c != 0 && UNLIKELY(last_num_tuples != scratch_batch_->num_tuples)) {
+        parse_status_.MergeStatus(Substitute("Corrupt Parquet file '$0': column '$1' "
+            "had $2 remaining values but expected $3", filename(),
+            col_reader->schema_element().name, last_num_tuples,
+            scratch_batch_->num_tuples));
+        *skip_row_group = true;
+        return Status::OK();
+      }
       last_num_tuples = scratch_batch_->num_tuples;
     }
     row_group_rows_read_ += scratch_batch_->num_tuples;
@@ -788,7 +798,7 @@ Status HdfsParquetScanner::ProcessFooter() {
   uint8_t* metadata_ptr = metadata_size_ptr - metadata_size;
   // If the metadata was too big, we need to stitch it before deserializing it.
   // In that case, we stitch the data in this buffer.
-  vector<uint8_t> metadata_buffer;
+  ScopedBuffer metadata_buffer(scan_node_->mem_tracker());
 
   DCHECK(metadata_range_ != NULL);
   if (UNLIKELY(metadata_size > remaining_bytes_buffered)) {
@@ -803,7 +813,7 @@ Status HdfsParquetScanner::ProcessFooter() {
       sizeof(int32_t) - sizeof(PARQUET_VERSION_NUMBER) - metadata_size;
     int64_t metadata_bytes_to_read = metadata_size;
     if (metadata_start < 0) {
-      return Status(Substitute("File $0 is invalid. Invalid metadata size in file "
+      return Status(Substitute("File '$0' is invalid. Invalid metadata size in file "
           "footer: $1 bytes. File size: $2 bytes.", filename(), metadata_size,
           file_desc->file_length));
     }
@@ -812,8 +822,12 @@ Status HdfsParquetScanner::ProcessFooter() {
     // TODO: consider moving this stitching into the scanner context. The scanner
     // context usually handles the stitching but no other scanner need this logic
     // now.
-    metadata_buffer.resize(metadata_size);
-    metadata_ptr = &metadata_buffer[0];
+
+    if (!metadata_buffer.TryAllocate(metadata_size)) {
+      return Status(Substitute("Could not allocate buffer of $0 bytes for Parquet "
+          "metadata for file '$1'.", metadata_size, filename()));
+    }
+    metadata_ptr = metadata_buffer.buffer();
     int64_t copy_offset = 0;
     DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr();
 
@@ -856,6 +870,10 @@ Status HdfsParquetScanner::ProcessFooter() {
     return Status(
         Substitute("Invalid file. This file: $0 has no row groups", filename()));
   }
+  if (file_metadata_.num_rows < 0) {
+    return Status(Substitute("Corrupt Parquet file '$0': negative row count $1 in "
+        "file metadata", filename(), file_metadata_.num_rows));
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 3e8f33c..c7e3e17 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -62,6 +62,7 @@ const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed
to al
 Status ParquetLevelDecoder::Init(const string& filename,
     parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
     int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
+  DCHECK_GE(num_buffered_values, 0);
   encoding_ = encoding;
   max_level_ = max_level;
   num_buffered_values_ = num_buffered_values;
@@ -95,7 +96,10 @@ Status ParquetLevelDecoder::Init(const string& filename,
       return Status(ss.str());
     }
   }
-  DCHECK_GT(num_bytes, 0);
+  if (UNLIKELY(num_bytes < 0 || num_bytes > *data_size)) {
+    return Status(Substitute("Corrupt Parquet file '$0': $1 bytes of encoded levels but "
+        "only $2 bytes left in page", filename, num_bytes, data_size));
+  }
   *data += num_bytes;
   *data_size -= num_bytes;
   return Status::OK();
@@ -404,6 +408,8 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   }
 
   virtual Status InitDataPage(uint8_t* data, int size) {
+    // Data can be empty if the column contains all NULLs
+    DCHECK_GE(size, 0);
     page_encoding_ = current_page_header_.data_page_header.encoding;
     if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
         page_encoding_ != parquet::Encoding::PLAIN) {
@@ -419,7 +425,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
       if (!dict_decoder_init_) {
         return Status("File corrupt. Missing dictionary page.");
       }
-      dict_decoder_.SetData(data, size);
+      RETURN_IF_ERROR(dict_decoder_.SetData(data, size));
     }
 
     // TODO: Perform filter selectivity checks here.
@@ -757,6 +763,15 @@ Status BaseScalarColumnReader::ReadDataPage() {
 
     int data_size = current_page_header_.compressed_page_size;
     int uncompressed_size = current_page_header_.uncompressed_page_size;
+    if (UNLIKELY(data_size < 0)) {
+      return Status(Substitute("Corrupt Parquet file '$0': negative page size $1 for "
+          "column '$2'", filename(), data_size, schema_element().name));
+    }
+    if (UNLIKELY(uncompressed_size < 0)) {
+      return Status(Substitute("Corrupt Parquet file '$0': negative uncompressed page "
+          "size $1 for column '$2'", filename(), uncompressed_size,
+          schema_element().name));
+    }
 
     if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
       if (slot_desc_ == NULL) {
@@ -853,7 +868,12 @@ Status BaseScalarColumnReader::ReadDataPage() {
     // statistics. See IMPALA-2208 and PARQUET-251.
     if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
     data_end_ = data_ + data_size;
-    num_buffered_values_ = current_page_header_.data_page_header.num_values;
+    int num_values = current_page_header_.data_page_header.num_values;
+    if (num_values < 0) {
+      return Status(Substitute("Error reading data page in Parquet file '$0'. "
+          "Invalid number of values in metadata: $1", filename(), num_values));
+    }
+    num_buffered_values_ = num_values;
     num_values_read_ += num_buffered_values_;
 
     if (decompressor_.get() != NULL) {
@@ -902,7 +922,7 @@ Status BaseScalarColumnReader::ReadDataPage() {
         max_def_level(), num_buffered_values_, &data_, &data_size));
 
     // Data can be empty if the column contains all NULLs
-    if (data_size != 0) RETURN_IF_ERROR(InitDataPage(data_, data_size));
+    RETURN_IF_ERROR(InitDataPage(data_, data_size));
     break;
   }
 
@@ -920,6 +940,14 @@ bool BaseScalarColumnReader::NextLevels() {
 
   // Definition level is not present if column and any containing structs are required.
   def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel();
+  // The compiler can optimize these two conditions into a single branch by treating
+  // def_level_ as unsigned.
+  if (UNLIKELY(def_level_ < 0 || def_level_ > max_def_level())) {
+    parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file '$0': "
+        "invalid def level $1 > max def level $2 for column '$3'", filename(),
+        def_level_, max_def_level(), schema_element().name)));
+    return false;
+  }
 
   if (ADVANCE_REP_LEVEL && max_rep_level() > 0) {
     // Repetition level is only present if this column is nested in any collection type.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 930e6bb..8435e71 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -428,7 +428,9 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   virtual void ClearDictionaryDecoder() = 0;
 
   /// Initializes the reader with the data contents. This is the content for the entire
-  /// decompressed data page. Decoders can initialize state from here.
+  /// decompressed data page. Decoders can initialize state from here. The caller must
+  /// validate the input such that 'size' is non-negative and that 'data' has at least
+  /// 'size' bytes remaining.
   virtual Status InitDataPage(uint8_t* data, int size) = 0;
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/parquet-metadata-utils.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.cc b/be/src/exec/parquet-metadata-utils.cc
index 52ae933..1b694ed 100644
--- a/be/src/exec/parquet-metadata-utils.cc
+++ b/be/src/exec/parquet-metadata-utils.cc
@@ -56,32 +56,41 @@ Status ParquetMetadataUtils::ValidateColumnOffsets(const string& filename,
     int64_t file_length, const parquet::RowGroup& row_group) {
   for (int i = 0; i < row_group.columns.size(); ++i) {
     const parquet::ColumnChunk& col_chunk = row_group.columns[i];
+    RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length,
+        col_chunk.meta_data.data_page_offset, "data page offset"));
     int64_t col_start = col_chunk.meta_data.data_page_offset;
     // The file format requires that if a dictionary page exists, it be before data pages.
     if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+      RETURN_IF_ERROR(ValidateOffsetInFile(filename, i, file_length,
+            col_chunk.meta_data.dictionary_page_offset, "dictionary page offset"));
       if (col_chunk.meta_data.dictionary_page_offset >= col_start) {
-        stringstream ss;
-        ss << "File " << filename << ": metadata is corrupt. "
-            << "Dictionary page (offset=" << col_chunk.meta_data.dictionary_page_offset
-            << ") must come before any data pages (offset=" << col_start <<
").";
-        return Status(ss.str());
+        return Status(Substitute("Parquet file '$0': metadata is corrupt. Dictionary "
+            "page (offset=$1) must come before any data pages (offset=$2).",
+            filename, col_chunk.meta_data.dictionary_page_offset, col_start));
       }
       col_start = col_chunk.meta_data.dictionary_page_offset;
     }
     int64_t col_len = col_chunk.meta_data.total_compressed_size;
     int64_t col_end = col_start + col_len;
     if (col_end <= 0 || col_end > file_length) {
-      stringstream ss;
-      ss << "File " << filename << ": metadata is corrupt. "
-          << "Column " << i << " has invalid column offsets "
-          << "(offset=" << col_start << ", size=" << col_len <<
", "
-          << "file_size=" << file_length << ").";
-      return Status(ss.str());
+      return Status(Substitute("Parquet file '$0': metadata is corrupt. Column $1 has "
+          "invalid column offsets (offset=$2, size=$3, file_size=$4).", filename, i,
+          col_start, col_len, file_length));
     }
   }
   return Status::OK();
 }
 
+Status ParquetMetadataUtils::ValidateOffsetInFile(const string& filename, int col_idx,
+    int64_t file_length, int64_t offset, const string& offset_name) {
+  if (offset < 0 || offset >= file_length) {
+    return Status(Substitute("File '$0': metadata is corrupt. Column $1 has invalid "
+        "$2 (offset=$3 file_size=$4).", filename, col_idx, offset_name, offset,
+        file_length));
+  }
+  return Status::OK();;
+}
+
 static bool IsEncodingSupported(parquet::Encoding::type e) {
   switch (e) {
     case parquet::Encoding::PLAIN:
@@ -128,8 +137,10 @@ Status ParquetMetadataUtils::ValidateColumn(const parquet::FileMetaData&
file_me
   if (slot_desc == NULL) return Status::OK();
 
   parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[slot_desc->type().type];
-  DCHECK_EQ(type, file_data.meta_data.type)
-      << "Should have been validated in ResolvePath()";
+  if (UNLIKELY(type != file_data.meta_data.type)) {
+    return Status(Substitute("Unexpected Parquet type in file '$0' metadata expected $1 "
+        "actual $2: file may be corrupt", filename, type, file_data.meta_data.type));
+  }
 
   // Check the decimal scale in the file matches the metastore scale and precision.
   // We fail the query if the metadata makes it impossible for us to safely read
@@ -318,7 +329,7 @@ Status ParquetSchemaResolver::CreateSchemaTree(
     int ira_def_level, int* idx, int* col_idx, SchemaNode* node)
     const {
   if (*idx >= schema.size()) {
-    return Status(Substitute("File $0 corrupt: could not reconstruct schema tree from "
+    return Status(Substitute("File '$0' corrupt: could not reconstruct schema tree from "
             "flattened schema in file metadata", filename_));
   }
   node->element = &schema[*idx];
@@ -329,6 +340,14 @@ Status ParquetSchemaResolver::CreateSchemaTree(
     // file_metadata_.row_groups.columns
     node->col_idx = *col_idx;
     ++(*col_idx);
+  } else if (node->element->num_children > SCHEMA_NODE_CHILDREN_SANITY_LIMIT) {
+    // Sanity-check the schema to avoid allocating absurdly large buffers below.
+    return Status(Substitute("Schema in Parquet file '$0' has $1 children, more than limit
of "
+        "$2. File is likely corrupt", filename_, node->element->num_children,
+        SCHEMA_NODE_CHILDREN_SANITY_LIMIT));
+  } else if (node->element->num_children < 0) {
+    return Status(Substitute("Corrupt Parquet file '$0': schema element has $1 children.",
+        filename_, node->element->num_children));
   }
 
   // def_level_of_immediate_repeated_ancestor does not include this node, so set before

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/exec/parquet-metadata-utils.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.h b/be/src/exec/parquet-metadata-utils.h
index 7e1db31..7a1e897 100644
--- a/be/src/exec/parquet-metadata-utils.h
+++ b/be/src/exec/parquet-metadata-utils.h
@@ -39,6 +39,11 @@ class ParquetMetadataUtils {
   static Status ValidateColumnOffsets(const string& filename, int64_t file_length,
       const parquet::RowGroup& row_group);
 
+  /// Check that a file offset is in the file. Return an error status with a detailed
+  /// error message if it is not.
+  static Status ValidateOffsetInFile(const std::string& filename, int col_idx,
+      int64_t file_length, int64_t offset, const std::string& offset_name);
+
   /// Validates the column metadata to make sure this column is supported (e.g. encoding,
   /// type, etc) and matches the type of given slot_desc.
   static Status ValidateColumn(const parquet::FileMetaData& file_metadata,
@@ -144,6 +149,10 @@ class ParquetSchemaResolver {
       bool* missing_field) const;
 
  private:
+  /// An arbitrary limit on the number of children per schema node we support.
+  /// Used to sanity-check Parquet schemas.
+  static const int SCHEMA_NODE_CHILDREN_SANITY_LIMIT = 64 * 1024;
+
   /// Unflattens the schema metadata from a Parquet file metadata and converts it to our
   /// SchemaNode representation. Returns the result in 'node' unless an error status is
   /// returned. Does not set the slot_desc field of any SchemaNode.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 88ea035..5df69ed 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -519,10 +519,13 @@ int64_t DiskIoMgr::GetReadThroughput() {
 Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
   int disk_id = range->disk_id_;
   if (disk_id < 0 || disk_id >= disk_queues_.size()) {
-    stringstream ss;
-    ss << "Invalid scan range.  Bad disk id: " << disk_id;
-    DCHECK(false) << ss.str();
-    return Status(ss.str());
+    return Status(Substitute("Invalid scan range.  Bad disk id: $0", disk_id));
+  }
+  if (range->offset_ < 0) {
+    return Status(Substitute("Invalid scan range. Negative offset $0", range->offset_));
+  }
+  if (range->len_ < 0) {
+    return Status(Substitute("Invalid scan range. Negative length $0", range->len_));
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/runtime/scoped-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/scoped-buffer.h b/be/src/runtime/scoped-buffer.h
new file mode 100644
index 0000000..4841f7f
--- /dev/null
+++ b/be/src/runtime/scoped-buffer.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_SCOPED_BUFFER_H
+#define IMPALA_RUNTIME_SCOPED_BUFFER_H
+
+#include "runtime/mem-tracker.h"
+
+namespace {
+
+/// A scoped memory allocation that is tracked against a MemTracker.
+/// The allocation is automatically freed when the ScopedBuffer object goes out of scope.
+class ScopedBuffer {
+ public:
+  ScopedBuffer(MemTracker* mem_tracker) : mem_tracker_(mem_tracker),
+      buffer_(NULL), bytes_(0) {}
+  ~ScopedBuffer() { Release(); }
+
+  /// Try to allocate a buffer of size 'bytes'. Returns false if MemTracker::TryConsume()
+  /// or malloc() fails.
+  /// Should not be called if a buffer is already allocated.
+  bool TryAllocate(int64_t bytes) {
+    DCHECK(buffer_ == NULL);
+    DCHECK_GT(bytes, 0);
+    if (!mem_tracker_->TryConsume(bytes)) return false;
+    buffer_ = reinterpret_cast<uint8_t*>(malloc(bytes));
+    if (UNLIKELY(buffer_ == NULL)) {
+      mem_tracker_->Release(bytes);
+      return false;
+    }
+    bytes_ = bytes;
+    return true;
+  }
+
+  void Release() {
+    if (buffer_ == NULL) return;
+    free(buffer_);
+    buffer_ = NULL;
+    mem_tracker_->Release(bytes_);
+    bytes_ = 0;
+  }
+
+  inline uint8_t* buffer() const { return buffer_; }
+
+ private:
+  MemTracker* mem_tracker_;
+  uint8_t* buffer_;
+  /// The current size of the allocated buffer, if not NULL.
+  int64_t bytes_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h
index 48ced18..ce159cb 100644
--- a/be/src/util/bit-stream-utils.h
+++ b/be/src/util/bit-stream-utils.h
@@ -77,6 +77,9 @@ class BitWriter {
   /// to the next byte boundary.
   void Flush(bool align=false);
 
+  /// Maximum supported bitwidth for writer.
+  static const int MAX_BITWIDTH = 32;
+
  private:
   uint8_t* buffer_;
   int max_bytes_;
@@ -123,7 +126,8 @@ class BitReader {
   bool GetAligned(int num_bytes, T* v);
 
   /// Reads a vlq encoded int from the stream.  The encoded int must start at the
-  /// beginning of a byte. Return false if there were not enough bytes in the buffer.
+  /// beginning of a byte. Return false if there were not enough bytes in the buffer or
+  /// the int is invalid.
   bool GetVlqInt(int32_t* v);
 
   /// Returns the number of bytes left in the stream, not including the current byte (i.e.,
@@ -133,6 +137,9 @@ class BitReader {
   /// Maximum byte length of a vlq encoded int
   static const int MAX_VLQ_BYTE_LEN = 5;
 
+  /// Maximum supported bitwidth for reader.
+  static const int MAX_BITWIDTH = 32;
+
  private:
   uint8_t* buffer_;
   int max_bytes_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/bit-stream-utils.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h
index 4249bc5..fd77974 100644
--- a/be/src/util/bit-stream-utils.inline.h
+++ b/be/src/util/bit-stream-utils.inline.h
@@ -25,7 +25,7 @@ namespace impala {
 
 inline bool BitWriter::PutValue(uint64_t v, int num_bits) {
   // TODO: revisit this limit if necessary (can be raised to 64 by fixing some edge cases)
-  DCHECK_LE(num_bits, 32);
+  DCHECK_LE(num_bits, MAX_BITWIDTH);
   DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " <<
num_bits;
 
   if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false;
@@ -88,7 +88,7 @@ template<typename T>
 inline bool BitReader::GetValue(int num_bits, T* v) {
   DCHECK(buffer_ != NULL);
   // TODO: revisit this limit if necessary
-  DCHECK_LE(num_bits, 32);
+  DCHECK_LE(num_bits, MAX_BITWIDTH);
   DCHECK_LE(num_bits, sizeof(T) * 8);
 
   if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) return false;
@@ -140,13 +140,12 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) {
 inline bool BitReader::GetVlqInt(int32_t* v) {
   *v = 0;
   int shift = 0;
-  int num_bytes = 0;
   uint8_t byte = 0;
   do {
+    if (UNLIKELY(shift >= MAX_VLQ_BYTE_LEN * 7)) return false;
     if (!GetAligned<uint8_t>(1, &byte)) return false;
     *v |= (byte & 0x7F) << shift;
     shift += 7;
-    DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN);
   } while ((byte & 0x80) != 0);
   return true;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/compress.cc
----------------------------------------------------------------------
diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc
index 1676a50..7c97737 100644
--- a/be/src/util/compress.cc
+++ b/be/src/util/compress.cc
@@ -114,6 +114,7 @@ Status GzipCompressor::Compress(int64_t input_length, const uint8_t* input,
 Status GzipCompressor::ProcessBlock(bool output_preallocated,
     int64_t input_length, const uint8_t* input, int64_t* output_length,
     uint8_t** output) {
+  DCHECK_GE(input_length, 0);
   DCHECK(!output_preallocated || (output_preallocated && *output_length > 0));
   int64_t max_compressed_len = MaxOutputLen(input_length);
   if (!output_preallocated) {
@@ -146,6 +147,7 @@ Status BzipCompressor::ProcessBlock(bool output_preallocated, int64_t
input_leng
   // The bz2 library does not allow input to be NULL, even when input_length is 0. This
   // should be OK because we do not write any file formats that support bzip compression.
   DCHECK(input != NULL);
+  DCHECK_GE(input_length, 0);
 
   if (output_preallocated) {
     buffer_length_ = *output_length;
@@ -201,6 +203,7 @@ int64_t SnappyBlockCompressor::MaxOutputLen(int64_t input_len, const uint8_t*
in
 Status SnappyBlockCompressor::ProcessBlock(bool output_preallocated,
     int64_t input_length, const uint8_t* input, int64_t *output_length,
     uint8_t** output) {
+  DCHECK_GE(input_length, 0);
   // Hadoop uses a block compression scheme on top of snappy.  First there is
   // an integer which is the size of the decompressed data followed by a
   // sequence of compressed blocks each preceded with an integer size.
@@ -252,6 +255,7 @@ int64_t SnappyCompressor::MaxOutputLen(int64_t input_len, const uint8_t*
input)
 
 Status SnappyCompressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
+  DCHECK_GE(input_length, 0);
   int64_t max_compressed_len = MaxOutputLen(input_length);
   if (output_preallocated && *output_length < max_compressed_len) {
     return Status("SnappyCompressor::ProcessBlock: output length too small");
@@ -292,6 +296,7 @@ int64_t Lz4Compressor::MaxOutputLen(int64_t input_len, const uint8_t*
input) {
 
 Status Lz4Compressor::ProcessBlock(bool output_preallocated, int64_t input_length,
     const uint8_t* input, int64_t* output_length, uint8_t** output) {
+  DCHECK_GE(input_length, 0);
   CHECK(output_preallocated) << "Output was not allocated for Lz4 Codec";
   if (input_length == 0) return Status::OK();
   *output_length = LZ4_compress(reinterpret_cast<const char*>(input),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index 09a3d2d..d9fbe08 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -23,6 +23,7 @@
 #include <boost/unordered_map.hpp>
 
 #include "gutil/bits.h"
+#include "gutil/strings/substitute.h"
 #include "exec/parquet-common.h"
 #include "runtime/mem-pool.h"
 #include "runtime/string-value.h"
@@ -166,14 +167,20 @@ class DictEncoder : public DictEncoderBase {
 /// by the caller and valid as long as this object is.
 class DictDecoderBase {
  public:
-  /// The rle encoded indices into the dictionary.
-  void SetData(uint8_t* buffer, int buffer_len) {
-    DCHECK_GT(buffer_len, 0);
+  /// The rle encoded indices into the dictionary. Returns an error status if the buffer
+  /// is too short or the bit_width metadata in the buffer is invalid.
+  Status SetData(uint8_t* buffer, int buffer_len) {
+    DCHECK_GE(buffer_len, 0);
+    if (UNLIKELY(buffer_len == 0)) return Status("Dictionary cannot be 0 bytes");
     uint8_t bit_width = *buffer;
-    DCHECK_GE(bit_width, 0);
+    if (UNLIKELY(bit_width < 0 || bit_width > BitReader::MAX_BITWIDTH)) {
+      return Status(strings::Substitute("Dictionary has invalid or unsupported bit "
+          "width: $0", bit_width));
+    }
     ++buffer;
     --buffer_len;
     data_decoder_.Reset(buffer, buffer_len, bit_width);
+    return Status::OK();
   }
 
   virtual ~DictDecoderBase() {}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index ea6536c..10d5e3d 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -25,6 +25,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/string-value.inline.h"
 #include "runtime/timestamp-value.h"
+#include "testutil/test-macros.h"
 #include "util/dict-encoding.h"
 
 #include "common/names.h"
@@ -53,7 +54,7 @@ void ValidateDict(const vector<T>& values, int fixed_buffer_byte_size)
{
   DictDecoder<T> decoder;
   ASSERT_TRUE(
       decoder.Reset(dict_buffer, encoder.dict_encoded_size(), fixed_buffer_byte_size));
-  decoder.SetData(data_buffer, data_len);
+  ASSERT_OK(decoder.SetData(data_buffer, data_len));
   for (T i: values) {
     T j;
     decoder.GetValue(&j);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index 6f993d5..9f39697 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -88,14 +88,14 @@ class RleDecoder {
       repeat_count_(0),
       literal_count_(0) {
     DCHECK_GE(bit_width_, 0);
-    DCHECK_LE(bit_width_, 64);
+    DCHECK_LE(bit_width_, BitReader::MAX_BITWIDTH);
   }
 
   RleDecoder() : bit_width_(-1) {}
 
   void Reset(uint8_t* buffer, int buffer_len, int bit_width) {
     DCHECK_GE(bit_width, 0);
-    DCHECK_LE(bit_width, 64);
+    DCHECK_LE(bit_width, BitReader::MAX_BITWIDTH);
     bit_reader_.Reset(buffer, buffer_len);
     bit_width_ = bit_width;
     current_value_ = 0;
@@ -262,8 +262,7 @@ inline bool RleDecoder::Get(T* val) {
     --repeat_count_;
   } else {
     DCHECK_GT(literal_count_, 0);
-    bool result = bit_reader_.GetValue(bit_width_, val);
-    DCHECK(result);
+    if (UNLIKELY(!bit_reader_.GetValue(bit_width_, val))) return false;
     --literal_count_;
   }
 
@@ -275,8 +274,7 @@ bool RleDecoder::NextCounts() {
   // Read the next run's indicator int, it could be a literal or repeated run.
   // The int is encoded as a vlq-encoded value.
   int32_t indicator_value = 0;
-  bool result = bit_reader_.GetVlqInt(&indicator_value);
-  if (!result) return false;
+  if (UNLIKELY(!bit_reader_.GetVlqInt(&indicator_value))) return false;
 
   // lsb indicates if it is a literal run or repeated run
   bool is_literal = indicator_value & 1;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/be/src/util/rle-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc
index b3dc5b7..fd429eb 100644
--- a/be/src/util/rle-test.cc
+++ b/be/src/util/rle-test.cc
@@ -31,7 +31,7 @@
 
 namespace impala {
 
-const int MAX_WIDTH = 32;
+const int MAX_WIDTH = BitReader::MAX_BITWIDTH;
 
 TEST(BitArray, TestBool) {
   const int len = 8;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/testdata/workloads/functional-query/queries/QueryTest/parquet.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet.test b/testdata/workloads/functional-query/queries/QueryTest/parquet.test
index e6b4061..a449162 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet.test
@@ -28,7 +28,7 @@ Invalid metadata size in file footer
 # Parquet file with invalid column dict_page_offset.
 SELECT * from bad_dict_page_offset
 ---- CATCH
-Column 0 has invalid column offsets (offset=10000, size=47, file_size=249)
+Column 0 has invalid data page offset (offset=100001 file_size=249)
 ====
 ---- QUERY
 # Parquet file with invalid column total_compressed_size.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 8863995..4fcce64 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -512,6 +512,16 @@ class ImpalaTestSuite(BaseTestSuite):
     self.hive_client.drop_table(db_name, table_name, True)
     self.hive_client.create_table(table)
 
+  def _get_table_location(self, table_name, vector):
+    """ Returns the HDFS location of the table """
+    result = self.execute_query_using_client(self.client,
+        "describe formatted %s" % table_name, vector)
+    for row in result.data:
+      if 'Location:' in row:
+        return row.split('\t')[1]
+    # This should never happen.
+    assert 0, 'Unable to get location for table: ' + table_name
+
   def run_stmt_in_hive(self, stmt):
     """
     Run a statement in Hive, returning stdout if successful and throwing

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 8e78670..4cfbcb0 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -117,15 +117,6 @@ class TestUnmatchedSchema(ImpalaTestSuite):
     cls.TestMatrix.add_constraint(\
         lambda v: v.get_value('table_format').file_format != 'avro')
 
-  def _get_table_location(self, table_name, vector):
-    result = self.execute_query_using_client(self.client,
-        "describe formatted %s" % table_name, vector)
-    for row in result.data:
-      if 'Location:' in row:
-        return row.split('\t')[1]
-    # This should never happen.
-    assert 0, 'Unable to get location for table: ' + table_name
-
   def _create_test_table(self, vector):
     """
     Creates the test table

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5afd9f7d/tests/query_test/test_scanners_fuzz.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py
new file mode 100644
index 0000000..ae17572
--- /dev/null
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -0,0 +1,203 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pytest
+import random
+import shutil
+import tempfile
+import time
+from subprocess import check_call
+from tests.common.impala_test_suite import ImpalaTestSuite, LOG
+from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
+
+# Random fuzz testing of HDFS scanners. Existing tables for any HDFS file format
+# are corrupted in random ways to flush out bugs with handling of corrupted data.
+class TestScannersFuzzing(ImpalaTestSuite):
+  # Test a range of batch sizes to exercise different corner cases.
+  BATCH_SIZES = [0, 1, 16, 10000]
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestScannersFuzzing, cls).add_test_dimensions()
+    # TODO: enable for more table formats once they consistently pass the fuzz test.
+    cls.TestMatrix.add_constraint(lambda v:\
+        v.get_value('table_format').file_format in ('avro', 'parquet') or
+        (v.get_value('table_format').file_format == 'text'
+            and v.get_value('table_format').compression_type == 'none'))
+
+  def test_fuzz_alltypes(self, vector, unique_database):
+    self.run_fuzz_test(vector, unique_database, "alltypes")
+
+  def test_fuzz_decimal_tbl(self, vector, unique_database):
+    table_format = vector.get_value('table_format')
+    table_name = "decimal_tbl"
+    if table_format.file_format in ('avro'):
+      table_name = "avro_decimal_tbl"
+      if table_format.compression_codec != 'block' or \
+          table_format.compression_type != 'snap':
+        pytest.skip()
+
+    self.run_fuzz_test(vector, unique_database, table_name, 10)
+
+  def test_fuzz_nested_types(self, vector, unique_database):
+    table_format = vector.get_value('table_format')
+    if table_format.file_format != 'parquet': pytest.skip()
+    self.run_fuzz_test(vector, unique_database, "complextypestbl", 10)
+
+  # TODO: add test coverage for additional data types like char and varchar
+
+  def run_fuzz_test(self, vector, unique_database, table, num_copies=1):
+    """ Do some basic fuzz testing: create a copy of an existing table with randomly
+    corrupted files and make sure that we don't crash or behave in an unexpected way.
+    'unique_database' is used for the table, so it will be cleaned up automatically.
+    If 'num_copies' is set, create that many corrupted copies of each input file.
+    SCANNER_FUZZ_SEED can be set in the environment to reproduce the result (assuming that
+    input files are the same).
+    SCANNER_FUZZ_KEEP_FILES can be set in the environment to keep the generated files.
+    """
+    # Create and seed a new random number generator for reproducibility.
+    rng = random.Random()
+    random_seed = os.environ.get("SCANNER_FUZZ_SEED") or time.time()
+    LOG.info("Using random seed %d", random_seed)
+    rng.seed(long(random_seed))
+
+    table_format = vector.get_value('table_format')
+    self.change_database(self.client, table_format)
+
+    tmp_table_dir = tempfile.mkdtemp(prefix="tmp-scanner-fuzz-%s" % table,
+        dir=os.path.join(os.environ['IMPALA_HOME'], "testdata"))
+
+    self.execute_query("create table %s.%s like %s" % (unique_database, table, table))
+    fuzz_table_location = get_fs_path("/test-warehouse/{0}.db/{1}".format(
+        unique_database, table))
+
+    LOG.info("Generating corrupted version of %s in %s. Local working directory is %s",
+        table, unique_database, tmp_table_dir)
+
+    # Find the location of the existing table and get the full table directory structure.
+    table_loc = self._get_table_location(table, vector)
+    check_call(['hdfs', 'dfs', '-copyToLocal', table_loc + "/*", tmp_table_dir])
+
+    partitions = self.walk_and_corrupt_table_data(tmp_table_dir, num_copies, rng)
+    for partition in partitions:
+      self.execute_query('alter table {0}.{1} add partition ({2})'.format(
+          unique_database, table, ','.join(partition)))
+
+    # Copy all of the local files and directories to hdfs.
+    to_copy = ["%s/%s" % (tmp_table_dir, file_or_dir)
+               for file_or_dir in os.listdir(tmp_table_dir)]
+    check_call(['hdfs', 'dfs', '-copyFromLocal'] + to_copy + [fuzz_table_location])
+
+    if "SCANNER_FUZZ_KEEP_FILES" not in os.environ:
+      shutil.rmtree(tmp_table_dir)
+
+    # Querying the corrupted files should not DCHECK or crash.
+    self.execute_query("refresh %s.%s" % (unique_database, table))
+    # Execute a query that tries to read all the columns and rows in the file.
+    # Also execute a count(*) that materializes no columns, since different code
+    # paths are exercised.
+    # Use abort_on_error=0 to ensure we scan all the files.
+    queries = [
+        'select count(*) from (select distinct * from {0}.{1}) q'.format(
+            unique_database, table),
+        'select count(*) from {0}.{1} q'.format(unique_database, table)]
+
+    xfail_msgs = []
+    for query in queries:
+      for batch_size in self.BATCH_SIZES:
+        query_options = {'abort_on_error': '0', 'batch_size': batch_size}
+        try:
+          result = self.execute_query(query, query_options = query_options)
+          LOG.info('\n'.join(result.log))
+        except Exception as e:
+          if 'memory limit exceeded' in str(e).lower():
+            # Memory limit error should fail query.
+            continue
+          msg = "Should not throw error when abort_on_error=0: '{0}'".format(e)
+          LOG.error(msg)
+          # Parquet fails the query for some parse errors.
+          if table_format.file_format == 'parquet':
+            xfail_msgs.append(msg)
+          else:
+            raise
+    if len(xfail_msgs) != 0:
+      pytest.xfail('\n'.join(xfail_msgs))
+
+  def walk_and_corrupt_table_data(self, tmp_table_dir, num_copies, rng):
+    """ Walks a local copy of a HDFS table directory. Returns a list of partitions, each
+    as a list of "key=val" pairs. Ensures there is 'num_copies' copies of each file,
+    and corrupts each of the copies.
+    """
+    partitions = []
+    # Iterate over the partitions and files we downloaded.
+    for subdir, dirs, files in os.walk(tmp_table_dir):
+      if '_impala_insert_staging' in subdir: continue
+      if len(dirs) != 0: continue # Skip non-leaf directories
+
+      rel_subdir = os.path.relpath(subdir, tmp_table_dir)
+      if rel_subdir != ".":
+        # Create metadata for any directory partitions.
+        partitions.append(self.partitions_from_path(rel_subdir))
+
+      # Corrupt all of the files that we find.
+      for filename in files:
+        filepath = os.path.join(subdir, filename)
+        copies = [filepath]
+        for copy_num in range(1, num_copies):
+          copypath = os.path.join(subdir, "copy{0}_{1}".format(copy_num, filename))
+          shutil.copyfile(filepath, copypath)
+          copies.append(copypath)
+        for filepath in copies:
+          self.corrupt_file(filepath, rng)
+    return partitions
+
+  def partitions_from_path(self, relpath):
+    """ Return a list of "key=val" parts from partitions inferred from the directory path.
+    """
+    reversed_partitions = []
+    while relpath != '':
+      relpath, suffix  = os.path.split(relpath)
+      reversed_partitions.append(suffix)
+    return reversed(reversed_partitions)
+
+  def corrupt_file(self, path, rng):
+    """ Corrupt the file at 'path' in the local file system in a randomised way using the
+    random number generator 'rng'. Rewrites the file in-place.
+    Logs a message to describe how the file was corrupted, so the error is reproducible.
+    """
+    with open(path, "rb") as f:
+      data = bytearray(f.read())
+
+    if rng.random() < 0.5:
+      flip_offset = rng.randint(0, len(data) - 1)
+      flip_val = rng.randint(0, 255)
+      LOG.info("corrupt_file: Flip byte in %s at %d from %d to %d", path, flip_offset,
+          data[flip_offset], flip_val)
+      data[flip_offset] = flip_val
+    else:
+      truncation = rng.randint(0, len(data))
+      LOG.info("corrupt_file: Truncate %s to %d", path, truncation)
+      data = data[:truncation]
+
+    with open(path, "wb") as f:
+      f.write(data)
+



Mime
View raw message