parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-782: Support writing to Arrow sinks
Date Sun, 27 Nov 2016 22:24:35 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 47a94590d -> 8bff44273


PARQUET-782: Support writing to Arrow sinks

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes #196 from xhochy/PARQUET-782 and squashes the following commits:

b89738a [Uwe L. Korn] Update arrow hash
041f66d [Uwe L. Korn] PARQUET-782: Support writing to Arrow sinks


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/8bff4427
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/8bff4427
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/8bff4427

Branch: refs/heads/master
Commit: 8bff44273d2ef3663c9e52cbc958d7b72442ba8f
Parents: 47a9459
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Sun Nov 27 17:24:27 2016 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Sun Nov 27 17:24:27 2016 -0500

----------------------------------------------------------------------
 src/parquet/arrow/arrow-reader-writer-test.cc | 40 +++++++++++-
 src/parquet/arrow/io.cc                       | 20 ++++++
 src/parquet/arrow/io.h                        | 19 ++++++
 src/parquet/arrow/reader.cc                   | 73 ++++++++++++++++++++++
 src/parquet/arrow/test-util.h                 | 20 +++---
 src/parquet/arrow/writer.cc                   |  8 +++
 src/parquet/arrow/writer.h                    |  7 +++
 thirdparty/versions.sh                        |  2 +-
 8 files changed, 177 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 5ec70f3..7bcb590 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -24,6 +24,7 @@
 #include "parquet/arrow/test-util.h"
 #include "parquet/arrow/writer.h"
 
+#include "arrow/io/memory.h"
 #include "arrow/test-util.h"
 #include "arrow/types/construct.h"
 #include "arrow/types/primitive.h"
@@ -342,6 +343,29 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
   this->ReadAndCheckSingleColumnTable(values);
 }
 
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values));
+  std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  auto buffer = std::make_shared<::arrow::PoolBuffer>();
+  auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer);
+  ASSERT_OK_NO_THROW(WriteFlatTable(
+      table.get(), default_memory_pool(), arrow_sink_, 512, default_writer_properties()));
+
+  std::shared_ptr<ParquetBuffer> pbuffer =
+      std::make_shared<ParquetBuffer>(buffer->data(), buffer->size());
+  std::unique_ptr<RandomAccessSource> source(new BufferReader(pbuffer));
+  std::shared_ptr<::arrow::Table> out;
+  this->ReadTableFromFile(ParquetFileReader::Open(std::move(source)), &out);
+  ASSERT_EQ(1, out->num_columns());
+  ASSERT_EQ(values->length(), out->num_rows());
+
+  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+  ASSERT_EQ(1, chunked_array->num_chunks());
+  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
 TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
   int64_t chunk_size = SMALL_SIZE / 4;
   std::shared_ptr<Array> values;
@@ -456,10 +480,20 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
 template <typename T>
 using ParquetCDataType = typename ParquetDataType<T>::c_type;
 
+template <typename T>
+struct c_type_trait {
+  using ArrowCType = typename T::c_type;
+};
+
+template <>
+struct c_type_trait<::arrow::BooleanType> {
+  using ArrowCType = uint8_t;
+};
+
 template <typename TestType>
 class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
  public:
-  typedef typename TestType::c_type T;
+  typedef typename c_type_trait<TestType>::ArrowCType T;
 
   void MakeTestFile(std::vector<T>& values, int num_chunks,
       std::unique_ptr<ParquetFileReader>* file_reader) {
@@ -497,7 +531,7 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
 
     std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
     ASSERT_EQ(1, chunked_array->num_chunks());
-    ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get());
+    ExpectArrayT<TestType>(values.data(), chunked_array->chunk(0).get());
   }
 
   void CheckSingleColumnRequiredRead(int num_chunks) {
@@ -508,7 +542,7 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
     std::shared_ptr<Array> out;
     this->ReadSingleColumnFile(std::move(file_reader), &out);
 
-    ExpectArray<TestType>(values.data(), out.get());
+    ExpectArrayT<TestType>(values.data(), out.get());
   }
 };
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/io.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/io.cc b/src/parquet/arrow/io.cc
index 8f3aa3e..74464f2 100644
--- a/src/parquet/arrow/io.cc
+++ b/src/parquet/arrow/io.cc
@@ -103,5 +103,25 @@ std::shared_ptr<Buffer> ParquetReadSource::Read(int64_t nbytes)
{
   return result;
 }
 
