parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-816: Workaround for incorrect column chunk metadata in parquet-mr <= 1.2.8
Date Sun, 25 Dec 2016 15:03:03 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 8d3453fed -> c2d8df9fb


PARQUET-816: Workaround for incorrect column chunk metadata in parquet-mr <= 1.2.8

This turned up in reading of old data files generated by parquet-mr in 2013. There's a bug
in parquet-mr 1.2.8 and lower in which the column chunk metadata in the Parquet file is incorrect.
Impala inserted an explicit workaround for this (see See https://github.com/apache/incubator-impala/blob/88448d1d4ab31eaaf82f764b36dc7d11d4c63c32/be/src/exec/hdfs-parquet-scanner.cc#L1227).

In this particular file, the dictionary page header is 15 bytes, and the entire column chunk
is: 15 (dict page header) + 277 (dictionary) + 17 (data page header) + 28 (data page) bytes,
making 337 bytes.

But the metadata says the column chunk is only 322 bytes – the dict page header size got
dropped from the accounting.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #209 from wesm/PARQUET-816 and squashes the following commits:

21fdcbe [Wes McKinney] Move FileVersion to an inner class in FileMetaData
64e7f95 [Wes McKinney] Remove unnecessary std::move causing clang warning
bacb815 [Wes McKinney] Fix compilation error in benchmarks
f4c259e [Wes McKinney] cpplint
1e8c160 [Wes McKinney] clang-format
d2aa9a8 [Wes McKinney] Do not continue reading data pages in SerializedPageReader reading
the indicated number of rows in a row group
2638490 [Wes McKinney] Bring in IMPALA-694 workaround for PARQUET-816
bd3e949 [Wes McKinney] Optimistically decode truncated data pages. Add example data file


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/c2d8df9f
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/c2d8df9f
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/c2d8df9f

Branch: refs/heads/master
Commit: c2d8df9fb9ea5b8a15c5280e44b2d6255a17bd21
Parents: 8d3453f
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Sun Dec 25 10:02:50 2016 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Sun Dec 25 10:02:50 2016 -0500

----------------------------------------------------------------------
 data/nation.dict-malformed.parquet        | Bin 0 -> 2850 bytes
 src/parquet/column/column-io-benchmark.cc |   6 +--
 src/parquet/column/column-writer-test.cc  |  17 ++++---
 src/parquet/column/properties.h           |   2 +-
 src/parquet/column/reader.h               |   5 +-
 src/parquet/file/file-deserialize-test.cc |  27 ++++++-----
 src/parquet/file/file-metadata-test.cc    |  10 ++++
 src/parquet/file/metadata.cc              |  63 +++++++++++++++++++++++++
 src/parquet/file/metadata.h               |  29 ++++++++++++
 src/parquet/file/reader-internal.cc       |  49 ++++++++++++++-----
 src/parquet/file/reader-internal.h        |  17 +++++--
 src/parquet/reader-test.cc                |  48 +++++++++++++++----
 12 files changed, 223 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c2d8df9f/data/nation.dict-malformed.parquet
----------------------------------------------------------------------
diff --git a/data/nation.dict-malformed.parquet b/data/nation.dict-malformed.parquet
new file mode 100644
index 0000000..5008ac0
Binary files /dev/null and b/data/nation.dict-malformed.parquet differ

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c2d8df9f/src/parquet/column/column-io-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-io-benchmark.cc b/src/parquet/column/column-io-benchmark.cc
index 3b62004..3ff9c32 100644
--- a/src/parquet/column/column-io-benchmark.cc
+++ b/src/parquet/column/column-io-benchmark.cc
@@ -84,10 +84,10 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL)->Range(1024,
65536
 BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED)->Range(1024, 65536);
 
 std::unique_ptr<Int64Reader> BuildReader(
-    std::shared_ptr<Buffer>& buffer, ColumnDescriptor* schema) {
+    std::shared_ptr<Buffer>& buffer, int64_t num_values, ColumnDescriptor* schema)
{
   std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
   std::unique_ptr<SerializedPageReader> page_reader(
-      new SerializedPageReader(std::move(source), Compression::UNCOMPRESSED));
+      new SerializedPageReader(std::move(source), num_values, Compression::UNCOMPRESSED));
   return std::unique_ptr<Int64Reader>(new Int64Reader(schema, std::move(page_reader)));
 }
 
