parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject parquet-cpp git commit: PARQUET-836: Bugfix + testcase for column subsetting in arrow::FileReader::ReadFlatTable
Date Tue, 24 Jan 2017 07:42:26 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 56fbdb63b -> f9ff60797


PARQUET-836: Bugfix + testcase for column subsetting in arrow::FileReader::ReadFlatTable

Also adds an option to set the random seed when generating data for the Arrow test cases

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #223 from wesm/PARQUET-836 and squashes the following commits:

673cc0d [Wes McKinney] Add randomness option to Arrow data generation. Test case for FileReader::ReadFlatTable
with column subsetting


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/f9ff6079
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/f9ff6079
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/f9ff6079

Branch: refs/heads/master
Commit: f9ff607972629346c39ca85862d958edc87b2e3a
Parents: 56fbdb6
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Tue Jan 24 08:42:03 2017 +0100
Committer: Uwe L. Korn <uwelk@xhochy.com>
Committed: Tue Jan 24 08:42:03 2017 +0100

----------------------------------------------------------------------
 src/parquet/arrow/arrow-reader-writer-test.cc | 106 +++++++++++++++------
 src/parquet/arrow/reader.cc                   |   2 +-
 src/parquet/arrow/test-util.h                 |  29 ++++--
 3 files changed, 99 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/f9ff6079/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 6748a8d..d681e57 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -32,6 +32,7 @@
 
 using arrow::Array;
 using arrow::Buffer;
+using arrow::Column;
 using arrow::ChunkedArray;
 using arrow::default_memory_pool;
 using arrow::io::BufferReader;
@@ -51,6 +52,8 @@ namespace arrow {
 const int SMALL_SIZE = 100;
 const int LARGE_SIZE = 10000;
 
+constexpr uint32_t kDefaultSeed = 0;
+
 template <typename TestType>
 struct test_traits {};
 
@@ -185,23 +188,6 @@ using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
 template <typename T>
 using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
 
-void DoTableRoundtrip(
-    const std::shared_ptr<Table>& table, int num_threads, std::shared_ptr<Table>*
out) {
-  auto sink = std::make_shared<InMemoryOutputStream>();
-
-  ASSERT_OK_NO_THROW(WriteFlatTable(
-      table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2));
-
-  std::shared_ptr<Buffer> buffer = sink->GetBuffer();
-  std::unique_ptr<FileReader> reader;
-  ASSERT_OK_NO_THROW(
-      OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
-          ::parquet::default_reader_properties(), nullptr, &reader));
-
-  reader->set_num_threads(num_threads);
-  ASSERT_OK_NO_THROW(reader->ReadFlatTable(out));
-}
-
 template <typename TestType>
 class TestParquetIO : public ::testing::Test {
  public:
@@ -324,7 +310,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
 TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
   // This also tests max_definition_level = 1
   std::shared_ptr<Array> values;
-  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, &values));
+
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
 
   std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
   this->WriteFlatColumn(schema, values);
@@ -335,7 +322,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
 TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
   // This also tests max_definition_level = 1
   std::shared_ptr<Array> values;
-  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, &values));
+
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
   std::shared_ptr<Table> table = MakeSimpleTable(values, true);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
   ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
@@ -407,7 +395,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO)
{
 TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
   int64_t chunk_size = SMALL_SIZE / 4;
   std::shared_ptr<Array> values;
-  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, &values));
+
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
 
   std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
   FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema));
@@ -424,7 +413,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
 TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
   // This also tests max_definition_level = 1
   std::shared_ptr<Array> values;
-  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, &values));
+
+  ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
   std::shared_ptr<Table> table = MakeSimpleTable(values, true);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
   ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
@@ -490,7 +480,8 @@ using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;
 TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
   // This also tests max_definition_level = 1
   std::shared_ptr<Array> values;
-  ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, &values));
+
+  ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &values));
   std::shared_ptr<Table> table = MakeSimpleTable(values, true);
 
   // Parquet 2.0 roundtrip should yield an uint32_t column again
@@ -507,7 +498,7 @@ TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
 TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
   // This also tests max_definition_level = 1
   std::shared_ptr<Array> arr;
-  ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, &arr));
+  ASSERT_OK(NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100, kDefaultSeed, &arr));
 
   std::shared_ptr<::arrow::UInt32Array> values =
       std::dynamic_pointer_cast<::arrow::UInt32Array>(arr);
@@ -660,18 +651,15 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead)
{
   this->CheckSingleColumnRequiredTableRead(4);
 }
 