+ParquetWriteSink::ParquetWriteSink(
+    const std::shared_ptr<::arrow::io::OutputStream>& stream)
+    : stream_(stream) {}
+
+ParquetWriteSink::~ParquetWriteSink() {}
+
+void ParquetWriteSink::Close() {
+  PARQUET_THROW_NOT_OK(stream_->Close());
+}
+
+int64_t ParquetWriteSink::Tell() {
+  int64_t position;
+  PARQUET_THROW_NOT_OK(stream_->Tell(&position));
+  return position;
+}
+
+void ParquetWriteSink::Write(const uint8_t* data, int64_t length) {
+  PARQUET_THROW_NOT_OK(stream_->Write(data, length));
+}
+
 }  // namespace arrow
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/io.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/io.h b/src/parquet/arrow/io.h
index 119f8de..a068a4e 100644
--- a/src/parquet/arrow/io.h
+++ b/src/parquet/arrow/io.h
@@ -76,6 +76,25 @@ class PARQUET_EXPORT ParquetReadSource : public RandomAccessSource {
   ParquetAllocator* allocator_;
 };
 
+class PARQUET_EXPORT ParquetWriteSink : public OutputStream {
+ public:
+  explicit ParquetWriteSink(const std::shared_ptr<::arrow::io::OutputStream>& stream);
+
+  virtual ~ParquetWriteSink();
+
+  // Close the output stream
+  void Close() override;
+
+  // Return the current position in the output stream relative to the start
+  int64_t Tell() override;
+
+  // Copy bytes into the output stream
+  void Write(const uint8_t* data, int64_t length) override;
+
+ private:
+  std::shared_ptr<::arrow::io::OutputStream> stream_;
+};
+
 }  // namespace arrow
 }  // namespace parquet
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index bc9ec8f..2d2b5cd 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -376,6 +376,79 @@ Status FlatColumnReader::Impl::TypedReadBatch(
 }
 
 template <>
+Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
+    int batch_size, std::shared_ptr<Array>* out) {
+  int values_to_read = batch_size;
+  RETURN_NOT_OK(InitDataBuffer<::arrow::BooleanType>(batch_size));
+  valid_bits_idx_ = 0;
+  if (descr_->max_definition_level() > 0) {
+    valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_);
+    int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size) / 8;
+    valid_bits_buffer_->Resize(valid_bits_size);
+    valid_bits_ptr_ = valid_bits_buffer_->mutable_data();
+    memset(valid_bits_ptr_, 0, valid_bits_size);
+    null_count_ = 0;
+  }
+
+  while ((values_to_read > 0) && column_reader_) {
+    values_buffer_.Resize(values_to_read * sizeof(bool));
+    if (descr_->max_definition_level() > 0) {
+      def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
+    }
+    auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get());
+    int64_t values_read;
+    int64_t levels_read;
+    int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+    auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
+    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+                             values_to_read, def_levels, nullptr, values, &values_read));
+    values_to_read -= levels_read;
+    if (descr_->max_definition_level() == 0) {
+      ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(values, values_read);
+    } else {
+      // As per the defintion and checks for flat columns:
+      // descr_->max_definition_level() == 1
+      ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
+          def_levels, values, values_read, levels_read);
+    }
+    if (!column_reader_->HasNext()) { NextRowGroup(); }
+  }
+
+  if (descr_->max_definition_level() > 0) {
+    // TODO: Shrink arrays in the case they are too large
+    if (valid_bits_idx_ < batch_size * 0.8) {
+      // Shrink arrays as they are larger than the output.
+      // TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays
+      //    without the need for a copy. Given a decent underlying allocator this
+      //    should still free some underlying pages to the OS.
+
+      auto data_buffer = std::make_shared<PoolBuffer>(pool_);
+      RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(bool)));
+      memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size());
+      data_buffer_ = data_buffer;
+
+      auto valid_bits_buffer = std::make_shared<PoolBuffer>(pool_);
+      RETURN_NOT_OK(
+          valid_bits_buffer->Resize(::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8));
+      memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(),
+          valid_bits_buffer->size());
+      valid_bits_buffer_ = valid_bits_buffer;
+    }
+    *out = std::make_shared<::arrow::BooleanArray>(
+        field_->type, valid_bits_idx_, data_buffer_, null_count_, valid_bits_buffer_);
+    // Relase the ownership
+    data_buffer_.reset();
+    valid_bits_buffer_.reset();
+    return Status::OK();
+  } else {
+    *out = std::make_shared<::arrow::BooleanArray>(
+        field_->type, valid_bits_idx_, data_buffer_);
+    data_buffer_.reset();
+    return Status::OK();
+  }
+}
+
+template <>
 Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
     int batch_size, std::shared_ptr<Array>* out) {
   int values_to_read = batch_size;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 92798ff..dedd398 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -37,6 +37,9 @@ using is_arrow_int = std::is_integral<typename ArrowType::c_type>;
 template <typename ArrowType>
 using is_arrow_string = std::is_same<ArrowType, ::arrow::StringType>;
 
+template <typename ArrowType>
+using is_arrow_bool = std::is_same<ArrowType, ::arrow::BooleanType>;
+
 template <class ArrowType>
 typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullArray(
     size_t size, std::shared_ptr<Array>* out) {
@@ -70,8 +73,9 @@ typename std::enable_if<is_arrow_string<ArrowType>::value, Status>::type
NonNull
   return builder.Finish(out);
 }
 
-template <>
-Status NonNullArray<::arrow::BooleanType>(size_t size, std::shared_ptr<Array>*
out) {
+template <class ArrowType>
+typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullArray(
+    size_t size, std::shared_ptr<Array>* out) {
   std::vector<uint8_t> values;
   ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
   ::arrow::BooleanBuilder builder(
@@ -135,8 +139,8 @@ typename std::enable_if<is_arrow_string<ArrowType>::value, Status>::type
Nullabl
 }
 
 // This helper function only supports (size/2) nulls yet.
-template <>
-Status NullableArray<::arrow::BooleanType>(
+template <class ArrowType>
+typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableArray(
     size_t size, size_t num_nulls, std::shared_ptr<Array>* out) {
   std::vector<uint8_t> values;
   ::arrow::test::randint<uint8_t>(size, 0, 1, &values);
@@ -176,19 +180,19 @@ void ExpectArray(T* expected, Array* result) {
 }
 
 template <typename ArrowType>
-void ExpectArray(typename ArrowType::c_type* expected, Array* result) {
+void ExpectArrayT(void* expected, Array* result) {
   ::arrow::PrimitiveArray* p_array = static_cast<::arrow::PrimitiveArray*>(result);
   for (int64_t i = 0; i < result->length(); i++) {
-    EXPECT_EQ(expected[i],
+    EXPECT_EQ(reinterpret_cast<typename ArrowType::c_type*>(expected)[i],
         reinterpret_cast<const typename ArrowType::c_type*>(p_array->data()->data())[i]);
   }
 }
 
 template <>
-void ExpectArray<::arrow::BooleanType>(uint8_t* expected, Array* result) {
+void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) {
   ::arrow::BooleanBuilder builder(
       ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>());
-  builder.Append(expected, result->length());
+  builder.Append(reinterpret_cast<uint8_t*>(expected), result->length());
 
   std::shared_ptr<Array> expected_array;
   EXPECT_OK(builder.Finish(&expected_array));

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index e4d3745..b7c7d20 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <vector>
 
+#include "parquet/arrow/io.h"
 #include "parquet/arrow/schema.h"
 #include "parquet/arrow/utils.h"
 
@@ -370,6 +371,13 @@ Status WriteFlatTable(const Table* table, MemoryPool* pool,
   return writer.Close();
 }
 
+Status WriteFlatTable(const Table* table, MemoryPool* pool,
+    const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
+    const std::shared_ptr<WriterProperties>& properties) {
+  auto parquet_sink = std::make_shared<ParquetWriteSink>(sink);
+  return WriteFlatTable(table, pool, parquet_sink, chunk_size, properties);
+}
+
 }  // namespace arrow
 
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/src/parquet/arrow/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
index 92524d8..a82c2f6 100644
--- a/src/parquet/arrow/writer.h
+++ b/src/parquet/arrow/writer.h
@@ -23,6 +23,8 @@
 #include "parquet/api/schema.h"
 #include "parquet/api/writer.h"
 
+#include "arrow/io/interfaces.h"
+
 namespace arrow {
 
 class Array;
@@ -71,6 +73,11 @@ class PARQUET_EXPORT FileWriter {
     int64_t chunk_size,
     const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
 
+::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table,
+    ::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>&
sink,
+    int64_t chunk_size,
+    const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
+
 }  // namespace arrow
 
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8bff4427/thirdparty/versions.sh
----------------------------------------------------------------------
diff --git a/thirdparty/versions.sh b/thirdparty/versions.sh
index ff5644e..f877b33 100755
--- a/thirdparty/versions.sh
+++ b/thirdparty/versions.sh
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-ARROW_VERSION="d946e7917d55cb220becd6469ae93430f2e60764"
+ARROW_VERSION="86f56a6073c3254487ede3aff1dc9d117d24adaf"
 ARROW_URL="https://github.com/apache/arrow/archive/${ARROW_VERSION}.tar.gz"
 ARROW_BASEDIR="arrow-${ARROW_VERSION}"
 


Mime
View raw message