impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taras...@apache.org
Subject [2/5] incubator-impala git commit: IMPALA-3845: Split up hdfs-parquet-scanner.cc into more files/components.
Date Fri, 15 Jul 2016 18:27:20 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-metadata-utils.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.cc b/be/src/exec/parquet-metadata-utils.cc
new file mode 100644
index 0000000..5f10f62
--- /dev/null
+++ b/be/src/exec/parquet-metadata-utils.cc
@@ -0,0 +1,647 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "exec/parquet-metadata-utils.h"
+
+#include <string>
+#include <sstream>
+#include <vector>
+
+#include <boost/algorithm/string.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "exec/parquet-common.h"
+#include "runtime/runtime-state.h"
+#include "util/debug-util.h"
+
+using std::endl;
+using std::string;
+using std::stringstream;
+using std::vector;
+using strings::Substitute;
+using boost::algorithm::is_any_of;
+using boost::algorithm::split;
+using boost::algorithm::token_compress_on;
+
+namespace impala {
+
+Status ParquetMetadataUtils::ValidateFileVersion(
+    const parquet::FileMetaData& file_metadata, const char* filename) {
+  if (file_metadata.version > PARQUET_CURRENT_VERSION) {
+    stringstream ss;
+    ss << "File: " << filename << " is of an unsupported version. "
+       << "file version: " << file_metadata.version;
+    return Status(ss.str());
+  }
+  return Status::OK();
+}
+
+Status ParquetMetadataUtils::ValidateColumnOffsets(const string& filename,
+    int64_t file_length, const parquet::RowGroup& row_group) {
+  for (int i = 0; i < row_group.columns.size(); ++i) {
+    const parquet::ColumnChunk& col_chunk = row_group.columns[i];
+    int64_t col_start = col_chunk.meta_data.data_page_offset;
+    // The file format requires that if a dictionary page exists, it be before data pages.
+    if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+      if (col_chunk.meta_data.dictionary_page_offset >= col_start) {
+        stringstream ss;
+        ss << "File " << filename << ": metadata is corrupt. "
+            << "Dictionary page (offset=" << col_chunk.meta_data.dictionary_page_offset
+            << ") must come before any data pages (offset=" << col_start <<
").";
+        return Status(ss.str());
+      }
+      col_start = col_chunk.meta_data.dictionary_page_offset;
+    }
+    int64_t col_len = col_chunk.meta_data.total_compressed_size;
+    int64_t col_end = col_start + col_len;
+    if (col_end <= 0 || col_end > file_length) {
+      stringstream ss;
+      ss << "File " << filename << ": metadata is corrupt. "
+          << "Column " << i << " has invalid column offsets "
+          << "(offset=" << col_start << ", size=" << col_len <<
", "
+          << "file_size=" << file_length << ").";
+      return Status(ss.str());
+    }
+  }
+  return Status::OK();
+}
+
+static bool IsEncodingSupported(parquet::Encoding::type e) {
+  switch (e) {
+    case parquet::Encoding::PLAIN:
+    case parquet::Encoding::PLAIN_DICTIONARY:
+    case parquet::Encoding::BIT_PACKED:
+    case parquet::Encoding::RLE:
+      return true;
+    default:
+      return false;
+  }
+}
+
+Status ParquetMetadataUtils::ValidateColumn(const parquet::FileMetaData& file_metadata,
+    const char* filename, int row_group_idx, int col_idx,
+    const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
+    RuntimeState* state) {
+  const parquet::ColumnChunk& file_data =
+      file_metadata.row_groups[row_group_idx].columns[col_idx];
+
+  // Check the encodings are supported.
+  const vector<parquet::Encoding::type>& encodings = file_data.meta_data.encodings;
+  for (int i = 0; i < encodings.size(); ++i) {
+    if (!IsEncodingSupported(encodings[i])) {
+      stringstream ss;
+      ss << "File '" << filename << "' uses an unsupported encoding: "
+         << PrintEncoding(encodings[i]) << " for column '" << schema_element.name
+         << "'.";
+      return Status(ss.str());
+    }
+  }
+
+  // Check the compression is supported.
+  if (file_data.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED &&
+      file_data.meta_data.codec != parquet::CompressionCodec::SNAPPY &&
+      file_data.meta_data.codec != parquet::CompressionCodec::GZIP) {
+    stringstream ss;
+    ss << "File '" << filename << "' uses an unsupported compression: "
+        << file_data.meta_data.codec << " for column '" << schema_element.name
+        << "'.";
+    return Status(ss.str());
+  }
+
+  // Validation after this point is only if col_reader is reading values.
+  if (slot_desc == NULL) return Status::OK();
+
+  parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[slot_desc->type().type];
+  DCHECK_EQ(type, file_data.meta_data.type)
+      << "Should have been validated in ResolvePath()";
+
+  // Check the decimal scale in the file matches the metastore scale and precision.
+  // We fail the query if the metadata makes it impossible for us to safely read
+  // the file. If we don't require the metadata, we will fail the query if
+  // abort_on_error is true, otherwise we will just log a warning.
+  bool is_converted_type_decimal = schema_element.__isset.converted_type &&
+      schema_element.converted_type == parquet::ConvertedType::DECIMAL;
+  if (slot_desc->type().type == TYPE_DECIMAL) {
+    // We require that the scale and byte length be set.
+    if (schema_element.type != parquet::Type::FIXED_LEN_BYTE_ARRAY) {
+      stringstream ss;
+      ss << "File '" << filename << "' column '" << schema_element.name
+         << "' should be a decimal column encoded using FIXED_LEN_BYTE_ARRAY.";
+      return Status(ss.str());
+    }
+
+    if (!schema_element.__isset.type_length) {
+      stringstream ss;
+      ss << "File '" << filename << "' column '" << schema_element.name
+         << "' does not have type_length set.";
+      return Status(ss.str());
+    }
+
+    int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type());
+    if (schema_element.type_length != expected_len) {
+      stringstream ss;
+      ss << "File '" << filename << "' column '" << schema_element.name
+         << "' has an invalid type length. Expecting: " << expected_len
+         << " len in file: " << schema_element.type_length;
+      return Status(ss.str());
+    }
+
+    if (!schema_element.__isset.scale) {
+      stringstream ss;
+      ss << "File '" << filename << "' column '" << schema_element.name
+         << "' does not have the scale set.";
+      return Status(ss.str());
+    }
+
+    if (schema_element.scale != slot_desc->type().scale) {
+      // TODO: we could allow a mismatch and do a conversion at this step.
+      stringstream ss;
+      ss << "File '" << filename << "' column '" << schema_element.name
+         << "' has a scale that does not match the table metadata scale."
+         << " File metadata scale: " << schema_element.scale
+         << " Table metadata scale: " << slot_desc->type().scale;
+      return Status(ss.str());
+    }
+
+    // The other decimal metadata should be there but we don't need it.
+    if (!schema_element.__isset.precision) {
+      ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename,
+          schema_element.name);
+      RETURN_IF_ERROR(state->LogOrReturnError(msg));
+    } else {
+      if (schema_element.precision != slot_desc->type().precision) {
+        // TODO: we could allow a mismatch and do a conversion at this step.
+        ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION, filename, schema_element.name,
+            schema_element.precision, slot_desc->type().precision);
+        RETURN_IF_ERROR(state->LogOrReturnError(msg));
+      }
+    }
+
+    if (!is_converted_type_decimal) {
+      // TODO: is this validation useful? It is not required at all to read the data and
+      // might only serve to reject otherwise perfectly readable files.
+      ErrorMsg msg(TErrorCode::PARQUET_BAD_CONVERTED_TYPE, filename,
+          schema_element.name);
+      RETURN_IF_ERROR(state->LogOrReturnError(msg));
+    }
+  } else if (schema_element.__isset.scale || schema_element.__isset.precision ||
+      is_converted_type_decimal) {
+    ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL, filename,
+        schema_element.name, slot_desc->type().DebugString());
+    RETURN_IF_ERROR(state->LogOrReturnError(msg));
+  }
+  return Status::OK();
+}
+
+ParquetFileVersion::ParquetFileVersion(const string& created_by) {
+  string created_by_lower = created_by;
+  std::transform(created_by_lower.begin(), created_by_lower.end(),
+      created_by_lower.begin(), ::tolower);
+  is_impala_internal = false;
+
+  vector<string> tokens;
+  split(tokens, created_by_lower, is_any_of(" "), token_compress_on);
+  // Boost always creates at least one token
+  DCHECK_GT(tokens.size(), 0);
+  application = tokens[0];
+
+  if (tokens.size() >= 3 && tokens[1] == "version") {
+    string version_string = tokens[2];
+    // Ignore any trailing nodextra characters
+    int n = version_string.find_first_not_of("0123456789.");
+    string version_string_trimmed = version_string.substr(0, n);
+
+    vector<string> version_tokens;
+    split(version_tokens, version_string_trimmed, is_any_of("."));
+    version.major = version_tokens.size() >= 1 ? atoi(version_tokens[0].c_str()) : 0;
+    version.minor = version_tokens.size() >= 2 ? atoi(version_tokens[1].c_str()) : 0;
+    version.patch = version_tokens.size() >= 3 ? atoi(version_tokens[2].c_str()) : 0;
+
+    if (application == "impala") {
+      if (version_string.find("-internal") != string::npos) is_impala_internal = true;
+    }
+  } else {
+    version.major = 0;
+    version.minor = 0;
+    version.patch = 0;
+  }
+}
+
+bool ParquetFileVersion::VersionLt(int major, int minor, int patch) const {
+  if (version.major < major) return true;
+  if (version.major > major) return false;
+  DCHECK_EQ(version.major, major);
+  if (version.minor < minor) return true;
+  if (version.minor > minor) return false;
+  DCHECK_EQ(version.minor, minor);
+  return version.patch < patch;
+}
+
+bool ParquetFileVersion::VersionEq(int major, int minor, int patch) const {
+  return version.major == major && version.minor == minor && version.patch
== patch;
+}
+
+static string PrintRepetitionType(const parquet::FieldRepetitionType::type& t) {
+  switch (t) {
+    case parquet::FieldRepetitionType::REQUIRED: return "required";
+    case parquet::FieldRepetitionType::OPTIONAL: return "optional";
+    case parquet::FieldRepetitionType::REPEATED: return "repeated";
+    default: return "<unknown>";
+  }
+}
+
+static string PrintParquetType(const parquet::Type::type& t) {
+  switch (t) {
+    case parquet::Type::BOOLEAN: return "boolean";
+    case parquet::Type::INT32: return "int32";
+    case parquet::Type::INT64: return "int64";
+    case parquet::Type::INT96: return "int96";
+    case parquet::Type::FLOAT: return "float";
+    case parquet::Type::DOUBLE: return "double";
+    case parquet::Type::BYTE_ARRAY: return "byte_array";
+    case parquet::Type::FIXED_LEN_BYTE_ARRAY: return "fixed_len_byte_array";
+    default: return "<unknown>";
+  }
+}
+
+string SchemaNode::DebugString(int indent) const {
+  stringstream ss;
+  for (int i = 0; i < indent; ++i) ss << " ";
+  ss << PrintRepetitionType(element->repetition_type) << " ";
+  if (element->num_children > 0) {
+    ss << "struct";
+  } else {
+    ss << PrintParquetType(element->type);
+  }
+  ss << " " << element->name << " [i:" << col_idx << " d:"
<< max_def_level
+     << " r:" << max_rep_level << "]";
+  if (element->num_children > 0) {
+    ss << " {" << endl;
+    for (int i = 0; i < element->num_children; ++i) {
+      ss << children[i].DebugString(indent + 2) << endl;
+    }
+    for (int i = 0; i < indent; ++i) ss << " ";
+    ss << "}";
+  }
+  return ss.str();
+}
+
+Status ParquetSchemaResolver::CreateSchemaTree(const vector<parquet::SchemaElement>&
schema,
+    SchemaNode* node) const {
+  int idx = 0;
+  int col_idx = 0;
+  RETURN_IF_ERROR(CreateSchemaTree(schema, 0, 0, 0, &idx, &col_idx, node));
+  if (node->children.empty()) {
+    return Status(Substitute("Invalid file: '$0' has no columns.", filename_));
+  }
+  return Status::OK();
+}
+
+Status ParquetSchemaResolver::CreateSchemaTree(
+    const vector<parquet::SchemaElement>& schema, int max_def_level, int max_rep_level,
+    int ira_def_level, int* idx, int* col_idx, SchemaNode* node)
+    const {
+  if (*idx >= schema.size()) {
+    return Status(Substitute("File $0 corrupt: could not reconstruct schema tree from "
+            "flattened schema in file metadata", filename_));
+  }
+  node->element = &schema[*idx];
+  ++(*idx);
+
+  if (node->element->num_children == 0) {
+    // node is a leaf node, meaning it's materialized in the file and appears in
+    // file_metadata_.row_groups.columns
+    node->col_idx = *col_idx;
+    ++(*col_idx);
+  }
+
+  // def_level_of_immediate_repeated_ancestor does not include this node, so set before
+  // updating ira_def_level
+  node->def_level_of_immediate_repeated_ancestor = ira_def_level;
+
+  if (node->element->repetition_type == parquet::FieldRepetitionType::OPTIONAL) {
+    ++max_def_level;
+  } else if (node->element->repetition_type == parquet::FieldRepetitionType::REPEATED)
{
+    ++max_rep_level;
+    // Repeated fields add a definition level. This is used to distinguish between an
+    // empty list and a list with an item in it.
+    ++max_def_level;
+    // node is the new most immediate repeated ancestor
+    ira_def_level = max_def_level;
+  }
+  node->max_def_level = max_def_level;
+  node->max_rep_level = max_rep_level;
+
+  node->children.resize(node->element->num_children);
+  for (int i = 0; i < node->element->num_children; ++i) {
+    RETURN_IF_ERROR(CreateSchemaTree(schema, max_def_level, max_rep_level, ira_def_level,
+        idx, col_idx, &node->children[i]));
+  }
+  return Status::OK();
+}
+
+Status ParquetSchemaResolver::ResolvePath(const SchemaPath& path, SchemaNode** node,
+    bool* pos_field, bool* missing_field) const {
+  *missing_field = false;
+  // First try two-level array encoding.
+  bool missing_field_two_level;
+  Status status_two_level =
+      ResolvePathHelper(TWO_LEVEL, path, node, pos_field, &missing_field_two_level);
+  if (missing_field_two_level) DCHECK(status_two_level.ok());
+  if (status_two_level.ok() && !missing_field_two_level) return Status::OK();
+  // The two-level resolution failed or reported a missing field, try three-level array
+  // encoding.
+  bool missing_field_three_level;
+  Status status_three_level =
+      ResolvePathHelper(THREE_LEVEL, path, node, pos_field, &missing_field_three_level);
+  if (missing_field_three_level) DCHECK(status_three_level.ok());
+  if (status_three_level.ok() && !missing_field_three_level) return Status::OK();
+  // The three-level resolution failed or reported a missing field, try one-level array
+  // encoding.
+  bool missing_field_one_level;
+  Status status_one_level =
+      ResolvePathHelper(ONE_LEVEL, path, node, pos_field, &missing_field_one_level);
+  if (missing_field_one_level) DCHECK(status_one_level.ok());
+  if (status_one_level.ok() && !missing_field_one_level) return Status::OK();
+  // None of resolutions yielded a node. Set *missing_field to true if any of the
+  // resolutions reported a missing a field.
+  if (missing_field_one_level || missing_field_two_level || missing_field_three_level) {
+    *node = NULL;
+    *missing_field = true;
+    return Status::OK();
+  }
+  // All resolutions failed. Log and return the status from the three-level resolution
+  // (which is technically the standard).
+  DCHECK(!status_one_level.ok() && !status_two_level.ok() && !status_three_level.ok());
+  *node = NULL;
+  VLOG_QUERY << status_three_level.msg().msg() << "\n" << GetStackTrace();
+  return status_three_level;
+}
+
+Status ParquetSchemaResolver::ResolvePathHelper(ArrayEncoding array_encoding,
+    const SchemaPath& path, SchemaNode** node, bool* pos_field, bool* missing_field)
const {
+  DCHECK(schema_.element != NULL)
+      << "schema_ must be initialized before calling ResolvePath()";
+
+  *pos_field = false;
+  *missing_field = false;
+  *node = const_cast<SchemaNode*>(&schema_);
+  const ColumnType* col_type = NULL;
+
+  // Traverse 'path' and resolve 'node' to the corresponding SchemaNode in 'schema_' (by
+  // ordinal), or set 'node' to NULL if 'path' doesn't exist in this file's schema.
+  for (int i = 0; i < path.size(); ++i) {
+    // Advance '*node' if necessary
+    if (i == 0 || col_type->type != TYPE_ARRAY || array_encoding == THREE_LEVEL) {
+      *node = NextSchemaNode(col_type, path, i, *node, missing_field);
+      if (*missing_field) return Status::OK();
+    } else {
+      // We just resolved an array, meaning *node is set to the repeated field of the
+      // array. Since we are trying to resolve using one- or two-level array encoding, the
+      // repeated field represents both the array and the array's item (i.e. there is no
+      // explict item field), so we don't advance *node in this case.
+      DCHECK(col_type != NULL);
+      DCHECK_EQ(col_type->type, TYPE_ARRAY);
+      DCHECK(array_encoding == ONE_LEVEL || array_encoding == TWO_LEVEL);
+      DCHECK((*node)->is_repeated());
+    }
+
+    // Advance 'col_type'
+    int table_idx = path[i];
+    col_type = i == 0 ? &tbl_desc_.col_descs()[table_idx].type()
+               : &col_type->children[table_idx];
+
+    // Resolve path[i]
+    if (col_type->type == TYPE_ARRAY) {
+      DCHECK_EQ(col_type->children.size(), 1);
+      RETURN_IF_ERROR(
+          ResolveArray(array_encoding, path, i, node, pos_field, missing_field));
+      if (*missing_field || *pos_field) return Status::OK();
+    } else if (col_type->type == TYPE_MAP) {
+      DCHECK_EQ(col_type->children.size(), 2);
+      RETURN_IF_ERROR(ResolveMap(path, i, node, missing_field));
+      if (*missing_field) return Status::OK();
+    } else if (col_type->type == TYPE_STRUCT) {
+      DCHECK_GT(col_type->children.size(), 0);
+      // Nothing to do for structs
+    } else {
+      DCHECK(!col_type->IsComplexType());
+      DCHECK_EQ(i, path.size() - 1);
+      RETURN_IF_ERROR(ValidateScalarNode(**node, *col_type, path, i));
+    }
+  }
+  DCHECK(*node != NULL);
+  return Status::OK();
+}
+
+SchemaNode* ParquetSchemaResolver::NextSchemaNode(
+    const ColumnType* col_type, const SchemaPath& path, int next_idx, SchemaNode* node,
+    bool* missing_field) const {
+  DCHECK_LT(next_idx, path.size());
+  if (next_idx != 0) DCHECK(col_type != NULL);
+
+  int file_idx;
+  int table_idx = path[next_idx];
+  if (fallback_schema_resolution_ == TParquetFallbackSchemaResolution::type::NAME) {
+    if (next_idx == 0) {
+      // Resolve top-level table column by name.
+      DCHECK_LT(table_idx, tbl_desc_.col_descs().size());
+      const string& name = tbl_desc_.col_descs()[table_idx].name();
+      file_idx = FindChildWithName(node, name);
+    } else if (col_type->type == TYPE_STRUCT) {
+      // Resolve struct field by name.
+      DCHECK_LT(table_idx, col_type->field_names.size());
+      const string& name = col_type->field_names[table_idx];
+      file_idx = FindChildWithName(node, name);
+    } else if (col_type->type == TYPE_ARRAY) {
+      // Arrays have only one child in the file.
+      DCHECK_EQ(table_idx, SchemaPathConstants::ARRAY_ITEM);
+      file_idx = table_idx;
+    } else {
+      DCHECK_EQ(col_type->type, TYPE_MAP);
+      // Maps have two values, "key" and "value". These are supposed to be ordered and may
+      // not have the right field names, but try to resolve by name in case they're
+      // switched and otherwise use the order. See
+      // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for
+      // more details.
+      DCHECK(table_idx == SchemaPathConstants::MAP_KEY ||
+             table_idx == SchemaPathConstants::MAP_VALUE);
+      const string& name = table_idx == SchemaPathConstants::MAP_KEY ? "key" : "value";
+      file_idx = FindChildWithName(node, name);
+      if (file_idx >= node->children.size()) {
+        // Couldn't resolve by name, fall back to resolution by position.
+        file_idx = table_idx;
+      }
+    }
+  } else {
+    // Resolution by position.
+    DCHECK_EQ(fallback_schema_resolution_,
+        TParquetFallbackSchemaResolution::type::POSITION);
+    if (next_idx == 0) {
+      // For top-level columns, the first index in a path includes the table's partition
+      // keys.
+      file_idx = table_idx - tbl_desc_.num_clustering_cols();
+    } else {
+      file_idx = table_idx;
+    }
+  }
+
+  if (file_idx >= node->children.size()) {
+    string schema_resolution_mode = "unknown";
+    auto entry = _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.find(
+        fallback_schema_resolution_);
+    if (entry != _TParquetFallbackSchemaResolution_VALUES_TO_NAMES.end()) {
+      schema_resolution_mode = entry->second;
+    }
+    VLOG_FILE << Substitute(
+        "File '$0' does not contain path '$1' (resolving by $2)", filename_,
+        PrintPath(tbl_desc_, path), schema_resolution_mode);
+    *missing_field = true;
+    return NULL;
+  }
+  return &node->children[file_idx];
+}
+
+int ParquetSchemaResolver::FindChildWithName(SchemaNode* node,
+    const string& name) const {
+  int idx;
+  for (idx = 0; idx < node->children.size(); ++idx) {
+    if (node->children[idx].element->name == name) break;
+  }
+  return idx;
+}
+
+// There are three types of array encodings:
+//
+// 1. One-level encoding
+//      A bare repeated field. This is interpreted as a required array of required
+//      items.
+//    Example:
+//      repeated <item-type> item;
+//
+// 2. Two-level encoding
+//      A group containing a single repeated field. This is interpreted as a
+//      <list-repetition> array of required items (<list-repetition> is either
+//      optional or required).
+//    Example:
+//      <list-repetition> group <name> {
+//        repeated <item-type> item;
+//      }
+//
+// 3. Three-level encoding
+//      The "official" encoding according to the parquet spec. A group containing a
+//      single repeated group containing the item field. This is interpreted as a
+//      <list-repetition> array of <item-repetition> items (<list-repetition>
and
+//      <item-repetition> are each either optional or required).
+//    Example:
+//      <list-repetition> group <name> {
+//        repeated group list {
+//          <item-repetition> <item-type> item;
+//        }
+//      }
+//
+// We ignore any field annotations or names, making us more permissive than the
+// Parquet spec dictates. Note that in any of the encodings, <item-type> may be a
+// group containing more fields, which corresponds to a complex item type. See
+// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists for
+// more details and examples.
+//
+// This function resolves the array at '*node' assuming one-, two-, or three-level
+// encoding, determined by 'array_encoding'. '*node' is set to the repeated field for all
+// three encodings (unless '*pos_field' or '*missing_field' are set to true).
+Status ParquetSchemaResolver::ResolveArray(ArrayEncoding array_encoding,
+    const SchemaPath& path, int idx, SchemaNode** node, bool* pos_field,
+    bool* missing_field) const {
+  if (array_encoding == ONE_LEVEL) {
+    if (!(*node)->is_repeated()) {
+      ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+          PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString());
+      return Status::Expected(msg);
+    }
+  } else {
+    // In the multi-level case, we always expect the outer group to contain a single
+    // repeated field
+    if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated()) {
+      ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+          PrintSubPath(tbl_desc_, path, idx), "array", (*node)->DebugString());
+      return Status::Expected(msg);
+    }
+    // Set *node to the repeated field
+    *node = &(*node)->children[0];
+  }
+  DCHECK((*node)->is_repeated());
+
+  if (idx + 1 < path.size()) {
+    if (path[idx + 1] == SchemaPathConstants::ARRAY_POS) {
+      // The next index in 'path' is the artifical position field.
+      DCHECK_EQ(path.size(), idx + 2) << "position field cannot have children!";
+      *pos_field = true;
+      *node = NULL;
+      return Status::OK();
+    } else {
+      // The next value in 'path' should be the item index
+      DCHECK_EQ(path[idx + 1], SchemaPathConstants::ARRAY_ITEM);
+    }
+  }
+  return Status::OK();
+}
+
+// According to the parquet spec, map columns are represented like:
+// <map-repetition> group <name> (MAP) {
+//   repeated group key_value {
+//     required <key-type> key;
+//     <value-repetition> <value-type> value;
+//   }
+// }
+// We ignore any field annotations or names, making us more permissive than the
+// Parquet spec dictates. See
+// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps for
+// more details.
+Status ParquetSchemaResolver::ResolveMap(const SchemaPath& path, int idx, SchemaNode**
node,
+    bool* missing_field) const {
+  if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated() ||
+      (*node)->children[0].children.size() != 2) {
+    ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+        PrintSubPath(tbl_desc_, path, idx), "map", (*node)->DebugString());
+    return Status::Expected(msg);
+  }
+  *node = &(*node)->children[0];
+
+  // The next index in 'path' should be the key or the value.
+  if (idx + 1 < path.size()) {
+    DCHECK(path[idx + 1] == SchemaPathConstants::MAP_KEY ||
+           path[idx + 1] == SchemaPathConstants::MAP_VALUE);
+  }
+  return Status::OK();
+}
+
+Status ParquetSchemaResolver::ValidateScalarNode(const SchemaNode& node,
+    const ColumnType& col_type, const SchemaPath& path, int idx) const {
+  if (!node.children.empty()) {
+    ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+        PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
+    return Status::Expected(msg);
+  }
+  parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[col_type.type];
+  if (type != node.element->type) {
+    ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename_,
+        PrintSubPath(tbl_desc_, path, idx), col_type.DebugString(), node.DebugString());
+    return Status::Expected(msg);
+  }
+  return Status::OK();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-metadata-utils.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-metadata-utils.h b/be/src/exec/parquet-metadata-utils.h
new file mode 100644
index 0000000..a07627e
--- /dev/null
+++ b/be/src/exec/parquet-metadata-utils.h
@@ -0,0 +1,202 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_EXEC_PARQUET_METADATA_UTILS_H
+#define IMPALA_EXEC_PARQUET_METADATA_UTILS_H
+
+#include <string>
+
+#include "runtime/descriptors.h"
+#include "gen-cpp/parquet_types.h"
+
+namespace impala {
+
+class RuntimeState;
+
+class ParquetMetadataUtils {
+ public:
+  /// Checks the version of the given file and returns a non-OK status if
+  /// Impala does not support that version.
+  static Status ValidateFileVersion(const parquet::FileMetaData& file_metadata,
+      const char* filename);
+
+  /// Validate column offsets by checking if the dictionary page comes before the data
+  /// pages and checking if the column offsets lie within the file.
+  static Status ValidateColumnOffsets(const string& filename, int64_t file_length,
+      const parquet::RowGroup& row_group);
+
+  /// Validates the column metadata to make sure this column is supported (e.g. encoding,
+  /// type, etc) and matches the type of given slot_desc.
+  static Status ValidateColumn(const parquet::FileMetaData& file_metadata,
+      const char* filename, int row_group_idx, int col_idx,
+      const parquet::SchemaElement& schema_element, const SlotDescriptor* slot_desc,
+      RuntimeState* state);
+};
+
+struct ParquetFileVersion {
+  /// Application that wrote the file. e.g. "IMPALA"
+  std::string application;
+
+  /// Version of the application that wrote the file, expressed in three parts
+  /// (<major>.<minor>.<patch>). Unspecified parts default to 0, and extra
parts are
+  /// ignored. e.g.:
+  /// "1.2.3"    => {1, 2, 3}
+  /// "1.2"      => {1, 2, 0}
+  /// "1.2-cdh5" => {1, 2, 0}
+  struct {
+    int major;
+    int minor;
+    int patch;
+  } version;
+
+  /// If true, this file was generated by an Impala internal release
+  bool is_impala_internal;
+
+  ParquetFileVersion() : is_impala_internal(false) { }
+
+  /// Parses the version from the created_by string
+  ParquetFileVersion(const std::string& created_by);
+
+  /// Returns true if version is strictly less than <major>.<minor>.<patch>
+  bool VersionLt(int major, int minor = 0, int patch = 0) const;
+
+  /// Returns true if version is equal to <major>.<minor>.<patch>
+  bool VersionEq(int major, int minor, int patch) const;
+};
+
+/// Internal representation of a Parquet schema (including nested-type columns).
+struct SchemaNode {
+  /// The corresponding schema element defined in the file metadata
+  const parquet::SchemaElement* element;
+
+  /// The index into the RowGroup::columns list if this column is materialized in the
+  /// file (i.e. it's a scalar type). -1 for nested types.
+  int col_idx;
+
+  /// The maximum definition level of this column, i.e., the definition level that
+  /// corresponds to a non-NULL value. Valid values are >= 0.
+  int max_def_level;
+
+  /// The maximum repetition level of this column. Valid values are >= 0.
+  int max_rep_level;
+
+  /// The definition level of the most immediate ancestor of this node with repeated
+  /// field repetition type. 0 if there are no repeated ancestors.
+  int def_level_of_immediate_repeated_ancestor;
+
+  /// Any nested schema nodes. Empty for non-nested types.
+  std::vector<SchemaNode> children;
+
+  SchemaNode() : element(NULL), col_idx(-1), max_def_level(-1), max_rep_level(-1),
+                 def_level_of_immediate_repeated_ancestor(-1) { }
+
+  std::string DebugString(int indent = 0) const;
+
+  bool is_repeated() const {
+    return element->repetition_type == parquet::FieldRepetitionType::REPEATED;
+  }
+};
+
+/// Utility class to resolve SchemaPaths (e.g., from a table descriptor) against a
+/// Parquet file schema. Supports resolution by field index or by field name.
+class ParquetSchemaResolver {
+ public:
+  ParquetSchemaResolver(const HdfsTableDescriptor& tbl_desc,
+      TParquetFallbackSchemaResolution::type fallback_schema_resolution)
+    : tbl_desc_(tbl_desc),
+      fallback_schema_resolution_(fallback_schema_resolution),
+      filename_(NULL) {
+  }
+
+  /// Parses the schema of the given file metadata into an internal schema
+  /// representation used in path resolution. Remembers the filename for error
+  /// reporting. Returns a non-OK status if the Parquet schema could not be parsed.
+  Status Init(const parquet::FileMetaData* file_metadata, const char* filename) {
+    DCHECK(filename != NULL);
+    filename_ = filename;
+    return CreateSchemaTree(file_metadata->schema, &schema_);
+  }
+
+  /// Traverses 'schema_' according to 'path', returning the result in 'node'. If 'path'
+  /// does not exist in this file's schema, 'missing_field' is set to true and
+  /// Status::OK() is returned, otherwise 'missing_field' is set to false. If 'path'
+  /// resolves to a collection position field, *pos_field is set to true. Otherwise
+  /// 'pos_field' is set to false. Returns a non-OK status if 'path' cannot be resolved
+  /// against the file's schema (e.g., unrecognized collection schema).
+  ///
+  /// Tries to resolve assuming either two- or three-level array encoding in
+  /// 'schema_'. Returns a bad status if resolution fails in both cases.
+  Status ResolvePath(const SchemaPath& path, SchemaNode** node, bool* pos_field,
+      bool* missing_field) const;
+
+ private:
+  /// Unflattens the schema metadata from a Parquet file metadata and converts it to our
+  /// SchemaNode representation. Returns the result in 'node' unless an error status is
+  /// returned. Does not set the slot_desc field of any SchemaNode.
+  Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
+      SchemaNode* node) const;
+
+  /// Recursive implementation used internally by the above CreateSchemaTree() function.
+  Status CreateSchemaTree(const std::vector<parquet::SchemaElement>& schema,
+      int max_def_level, int max_rep_level, int ira_def_level, int* idx, int* col_idx,
+      SchemaNode* node) const;
+
+  /// The 'array_encoding' parameter determines whether to assume one-, two-, or
+  /// three-level array encoding. The returned status is not logged (i.e. it's an expected
+  /// error).
+  enum ArrayEncoding {
+    ONE_LEVEL,
+    TWO_LEVEL,
+    THREE_LEVEL
+  };
+
+  Status ResolvePathHelper(ArrayEncoding array_encoding, const SchemaPath& path,
+      SchemaNode** node, bool* pos_field, bool* missing_field) const;
+
+  /// Helper functions for ResolvePathHelper().
+
+  /// Advances 'node' to one of its children based on path[next_idx] and
+  /// 'col_type'. 'col_type' is NULL if 'node' is the root node, otherwise it's the type
+  /// associated with 'node'. Returns the child node or sets 'missing_field' to true.
+  SchemaNode* NextSchemaNode(const ColumnType* col_type, const SchemaPath& path,
+      int next_idx, SchemaNode* node, bool* missing_field) const;
+
+  /// Returns the index of 'node's child with 'name', or the number of children if not
+  /// found.
+  int FindChildWithName(SchemaNode* node, const string& name) const;
+
+  /// The ResolvePathHelper() logic for arrays.
+  Status ResolveArray(ArrayEncoding array_encoding, const SchemaPath& path, int idx,
+    SchemaNode** node, bool* pos_field, bool* missing_field) const;
+
+  /// The ResolvePathHelper() logic for maps.
+  Status ResolveMap(const SchemaPath& path, int idx, SchemaNode** node,
+      bool* missing_field) const;
+
+  /// The ResolvePathHelper() logic for scalars (just does validation since there's no
+  /// more actual work to be done).
+  Status ValidateScalarNode(const SchemaNode& node, const ColumnType& col_type,
+      const SchemaPath& path, int idx) const;
+
+  const HdfsTableDescriptor& tbl_desc_;
+  const TParquetFallbackSchemaResolution::type fallback_schema_resolution_;
+  const char* filename_;
+
+  /// Root node of our internal schema representation populated in Init().
+  SchemaNode schema_;
+};
+
+} // impala namespace
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-scratch-tuple-batch.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-scratch-tuple-batch.h b/be/src/exec/parquet-scratch-tuple-batch.h
new file mode 100644
index 0000000..f2f9794
--- /dev/null
+++ b/be/src/exec/parquet-scratch-tuple-batch.h
@@ -0,0 +1,72 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H
+#define IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H
+
+#include "runtime/descriptors.h"
+#include "runtime/row-batch.h"
+
+namespace impala {
+
+/// Helper struct that holds a batch of tuples allocated from a mem pool, as well
+/// as state associated with iterating over its tuples and transferring
+/// them to an output batch in TransferScratchTuples().
+struct ScratchTupleBatch {
+  // Memory backing the batch of tuples. Allocated from batch's tuple data pool.
+  uint8_t* tuple_mem;
+  // Keeps track of the current tuple index.
+  int tuple_idx;
+  // Number of valid tuples in tuple_mem.
+  int num_tuples;
+  // Cached for convenient access.
+  const int tuple_byte_size;
+
+  // Helper batch for safely allocating tuple_mem from its tuple data pool using
+  // ResizeAndAllocateTupleBuffer().
+  RowBatch batch;
+
+  ScratchTupleBatch(
+      const RowDescriptor& row_desc, int batch_size, MemTracker* mem_tracker)
+    : tuple_mem(NULL),
+      tuple_idx(0),
+      num_tuples(0),
+      tuple_byte_size(row_desc.GetRowSize()),
+      batch(row_desc, batch_size, mem_tracker) {
+    DCHECK_EQ(row_desc.tuple_descriptors().size(), 1);
+  }
+
+  Status Reset(RuntimeState* state) {
+    tuple_idx = 0;
+    num_tuples = 0;
+    // Buffer size is not needed.
+    int64_t buffer_size;
+    RETURN_IF_ERROR(batch.ResizeAndAllocateTupleBuffer(state, &buffer_size, &tuple_mem));
+    return Status::OK();
+  }
+
+  inline Tuple* GetTuple(int tuple_idx) const {
+    return reinterpret_cast<Tuple*>(tuple_mem + tuple_idx * tuple_byte_size);
+  }
+
+  inline MemPool* mem_pool() { return batch.tuple_data_pool(); }
+  inline int capacity() const { return batch.capacity(); }
+  inline uint8_t* CurrTuple() const { return tuple_mem + tuple_idx * tuple_byte_size; }
+  inline uint8_t* TupleEnd() const { return tuple_mem + num_tuples * tuple_byte_size; }
+  inline bool AtEnd() const { return tuple_idx == num_tuples; }
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exec/parquet-version-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-version-test.cc b/be/src/exec/parquet-version-test.cc
index c159205..9ead7b6 100644
--- a/be/src/exec/parquet-version-test.cc
+++ b/be/src/exec/parquet-version-test.cc
@@ -17,7 +17,8 @@
 #include <iostream>
 #include <limits.h>
 #include <gtest/gtest.h>
-#include "exec/hdfs-parquet-scanner.h"
+#include "exec/parquet-metadata-utils.h"
+#include "util/cpu-info.h"
 
 #include "common/names.h"
 
@@ -26,7 +27,7 @@ namespace impala {
 void CheckVersionParse(const string& s, const string& expected_application,
     int expected_major, int expected_minor, int expected_patch,
     bool expected_is_internal) {
-  HdfsParquetScanner::FileVersion v(s);
+  ParquetFileVersion v(s);
   EXPECT_EQ(v.application, expected_application) << "String: " << s;
   EXPECT_EQ(v.version.major, expected_major) << "String: " << s;
   EXPECT_EQ(v.version.minor, expected_minor) << "String: " << s;
@@ -62,7 +63,7 @@ TEST(ParquetVersionTest, Parsing) {
 }
 
 TEST(ParquetVersionTest, Comparisons) {
-  HdfsParquetScanner::FileVersion v("foo version 1.2.3");
+  ParquetFileVersion v("foo version 1.2.3");
   EXPECT_TRUE(v.VersionEq(1, 2, 3));
   EXPECT_FALSE(v.VersionEq(1, 2, 4));
   EXPECT_TRUE(v.VersionLt(3, 2, 1));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/exprs/expr-value.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-value.h b/be/src/exprs/expr-value.h
index 93b5f83..2cd2d58 100644
--- a/be/src/exprs/expr-value.h
+++ b/be/src/exprs/expr-value.h
@@ -17,7 +17,7 @@
 
 #include "runtime/collection-value.h"
 #include "runtime/decimal-value.h"
-#include "runtime/string-value.h"
+#include "runtime/string-value.inline.h"
 #include "runtime/timestamp-value.h"
 #include "util/decimal-util.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index afbe3b6..7a467be 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -225,6 +225,20 @@ void RuntimeState::GetUnreportedErrors(ErrorLogMap* new_errors) {
   ClearErrorMap(error_log_);
 }
 
+Status RuntimeState::LogOrReturnError(const ErrorMsg& message) {
+  DCHECK_NE(message.error(), TErrorCode::OK);
+  // If either abort_on_error=true or the error necessitates execution stops
+  // immediately, return an error status.
+  if (abort_on_error() ||
+      message.error() == TErrorCode::MEM_LIMIT_EXCEEDED ||
+      message.error() == TErrorCode::CANCELLED) {
+    return Status(message);
+  }
+  // Otherwise, add the error to the error log and continue.
+  LogError(message);
+  return Status::OK();
+}
+
 void RuntimeState::LogMemLimitExceeded(const MemTracker* tracker,
     int64_t failed_allocation_size) {
   DCHECK_GE(failed_allocation_size, 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index a5e560e..d6c766c 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -209,6 +209,12 @@ class RuntimeState {
   /// be sent back to the coordinator
   void GetUnreportedErrors(ErrorLogMap* new_errors);
 
+  /// Given an error message, determine whether execution should be aborted and, if so,
+  /// return the corresponding error status. Otherwise, log the error and return
+  /// Status::OK(). Execution is aborted if the ABORT_ON_ERROR query option is set to
+  /// true or the error is not recoverable and should be handled upstream.
+  Status LogOrReturnError(const ErrorMsg& message);
+
   bool is_cancelled() const { return is_cancelled_; }
   void set_is_cancelled(bool v) { is_cancelled_ = v; }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/util/debug-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index c3f8c94..5ac96e6 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -269,6 +269,14 @@ string PrintPath(const TableDescriptor& tbl_desc, const SchemaPath&
path) {
   return ss.str();
 }
 
+string PrintSubPath(const TableDescriptor& tbl_desc, const SchemaPath& path,
+    int end_path_idx) {
+  DCHECK_GE(end_path_idx, 0);
+  SchemaPath::const_iterator subpath_end = path.begin() + end_path_idx + 1;
+  SchemaPath subpath(path.begin(), subpath_end);
+  return PrintPath(tbl_desc, subpath);
+}
+
 string PrintNumericPath(const SchemaPath& path) {
   stringstream ss;
   ss << "[";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6ee15fad/be/src/util/debug-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h
index c9550dc..48e8c27 100644
--- a/be/src/util/debug-util.h
+++ b/be/src/util/debug-util.h
@@ -70,8 +70,12 @@ std::string PrintAsHex(const char* bytes, int64_t len);
 std::string PrintTMetricKind(const TMetricKind::type& type);
 std::string PrintTUnit(const TUnit::type& type);
 std::string PrintTImpalaQueryOptions(const TImpalaQueryOptions::type& type);
+
 /// Returns the fully qualified path, e.g. "database.table.array_col.item.field"
 std::string PrintPath(const TableDescriptor& tbl_desc, const SchemaPath& path);
+/// Same as PrintPath(), but truncates the path after the given 'end_path_idx'.
+std::string PrintSubPath(const TableDescriptor& tbl_desc, const SchemaPath& path,
+    int end_path_idx);
 /// Returns the numeric path without column/field names, e.g. "[0,1,2]"
 std::string PrintNumericPath(const SchemaPath& path);
 
@@ -98,6 +102,20 @@ std::string GetVersionString(bool compact = false);
 /// for recursive calls.
 std::string GetStackTrace();
 
+// FILE_CHECKs are conditions that we expect to be true but could fail due to a malformed
+// input file. They differentiate these cases from DCHECKs, which indicate conditions that
+// are true unless there's a bug in Impala. We would ideally always return a bad Status
+// instead of failing a FILE_CHECK, but in many cases we use FILE_CHECK instead because
+// there's a performance cost to doing the check in a release build, or just due to legacy
+// code.
+#define FILE_CHECK(a) DCHECK(a)
+#define FILE_CHECK_EQ(a, b) DCHECK_EQ(a, b)
+#define FILE_CHECK_NE(a, b) DCHECK_NE(a, b)
+#define FILE_CHECK_GT(a, b) DCHECK_GT(a, b)
+#define FILE_CHECK_LT(a, b) DCHECK_LT(a, b)
+#define FILE_CHECK_GE(a, b) DCHECK_GE(a, b)
+#define FILE_CHECK_LE(a, b) DCHECK_LE(a, b)
+
 }
 
 #endif



Mime
View raw message