impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jruss...@apache.org
Subject [4/5] impala git commit: IMPALA-4993: extend dictionary filtering to collections
Date Fri, 19 Jan 2018 21:41:47 GMT
IMPALA-4993: extend dictionary filtering to collections

Currently, top-level scalar columns in parquet files can
be used at runtime to prune row-groups by evaluating certain
conjuncts over the column's dictionary (if available).

This change extends such pruning to scalar values that are
stored in collection type columns. Currently, dictionary
pruning works by finding eligible conjuncts for top-level
slots. Since only top-level slots are supported, the slots
are implicitly part of the scan node's tuple descriptor.
With this change, we track eligible conjuncts by slot as well
as the tuple that contains the slot (either top-level or
nested collection). Since collection conjuncts are already
managed by a map that associates tuple descriptors to a list
of their conjuncts, this extension follows the existing
representation.

The frontend builds the mapping of SlotId to conjuncts that
are dictionary filterable. This mapping now includes SlotId's
that reference nested tuples. The backend is adjusted to
use the same representation. In addition, collection
readers are decomposed into scalar filterable columns and
other, non-dictionary filterable readers. When filtering
a row group using a conjunct associated to a (possibly)
nested collection type, an additional tuple buffer is
allocated per tuple descriptor.

Testing:
- e2e test extended to illustrate row-groups that are pruned
  by nested collection dictionary filters.

Change-Id: If3a2abcfc3d0f7d18756816659fed77ce12668dd
Reviewed-on: http://gerrit.cloudera.org:8080/8775
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/db98dc65
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/db98dc65
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/db98dc65