@@ -114,7 +114,7 @@ static void BM_ReadInt64Column(::benchmark::State& state) {
   std::vector<int16_t> definition_levels_out(state.range_y());
   std::vector<int16_t> repetition_levels_out(state.range_y());
   while (state.KeepRunning()) {
-    std::unique_ptr<Int64Reader> reader = BuildReader(src, schema.get());
+    std::unique_ptr<Int64Reader> reader = BuildReader(src, state.range_y(), schema.get());
     int64_t values_read = 0;
     for (size_t i = 0; i < values.size(); i += values_read) {
       reader->ReadBatch(values_out.size(), definition_levels_out.data(),

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c2d8df9f/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index 68d79d1..5d4daeb 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -61,11 +61,12 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType>
{
 
   Type::type type_num() { return TestType::type_num; }
 
-  void BuildReader(Compression::type compression = Compression::UNCOMPRESSED) {
+  void BuildReader(
+      int64_t num_rows, Compression::type compression = Compression::UNCOMPRESSED) {
     auto buffer = sink_->GetBuffer();
     std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
     std::unique_ptr<SerializedPageReader> page_reader(
-        new SerializedPageReader(std::move(source), compression));
+        new SerializedPageReader(std::move(source), num_rows, compression));
     reader_.reset(new TypedColumnReader<TestType>(this->descr_, std::move(page_reader)));
   }
 
@@ -92,7 +93,7 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
   }
 
   void ReadColumn(Compression::type compression = Compression::UNCOMPRESSED) {
-    BuildReader(compression);
+    BuildReader(static_cast<int64_t>(this->values_out_.size()), compression);
     reader_->ReadBatch(this->values_out_.size(), definition_levels_out_.data(),
         repetition_levels_out_.data(), this->values_out_ptr_, &values_read_);
     this->SyncValuesOut();
@@ -171,9 +172,10 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType>
{
 
 template <typename TestType>
 void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type compression)
{
-  BuildReader(compression);
+  int64_t total_values = static_cast<int64_t>(this->values_out_.size());
+  BuildReader(total_values, compression);
   values_read_ = 0;
-  while (values_read_ < static_cast<int64_t>(this->values_out_.size())) {
+  while (values_read_ < total_values) {
     int64_t values_read_recently = 0;
     reader_->ReadBatch(this->values_out_.size() - values_read_,
         definition_levels_out_.data() + values_read_,
@@ -186,11 +188,12 @@ void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type
compressio
 
 template <>
 void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression)
{
-  BuildReader(compression);
+  int64_t total_values = static_cast<int64_t>(this->values_out_.size());
+  BuildReader(total_values, compression);
   this->data_buffer_.clear();
 
   values_read_ = 0;
-  while (values_read_ < static_cast<int64_t>(this->values_out_.size())) {
+  while (values_read_ < total_values) {
     int64_t values_read_recently = 0;
     reader_->ReadBatch(this->values_out_.size() - values_read_,
         definition_levels_out_.data() + values_read_,

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c2d8df9f/src/parquet/column/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h
index ee7e7d6..f5f2fd5 100644
--- a/src/parquet/column/properties.h
+++ b/src/parquet/column/properties.h
@@ -86,7 +86,7 @@ static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
 static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
 static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
     ParquetVersion::PARQUET_1_0;
-static std::string DEFAULT_CREATED_BY = "Apache parquet-cpp";
+static std::string DEFAULT_CREATED_BY = "parquet-cpp version 0.1.0";
 static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
 
 class PARQUET_EXPORT ColumnProperties {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c2d8df9f/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index 5608192..d759b96 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -42,8 +42,9 @@ class PARQUET_EXPORT ColumnReader {
       MemoryAllocator* allocator = default_allocator());
   virtual ~ColumnReader();
 
-  static std::shared_ptr<ColumnReader> Make(const ColumnDescriptor*,
-      std::unique_ptr<PageReader>, MemoryAllocator* allocator = default_allocator());
+  static std::shared_ptr<ColumnReader> Make(const ColumnDescriptor* descr,
+      std::unique_ptr<PageReader> pager,
+      MemoryAllocator* allocator = default_allocator());
 
   // Returns true if there are still values in this column.
   bool HasNext() {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c2d8df9f/src/parquet/file/file-deserialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
index 5d97cd9..5287885 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -60,11 +60,12 @@ class TestPageSerde : public ::testing::Test {
     ResetStream();
   }
 
-  void InitSerializedPageReader(Compression::type codec = Compression::UNCOMPRESSED) {
+  void InitSerializedPageReader(
+      int64_t num_rows, Compression::type codec = Compression::UNCOMPRESSED) {
     EndStream();
     std::unique_ptr<InputStream> stream;
     stream.reset(new InMemoryInputStream(out_buffer_));
-    page_reader_.reset(new SerializedPageReader(std::move(stream), codec));
+    page_reader_.reset(new SerializedPageReader(std::move(stream), num_rows, codec));
   }
 
   void WriteDataPageHeader(int max_serialized_len = 1024, int32_t uncompressed_size = 0,
@@ -116,11 +117,12 @@ TEST_F(TestPageSerde, DataPage) {
   format::PageHeader out_page_header;
 
   int stats_size = 512;
+  const int32_t num_rows = 4444;
   AddDummyStats(stats_size, data_page_header_);
-  data_page_header_.num_values = 4444;
+  data_page_header_.num_values = num_rows;
 
   WriteDataPageHeader();
-  InitSerializedPageReader();
+  InitSerializedPageReader(num_rows);
   std::shared_ptr<Page> current_page = page_reader_->NextPage();
   CheckDataPageHeader(data_page_header_, current_page.get());
 }
@@ -130,7 +132,8 @@ TEST_F(TestPageSerde, TestLargePageHeaders) {
   AddDummyStats(stats_size, data_page_header_);
 
   // Any number to verify metadata roundtrip
-  data_page_header_.num_values = 4141;
+  const int32_t num_rows = 4141;
+  data_page_header_.num_values = num_rows;
 
   int max_header_size = 512 * 1024;  // 512 KB
   WriteDataPageHeader(max_header_size);
@@ -140,12 +143,14 @@ TEST_F(TestPageSerde, TestLargePageHeaders) {
   ASSERT_LE(stats_size, out_stream_->Tell());
   ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, out_stream_->Tell());
 
-  InitSerializedPageReader();
+  InitSerializedPageReader(num_rows);
   std::shared_ptr<Page> current_page = page_reader_->NextPage();
   CheckDataPageHeader(data_page_header_, current_page.get());
 }
 
 TEST_F(TestPageSerde, TestFailLargePageHeaders) {
+  const int32_t num_rows = 1337;  // dummy value
+
   int stats_size = 256 * 1024;  // 256 KB
   AddDummyStats(stats_size, data_page_header_);
 
@@ -156,7 +161,7 @@ TEST_F(TestPageSerde, TestFailLargePageHeaders) {
 
   int smaller_max_size = 128 * 1024;
   ASSERT_LE(smaller_max_size, out_stream_->Tell());
-  InitSerializedPageReader();
+  InitSerializedPageReader(num_rows);
 
   // Set the max page header size to 128 KB, which is less than the current
   // header size
@@ -168,8 +173,8 @@ TEST_F(TestPageSerde, Compression) {
   Compression::type codec_types[3] = {
       Compression::GZIP, Compression::SNAPPY, Compression::BROTLI};
 
-  // This is a dummy number
-  data_page_header_.num_values = 32;
+  const int32_t num_rows = 32;  // dummy value
+  data_page_header_.num_values = num_rows;
 
   int num_pages = 10;
 
@@ -198,7 +203,7 @@ TEST_F(TestPageSerde, Compression) {
       out_stream_->Write(buffer.data(), actual_size);
     }
 
-    InitSerializedPageReader(codec_type);
+    InitSerializedPageReader(num_rows * num_pages, codec_type);
 
     std::shared_ptr<Page> page;
     const DataPage* data_page;
@@ -220,7 +225,7 @@ TEST_F(TestPageSerde, LZONotSupported) {
   std::vector<uint8_t> faux_data(data_size);
   WriteDataPageHeader(1024, data_size, data_size);
   out_stream_->Write(faux_data.data(), data_size);
-  ASSERT_THROW(InitSerializedPageReader(Compression::LZO), ParquetException);
+  ASSERT_THROW(InitSerializedPageReader(data_size, Compression::LZO), ParquetException);
 }
 
 // ----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c2d8df9f/src/parquet/file/file-metadata-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc
index d3d0edc..0c9d376 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/file/file-metadata-test.cc
@@ -153,5 +153,15 @@ TEST(Metadata, TestBuildAccess) {
   ASSERT_EQ(10, rg2_column1->data_page_offset());
   ASSERT_EQ(26, rg2_column2->data_page_offset());
 }
+
+TEST(FileVersion, Basics) {
+  FileMetaData::Version version("parquet-mr version 1.2.8");
+
+  ASSERT_EQ("parquet-mr", version.application);
+  ASSERT_EQ(1, version.version.major);
+  ASSERT_EQ(2, version.version.minor);
+  ASSERT_EQ(8, version.version.patch);
+}
+
 }  // namespace metadata
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c2d8df9f/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index 2580706..adfcb69 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
+#include <string>
 #include <vector>
 
 #include "parquet/exception.h"
@@ -22,6 +24,8 @@
 #include "parquet/schema/converter.h"
 #include "parquet/thrift/util.h"
 
+#include <boost/algorithm/string.hpp>
+
 namespace parquet {
 
 template <typename DType>
@@ -269,6 +273,13 @@ class FileMetaData::FileMetaDataImpl {
     metadata_.reset(new format::FileMetaData);
     DeserializeThriftMsg(metadata, metadata_len, metadata_.get());
     metadata_len_ = *metadata_len;
+
+    if (metadata_->__isset.created_by) {
+      writer_version_ = FileMetaData::Version(metadata_->created_by);
+    } else {
+      writer_version_ = FileMetaData::Version("unknown 0.0.0");
+    }
+
     InitSchema();
   }
   ~FileMetaDataImpl() {}
@@ -281,6 +292,8 @@ class FileMetaData::FileMetaDataImpl {
   inline const std::string& created_by() const { return metadata_->created_by; }
   inline int num_schema_elements() const { return metadata_->schema.size(); }
 
+  const FileMetaData::Version& writer_version() const { return writer_version_; }
+
   void WriteTo(OutputStream* dst) { SerializeThriftMsg(metadata_.get(), 1024, dst); }
 
   std::unique_ptr<RowGroupMetaData> RowGroup(int i) {
@@ -306,6 +319,7 @@ class FileMetaData::FileMetaDataImpl {
     schema_.Init(converter.Convert());
   }
   SchemaDescriptor schema_;
+  FileMetaData::Version writer_version_;
 };
 
 std::unique_ptr<FileMetaData> FileMetaData::Make(
@@ -346,6 +360,10 @@ int32_t FileMetaData::version() const {
   return impl_->version();
 }
 
+const FileMetaData::Version& FileMetaData::writer_version() const {
+  return impl_->writer_version();
+}
+
 const std::string& FileMetaData::created_by() const {
   return impl_->created_by();
 }
@@ -362,6 +380,51 @@ void FileMetaData::WriteTo(OutputStream* dst) {
   return impl_->WriteTo(dst);
 }
 
+FileMetaData::Version::Version(const std::string& created_by) {
+  namespace ba = boost::algorithm;
+
+  std::string created_by_lower = created_by;
+  std::transform(created_by_lower.begin(), created_by_lower.end(),
+      created_by_lower.begin(), ::tolower);
+
+  std::vector<std::string> tokens;
+  ba::split(tokens, created_by_lower, ba::is_any_of(" "), ba::token_compress_on);
+  // Boost always creates at least one token
+  DCHECK_GT(tokens.size(), 0);
+  application = tokens[0];
+
+  if (tokens.size() >= 3 && tokens[1] == "version") {
+    std::string version_string = tokens[2];
+    // Ignore any trailing nodextra characters
+    int n = version_string.find_first_not_of("0123456789.");
+    std::string version_string_trimmed = version_string.substr(0, n);
+
+    std::vector<std::string> version_tokens;
+    ba::split(version_tokens, version_string_trimmed, ba::is_any_of("."));
+    version.major = version_tokens.size() >= 1 ? atoi(version_tokens[0].c_str()) : 0;
+    version.minor = version_tokens.size() >= 2 ? atoi(version_tokens[1].c_str()) : 0;
+    version.patch = version_tokens.size() >= 3 ? atoi(version_tokens[2].c_str()) : 0;
+  } else {
+    version.major = 0;
+    version.minor = 0;
+    version.patch = 0;
+  }
+}
+
+bool FileMetaData::Version::VersionLt(int major, int minor, int patch) const {
+  if (version.major < major) return true;
+  if (version.major > major) return false;
+  DCHECK_EQ(version.major, major);
+  if (version.minor < minor) return true;
+  if (version.minor > minor) return false;
+  DCHECK_EQ(version.minor, minor);
+  return version.patch < patch;
+}
+
+bool FileMetaData::Version::VersionEq(int major, int minor, int patch) const {
+  return version.major == major && version.minor == minor && version.patch
== patch;
+}
+
 // MetaData Builders
 // row-group metadata
 class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c2d8df9f/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 94e6d66..c5dd03a 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -93,6 +93,32 @@ class FileMetaDataBuilder;
 
 class PARQUET_EXPORT FileMetaData {
  public:
+  struct Version {
+    /// Application that wrote the file. e.g. "IMPALA"
+    std::string application;
+
+    /// Version of the application that wrote the file, expressed in three parts
+    /// (<major>.<minor>.<patch>). Unspecified parts default to 0, and
extra parts are
+    /// ignored. e.g.:
+    /// "1.2.3"    => {1, 2, 3}
+    /// "1.2"      => {1, 2, 0}
+    /// "1.2-cdh5" => {1, 2, 0}
+    struct {
+      int major;
+      int minor;
+      int patch;
+    } version;
+
+    Version() {}
+    explicit Version(const std::string& created_by);
+
+    /// Returns true if version is strictly less than <major>.<minor>.<patch>
+    bool VersionLt(int major, int minor = 0, int patch = 0) const;
+
+    /// Returns true if version is equal to <major>.<minor>.<patch>
+    bool VersionEq(int major, int minor, int patch) const;
+  };
+
   // API convenience to get a MetaData accessor
   static std::unique_ptr<FileMetaData> Make(
       const uint8_t* serialized_metadata, uint32_t* metadata_len);
@@ -109,6 +135,8 @@ class PARQUET_EXPORT FileMetaData {
   int num_schema_elements() const;
   std::unique_ptr<RowGroupMetaData> RowGroup(int i) const;
 
+  const Version& writer_version() const;
+
   void WriteTo(OutputStream* dst);
 
   // Return const-pointer to make it clear that this object is not to be copied
@@ -117,6 +145,7 @@ class PARQUET_EXPORT FileMetaData {
  private:
   friend FileMetaDataBuilder;
   explicit FileMetaData(const uint8_t* serialized_metadata, uint32_t* metadata_len);
+
   // PIMPL Idiom
   FileMetaData();
   class FileMetaDataImpl;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c2d8df9f/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 5eda79b..37c790c 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -42,8 +42,11 @@ namespace parquet {
 // assembled in a serialized stream for storing in a Parquet files
 
 SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
-    Compression::type codec_type, MemoryAllocator* allocator)
-    : stream_(std::move(stream)), decompression_buffer_(0, allocator) {
+    int64_t total_num_rows, Compression::type codec_type, MemoryAllocator* allocator)
+    : stream_(std::move(stream)),
+      decompression_buffer_(0, allocator),
+      seen_num_rows_(0),
+      total_num_rows_(total_num_rows) {
   max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
   decompressor_ = Codec::Create(codec_type);
 }
@@ -51,7 +54,7 @@ SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream>
stream,
 std::shared_ptr<Page> SerializedPageReader::NextPage() {
   // Loop here because there may be unhandled page types that we skip until
   // finding a page that we do know what to do with
-  while (true) {
+  while (seen_num_rows_ < total_num_rows_) {
     int64_t bytes_read = 0;
     int64_t bytes_available = 0;
     uint32_t header_size = 0;
@@ -89,7 +92,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
 
     // Read the compressed data page.
     buffer = stream_->Read(compressed_len, &bytes_read);
-    if (bytes_read != compressed_len) ParquetException::EofException();
+    if (bytes_read != compressed_len) { ParquetException::EofException(); }
 
     // Uncompress it if we need to
     if (decompressor_ != NULL) {
@@ -128,14 +131,17 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
         }
       }
 
-      auto page = std::make_shared<DataPage>(page_buffer, header.num_values,
+      seen_num_rows_ += header.num_values;
+
+      return std::make_shared<DataPage>(page_buffer, header.num_values,
           FromThrift(header.encoding), FromThrift(header.definition_level_encoding),
           FromThrift(header.repetition_level_encoding), page_statistics);
-
-      return page;
     } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) {
       const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
       bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false;
+
+      seen_num_rows_ += header.num_values;
+
       return std::make_shared<DataPageV2>(page_buffer, header.num_values,
           header.num_nulls, header.num_rows, FromThrift(header.encoding),
           header.definition_levels_byte_length, header.repetition_levels_byte_length,
@@ -149,6 +155,11 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
   return std::shared_ptr<Page>(nullptr);
 }
 
+SerializedRowGroup::SerializedRowGroup(RandomAccessSource* source,
+    FileMetaData* file_metadata, int row_group_number, const ReaderProperties& props)
+    : source_(source), file_metadata_(file_metadata), properties_(props) {
+  row_group_metadata_ = file_metadata->RowGroup(row_group_number);
+}
 const RowGroupMetaData* SerializedRowGroup::metadata() const {
   return row_group_metadata_.get();
 }
@@ -157,6 +168,9 @@ const ReaderProperties* SerializedRowGroup::properties() const {
   return &properties_;
 }
 
+// For PARQUET-816
+static constexpr int64_t kMaxDictHeaderSize = 100;
+
 std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) {
   // Read column chunk from the file
   auto col = row_group_metadata_->ColumnChunk(i);
@@ -166,13 +180,24 @@ std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int
i) {
     col_start = col->dictionary_page_offset();
   }
 
-  int64_t bytes_to_read = col->total_compressed_size();
+  int64_t col_length = col->total_compressed_size();
   std::unique_ptr<InputStream> stream;
 
-  stream = properties_.GetStream(source_, col_start, bytes_to_read);
+  // PARQUET-816 workaround for old files created by older parquet-mr
+  const FileMetaData::Version& version = file_metadata_->writer_version();
+  if (version.application == "parquet-mr" && version.VersionLt(1, 2, 9)) {
+    // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
+    // dictionary page header size in total_compressed_size and total_uncompressed_size
+    // (see IMPALA-694). We add padding to compensate.
+    int64_t bytes_remaining = source_->Size() - (col_start + col_length);
+    int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining);
+    col_length += padding;
+  }
+
+  stream = properties_.GetStream(source_, col_start, col_length);
 
-  return std::unique_ptr<PageReader>(new SerializedPageReader(
-      std::move(stream), col->compression(), properties_.allocator()));
+  return std::unique_ptr<PageReader>(new SerializedPageReader(std::move(stream),
+      row_group_metadata_->num_rows(), col->compression(), properties_.allocator()));
 }
 
 // ----------------------------------------------------------------------
@@ -205,7 +230,7 @@ SerializedFile::~SerializedFile() {
 
 std::shared_ptr<RowGroupReader> SerializedFile::GetRowGroup(int i) {
   std::unique_ptr<SerializedRowGroup> contents(
-      new SerializedRowGroup(source_.get(), file_metadata_->RowGroup(i), properties_));
+      new SerializedRowGroup(source_.get(), file_metadata_.get(), i, properties_));
   return std::make_shared<RowGroupReader>(std::move(contents));
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c2d8df9f/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index faedda0..582ab35 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -44,8 +44,8 @@ static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024;
 // and the page metadata.
 class SerializedPageReader : public PageReader {
  public:
-  SerializedPageReader(std::unique_ptr<InputStream> stream, Compression::type codec,
-      MemoryAllocator* allocator = default_allocator());
+  SerializedPageReader(std::unique_ptr<InputStream> stream, int64_t num_rows,
+      Compression::type codec, MemoryAllocator* allocator = default_allocator());
 
   virtual ~SerializedPageReader() {}
 
@@ -63,16 +63,22 @@ class SerializedPageReader : public PageReader {
   // Compression codec to use.
   std::unique_ptr<Codec> decompressor_;
   OwnedMutableBuffer decompression_buffer_;
+
   // Maximum allowed page size
   uint32_t max_page_header_size_;
+
+  // Number of rows read in data pages so far
+  int64_t seen_num_rows_;
+
+  // Number of rows in all the data pages
+  int64_t total_num_rows_;
 };
 
 // RowGroupReader::Contents implementation for the Parquet file specification
 class SerializedRowGroup : public RowGroupReader::Contents {
  public:
-  SerializedRowGroup(RandomAccessSource* source,
-      std::unique_ptr<RowGroupMetaData> metadata, const ReaderProperties props)
-      : source_(source), row_group_metadata_(std::move(metadata)), properties_(props) {}
+  SerializedRowGroup(RandomAccessSource* source, FileMetaData* file_metadata,
+      int row_group_number, const ReaderProperties& props);
 
   virtual const RowGroupMetaData* metadata() const;
 
@@ -82,6 +88,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
 
  private:
   RandomAccessSource* source_;
+  FileMetaData* file_metadata_;
   std::unique_ptr<RowGroupMetaData> row_group_metadata_;
   ReaderProperties properties_;
 };

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c2d8df9f/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index 04437b2..d21a809 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -36,17 +36,25 @@ namespace parquet {
 
 const char* data_dir = std::getenv("PARQUET_TEST_DATA");
 
-class TestAllTypesPlain : public ::testing::Test {
- public:
-  void SetUp() {
-    std::string dir_string(data_dir);
+std::string alltypes_plain() {
+  std::string dir_string(data_dir);
+  std::stringstream ss;
+  ss << dir_string << "/"
+     << "alltypes_plain.parquet";
+  return ss.str();
+}
 
-    std::stringstream ss;
-    ss << dir_string << "/"
-       << "alltypes_plain.parquet";
+std::string nation_dict_truncated_data_page() {
+  std::string dir_string(data_dir);
+  std::stringstream ss;
+  ss << dir_string << "/"
+     << "nation.dict-malformed.parquet";
+  return ss.str();
+}
 
-    reader_ = ParquetFileReader::OpenFile(ss.str());
-  }
+class TestAllTypesPlain : public ::testing::Test {
+ public:
+  void SetUp() { reader_ = ParquetFileReader::OpenFile(alltypes_plain()); }
 
   void TearDown() {}
 
@@ -181,4 +189,26 @@ TEST_F(TestLocalFileSource, FileClosedOnDestruction) {
   ASSERT_EQ(EBADF, errno);
 }
 
+TEST(TestFileReaderAdHoc, NationDictTruncatedDataPage) {
+  // PARQUET-816. Some files generated by older Parquet implementations may
+  // contain malformed data page metadata, and we can successfully decode them
+  // if we optimistically proceed to decoding, even if there is not enough data
+  // available in the stream. Before, we had quite aggressive checking of
+  // stream reads, which are not found e.g. in Impala's Parquet implementation
+  auto reader = ParquetFileReader::OpenFile(nation_dict_truncated_data_page(), false);
+  std::stringstream ss;
+
+  // empty list means print all
+  std::list<int> columns;
+  reader->DebugPrint(ss, columns, true);
+
+  reader = ParquetFileReader::OpenFile(nation_dict_truncated_data_page(), true);
+  std::stringstream ss2;
+  reader->DebugPrint(ss2, columns, true);
+
+  // The memory-mapped reads runs over the end of the column chunk and succeeds
+  // by accident
+  ASSERT_EQ(ss2.str(), ss.str());
+}
+
 }  // namespace parquet


Mime
View raw message