-TEST(TestArrowReadWrite, MultithreadedRead) {
-  const int num_columns = 20;
-  const int num_rows = 1000;
-  const int num_threads = 4;
-
+void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr<Table>* out) {
   std::shared_ptr<::arrow::Column> column;
   std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns);
   std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns);
 
   std::shared_ptr<Array> values;
   for (int i = 0; i < num_columns; ++i) {
-    ASSERT_OK(NullableArray<::arrow::DoubleType>(num_rows, num_rows / 10, &values));
+    ASSERT_OK(NullableArray<::arrow::DoubleType>(
+        num_rows, num_rows / 10, static_cast<uint32_t>(i), &values));
     std::stringstream ss;
     ss << "col" << i;
     column = MakeColumn(ss.str(), values, true);
@@ -680,13 +668,69 @@ TEST(TestArrowReadWrite, MultithreadedRead) {
     fields[i] = column->field();
   }
   auto schema = std::make_shared<::arrow::Schema>(fields);
-  auto table = std::make_shared<Table>("schema", schema, columns);
+  *out = std::make_shared<Table>("schema", schema, columns);
+}
+
+void DoTableRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
+    const std::vector<int>& column_subset, std::shared_ptr<Table>* out) {
+  auto sink = std::make_shared<InMemoryOutputStream>();
+
+  ASSERT_OK_NO_THROW(WriteFlatTable(
+      table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2));
+
+  std::shared_ptr<Buffer> buffer = sink->GetBuffer();
+  std::unique_ptr<FileReader> reader;
+  ASSERT_OK_NO_THROW(
+      OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
+          ::parquet::default_reader_properties(), nullptr, &reader));
+
+  reader->set_num_threads(num_threads);
+
+  if (column_subset.size() > 0) {
+    ASSERT_OK_NO_THROW(reader->ReadFlatTable(column_subset, out));
+  } else {
+    // Read everything
+    ASSERT_OK_NO_THROW(reader->ReadFlatTable(out));
+  }
+}
+
+TEST(TestArrowReadWrite, MultithreadedRead) {
+  const int num_columns = 20;
+  const int num_rows = 1000;
+  const int num_threads = 4;
+
+  std::shared_ptr<Table> table;
+  MakeDoubleTable(num_columns, num_rows, &table);
 
   std::shared_ptr<Table> result;
-  DoTableRoundtrip(table, num_threads, &result);
+  DoTableRoundtrip(table, num_threads, {}, &result);
 
   ASSERT_TRUE(table->Equals(result));
 }
 
+TEST(TestArrowReadWrite, ReadColumnSubset) {
+  const int num_columns = 20;
+  const int num_rows = 1000;
+  const int num_threads = 4;
+
+  std::shared_ptr<Table> table;
+  MakeDoubleTable(num_columns, num_rows, &table);
+
+  std::shared_ptr<Table> result;
+  std::vector<int> column_subset = {0, 4, 8, 10};
+  DoTableRoundtrip(table, num_threads, column_subset, &result);
+
+  std::vector<std::shared_ptr<::arrow::Column>> ex_columns;
+  std::vector<std::shared_ptr<::arrow::Field>> ex_fields;
+  for (int i : column_subset) {
+    ex_columns.push_back(table->column(i));
+    ex_fields.push_back(table->column(i)->field());
+  }
+
+  auto ex_schema = std::make_shared<::arrow::Schema>(ex_fields);
+  auto expected = std::make_shared<Table>("schema", ex_schema, ex_columns);
+  ASSERT_TRUE(result->Equals(expected));
+}
+
 }  // namespace arrow
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/f9ff6079/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 9221041..a60d0b2 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -222,7 +222,7 @@ Status FileReader::Impl::ReadFlatTable(
   auto ReadColumn = [&indices, &schema, &columns, this](int i) {
     std::shared_ptr<Array> array;
     RETURN_NOT_OK(ReadFlatColumn(indices[i], &array));
-    columns[i] = std::make_shared<Column>(schema->field(indices[i]), array);
+    columns[i] = std::make_shared<Column>(schema->field(i), array);
     return Status::OK();
   };
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/f9ff6079/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index f996c2c..4760f0e 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -90,9 +90,10 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type
NonNullAr
 // This helper function only supports (size/2) nulls.
 template <typename ArrowType>
 typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NullableArray(
-    size_t size, size_t num_nulls, std::shared_ptr<Array>* out) {
+    size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
   std::vector<typename ArrowType::c_type> values;
-  ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values);
+  ::arrow::test::random_real<typename ArrowType::c_type>(
+      size, seed, -1e10, 1e10, &values);
   std::vector<uint8_t> valid_bytes(size, 1);
 
   for (size_t i = 0; i < num_nulls; i++) {
@@ -108,8 +109,11 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type
Nullable
 // This helper function only supports (size/2) nulls.
 template <typename ArrowType>
 typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NullableArray(
-    size_t size, size_t num_nulls, std::shared_ptr<Array>* out) {
+    size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
   std::vector<typename ArrowType::c_type> values;
+
+  // Seed is random in Arrow right now
+  (void)seed;
   ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
   std::vector<uint8_t> valid_bytes(size, 1);
 
@@ -127,7 +131,8 @@ typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type
NullableAr
 template <typename ArrowType>
 typename std::enable_if<
     is_arrow_string<ArrowType>::value || is_arrow_binary<ArrowType>::value, Status>::type
-NullableArray(size_t size, size_t num_nulls, std::shared_ptr<::arrow::Array>* out)
{
+NullableArray(
+    size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>*
out) {
   std::vector<uint8_t> valid_bytes(size, 1);
 
   for (size_t i = 0; i < num_nulls; i++) {
@@ -136,8 +141,16 @@ NullableArray(size_t size, size_t num_nulls, std::shared_ptr<::arrow::Array>*
ou
 
   using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
   BuilderType builder(::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+
+  const int kBufferSize = 10;
+  uint8_t buffer[kBufferSize];
   for (size_t i = 0; i < size; i++) {
-    builder.Append("test-string");
+    if (!valid_bytes[i]) {
+      builder.AppendNull();
+    } else {
+      ::arrow::test::random_bytes(kBufferSize, seed + i, buffer);
+      builder.Append(buffer, kBufferSize);
+    }
   }
   return builder.Finish(out);
 }
@@ -145,8 +158,12 @@ NullableArray(size_t size, size_t num_nulls, std::shared_ptr<::arrow::Array>*
ou
 // This helper function only supports (size/2) nulls yet.
 template <class ArrowType>
 typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableArray(
-    size_t size, size_t num_nulls, std::shared_ptr<Array>* out) {
+    size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
   std::vector<uint8_t> values;
+
+  // Seed is random in Arrow right now
+  (void)seed;
+
   ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
   std::vector<uint8_t> valid_bytes(size, 1);
 


Mime
View raw message