Branch: refs/heads/master
Commit: db98dc6504368f24eb20e12959f6d779be31c9b6
Parents: 579e332
Author: Vuk Ercegovac <vercegovac@cloudera.com>
Authored: Mon Nov 27 18:27:47 2017 -0800
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Fri Jan 19 20:37:25 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc             | 293 ++++++++++---------
 be/src/exec/hdfs-parquet-scanner.h              |  63 ++--
 be/src/exec/hdfs-scan-node-base.h               |   3 +-
 be/src/exec/hdfs-scanner.cc                     |  14 +-
 be/src/exec/hdfs-scanner.h                      |   5 +-
 be/src/exec/parquet-column-readers.cc           |   4 +-
 be/src/runtime/collection-value-builder.h       |   2 +-
 be/src/runtime/scoped-buffer.h                  |   1 +
 common/thrift/PlanNodes.thrift                  |   3 +-
 .../apache/impala/analysis/SlotDescriptor.java  |   4 +
 .../apache/impala/analysis/TupleDescriptor.java |   2 +
 .../org/apache/impala/planner/HdfsScanNode.java | 215 +++++++++-----
 testdata/CustomerMultiBlock/README              |  12 +
 .../customer_multiblock.parquet                 | Bin 0 -> 494519 bytes
 .../functional/functional_schema_template.sql   |  19 ++
 .../datasets/functional/schema_constraints.csv  |   1 +
 .../queries/PlannerTest/constant-folding.test   |   2 +
 .../queries/PlannerTest/mt-dop-validation.test  |   6 +
 .../queries/PlannerTest/parquet-filtering.test  |  66 +++++
 .../queries/QueryTest/parquet-filtering.test    | 237 ++++++++++++++-
 20 files changed, 706 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index f0f280d..3a17a3b 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -68,7 +68,7 @@ const string PARQUET_MEM_LIMIT_EXCEEDED =
     "HdfsParquetScanner::$0() failed to allocate $1 bytes for $2.";
 
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
-    const std::vector<HdfsFileDesc*>& files) {
+    const vector<HdfsFileDesc*>& files) {
   vector<ScanRange*> footer_ranges;
   for (int i = 0; i < files.size(); ++i) {
     // If the file size is less than 12 bytes, it is an invalid Parquet file.
@@ -101,7 +101,7 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
         // the actual split (in InitColumns()). The original split is stored in the
         // metadata associated with the footer range.
         ScanRange* footer_range;
-        if (footer_split != NULL) {
+        if (footer_split != nullptr) {
           footer_range = scan_node->AllocateScanRange(files[i]->fs,
               files[i]->filename.c_str(), footer_size, footer_start,
               split_metadata->partition_id, footer_split->disk_id(),
@@ -128,12 +128,12 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
 }
 
 ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
-  DCHECK(file != NULL);
+  DCHECK(file != nullptr);
   for (int i = 0; i < file->splits.size(); ++i) {
     ScanRange* split = file->splits[i];
     if (split->offset() + split->len() == file->file_length) return split;
   }
-  return NULL;
+  return nullptr;
 }
 
 namespace impala {
@@ -143,22 +143,21 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
     row_group_idx_(-1),
     row_group_rows_read_(0),
     advance_row_group_(true),
-    min_max_tuple_buffer_(scan_node->mem_tracker()),
+    min_max_tuple_(nullptr),
     row_batches_produced_(0),
     scratch_batch_(new ScratchTupleBatch(
         *scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())),
-    metadata_range_(NULL),
+    metadata_range_(nullptr),
     dictionary_pool_(new MemPool(scan_node->mem_tracker())),
-    dict_filter_tuple_backing_(scan_node->mem_tracker()),
     assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
-    process_footer_timer_stats_(NULL),
-    num_cols_counter_(NULL),
-    num_stats_filtered_row_groups_counter_(NULL),
-    num_row_groups_counter_(NULL),
-    num_scanners_with_no_reads_counter_(NULL),
-    num_dict_filtered_row_groups_counter_(NULL),
+    process_footer_timer_stats_(nullptr),
+    num_cols_counter_(nullptr),
+    num_stats_filtered_row_groups_counter_(nullptr),
+    num_row_groups_counter_(nullptr),
+    num_scanners_with_no_reads_counter_(nullptr),
+    num_dict_filtered_row_groups_counter_(nullptr),
     coll_items_read_counter_(0),
-    codegend_process_scratch_batch_fn_(NULL) {
+    codegend_process_scratch_batch_fn_(nullptr) {
   assemble_rows_timer_.Stop();
 }
 
@@ -181,23 +180,25 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>(
       scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET));
-  if (codegend_process_scratch_batch_fn_ == NULL) {
+  if (codegend_process_scratch_batch_fn_ == nullptr) {
     scan_node_->IncNumScannersCodegenDisabled();
   } else {
     scan_node_->IncNumScannersCodegenEnabled();
   }
 
-  level_cache_pool_.reset(new MemPool(scan_node_->mem_tracker()));
+  perm_pool_.reset(new MemPool(scan_node_->mem_tracker()));
 
   // Allocate tuple buffer to evaluate conjuncts on parquet::Statistics.
   const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
   if (min_max_tuple_desc != nullptr) {
     int64_t tuple_size = min_max_tuple_desc->byte_size();
-    if (!min_max_tuple_buffer_.TryAllocate(tuple_size)) {
+    uint8_t* buffer = perm_pool_->TryAllocate(tuple_size);
+    if (buffer == nullptr) {
       string details = Substitute("Could not allocate buffer of $0 bytes for Parquet "
           "statistics tuple for file '$1'.", tuple_size, filename());
       return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, tuple_size);
     }
+    min_max_tuple_ = reinterpret_cast<Tuple*>(buffer);
   }
 
   // Clone the min/max statistics conjuncts.
@@ -207,7 +208,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   for (int i = 0; i < context->filter_ctxs().size(); ++i) {
     const FilterContext* ctx = &context->filter_ctxs()[i];
-    DCHECK(ctx->filter != NULL);
+    DCHECK(ctx->filter != nullptr);
     filter_ctxs_.push_back(ctx);
   }
   filter_stats_.resize(filter_ctxs_.size());
@@ -248,7 +249,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   // The scanner-wide stream was used only to read the file footer.  Each column has added
   // its own stream.
-  stream_ = NULL;
+  stream_ = nullptr;
   return Status::OK();
 }
 
@@ -270,9 +271,9 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
     // Open() fails or if the query is cancelled.
     scratch_batch_->ReleaseResources(nullptr);
   }
-  if (level_cache_pool_ != nullptr) {
-    level_cache_pool_->FreeAll();
-    level_cache_pool_.reset();
+  if (perm_pool_ != nullptr) {
+    perm_pool_->FreeAll();
+    perm_pool_.reset();
   }
 
   // Verify all resources (if any) have been transferred.
@@ -398,7 +399,7 @@ Status HdfsParquetScanner::ProcessSplit() {
       eos_ = true;
       break;
     }
-    unique_ptr<RowBatch> batch = std::make_unique<RowBatch>(scan_node_->row_desc(),
+    unique_ptr<RowBatch> batch = make_unique<RowBatch>(scan_node_->row_desc(),
         state_->batch_size(), scan_node_->mem_tracker());
     Status status = GetNextInternal(batch.get());
     // Always add batch to the queue because it may contain data referenced by previously
@@ -526,8 +527,8 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
 
   int64_t tuple_size = min_max_tuple_desc->byte_size();
 
-  Tuple* min_max_tuple = reinterpret_cast<Tuple*>(min_max_tuple_buffer_.buffer());
-  min_max_tuple->Init(tuple_size);
+  DCHECK(min_max_tuple_ != nullptr);
+  min_max_tuple_->Init(tuple_size);
 
   DCHECK_EQ(min_max_tuple_desc->slots().size(), min_max_conjunct_evals_.size());
   for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) {
@@ -569,7 +570,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
     const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
     const ColumnType& col_type = slot_desc->type();
     bool stats_read = false;
-    void* slot = min_max_tuple->GetSlot(slot_desc->tuple_offset());
+    void* slot = min_max_tuple_->GetSlot(slot_desc->tuple_offset());
     const string& fn_name = eval->root().function_name();
     if (fn_name == "lt" || fn_name == "le") {
       // We need to get min stats.
@@ -585,7 +586,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
 
     if (stats_read) {
       TupleRow row;
-      row.SetTuple(0, min_max_tuple);
+      row.SetTuple(0, min_max_tuple_);
       if (!ExecNode::EvalPredicate(eval, &row)) {
         *skip_row_group = true;
         break;
@@ -662,12 +663,14 @@ Status HdfsParquetScanner::NextRowGroup() {
       continue;
     }
 
+    InitCollectionColumns();
+
     // Prepare dictionary filtering columns for first read
     // This must be done before dictionary filtering, because this code initializes
     // the column offsets and streams needed to read the dictionaries.
     // TODO: Restructure the code so that the dictionary can be read without the rest
     // of the column.
-    RETURN_IF_ERROR(InitColumns(row_group_idx_, dict_filterable_readers_));
+    RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, dict_filterable_readers_));
 
     // If there is a dictionary-encoded column where every value is eliminated
     // by a conjunct, the row group can be eliminated. This initializes dictionaries
@@ -687,7 +690,7 @@ Status HdfsParquetScanner::NextRowGroup() {
     // At this point, the row group has passed any filtering criteria
     // Prepare non-dictionary filtering column readers for first read and
     // initialize their dictionaries.
-    RETURN_IF_ERROR(InitColumns(row_group_idx_, non_dict_filterable_readers_));
+    RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, non_dict_filterable_readers_));
     status = InitDictionaries(non_dict_filterable_readers_);
     if (!status.ok()) {
       // Either return an error or skip this row group if it is ok to ignore errors
@@ -735,13 +738,8 @@ void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
   }
 }
 
-bool HdfsParquetScanner::IsDictFilterable(ParquetColumnReader* col_reader) {
-  // Nested types are not supported yet
-  if (col_reader->IsCollectionReader()) return false;
-
-  BaseScalarColumnReader* scalar_reader =
-    static_cast<BaseScalarColumnReader*>(col_reader);
-  const SlotDescriptor* slot_desc = scalar_reader->slot_desc();
+bool HdfsParquetScanner::IsDictFilterable(BaseScalarColumnReader* col_reader) {
+  const SlotDescriptor* slot_desc = col_reader->slot_desc();
   // Some queries do not need the column to be materialized, so slot_desc is NULL.
   // For example, a count(*) with no predicates only needs to count records
   // rather than materializing the values.
@@ -755,44 +753,60 @@ bool HdfsParquetScanner::IsDictFilterable(ParquetColumnReader* col_reader) {
   // dictionary values, so skip these datatypes for now.
   // TODO: The values should be converted during dictionary construction and stored
   // in converted form in the dictionary.
-  if (scalar_reader->NeedsConversion()) return false;
+  if (col_reader->NeedsConversion()) return false;
 
   // Certain datatypes (timestamps) need to validate the value, as certain bit
   // combinations are not valid. The dictionary values are not validated, so
   // skip these datatypes for now.
   // TODO: This should be pushed into dictionary construction.
-  if (scalar_reader->NeedsValidation()) return false;
+  if (col_reader->NeedsValidation()) return false;
 
   return true;
 }
 
-Status HdfsParquetScanner::InitDictFilterStructures() {
-  // Check dictionary filtering query option
-  bool dictionary_filtering_enabled =
-      state_->query_options().parquet_dictionary_filtering;
-
-  // Allocate space for dictionary filtering tuple
-  // Certain queries do not need any columns to be materialized (such as count(*))
-  // and have a tuple size of 0. Explicitly disable dictionary filtering in this case.
-  int tuple_size = scan_node_->tuple_desc()->byte_size();
-  if (tuple_size > 0) {
-    if (!dict_filter_tuple_backing_.TryAllocate(tuple_size)) {
-      string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED,
-          "InitDictFilterStructures", tuple_size, "Dictionary Filtering Tuple");
-      return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, tuple_size);
+void HdfsParquetScanner::PartitionReaders(
+    const vector<ParquetColumnReader*>& readers, bool can_eval_dict_filters) {
+  for (auto* reader : readers) {
+    if (reader->IsCollectionReader()) {
+      CollectionColumnReader* col_reader = static_cast<CollectionColumnReader*>(reader);
+      collection_readers_.push_back(col_reader);
+      PartitionReaders(*col_reader->children(), can_eval_dict_filters);
+    } else {
+      BaseScalarColumnReader* scalar_reader =
+          static_cast<BaseScalarColumnReader*>(reader);
+      if (can_eval_dict_filters && IsDictFilterable(scalar_reader)) {
+        dict_filterable_readers_.push_back(scalar_reader);
+      } else {
+        non_dict_filterable_readers_.push_back(scalar_reader);
+      }
     }
-  } else {
-    dictionary_filtering_enabled = false;
   }
+}
 
-  // Divide the column readers into a list of column readers that are eligible for
-  // dictionary filtering and a list of column readers that are not. If dictionary
-  // filtering is disabled, all column readers go into the ineligible list.
-  for (ParquetColumnReader* col_reader : column_readers_) {
-    if (dictionary_filtering_enabled && IsDictFilterable(col_reader)) {
-      dict_filterable_readers_.push_back(col_reader);
-    } else {
-      non_dict_filterable_readers_.push_back(col_reader);
+Status HdfsParquetScanner::InitDictFilterStructures() {
+  bool can_eval_dict_filters =
+      state_->query_options().parquet_dictionary_filtering && !dict_filter_map_.empty();
+
+  // Separate column readers into scalar and collection readers.
+  PartitionReaders(column_readers_, can_eval_dict_filters);
+
+  // Allocate tuple buffers for all tuple descriptors that are associated with conjuncts
+  // that can be dictionary filtered.
+  for (auto* col_reader : dict_filterable_readers_) {
+    const SlotDescriptor* slot_desc = col_reader->slot_desc();
+    const TupleDescriptor* tuple_desc = slot_desc->parent();
+    auto tuple_it = dict_filter_tuple_map_.find(tuple_desc);
+    if (tuple_it != dict_filter_tuple_map_.end()) continue;
+    int tuple_size = tuple_desc->byte_size();
+    if (tuple_size > 0) {
+      uint8_t* buffer = perm_pool_->TryAllocate(tuple_size);
+      if (buffer == nullptr) {
+        string details = Substitute(
+            PARQUET_MEM_LIMIT_EXCEEDED, "InitDictFilterStructures", tuple_size,
+            "Dictionary Filtering Tuple");
+        return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, tuple_size);
+      }
+      dict_filter_tuple_map_[tuple_desc] = reinterpret_cast<Tuple*>(buffer);
     }
   }
   return Status::OK();
@@ -857,6 +871,8 @@ bool HdfsParquetScanner::IsDictionaryEncoded(
 Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_group,
     bool* row_group_eliminated) {
   *row_group_eliminated = false;
+  // Check if there's anything to do here.
+  if (dict_filterable_readers_.empty()) return Status::OK();
 
   // Legacy impala files (< 2.9) require special handling, because they do not encode
   // information about whether the column is 100% dictionary encoded.
@@ -865,14 +881,13 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr
     is_legacy_impala = true;
   }
 
-  Tuple* dict_filter_tuple =
-      reinterpret_cast<Tuple*>(dict_filter_tuple_backing_.buffer());
-  dict_filter_tuple->Init(scan_node_->tuple_desc()->byte_size());
-  vector<ParquetColumnReader*> deferred_dict_init_list;
-  for (ParquetColumnReader* col_reader : dict_filterable_readers_) {
-    DCHECK(!col_reader->IsCollectionReader());
-    BaseScalarColumnReader* scalar_reader =
-        static_cast<BaseScalarColumnReader*>(col_reader);
+  // Keeps track of column readers that need to be initialized. For example, if a
+  // column cannot be filtered, then defer its dictionary initialization once we know
+  // the row group cannot be filtered.
+  vector<BaseScalarColumnReader*> deferred_dict_init_list;
+  // Keeps track of the initialized tuple associated with a TupleDescriptor.
+  unordered_map<const TupleDescriptor*, Tuple*> tuple_map;
+  for (BaseScalarColumnReader* scalar_reader : dict_filterable_readers_) {
     const parquet::ColumnMetaData& col_metadata =
         row_group.columns[scalar_reader->col_idx()].meta_data;
 
@@ -898,10 +913,25 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr
         dictionary->num_entries() >= LEGACY_IMPALA_MAX_DICT_ENTRIES) continue;
 
     const SlotDescriptor* slot_desc = scalar_reader->slot_desc();
+    DCHECK(slot_desc != nullptr);
+    const TupleDescriptor* tuple_desc = slot_desc->parent();
     auto dict_filter_it = dict_filter_map_.find(slot_desc->id());
     DCHECK(dict_filter_it != dict_filter_map_.end());
     const vector<ScalarExprEvaluator*>& dict_filter_conjunct_evals =
         dict_filter_it->second;
+    Tuple* dict_filter_tuple = nullptr;
+    auto dict_filter_tuple_it = tuple_map.find(tuple_desc);
+    if (dict_filter_tuple_it == tuple_map.end()) {
+      auto tuple_it = dict_filter_tuple_map_.find(tuple_desc);
+      DCHECK(tuple_it != dict_filter_tuple_map_.end());
+      dict_filter_tuple = tuple_it->second;
+      dict_filter_tuple->Init(tuple_desc->byte_size());
+      tuple_map[tuple_desc] = dict_filter_tuple;
+    } else {
+      dict_filter_tuple = dict_filter_tuple_it->second;
+    }
+
+    DCHECK(dict_filter_tuple != nullptr);
     void* slot = dict_filter_tuple->GetSlot(slot_desc->tuple_offset());
     bool column_has_match = false;
     for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) {
@@ -956,9 +986,9 @@ Status HdfsParquetScanner::AssembleRows(
     const vector<ParquetColumnReader*>& column_readers, RowBatch* row_batch,
     bool* skip_row_group) {
   DCHECK(!column_readers.empty());
-  DCHECK(row_batch != NULL);
+  DCHECK(row_batch != nullptr);
   DCHECK_EQ(*skip_row_group, false);
-  DCHECK(scratch_batch_ != NULL);
+  DCHECK(scratch_batch_ != nullptr);
 
   int64_t num_rows_read = 0;
   while (!column_readers[0]->RowGroupAtEnd()) {
@@ -1013,7 +1043,7 @@ Status HdfsParquetScanner::AssembleRows(
 }
 
 Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
-  DCHECK(dst_batch != NULL);
+  DCHECK(dst_batch != nullptr);
   dst_batch->CommitRows(num_rows);
 
   if (context_->cancelled()) return Status::CANCELLED;
@@ -1053,7 +1083,7 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
   }
 
   int num_rows_to_commit;
-  if (codegend_process_scratch_batch_fn_ != NULL) {
+  if (codegend_process_scratch_batch_fn_ != nullptr) {
     num_rows_to_commit = codegend_process_scratch_batch_fn_(this, dst_batch);
   } else {
     num_rows_to_commit = ProcessScratchBatch(dst_batch);
@@ -1065,17 +1095,17 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
 Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ScalarExpr*>& conjuncts, llvm::Function** process_scratch_batch_fn) {
   DCHECK(node->runtime_state()->ShouldCodegen());
-  *process_scratch_batch_fn = NULL;
+  *process_scratch_batch_fn = nullptr;
   LlvmCodeGen* codegen = node->runtime_state()->codegen();
-  DCHECK(codegen != NULL);
+  DCHECK(codegen != nullptr);
   SCOPED_TIMER(codegen->codegen_timer());
 
   llvm::Function* fn = codegen->GetFunction(IRFunction::PROCESS_SCRATCH_BATCH, true);
-  DCHECK(fn != NULL);
+  DCHECK(fn != nullptr);
 
   llvm::Function* eval_conjuncts_fn;
   RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts, &eval_conjuncts_fn));
-  DCHECK(eval_conjuncts_fn != NULL);
+  DCHECK(eval_conjuncts_fn != nullptr);
 
   int replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts");
   DCHECK_EQ(replaced, 1);
@@ -1083,14 +1113,14 @@ Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
   llvm::Function* eval_runtime_filters_fn;
   RETURN_IF_ERROR(CodegenEvalRuntimeFilters(
       codegen, node->filter_exprs(), &eval_runtime_filters_fn));
-  DCHECK(eval_runtime_filters_fn != NULL);
+  DCHECK(eval_runtime_filters_fn != nullptr);
 
   replaced = codegen->ReplaceCallSites(fn, eval_runtime_filters_fn, "EvalRuntimeFilters");
   DCHECK_EQ(replaced, 1);
 
   fn->setName("ProcessScratchBatch");
   *process_scratch_batch_fn = codegen->FinalizeFunction(fn);
-  if (*process_scratch_batch_fn == NULL) {
+  if (*process_scratch_batch_fn == nullptr) {
     return Status("Failed to finalize process_scratch_batch_fn.");
   }
   return Status::OK();
@@ -1199,7 +1229,7 @@ bool HdfsParquetScanner::AssembleCollection(
     CollectionValueBuilder* coll_value_builder) {
   DCHECK(!column_readers.empty());
   DCHECK_GE(new_collection_rep_level, 0);
-  DCHECK(coll_value_builder != NULL);
+  DCHECK(coll_value_builder != nullptr);
 
   const TupleDescriptor* tuple_desc = &coll_value_builder->tuple_desc();
   Tuple* template_tuple = template_tuple_map_[tuple_desc];
@@ -1215,12 +1245,12 @@ bool HdfsParquetScanner::AssembleCollection(
   // group (otherwise it would always be true because we're on the "edge" of two
   // collections), and only ProcessSplit() should call AssembleRows() at the end of the
   // row group.
-  if (coll_value_builder != NULL) DCHECK(!end_of_collection);
+  if (coll_value_builder != nullptr) DCHECK(!end_of_collection);
 
   while (!end_of_collection && continue_execution) {
     MemPool* pool;
     Tuple* tuple;
-    TupleRow* row = NULL;
+    TupleRow* row = nullptr;
 
     int64_t num_rows;
     // We're assembling item tuples into an CollectionValue
@@ -1233,7 +1263,7 @@ bool HdfsParquetScanner::AssembleCollection(
     // 'num_rows' can be very high if we're writing to a large CollectionValue. Limit
     // the number of rows we read at one time so we don't spend too long in the
     // 'num_rows' loop below before checking for cancellation or limit reached.
-    num_rows = std::min(
+    num_rows = min(
         num_rows, static_cast<int64_t>(scan_node_->runtime_state()->batch_size()));
 
     int num_to_commit = 0;
@@ -1286,7 +1316,7 @@ inline bool HdfsParquetScanner::ReadCollectionItem(
       FILE_CHECK_GE(col_reader->def_level(),
                     col_reader->def_level_of_immediate_repeated_ancestor());
       // Fill in position slot if applicable
-      if (col_reader->pos_slot_desc() != NULL) col_reader->ReadPosition(tuple);
+      if (col_reader->pos_slot_desc() != nullptr) col_reader->ReadPosition(tuple);
       continue_execution = col_reader->ReadValue(pool, tuple);
     } else {
       // A containing repeated field is empty or NULL
@@ -1351,14 +1381,14 @@ Status HdfsParquetScanner::ProcessFooter() {
   // deserializing it.
   ScopedBuffer metadata_buffer(scan_node_->mem_tracker());
 
-  DCHECK(metadata_range_ != NULL);
+  DCHECK(metadata_range_ != nullptr);
   if (UNLIKELY(metadata_size > remaining_bytes_buffered)) {
     // In this case, the metadata is bigger than our guess meaning there are
     // not enough bytes in the footer range from IssueInitialRanges().
     // We'll just issue more ranges to the IoMgr that is the actual footer.
     int64_t partition_id = context_->partition_descriptor()->id();
     const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
-    DCHECK(file_desc != NULL);
+    DCHECK(file_desc != nullptr);
     // The start of the metadata is:
     // file_length - 4-byte metadata size - footer-size - metadata size
     int64_t metadata_start = file_desc->file_length - sizeof(int32_t)
@@ -1443,7 +1473,7 @@ Status HdfsParquetScanner::ProcessFooter() {
 Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc,
     const ParquetSchemaResolver& schema_resolver,
     vector<ParquetColumnReader*>* column_readers) {
-  DCHECK(column_readers != NULL);
+  DCHECK(column_readers != nullptr);
   DCHECK(column_readers->empty());
 
   if (scan_node_->optimize_parquet_count_star()) {
@@ -1453,14 +1483,14 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
   }
 
   // Each tuple can have at most one position slot. We'll process this slot desc last.
-  SlotDescriptor* pos_slot_desc = NULL;
+  SlotDescriptor* pos_slot_desc = nullptr;
 
   for (SlotDescriptor* slot_desc: tuple_desc.slots()) {
     // Skip partition columns
     if (&tuple_desc == scan_node_->tuple_desc() &&
         slot_desc->col_pos() < scan_node_->num_partition_keys()) continue;
 
-    SchemaNode* node = NULL;
+    SchemaNode* node = nullptr;
     bool pos_field;
     bool missing_field;
     RETURN_IF_ERROR(schema_resolver.ResolvePath(
@@ -1470,7 +1500,7 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
       // In this case, we are selecting a column that is not in the file.
       // Update the template tuple to put a NULL in this slot.
       Tuple** template_tuple = &template_tuple_map_[&tuple_desc];
-      if (*template_tuple == NULL) {
+      if (*template_tuple == nullptr) {
         *template_tuple =
             Tuple::Create(tuple_desc.byte_size(), template_tuple_pool_.get());
       }
@@ -1479,7 +1509,8 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
     }
 
     if (pos_field) {
-      DCHECK(pos_slot_desc == NULL) << "There should only be one position slot per tuple";
+      DCHECK(pos_slot_desc == nullptr)
+          << "There should only be one position slot per tuple";
       pos_slot_desc = slot_desc;
       continue;
     }
@@ -1493,7 +1524,7 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
 
     if (col_reader->IsCollectionReader()) {
       // Recursively populate col_reader's children
-      DCHECK(slot_desc->collection_item_descriptor() != NULL);
+      DCHECK(slot_desc->collection_item_descriptor() != nullptr);
       const TupleDescriptor* item_tuple_desc = slot_desc->collection_item_descriptor();
       CollectionColumnReader* collection_reader =
           static_cast<CollectionColumnReader*>(col_reader);
@@ -1513,7 +1544,7 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
     column_readers->push_back(reader);
   }
 
-  if (pos_slot_desc != NULL) {
+  if (pos_slot_desc != nullptr) {
     // 'tuple_desc' has a position slot. Use an existing column reader to populate it.
     DCHECK(!column_readers->empty());
     (*column_readers)[0]->set_pos_slot_desc(pos_slot_desc);
@@ -1547,14 +1578,14 @@ Status HdfsParquetScanner::CreateCountingReader(const SchemaPath& parent_path,
     }
 
     *reader = ParquetColumnReader::Create(
-        *target_node, target_node->is_repeated(), NULL, this);
+        *target_node, target_node->is_repeated(), nullptr, this);
     if (target_node->is_repeated()) {
-      // Find the closest scalar descendent of 'target_node' via breadth-first search, and
+      // Find the closest scalar descendant of 'target_node' via breadth-first search, and
       // create scalar reader to drive 'reader'. We find the closest (i.e. least-nested)
-      // descendent as a heuristic for picking a descendent with fewer values, so it's
+      // descendant as a heuristic for picking a descendant with fewer values, so it's
       // faster to scan.
       // TODO: use different heuristic than least-nested? Fewest values?
-      const SchemaNode* node = NULL;
+      const SchemaNode* node = nullptr;
       queue<const SchemaNode*> nodes;
       nodes.push(target_node);
       while (!nodes.empty()) {
@@ -1563,7 +1594,7 @@ Status HdfsParquetScanner::CreateCountingReader(const SchemaPath& parent_path,
         if (node->children.size() > 0) {
           for (const SchemaNode& child: node->children) nodes.push(&child);
         } else {
-          // node is the least-nested scalar descendent of 'target_node'
+          // node is the least-nested scalar descendant of 'target_node'
           break;
         }
       }
@@ -1571,52 +1602,52 @@ Status HdfsParquetScanner::CreateCountingReader(const SchemaPath& parent_path,
       CollectionColumnReader* parent_reader =
           static_cast<CollectionColumnReader*>(*reader);
       parent_reader->children()->push_back(
-          ParquetColumnReader::Create(*node, false, NULL, this));
+          ParquetColumnReader::Create(*node, false, nullptr, this));
     }
   } else {
     // Special case for a repeated scalar node. The repeated node represents both the
     // parent collection and the child item.
-    *reader = ParquetColumnReader::Create(*parent_node, false, NULL, this);
+    *reader = ParquetColumnReader::Create(*parent_node, false, nullptr, this);
   }
 
   return Status::OK();
 }
 
-Status HdfsParquetScanner::InitColumns(
-    int row_group_idx, const vector<ParquetColumnReader*>& column_readers) {
+void HdfsParquetScanner::InitCollectionColumns() {
+  for (CollectionColumnReader* col_reader: collection_readers_) {
+    col_reader->Reset();
+  }
+}
+
+Status HdfsParquetScanner::InitScalarColumns(
+    int row_group_idx, const vector<BaseScalarColumnReader*>& column_readers) {
   int64_t partition_id = context_->partition_descriptor()->id();
   const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
-  DCHECK(file_desc != NULL);
+  DCHECK(file_desc != nullptr);
   parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx];
 
   // All the scan ranges (one for each column).
   vector<ScanRange*> col_ranges;
-  // Used to validate that the number of values in each reader in column_readers_ is the
-  // same.
-  int num_values = -1;
+  // Used to validate that the number of values in each reader in column_readers_ at the
+  // same SchemaElement is the same.
+  unordered_map<const parquet::SchemaElement*, int> num_values_map;
   // Used to validate we issued the right number of scan ranges
   int num_scalar_readers = 0;
 
-  for (ParquetColumnReader* col_reader: column_readers) {
-    if (col_reader->IsCollectionReader()) {
-      CollectionColumnReader* collection_reader =
-          static_cast<CollectionColumnReader*>(col_reader);
-      collection_reader->Reset();
-      // Recursively init child readers
-      RETURN_IF_ERROR(InitColumns(row_group_idx, *collection_reader->children()));
-      continue;
-    }
+  for (BaseScalarColumnReader* scalar_reader: column_readers) {
     ++num_scalar_readers;
-
-    BaseScalarColumnReader* scalar_reader =
-        static_cast<BaseScalarColumnReader*>(col_reader);
     const parquet::ColumnChunk& col_chunk = row_group.columns[scalar_reader->col_idx()];
+    auto num_values_it = num_values_map.find(&scalar_reader->schema_element());
+    int num_values = -1;
+    if (num_values_it != num_values_map.end()) {
+      num_values = num_values_it->second;
+    } else {
+      num_values_map[&scalar_reader->schema_element()] = col_chunk.meta_data.num_values;
+    }
     int64_t col_start = col_chunk.meta_data.data_page_offset;
 
-    if (num_values == -1) {
-      num_values = col_chunk.meta_data.num_values;
-    } else if (col_chunk.meta_data.num_values != num_values) {
-      // TODO for 2.3: improve this error message by saying which columns are different,
+    if (num_values != -1 && col_chunk.meta_data.num_values != num_values) {
+      // TODO: improve this error message by saying which columns are different,
       // and also specify column in other error messages as appropriate
       return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(),
           col_chunk.meta_data.num_values, num_values, filename());
@@ -1672,7 +1703,7 @@ Status HdfsParquetScanner::InitColumns(
 
     // Get the stream that will be used for this column
     ScannerContext::Stream* stream = context_->AddStream(col_range);
-    DCHECK(stream != NULL);
+    DCHECK(stream != nullptr);
 
     RETURN_IF_ERROR(scalar_reader->Reset(&col_chunk.meta_data, stream));
   }
@@ -1688,18 +1719,8 @@ Status HdfsParquetScanner::InitColumns(
 }
 
 Status HdfsParquetScanner::InitDictionaries(
-    const vector<ParquetColumnReader*>& column_readers) {
-  for (ParquetColumnReader* col_reader : column_readers) {
-    if (col_reader->IsCollectionReader()) {
-      CollectionColumnReader* collection_reader =
-          static_cast<CollectionColumnReader*>(col_reader);
-      // Recursively init child reader dictionaries
-      RETURN_IF_ERROR(InitDictionaries(*collection_reader->children()));
-      continue;
-    }
-
-    BaseScalarColumnReader* scalar_reader =
-        static_cast<BaseScalarColumnReader*>(col_reader);
+    const vector<BaseScalarColumnReader*>& column_readers) {
+  for (BaseScalarColumnReader* scalar_reader : column_readers) {
     RETURN_IF_ERROR(scalar_reader->InitDictionary());
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 99b5a60..2ddf0fc 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -384,8 +384,8 @@ class HdfsParquetScanner : public HdfsScanner {
 
   boost::scoped_ptr<ParquetSchemaResolver> schema_resolver_;
 
-  /// Buffer to back tuples when reading parquet::Statistics.
-  ScopedBuffer min_max_tuple_buffer_;
+  /// Tuple to hold values when reading parquet::Statistics. Owned by perm_pool_.
+  Tuple* min_max_tuple_;
 
   /// Clone of Min/max statistics conjunct evaluators. Has the same life time as
   /// the scanner. Stored in 'obj_pool_'.
@@ -416,9 +416,10 @@ class HdfsParquetScanner : public HdfsScanner {
     LocalFilterStats() : considered(0), rejected(0), total_possible(0), enabled(1) { }
   };
 
-  /// Pool used for allocating caches of definition/repetition levels that are
-  /// populated by the level readers. The pool is freed in Close().
-  boost::scoped_ptr<MemPool> level_cache_pool_;
+  /// Pool used for allocating caches of definition/repetition levels and tuples for
+  /// dictionary filtering. The definition/repetition levels are populated by the
+  /// level readers. The pool is freed in Close().
+  boost::scoped_ptr<MemPool> perm_pool_;
 
   /// Track statistics of each filter (one for each filter in filter_ctxs_) per scanner so
   /// that expensive aggregation up to the scan node can be performed once, during
@@ -448,16 +449,24 @@ class HdfsParquetScanner : public HdfsScanner {
   /// pages in a column chunk.
   boost::scoped_ptr<MemPool> dictionary_pool_;
 
-  /// Column readers that are eligible for dictionary filtering
-  /// These are pointers to elements of column_readers_
-  std::vector<ParquetColumnReader*> dict_filterable_readers_;
+  /// Column readers that are eligible for dictionary filtering.
+  /// These are pointers to elements of column_readers_. Materialized columns that are
+  /// dictionary encoded correspond to scalar columns that are either top-level columns
+  /// or nested within a collection. CollectionColumnReaders are not eligible for
+  /// dictionary filtering so are not included.
+  std::vector<BaseScalarColumnReader*> dict_filterable_readers_;
 
-  /// Column readers that are not eligible for dictionary filtering
-  /// These are pointers to elements of column_readers_
-  std::vector<ParquetColumnReader*> non_dict_filterable_readers_;
+  /// Column readers that are not eligible for dictionary filtering.
+  /// These are pointers to elements of column_readers_. The readers are either top-level
+  /// or nested within a collection.
+  std::vector<BaseScalarColumnReader*> non_dict_filterable_readers_;
 
-  /// Memory used to store the tuple used for dictionary filtering
-  ScopedBuffer dict_filter_tuple_backing_;
+  /// Flattened collection column readers that point to readers in column_readers_.
+  std::vector<CollectionColumnReader*> collection_readers_;
+
+  /// Memory used to store the tuples used for dictionary filtering. Tuples owned by
+  /// perm_pool_.
+  std::unordered_map<const TupleDescriptor*, Tuple*> dict_filter_tuple_map_;
 
   /// Timer for materializing rows.  This ignores time getting the next buffer.
   ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
@@ -619,14 +628,17 @@ class HdfsParquetScanner : public HdfsScanner {
 
   /// Walks file_metadata_ and initiates reading the materialized columns.  This
   /// initializes 'column_readers' and issues the reads for the columns. 'column_readers'
-  /// should be the readers used to materialize a single tuple (i.e., column_readers_ or
-  /// the children of a collection node).
-  Status InitColumns(
-      int row_group_idx, const std::vector<ParquetColumnReader*>& column_readers)
+  /// includes a mix of scalar readers from multiple schema nodes (i.e., readers of
+  /// top-level scalar columns and readers of scalar columns within a collection node).
+  Status InitScalarColumns(
+      int row_group_idx, const std::vector<BaseScalarColumnReader*>& column_readers)
       WARN_UNUSED_RESULT;
 
+  /// Initializes the column readers in collection_readers_.
+  void InitCollectionColumns();
+
   /// Initialize dictionaries for all column readers
-  Status InitDictionaries(const std::vector<ParquetColumnReader*>& column_readers)
+  Status InitDictionaries(const std::vector<BaseScalarColumnReader*>& column_readers)
       WARN_UNUSED_RESULT;
 
   /// Performs some validation once we've reached the end of a row group to help detect
@@ -645,8 +657,19 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Evaluates whether the column reader is eligible for dictionary predicates
   bool IsDictFilterable(ParquetColumnReader* col_reader);
 
-  /// Divides the column readers into dict_filterable_readers_ and
-  /// non_dict_filterable_readers_. Allocates memory for dict_filter_tuple_backing_.
+  /// Evaluates whether the column reader is eligible for dictionary predicates.
+  bool IsDictFilterable(BaseScalarColumnReader* col_reader);
+
+  /// Partitions the readers into scalar and collection readers. The collection readers
+  /// are flattened into collection_readers_. The scalar readers are partitioned into
+  /// dict_filterable_readers_ and non_dict_filterable_readers_ depending on whether
+  /// dictionary filtering is enabled and the reader can be dictionary filtered.
+  void PartitionReaders(const vector<ParquetColumnReader*>& readers,
+                        bool can_eval_dict_filters);
+
+  /// Divides the column readers into dict_filterable_readers_,
+  /// non_dict_filterable_readers_ and collection_readers_. Allocates memory for
+  /// dict_filter_tuple_map_.
   Status InitDictFilterStructures() WARN_UNUSED_RESULT;
 
   /// Returns true if all of the data pages in the column chunk are dictionary encoded

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 2b310af..70fbac2 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -393,7 +393,8 @@ class HdfsScanNodeBase : public ScanNode {
   ConjunctsMap conjuncts_map_;
   ConjunctEvaluatorsMap conjunct_evals_map_;
 
-  /// Dictionary filtering eligible conjuncts for each slot.
+  /// Dictionary filtering eligible conjuncts for each slot. Set to nullptr when there
+  /// are no dictionary filters.
   const TDictFilterConjunctsMap* thrift_dict_filter_conjuncts_map_;
 
   /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 4cafa5d..f934f79 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -83,12 +83,20 @@ Status HdfsScanner::Open(ScannerContext* context) {
   // Set up the scan node's dictionary filtering conjuncts map.
   if (scan_node_->thrift_dict_filter_conjuncts_map() != nullptr) {
     for (auto& entry : *(scan_node_->thrift_dict_filter_conjuncts_map())) {
+      SlotDescriptor* slot_desc = state_->desc_tbl().GetSlotDescriptor(entry.first);
+      TupleId tuple_id = (slot_desc->type().IsCollectionType() ?
+          slot_desc->collection_item_descriptor()->id() :
+          slot_desc->parent()->id());
+      auto conjunct_evals_it = conjunct_evals_map_.find(tuple_id);
+      DCHECK(conjunct_evals_it != conjunct_evals_map_.end());
+      const vector<ScalarExprEvaluator*>& conjunct_evals = conjunct_evals_it->second;
+
       // Convert this slot's list of conjunct indices into a list of pointers
       // into conjunct_evals_.
       for (int conjunct_idx : entry.second) {
-        DCHECK_LT(conjunct_idx, conjunct_evals_->size());
-        DCHECK((*conjunct_evals_)[conjunct_idx] != nullptr);
-        dict_filter_map_[entry.first].push_back((*conjunct_evals_)[conjunct_idx]);
+        DCHECK_LT(conjunct_idx, conjunct_evals.size());
+        DCHECK((conjunct_evals)[conjunct_idx] != nullptr);
+        dict_filter_map_[entry.first].push_back((conjunct_evals)[conjunct_idx]);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 6228744..e3c186f 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -217,8 +217,9 @@ class HdfsScanner {
   // scanners that do not support nested types.
   const std::vector<ScalarExprEvaluator*>* conjunct_evals_ = nullptr;
 
-  // Clones of the conjuncts' evaluators in scan_node_->dict_filter_conjuncts_map().
-  typedef std::map<SlotId, std::vector<ScalarExprEvaluator*>> DictFilterConjunctsMap;
+  // Clones of the conjuncts' evaluators in scan_node_->thrift_dict_filter_conjuncts().
+  typedef std::map<SlotId, std::vector<ScalarExprEvaluator*>>
+      DictFilterConjunctsMap;
   DictFilterConjunctsMap dict_filter_map_;
 
   /// Holds memory for template tuples. The memory in this pool must remain valid as long

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 317a4a5..099fdce 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -1159,14 +1159,14 @@ Status BaseScalarColumnReader::ReadDataPage() {
     // Initialize the repetition level data
     RETURN_IF_ERROR(rep_levels_.Init(filename(),
         current_page_header_.data_page_header.repetition_level_encoding,
-        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
+        parent_->perm_pool_.get(), parent_->state_->batch_size(),
         max_rep_level(), num_buffered_values_,
         &data_, &data_size));
 
     // Initialize the definition level data
     RETURN_IF_ERROR(def_levels_.Init(filename(),
         current_page_header_.data_page_header.definition_level_encoding,
-        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
+        parent_->perm_pool_.get(), parent_->state_->batch_size(),
         max_def_level(), num_buffered_values_, &data_, &data_size));
 
     // Data can be empty if the column contains all NULLs

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/runtime/collection-value-builder.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/collection-value-builder.h b/be/src/runtime/collection-value-builder.h
index 1d20ef4..d75e94b 100644
--- a/be/src/runtime/collection-value-builder.h
+++ b/be/src/runtime/collection-value-builder.h
@@ -25,7 +25,7 @@
 
 namespace impala {
 
-/// Class for constructing an CollectionValue when the total size isn't known
+/// Class for constructing a CollectionValue when the total size isn't known
 /// up-front. This class handles allocating the buffer backing the collection from a
 /// MemPool, and uses a doubling strategy for growing the collection.
 class CollectionValueBuilder {

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/runtime/scoped-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/scoped-buffer.h b/be/src/runtime/scoped-buffer.h
index cf77dd6..17fb8e2 100644
--- a/be/src/runtime/scoped-buffer.h
+++ b/be/src/runtime/scoped-buffer.h
@@ -24,6 +24,7 @@ namespace impala {
 
 /// A scoped memory allocation that is tracked against a MemTracker.
 /// The allocation is automatically freed when the ScopedBuffer object goes out of scope.
+/// NOTE: if multiple allocations share the same lifetime, prefer to use MemPool.
 class ScopedBuffer {
  public:
   ScopedBuffer(MemTracker* mem_tracker) : mem_tracker_(mem_tracker),

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 97ef1b3..fedca3c 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -229,8 +229,7 @@ struct THdfsScanNode {
   // Tuple to evaluate 'min_max_conjuncts' against.
   8: optional Types.TTupleId min_max_tuple_id
 
-  // Map from SlotIds to the indices in TPlanNode.conjuncts that are eligible
-  // for dictionary filtering.
+  // The conjuncts that are eligible for dictionary filtering.
   9: optional map<Types.TSlotId, list<i32>> dictionary_filter_conjuncts
 
   // The byte offset of the slot for Parquet metadata if Parquet count star optimization

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
index e303a11..4f0a0e1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
@@ -69,12 +69,16 @@ public class SlotDescriptor {
   private ColumnStats stats_;  // only set if 'column' isn't set
 
   SlotDescriptor(SlotId id, TupleDescriptor parent) {
+    Preconditions.checkNotNull(id);
+    Preconditions.checkNotNull(parent);
     id_ = id;
     parent_ = parent;
     byteOffset_ = -1;  // invalid
   }
 
   SlotDescriptor(SlotId id, TupleDescriptor parent, SlotDescriptor src) {
+    Preconditions.checkNotNull(id);
+    Preconditions.checkNotNull(parent);
     id_ = id;
     parent_ = parent;
     type_ = src.type_;

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
index a87cd3a..6c33861 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
@@ -48,6 +48,8 @@ import com.google.common.collect.Lists;
  * The list of slots tracks the named slots that are actually referenced in a query, as
  * well as all anonymous slots. Although not required, a tuple descriptor typically
  * only has named or anonymous slots and not a mix of both.
+ * Each tuple and slot descriptor has an associated unique id (within the scope of a
+ * query). A given slot descriptor is owned by exactly one tuple descriptor.
  *
  * For example, every table reference has a corresponding tuple descriptor. The columns
  * of the table are represented by the tuple descriptor's type (struct type with one

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index b13f435..596129b 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -100,10 +100,17 @@ import com.google.common.collect.Sets;
  * TABLESAMPLE clause. Scan predicates and the sampling are independent, so we first
  * prune partitions and then randomly select files from those partitions.
  *
- * For scans of tables with Parquet files the class creates an additional list of
- * conjuncts that are passed to the backend and will be evaluated against the
- * parquet::Statistics of row groups. If the conjuncts don't match, then whole row groups
- * will be skipped.
+ * For scans of tables with Parquet files the class sends over additional information
+ * to the backend to enable more aggressive runtime pruning. Two types of pruning are
+ * supported:
+ *
+ * 1. Min-max pruning: the class creates an additional list of conjuncts from applicable
+ * scan-node conjuncts and collection conjuncts. The additional conjuncts are
+ * used to prune a row group if any fail the row group's min-max parquet::Statistics.
+ *
+ * 2. Dictionary pruning: the class identifies which scan-node conjuncts and collection
+ * conjuncts can be used to prune a row group by evaluating conjuncts on the
+ * column dictionaries.
  *
  * Count(*) aggregation optimization flow:
  * The caller passes in an AggregateInfo to the constructor that this scan node uses to
@@ -181,11 +188,20 @@ public class HdfsScanNode extends ScanNode {
 
   // TupleDescriptors of collection slots that have an IsNotEmptyPredicate. See
   // SelectStmt#registerIsNotEmptyPredicates.
+  // Correctness for applying min-max and dictionary filters requires that the nested
+  // collection is tested to be not empty (via the IsNotEmptyPredicate).
+  // These filters are added by analysis (see: SelectStmt#registerIsNotEmptyPredicates).
+  // While correct, they may be conservative. See the tests for parquet collection
+  // filtering for examples that could benefit from being more aggressive
+  // (yet still correct).
   private final Set<TupleDescriptor> notEmptyCollections_ = Sets.newHashSet();
 
-  // Map from SlotIds to indices in PlanNodes.conjuncts_ that are eligible for
-  // dictionary filtering
-  private final Map<Integer, List<Integer>> dictionaryFilterConjuncts_ =
+  // Map from SlotDescriptor to indices in PlanNodes.conjuncts_ and
+  // collectionConjuncts_ that are eligible for dictionary filtering. Slots in the
+  // the TupleDescriptor of this scan node map to indices into PlanNodes.conjuncts_ and
+  // slots in the TupleDescriptors of nested types map to indices into
+  // collectionConjuncts_.
+  private Map<SlotDescriptor, List<Integer>> dictionaryFilterConjuncts_ =
       Maps.newLinkedHashMap();
 
   // Number of partitions that have the row count statistic.
@@ -525,10 +541,10 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
-   * Analyzes 'conjuncts_', populates 'minMaxTuple_' with slots for statistics values, and
-   * populates 'minMaxConjuncts_' with conjuncts pointing into the 'minMaxTuple_'. Only
-   * conjuncts of the form <slot> <op> <constant> are supported, and <op> must be one of
-   * LT, LE, GE, GT, or EQ.
+   * Analyzes 'conjuncts_' and 'collectionConjuncts_', populates 'minMaxTuple_' with slots
+   * for statistics values, and populates 'minMaxConjuncts_' with conjuncts pointing into
+   * the 'minMaxTuple_'. Only conjuncts of the form <slot> <op> <constant> are supported,
+   * and <op> must be one of LT, LE, GE, GT, or EQ.
    */
   private void computeMinMaxTupleAndConjuncts(Analyzer analyzer) throws ImpalaException{
     Preconditions.checkNotNull(desc_.getPath());
@@ -542,10 +558,6 @@ public class HdfsScanNode extends ScanNode {
 
     // Adds predicates for collections.
     for (Map.Entry<TupleDescriptor, List<Expr>> entry: collectionConjuncts_.entrySet()) {
-      // Adds only predicates for collections that are filtered by an IsNotEmptyPredicate.
-      // It is assumed that analysis adds these filters such that they are correct, but
-      // potentially conservative. See the tests for examples that could benefit from
-      // being more aggressive (yet still correct).
       if (notEmptyCollections_.contains(entry.getKey())) {
         for (Expr pred: entry.getValue()) tryComputeMinMaxPredicate(analyzer, pred);
       }
@@ -606,49 +618,68 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
-   * Walks through conjuncts and populates dictionaryFilterConjuncts_.
+   * Adds an entry to dictionaryFilterConjuncts_ if dictionary filtering is applicable
+   * for conjunct. The dictionaryFilterConjuncts_ entry maps the conjunct's tupleId and
+   * slotId to conjunctIdx. The conjunctIdx is the offset into a list of conjuncts;
+   * either conjuncts_ (for scan node's tupleId) or collectionConjuncts_ (for nested
+   * collections).
    */
-  private void computeDictionaryFilterConjuncts(Analyzer analyzer) {
-    for (int conjunct_idx = 0; conjunct_idx < conjuncts_.size(); ++conjunct_idx) {
-      Expr conjunct = conjuncts_.get(conjunct_idx);
-      List<TupleId> tupleIds = Lists.newArrayList();
-      List<SlotId> slotIds = Lists.newArrayList();
-
-      conjunct.getIds(tupleIds, slotIds);
-      if (slotIds.size() == 0) continue;
-      Preconditions.checkState(tupleIds.size() == 1);
-      if (slotIds.size() != 1) continue;
-
-      // Check to see if this slot is a collection type. Nested types are
-      // currently not supported. For example, an IsNotEmptyPredicate cannot
-      // be evaluated at the dictionary level.
-      if (analyzer.getSlotDesc(slotIds.get(0)).getType().isCollectionType()) continue;
-
-      // Check to see if this conjunct contains any known randomized function
-      if (conjunct.contains(Expr.IS_NONDETERMINISTIC_BUILTIN_FN_PREDICATE)) continue;
-
-      // Check to see if the conjunct evaluates to true when the slot is NULL
-      // This is important for dictionary filtering. Dictionaries do not
-      // contain an entry for NULL and do not provide an indication about
-      // whether NULLs are present. A conjunct that evaluates to true on NULL
-      // cannot be evaluated purely on the dictionary.
-      try {
-        if (analyzer.isTrueWithNullSlots(conjunct)) continue;
-      } catch (InternalException e) {
-        // Expr evaluation failed in the backend. Skip this conjunct since we cannot
-        // determine whether it is safe to apply it against a dictionary.
-        LOG.warn("Skipping dictionary filter because backend evaluation failed: "
-            + conjunct.toSql(), e);
-        continue;
-      }
+  private void addDictionaryFilter(Analyzer analyzer, Expr conjunct, int conjunctIdx) {
+    List<TupleId> tupleIds = Lists.newArrayList();
+    List<SlotId> slotIds = Lists.newArrayList();
+    conjunct.getIds(tupleIds, slotIds);
+    // Only single-slot conjuncts are eligible for dictionary filtering. When pruning
+    // a row-group, the conjunct must be evaluated only against a single row-group
+    // at-a-time. Expect a single slot conjunct to be associated with a single tuple-id.
+    if (slotIds.size() != 1) return;
+
+    // Check to see if this slot is a collection type. Dictionary pruning is applicable
+    // to scalar values nested in collection types, not enclosing collection types.
+    if (analyzer.getSlotDesc(slotIds.get(0)).getType().isCollectionType()) return;
+
+    // Check to see if this conjunct contains any known randomized function
+    if (conjunct.contains(Expr.IS_NONDETERMINISTIC_BUILTIN_FN_PREDICATE)) return;
+
+    // Check to see if the conjunct evaluates to true when the slot is NULL
+    // This is important for dictionary filtering. Dictionaries do not
+    // contain an entry for NULL and do not provide an indication about
+    // whether NULLs are present. A conjunct that evaluates to true on NULL
+    // cannot be evaluated purely on the dictionary.
+    try {
+      if (analyzer.isTrueWithNullSlots(conjunct)) return;
+    } catch (InternalException e) {
+      // Expr evaluation failed in the backend. Skip this conjunct since we cannot
+      // determine whether it is safe to apply it against a dictionary.
+      LOG.warn("Skipping dictionary filter because backend evaluation failed: "
+          + conjunct.toSql(), e);
+      return;
+    }
 
-      // TODO: Should there be a limit on the cost/structure of the conjunct?
-      Integer slotIdInt = slotIds.get(0).asInt();
-      if (dictionaryFilterConjuncts_.containsKey(slotIdInt)) {
-        dictionaryFilterConjuncts_.get(slotIdInt).add(conjunct_idx);
-      } else {
-        List<Integer> slotList = Lists.newArrayList(conjunct_idx);
-        dictionaryFilterConjuncts_.put(slotIdInt, slotList);
+    // TODO: Should there be a limit on the cost/structure of the conjunct?
+    SlotId slotId = slotIds.get(0);
+    SlotDescriptor slotKey = analyzer.getSlotDesc(slotId);
+    List<Integer> slotList = dictionaryFilterConjuncts_.get(slotKey);
+    if (slotList == null) {
+      slotList = Lists.newArrayList();
+      dictionaryFilterConjuncts_.put(slotKey, slotList);
+    }
+    slotList.add(conjunctIdx);
+  }
+
+  /**
+   * Walks through conjuncts_ and collectionConjuncts_ and populates
+   * dictionaryFilterConjuncts_.
+   */
+  private void computeDictionaryFilterConjuncts(Analyzer analyzer) {
+    for (int conjunctIdx = 0; conjunctIdx < conjuncts_.size(); ++conjunctIdx) {
+      addDictionaryFilter(analyzer, conjuncts_.get(conjunctIdx), conjunctIdx);
+    }
+    for (Map.Entry<TupleDescriptor, List<Expr>> entry: collectionConjuncts_.entrySet()) {
+      if (notEmptyCollections_.contains(entry.getKey())) {
+        List<Expr> conjuncts = entry.getValue();
+        for (int conjunctIdx = 0; conjunctIdx < conjuncts.size(); ++conjunctIdx) {
+          addDictionaryFilter(analyzer, conjuncts.get(conjunctIdx), conjunctIdx);
+        }
       }
     }
   }
@@ -986,7 +1017,12 @@ public class HdfsScanNode extends ScanNode {
       }
       msg.hdfs_scan_node.setMin_max_tuple_id(minMaxTuple_.getId().asInt());
     }
-    msg.hdfs_scan_node.setDictionary_filter_conjuncts(dictionaryFilterConjuncts_);
+    Map<Integer, List<Integer>> dictMap = Maps.newLinkedHashMap();
+    for (Map.Entry<SlotDescriptor, List<Integer>> entry :
+      dictionaryFilterConjuncts_.entrySet()) {
+      dictMap.put(entry.getKey().getId().asInt(), entry.getValue());
+    }
+    msg.hdfs_scan_node.setDictionary_filter_conjuncts(dictMap);
   }
 
   @Override
@@ -1008,8 +1044,8 @@ public class HdfsScanNode extends ScanNode {
           PrintUtils.printBytes(totalBytes_)));
       output.append("\n");
       if (!conjuncts_.isEmpty()) {
-        output.append(
-            detailPrefix + "predicates: " + getExplainString(conjuncts_) + "\n");
+        output.append(String.format("%spredicates: %s\n", detailPrefix,
+            getExplainString(conjuncts_)));
       }
       if (!collectionConjuncts_.isEmpty()) {
         for (Map.Entry<TupleDescriptor, List<Expr>> entry:
@@ -1042,23 +1078,54 @@ public class HdfsScanNode extends ScanNode {
             totalFiles_, numScanRangesNoDiskIds_, scanRanges_.size()));
       }
       if (!minMaxOriginalConjuncts_.isEmpty()) {
-        output.append(detailPrefix + "parquet statistics predicates: " +
-            getExplainString(minMaxOriginalConjuncts_) + "\n");
+        output.append(String.format("%sparquet statistics predicates: %s\n",
+            detailPrefix, getExplainString(minMaxOriginalConjuncts_)));
       }
-      if (!dictionaryFilterConjuncts_.isEmpty()) {
-        List<Integer> totalIdxList = Lists.newArrayList();
-        for (List<Integer> idxList : dictionaryFilterConjuncts_.values()) {
-          totalIdxList.addAll(idxList);
-        }
-        // Since the conjuncts are stored by the slot id, they are not necessarily
-        // in the same order as the normal conjuncts. Sort the indices so that the
-        // order matches the normal conjuncts.
-        Collections.sort(totalIdxList);
-        List<Expr> exprList = Lists.newArrayList();
-        for (Integer idx : totalIdxList) exprList.add(conjuncts_.get(idx));
-        output.append(String.format("%sparquet dictionary predicates: %s\n",
-            detailPrefix, getExplainString(exprList)));
+      // Groups the dictionary filterable conjuncts by tuple descriptor.
+      output.append(getDictionaryConjunctsExplainString(detailPrefix));
+    }
+    return output.toString();
+  }
+
+  // Helper method that prints the dictionary filterable conjuncts by tuple descriptor.
+  private String getDictionaryConjunctsExplainString(String prefix) {
+    StringBuilder output = new StringBuilder();
+    Map<TupleDescriptor, List<Integer>> perTupleConjuncts = Maps.newLinkedHashMap();
+    for (Map.Entry<SlotDescriptor, List<Integer>> entry :
+      dictionaryFilterConjuncts_.entrySet()) {
+      SlotDescriptor slotDescriptor = entry.getKey();
+      TupleDescriptor tupleDescriptor = slotDescriptor.getParent();
+      List<Integer> indexes = perTupleConjuncts.get(tupleDescriptor);
+      if (indexes == null) {
+        indexes = Lists.newArrayList();
+        perTupleConjuncts.put(tupleDescriptor, indexes);
+      }
+      indexes.addAll(entry.getValue());
+    }
+    for (Map.Entry<TupleDescriptor, List<Integer>> entry :
+      perTupleConjuncts.entrySet()) {
+      List<Integer> totalIdxList = entry.getValue();
+      // Since the conjuncts are stored by the slot id, they are not necessarily
+      // in the same order as the normal conjuncts. Sort the indices so that the
+      // order matches the normal conjuncts.
+      Collections.sort(totalIdxList);
+      List<Expr> conjuncts;
+      TupleDescriptor tupleDescriptor = entry.getKey();
+      String tupleName = "";
+      if (tupleDescriptor == getTupleDesc()) {
+        conjuncts = conjuncts_;
+      } else {
+        conjuncts = collectionConjuncts_.get(tupleDescriptor);
+        tupleName = " on " + tupleDescriptor.getAlias();
+      }
+      Preconditions.checkNotNull(conjuncts);
+      List<Expr> exprList = Lists.newArrayList();
+      for (Integer idx : totalIdxList) {
+        Preconditions.checkState(idx.intValue() < conjuncts.size());
+        exprList.add(conjuncts.get(idx));
       }
+      output.append(String.format("%sparquet dictionary predicates%s: %s\n",
+          prefix, tupleName, getExplainString(exprList)));
     }
     return output.toString();
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/CustomerMultiBlock/README
----------------------------------------------------------------------
diff --git a/testdata/CustomerMultiBlock/README b/testdata/CustomerMultiBlock/README
new file mode 100644
index 0000000..edc02e7
--- /dev/null
+++ b/testdata/CustomerMultiBlock/README
@@ -0,0 +1,12 @@
+This file is created to test IMPALA-4993. The file contains a subset
+of tpch_nested_parquet.customer, but written out using multiple row
+groups. The file was created by following the instructions in
+testdata/bin/load_nested.py to create the table, tmp_customer, which
+is then written out in parquet format using hive:
+
+SET parquet.block.size=8192;
+
+CREATE TABLE customer
+STORED AS PARQUET
+TBLPROPERTIES('parquet.compression'='SNAPPY')
+AS SELECT * FROM tmp_customer where c_custkey < 200;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/CustomerMultiBlock/customer_multiblock.parquet
----------------------------------------------------------------------
diff --git a/testdata/CustomerMultiBlock/customer_multiblock.parquet b/testdata/CustomerMultiBlock/customer_multiblock.parquet
new file mode 100644
index 0000000..5a14cc8
Binary files /dev/null and b/testdata/CustomerMultiBlock/customer_multiblock.parquet differ

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/datasets/functional/functional_schema_template.sql
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 3fcb5f7..f4b6b34 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -2051,6 +2051,25 @@ L_SHIPMODE STRING
 L_COMMENT STRING
 ====
 ---- DATASET
+-- IMPALA-4933: tests nested collections stored in multiple row-groups.
+---- BASE_TABLE_NAME
+customer_multiblock
+---- COLUMNS
+C_CUSTKEY BIGINT
+C_NAME STRING
+C_ADDRESS STRING
+C_NATIONKEY SMALLINT
+C_PHONE STRING
+C_ACCTBAL DECIMAL(12, 2)
+C_MKTSEGMENT STRING
+C_COMMENT STRING
+C_ORDERS ARRAY<STRUCT<O_ORDERKEY: BIGINT, O_ORDERSTATUS: STRING, O_TOTALPRICE: DECIMAL(12, 2), O_ORDERDATE: STRING, O_ORDERPRIORITY: STRING, O_CLERK: STRING, O_SHIPPRIORITY: INT, O_COMMENT: STRING, O_LINEITEMS: ARRAY<STRUCT<L_PARTKEY: BIGINT, L_SUPPKEY: BIGINT, L_LINENUMBER: INT, L_QUANTITY: DECIMAL(12, 2), L_EXTENDEDPRICE: DECIMAL(12, 2), L_DISCOUNT: DECIMAL(12, 2), L_TAX: DECIMAL(12, 2), L_RETURNFLAG: STRING, L_LINESTATUS: STRING, L_SHIPDATE: STRING, L_COMMITDATE: STRING, L_RECEIPTDATE: STRING, L_SHIPINSTRUCT: STRING, L_SHIPMODE: STRING, L_COMMENT: STRING>>>>
+---- DEPENDENT_LOAD
+`hadoop fs -mkdir -p /test-warehouse/customer_multiblock_parquet && \
+hadoop fs -put -f ${IMPALA_HOME}/testdata/CustomerMultiBlock/customer_multiblock.parquet \
+/test-warehouse/customer_multiblock_parquet/
+====
+---- DATASET
 functional
 ---- BASE_TABLE_NAME
 bzip2_tbl

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/datasets/functional/schema_constraints.csv
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index a3566c4..355e9b1 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -52,6 +52,7 @@ table_name:bad_column_metadata, constraint:restrict_to, table_format:parquet/non
 table_name:lineitem_multiblock, constraint:restrict_to, table_format:parquet/none/none
 table_name:lineitem_sixblocks, constraint:restrict_to, table_format:parquet/none/none
 table_name:lineitem_multiblock_one_row_group, constraint:restrict_to, table_format:parquet/none/none
+table_name:customer_multiblock, constraint:restrict_to, table_format:parquet/none/none
 
 # TODO: Support Avro. Data loading currently fails for Avro because complex types
 # cannot be converted to the corresponding Avro types yet.

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 031ac1c..2b2d5ef 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -56,6 +56,8 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled
    parquet statistics predicates: c_custkey > 10, o_orderkey = 4
    parquet dictionary predicates: c_custkey > 10
+   parquet dictionary predicates on o: o_orderkey = 4
+   parquet dictionary predicates on o_lineitems: 20 + l_linenumber < 0
    mem-estimate=176.00MB mem-reservation=0B
    tuple-ids=0 row-size=24B cardinality=15000
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index 4258772..f3a46de 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -253,6 +253,8 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled
    parquet statistics predicates: c_custkey < 10, o_orderkey < 5, l_linenumber < 3
    parquet dictionary predicates: c_custkey < 10
+   parquet dictionary predicates on o: o_orderkey < 5
+   parquet dictionary predicates on o_lineitems: l_linenumber < 3
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=254B cardinality=15000
 ---- PARALLELPLANS
@@ -314,6 +316,8 @@ Per-Host Resources: mem-estimate=264.00MB mem-reservation=0B
    extrapolated-rows=disabled
    parquet statistics predicates: c_custkey < 10, o_orderkey < 5, l_linenumber < 3
    parquet dictionary predicates: c_custkey < 10
+   parquet dictionary predicates on o: o_orderkey < 5
+   parquet dictionary predicates on o_lineitems: l_linenumber < 3
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=254B cardinality=15000
 ====
@@ -365,6 +369,7 @@ PLAN-ROOT SINK
      columns missing stats: c_orders, c_orders
    extrapolated-rows=disabled
    parquet statistics predicates: o1.o_orderkey < 5
+   parquet dictionary predicates on o1: o1.o_orderkey < 5
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=270B cardinality=150000
 ---- PARALLELPLANS
@@ -417,6 +422,7 @@ Per-Host Resources: mem-estimate=269.81MB mem-reservation=5.81MB
      columns missing stats: c_orders, c_orders
    extrapolated-rows=disabled
    parquet statistics predicates: o1.o_orderkey < 5
+   parquet dictionary predicates on o1: o1.o_orderkey < 5
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=270B cardinality=150000
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index 66770cd..e7dee4e 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -151,6 +151,7 @@ PLAN-ROOT SINK
      columns missing stats: id
    extrapolated-rows=disabled
    parquet statistics predicates: a.item.e < -10
+   parquet dictionary predicates on a: a.item.e < -10
    mem-estimate=32.00MB mem-reservation=0B
    tuple-ids=0 row-size=24B cardinality=unavailable
 ====
@@ -328,6 +329,8 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled
    parquet statistics predicates: c_custkey > 0, o.o_orderkey > 0, l.l_partkey > 0
    parquet dictionary predicates: c_custkey > 0
+   parquet dictionary predicates on o: o.o_orderkey > 0
+   parquet dictionary predicates on l: l.l_partkey > 0
    mem-estimate=176.00MB mem-reservation=0B
    tuple-ids=0 row-size=24B cardinality=15000
 ====
@@ -374,3 +377,66 @@ PLAN-ROOT SINK
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=16B cardinality=unavailable
 ====
+# Multiple nested collection values (at the same nesting level) where dictionary
+# pruning is applicable.
+select c_name, o.o_clerk from tpch_nested_parquet.customer c,
+c.c_orders o, o.o_lineitems l
+where l.l_shipdate = '1994-08-19' and
+l.l_receiptdate = '1994-08-24' and l.l_shipmode = 'RAIL' and l.l_returnflag = 'R' and
+l.l_comment is null;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=176.00MB mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:SUBPLAN
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2,1,0 row-size=162B cardinality=15000000
+|
+|--08:NESTED LOOP JOIN [CROSS JOIN]
+|  |  mem-estimate=50B mem-reservation=0B
+|  |  tuple-ids=2,1,0 row-size=162B cardinality=100
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     mem-estimate=0B mem-reservation=0B
+|  |     tuple-ids=0 row-size=50B cardinality=1
+|  |
+|  04:SUBPLAN
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=2,1 row-size=112B cardinality=100
+|  |
+|  |--07:NESTED LOOP JOIN [CROSS JOIN]
+|  |  |  mem-estimate=32B mem-reservation=0B
+|  |  |  tuple-ids=2,1 row-size=112B cardinality=10
+|  |  |
+|  |  |--05:SINGULAR ROW SRC
+|  |  |     parent-subplan=04
+|  |  |     mem-estimate=0B mem-reservation=0B
+|  |  |     tuple-ids=1 row-size=32B cardinality=1
+|  |  |
+|  |  06:UNNEST [o.o_lineitems l]
+|  |     parent-subplan=04
+|  |     mem-estimate=0B mem-reservation=0B
+|  |     tuple-ids=2 row-size=0B cardinality=10
+|  |
+|  03:UNNEST [c.c_orders o]
+|     parent-subplan=01
+|     mem-estimate=0B mem-reservation=0B
+|     tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c]
+   partitions=1/1 files=4 size=292.36MB
+   predicates: !empty(c.c_orders)
+   predicates on o: !empty(o.o_lineitems)
+   predicates on l: l.l_shipdate = '1994-08-19', l.l_receiptdate = '1994-08-24', l.l_shipmode = 'RAIL', l.l_returnflag = 'R', l.l_comment IS NULL
+   stored statistics:
+     table: rows=150000 size=292.36MB
+     columns missing stats: c_orders
+   extrapolated-rows=disabled
+   parquet statistics predicates: l.l_shipdate = '1994-08-19', l.l_receiptdate = '1994-08-24', l.l_shipmode = 'RAIL', l.l_returnflag = 'R'
+   parquet dictionary predicates on l: l.l_shipdate = '1994-08-19', l.l_receiptdate = '1994-08-24', l.l_shipmode = 'RAIL', l.l_returnflag = 'R'
+   mem-estimate=176.00MB mem-reservation=0B
+   tuple-ids=0 row-size=50B cardinality=150000
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/workloads/functional-query/queries/QueryTest/parquet-filtering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-filtering.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-filtering.test
index f90e660..7a4876f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet-filtering.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-filtering.test
@@ -1,7 +1,4 @@
-# This tests parquet dictionary filtering. It is mirrored with mt_dop
-# in mt-dop-parquet-filtering.test. Since the two rely on counting
-# the number of row groups filtered, differing parallelism changes
-# the counts seen in the output.
+# This tests parquet dictionary filtering.
 ====
 ---- QUERY
 # id: All values pass
@@ -258,4 +255,234 @@ select count(*) from functional_parquet.alltypes where mod(id,10000) = 7301;
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 24
 aggregation(SUM, NumDictFilteredRowGroups): 0
-====
\ No newline at end of file
+====
+---- QUERY
+# Nested dictionary filtering.
+#
+# Test coverage includes the following dimensions:
+# - nested collection type: map, array, struct
+# - number of filters and their nesting depth
+# - number of projections and their nesting depth
+# - required vs. non-required collections (outer vs. inner joins)
+# - filter matches: some, none
+# - count(*) optimization (exercises special code path)
+# - multiple row-groups per file
+#
+# Map key at depth 1. All required. No matches. Only one row-group is dictionary filtered
+# since only one (of two) row-groups is dictionary encoded for that column.
+select id from functional_parquet.complextypestbl f, f.int_map m where m.key = 'k5'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 1
+====
+---- QUERY
+select count(*) from functional_parquet.complextypestbl.int_map m where m.key = 'k5'
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 1
+====
+---- QUERY
+# Map key at depth 1. All required. Matches.
+select id from functional_parquet.complextypestbl f, f.int_map m where m.key = 'k1'
+---- RESULTS
+8
+1
+2
+7
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Map key at depth 1. None required. No matches.
+select id from functional_parquet.complextypestbl f left outer join f.int_map m
+where m.key = 'k5'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Map key at depth 1. None required. Matches.
+select id from functional_parquet.complextypestbl f left outer join f.int_map m
+where m.key = 'k1'
+---- RESULTS
+8
+1
+2
+7
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Map key at depth 1. All required. No matches. count(*).
+select count(*) from functional_parquet.complextypestbl f, f.int_map m where m.key = 'k5'
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 1
+---- QUERY
+# Map key at depth 1. All required. Matches. count(*)
+select count(*) from functional_parquet.complextypestbl f, f.int_map m where m.key = 'k1'
+---- RESULTS
+4
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Map key at depth 1. None required. No matches. count(*)
+select count(*) from functional_parquet.complextypestbl f left outer join f.int_map m
+where m.key = 'k5'
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Map key at depth 1. None required. Matches. count(*)
+select count(*) from functional_parquet.complextypestbl f left outer join f.int_map m
+where m.key = 'k1'
+---- RESULTS
+4
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Array struct value at depth 2. All required. Matches.
+select r_name from tpch_nested_parquet.region r, r.r_nations n where n.n_name = 'FRANCE'
+---- RESULTS
+'EUROPE'
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Array struct value at depth 3. All required. No matches.
+select c_name from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems l
+where l.l_returnflag = 'foo'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 4
+====
+---- QUERY
+# Array struct value at depth 3. All required. No matches.
+# Multiple nested values projected.
+select c_name, o.o_clerk from tpch_nested_parquet.customer c,
+c.c_orders o, o.o_lineitems l
+where l.l_returnflag = 'foo'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 4
+====
+---- QUERY
+# Array struct value at depth 3. Bottom not required. No matches.
+# Multiple nested values projected.
+select c_name, o.o_clerk from tpch_nested_parquet.customer c,
+c.c_orders o left outer join o.o_lineitems l
+where l.l_returnflag = 'foo'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Array struct value at depth 3. Top not required. No matches.
+# Multiple nested values projected.
+select c_name, o.o_clerk from tpch_nested_parquet.customer c left outer join c.c_orders o,
+o.o_lineitems l
+where l.l_returnflag = 'foo'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Array struct value at depth 3. All required. Multiple filters, multiple projections.
+# Matches.
+select c_name, o.o_clerk from tpch_nested_parquet.customer c,
+c.c_orders o, o.o_lineitems l
+where l.l_shipdate = '1994-08-19' and
+l.l_receiptdate = '1994-08-24' and l.l_shipmode = 'RAIL' and l.l_returnflag = 'R'
+---- RESULTS
+'Customer#000013873','Clerk#000000554'
+'Customer#000049757','Clerk#000000156'
+'Customer#000037490','Clerk#000000026'
+'Customer#000002836','Clerk#000000577'
+'Customer#000004897','Clerk#000000112'
+'Customer#000107891','Clerk#000000576'
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 0
+---- QUERY
+# Array struct value at depth 3. All required. No matches, count(*).
+select count(*) from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems l
+where l.l_returnflag = 'foo'
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 4
+====
+---- QUERY
+# Array struct values at depths 2 and 3. All required.
+# Matches, multiple nested values projected.
+select c_name, o.o_comment from tpch_nested_parquet.customer c,
+c.c_orders o, o.o_lineitems l
+where l.l_returnflag = 'foo' and o.o_clerk = 'foo'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 4
+====
+---- QUERY
+# Array struct value at depth 2. Not required. No match.
+# Illustrates a case that should not be pruned.
+select count(*) from tpch_nested_parquet.customer c left outer join
+(select * from c.c_orders o where o.o_orderstatus = 'foo') v
+---- RESULTS
+150000
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Tests dictionary filtering with files with multiple row-groups.
+# Expect all results to be filtered.
+select l.l_linenumber from functional_parquet.customer_multiblock c,
+c.c_orders o, o.o_lineitems l
+where l.l_linenumber + 1 < 0;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 2
+---- QUERY
+# Tests dictionary filtering with files with multiple row-groups and count(*).
+# Expect all results to be filtered.
+select count(*) from functional_parquet.customer_multiblock c,
+c.c_orders o, o.o_lineitems l
+where l.l_linenumber + 1 < 0;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 2
+---- QUERY
+# Tests dictionary filtering with files with multiple row-groups and count(*).
+# Expect same result as obtained with dictionary filtering disabled.
+select count(*) from functional_parquet.customer_multiblock c,
+c.c_orders o, o.o_lineitems l
+where l.l_linenumber > 0;
+---- RESULTS
+7786
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0


Mime
View raw message