arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-237: Implement parquet-cpp's abstract IO interfaces for memory allocation and file reading
Date Tue, 12 Jul 2016 05:59:05 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 77598fa59 -> ff6132f8a


ARROW-237: Implement parquet-cpp's abstract IO interfaces for memory allocation and file reading

Part of ARROW-227 and ARROW-236

Author: Wes McKinney <wesm@apache.org>

Closes #101 from wesm/ARROW-237 and squashes the following commits:

00c8211 [Wes McKinney] Draft implementations of parquet-cpp allocator and read-only file interfaces


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ff6132f8
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ff6132f8
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ff6132f8

Branch: refs/heads/master
Commit: ff6132f8a1c2a98cf7c94ae327342c8b38aecb18
Parents: 77598fa
Author: Wes McKinney <wesm@apache.org>
Authored: Mon Jul 11 22:58:57 2016 -0700
Committer: Wes McKinney <wesm@apache.org>
Committed: Mon Jul 11 22:58:57 2016 -0700

----------------------------------------------------------------------
 cpp/src/arrow/io/hdfs-io-test.cc                |   2 +-
 cpp/src/arrow/io/hdfs.cc                        |  16 +-
 cpp/src/arrow/io/hdfs.h                         |   8 +-
 cpp/src/arrow/io/interfaces.h                   |  14 +-
 cpp/src/arrow/parquet/CMakeLists.txt            |   5 +
 cpp/src/arrow/parquet/io.cc                     |  94 ++++
 cpp/src/arrow/parquet/io.h                      |  80 +++
 cpp/src/arrow/parquet/parquet-io-test.cc        | 511 ++++---------------
 .../arrow/parquet/parquet-reader-writer-test.cc | 489 ++++++++++++++++++
 cpp/src/arrow/parquet/utils.h                   |  15 +-
 python/pyarrow/includes/libarrow_io.pxd         |  12 +-
 python/pyarrow/io.pyx                           |   8 +-
 12 files changed, 810 insertions(+), 444 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/ff6132f8/cpp/src/arrow/io/hdfs-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs-io-test.cc b/cpp/src/arrow/io/hdfs-io-test.cc
index d1bf140..e48a281 100644
--- a/cpp/src/arrow/io/hdfs-io-test.cc
+++ b/cpp/src/arrow/io/hdfs-io-test.cc
@@ -266,7 +266,7 @@ TEST_F(TestHdfsClient, ReadableMethods) {
   ASSERT_EQ(size, file_size);
 
   uint8_t buffer[50];
-  int32_t bytes_read = 0;
+  int64_t bytes_read = 0;
 
   ASSERT_OK(file->Read(50, &bytes_read, buffer));
   ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50));

http://git-wip-us.apache.org/repos/asf/arrow/blob/ff6132f8/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
index 6da6ea4..800c3ed 100644
--- a/cpp/src/arrow/io/hdfs.cc
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -100,7 +100,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
     return Status::OK();
   }
 
-  Status ReadAt(int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
+  Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
     tSize ret = hdfsPread(fs_, file_, static_cast<tOffset>(position),
         reinterpret_cast<void*>(buffer), nbytes);
     RETURN_NOT_OK(CheckReadResult(ret));
@@ -108,7 +108,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
     return Status::OK();
   }
 
-  Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
+  Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
     tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer), nbytes);
     RETURN_NOT_OK(CheckReadResult(ret));
     *bytes_read = ret;
@@ -138,11 +138,11 @@ Status HdfsReadableFile::Close() {
 }
 
 Status HdfsReadableFile::ReadAt(
-    int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
+    int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
   return impl_->ReadAt(position, nbytes, bytes_read, buffer);
 }
 
-Status HdfsReadableFile::Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
+Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
   return impl_->Read(nbytes, bytes_read, buffer);
 }
 
@@ -177,7 +177,7 @@ class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl {
     return Status::OK();
   }
 
-  Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written) {
+  Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written) {
     tSize ret = hdfsWrite(fs_, file_, reinterpret_cast<const void*>(buffer), nbytes);
     CHECK_FAILURE(ret, "Write");
     *bytes_written = ret;
@@ -198,12 +198,12 @@ Status HdfsWriteableFile::Close() {
 }
 
 Status HdfsWriteableFile::Write(
-    const uint8_t* buffer, int32_t nbytes, int32_t* bytes_read) {
+    const uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) {
   return impl_->Write(buffer, nbytes, bytes_read);
 }
 
-Status HdfsWriteableFile::Write(const uint8_t* buffer, int32_t nbytes) {
-  int32_t bytes_written_dummy = 0;
+Status HdfsWriteableFile::Write(const uint8_t* buffer, int64_t nbytes) {
+  int64_t bytes_written_dummy = 0;
   return Write(buffer, nbytes, &bytes_written_dummy);
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ff6132f8/cpp/src/arrow/io/hdfs.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h
index 532e3c5..b6449fc 100644
--- a/cpp/src/arrow/io/hdfs.h
+++ b/cpp/src/arrow/io/hdfs.h
@@ -164,14 +164,14 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
   Status GetSize(int64_t* size) override;
 
   Status ReadAt(
-      int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override;
+      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
 
   Status Seek(int64_t position) override;
   Status Tell(int64_t* position) override;
 
   // NOTE: If you wish to read a particular range of a file in a multithreaded
   // context, you may prefer to use ReadAt to avoid locking issues
-  Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override;
+  Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
 
  private:
   class ARROW_NO_EXPORT HdfsReadableFileImpl;
@@ -189,9 +189,9 @@ class ARROW_EXPORT HdfsWriteableFile : public WriteableFile {
 
   Status Close() override;
 
-  Status Write(const uint8_t* buffer, int32_t nbytes) override;
+  Status Write(const uint8_t* buffer, int64_t nbytes) override;
 
-  Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written);
+  Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written);
 
   Status Tell(int64_t* position) override;
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ff6132f8/cpp/src/arrow/io/interfaces.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 4bd8a8f..25361d5 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef ARROW_IO_INTERFACES
-#define ARROW_IO_INTERFACES
+#ifndef ARROW_IO_INTERFACES_H
+#define ARROW_IO_INTERFACES_H
 
 #include <cstdint>
 
@@ -40,17 +40,17 @@ class FileSystemClient {
 };
 
 class FileBase {
+ public:
   virtual Status Close() = 0;
-
   virtual Status Tell(int64_t* position) = 0;
 };
 
 class ReadableFile : public FileBase {
  public:
   virtual Status ReadAt(
-      int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0;
+      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0;
 
-  virtual Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0;
+  virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0;
 
   virtual Status GetSize(int64_t* size) = 0;
 };
@@ -62,10 +62,10 @@ class RandomAccessFile : public ReadableFile {
 
 class WriteableFile : public FileBase {
  public:
-  virtual Status Write(const uint8_t* buffer, int32_t nbytes) = 0;
+  virtual Status Write(const uint8_t* buffer, int64_t nbytes) = 0;
 };
 
 }  // namespace io
 }  // namespace arrow
 
-#endif  // ARROW_IO_INTERFACES
+#endif  // ARROW_IO_INTERFACES_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/ff6132f8/cpp/src/arrow/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt
index 00f19b3..f2a90b7 100644
--- a/cpp/src/arrow/parquet/CMakeLists.txt
+++ b/cpp/src/arrow/parquet/CMakeLists.txt
@@ -19,6 +19,7 @@
 # arrow_parquet : Arrow <-> Parquet adapter
 
 set(PARQUET_SRCS
+  io.cc
   reader.cc
   schema.cc
   writer.cc
@@ -48,8 +49,12 @@ ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet)
 ADD_ARROW_TEST(parquet-io-test)
 ARROW_TEST_LINK_LIBRARIES(parquet-io-test arrow_parquet)
 
+ADD_ARROW_TEST(parquet-reader-writer-test)
+ARROW_TEST_LINK_LIBRARIES(parquet-reader-writer-test arrow_parquet)
+
 # Headers: top level
 install(FILES
+  io.h
   reader.h
   schema.h
   utils.h

http://git-wip-us.apache.org/repos/asf/arrow/blob/ff6132f8/cpp/src/arrow/parquet/io.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/io.cc b/cpp/src/arrow/parquet/io.cc
new file mode 100644
index 0000000..c81aa8c
--- /dev/null
+++ b/cpp/src/arrow/parquet/io.cc
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/parquet/io.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/api/io.h"
+
+#include "arrow/parquet/utils.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+// To assist with readability
+using ArrowROFile = arrow::io::RandomAccessFile;
+
+namespace arrow {
+namespace parquet {
+
+// ----------------------------------------------------------------------
+// ParquetAllocator
+
+ParquetAllocator::ParquetAllocator() : pool_(default_memory_pool()) {}
+
+ParquetAllocator::ParquetAllocator(MemoryPool* pool) : pool_(pool) {}
+
+ParquetAllocator::~ParquetAllocator() {}
+
+uint8_t* ParquetAllocator::Malloc(int64_t size) {
+  uint8_t* result;
+  PARQUET_THROW_NOT_OK(pool_->Allocate(size, &result));
+  return result;
+}
+
+void ParquetAllocator::Free(uint8_t* buffer, int64_t size) {
+  // Does not report Status
+  pool_->Free(buffer, size);
+}
+
+// ----------------------------------------------------------------------
+// ParquetReadSource
+
+ParquetReadSource::ParquetReadSource(
+    const std::shared_ptr<ArrowROFile>& file, ParquetAllocator* allocator)
+    : file_(file), allocator_(allocator) {}
+
+void ParquetReadSource::Close() {
+  PARQUET_THROW_NOT_OK(file_->Close());
+}
+
+int64_t ParquetReadSource::Tell() const {
+  int64_t position;
+  PARQUET_THROW_NOT_OK(file_->Tell(&position));
+  return position;
+}
+
+void ParquetReadSource::Seek(int64_t position) {
+  PARQUET_THROW_NOT_OK(file_->Seek(position));
+}
+
+int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) {
+  int64_t bytes_read;
+  PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out));
+  return bytes_read;
+}
+
+std::shared_ptr<::parquet::Buffer> ParquetReadSource::Read(int64_t nbytes) {
+  // TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests
+  // that there should be more code sharing amongst file-like sources
+  auto result = std::make_shared<::parquet::OwnedMutableBuffer>(0, allocator_);
+  result->Resize(nbytes);
+
+  int64_t bytes_read = Read(nbytes, result->mutable_data());
+  if (bytes_read < nbytes) { result->Resize(bytes_read); }
+  return result;
+}
+
+}  // namespace parquet
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ff6132f8/cpp/src/arrow/parquet/io.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/io.h b/cpp/src/arrow/parquet/io.h
new file mode 100644
index 0000000..ef8871d
--- /dev/null
+++ b/cpp/src/arrow/parquet/io.h
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Bridges Arrow's IO interfaces and Parquet-cpp's IO interfaces
+
+#ifndef ARROW_PARQUET_IO_H
+#define ARROW_PARQUET_IO_H
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/api/io.h"
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class MemoryPool;
+
+namespace parquet {
+
+// An implementation of the Parquet MemoryAllocator API that plugs into an
+// existing Arrow memory pool. This way we can direct all allocations to a
+// single place rather than tracking allocations in different locations (for
+// example: without utilizing parquet-cpp's default allocator)
+class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator {
+ public:
+  // Uses the default memory pool
+  ParquetAllocator();
+
+  explicit ParquetAllocator(MemoryPool* pool);
+  virtual ~ParquetAllocator();
+
+  uint8_t* Malloc(int64_t size) override;
+  void Free(uint8_t* buffer, int64_t size) override;
+
+  MemoryPool* pool() { return pool_; }
+
+ private:
+  MemoryPool* pool_;
+};
+
+class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource {
+ public:
+  ParquetReadSource(
+      const std::shared_ptr<io::RandomAccessFile>& file, ParquetAllocator* allocator);
+
+  void Close() override;
+  int64_t Tell() const override;
+  void Seek(int64_t pos) override;
+  int64_t Read(int64_t nbytes, uint8_t* out) override;
+  std::shared_ptr<::parquet::Buffer> Read(int64_t nbytes) override;
+
+ private:
+  // An Arrow readable file of some kind
+  std::shared_ptr<io::RandomAccessFile> file_;
+
+  // The allocator is required for creating managed buffers
+  ParquetAllocator* allocator_;
+};
+
+}  // namespace parquet
+}  // namespace arrow
+
+#endif  // ARROW_PARQUET_IO_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/ff6132f8/cpp/src/arrow/parquet/parquet-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc
index bfc27d2..7e724b3 100644
--- a/cpp/src/arrow/parquet/parquet-io-test.cc
+++ b/cpp/src/arrow/parquet/parquet-io-test.cc
@@ -15,475 +15,164 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+#include <string>
+
 #include "gtest/gtest.h"
 
-#include "arrow/test-util.h"
-#include "arrow/parquet/test-util.h"
-#include "arrow/parquet/reader.h"
-#include "arrow/parquet/writer.h"
-#include "arrow/types/construct.h"
-#include "arrow/types/primitive.h"
-#include "arrow/types/string.h"
+#include "arrow/parquet/io.h"
 #include "arrow/util/memory-pool.h"
 #include "arrow/util/status.h"
 
-#include "parquet/api/reader.h"
-#include "parquet/api/writer.h"
-
-using ParquetBuffer = parquet::Buffer;
-using parquet::BufferReader;
-using parquet::default_writer_properties;
-using parquet::InMemoryOutputStream;
-using parquet::LogicalType;
-using parquet::ParquetFileReader;
-using parquet::ParquetFileWriter;
-using parquet::RandomAccessSource;
-using parquet::Repetition;
-using parquet::SchemaDescriptor;
-using parquet::ParquetVersion;
-using ParquetType = parquet::Type;
-using parquet::schema::GroupNode;
-using parquet::schema::NodePtr;
-using parquet::schema::PrimitiveNode;
+#include "parquet/api/io.h"
 
 namespace arrow {
-
 namespace parquet {
 
-const int SMALL_SIZE = 100;
-const int LARGE_SIZE = 10000;
-
-template <typename TestType>
-struct test_traits {};
+// Allocator tests
 
-template <>
-struct test_traits<BooleanType> {
-  static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN;
-  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
-  static uint8_t const value;
-};
-
-const uint8_t test_traits<BooleanType>::value(1);
+TEST(TestParquetAllocator, DefaultCtor) {
+  ParquetAllocator allocator;
 
-template <>
-struct test_traits<UInt8Type> {
-  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
-  static constexpr LogicalType::type logical_enum = LogicalType::UINT_8;
-  static uint8_t const value;
-};
+  const int buffer_size = 10;
 
-const uint8_t test_traits<UInt8Type>::value(64);
-
-template <>
-struct test_traits<Int8Type> {
-  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
-  static constexpr LogicalType::type logical_enum = LogicalType::INT_8;
-  static int8_t const value;
-};
-
-const int8_t test_traits<Int8Type>::value(-64);
-
-template <>
-struct test_traits<UInt16Type> {
-  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
-  static constexpr LogicalType::type logical_enum = LogicalType::UINT_16;
-  static uint16_t const value;
-};
+  uint8_t* buffer = nullptr;
+  ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size););
 
-const uint16_t test_traits<UInt16Type>::value(1024);
+  // valgrind will complain if we write into nullptr
+  memset(buffer, 0, buffer_size);
 
-template <>
-struct test_traits<Int16Type> {
-  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
-  static constexpr LogicalType::type logical_enum = LogicalType::INT_16;
-  static int16_t const value;
-};
-
-const int16_t test_traits<Int16Type>::value(-1024);
-
-template <>
-struct test_traits<UInt32Type> {
-  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
-  static constexpr LogicalType::type logical_enum = LogicalType::UINT_32;
-  static uint32_t const value;
-};
-
-const uint32_t test_traits<UInt32Type>::value(1024);
-
-template <>
-struct test_traits<Int32Type> {
-  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
-  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
-  static int32_t const value;
-};
-
-const int32_t test_traits<Int32Type>::value(-1024);
-
-template <>
-struct test_traits<UInt64Type> {
-  static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
-  static constexpr LogicalType::type logical_enum = LogicalType::UINT_64;
-  static uint64_t const value;
-};
-
-const uint64_t test_traits<UInt64Type>::value(1024);
-
-template <>
-struct test_traits<Int64Type> {
-  static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
-  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
-  static int64_t const value;
-};
-
-const int64_t test_traits<Int64Type>::value(-1024);
-
-template <>
-struct test_traits<FloatType> {
-  static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
-  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
-  static float const value;
-};
-
-const float test_traits<FloatType>::value(2.1f);
-
-template <>
-struct test_traits<DoubleType> {
-  static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE;
-  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
-  static double const value;
-};
-
-const double test_traits<DoubleType>::value(4.2);
-
-template <>
-struct test_traits<StringType> {
-  static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
-  static constexpr LogicalType::type logical_enum = LogicalType::UTF8;
-  static std::string const value;
-};
-
-const std::string test_traits<StringType>::value("Test");
-
-template <typename T>
-using ParquetDataType = ::parquet::DataType<test_traits<T>::parquet_enum>;
-
-template <typename T>
-using ParquetWriter = ::parquet::TypedColumnWriter<ParquetDataType<T>>;
+  allocator.Free(buffer, buffer_size);
+}
 
-template <typename TestType>
-class TestParquetIO : public ::testing::Test {
+// Pass through to the default memory pool
+class TrackingPool : public MemoryPool {
  public:
-  virtual void SetUp() {}
-
-  std::shared_ptr<GroupNode> MakeSchema(Repetition::type repetition) {
-    auto pnode = PrimitiveNode::Make("column1", repetition,
-        test_traits<TestType>::parquet_enum, test_traits<TestType>::logical_enum);
-    NodePtr node_ =
-        GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
-    return std::static_pointer_cast<GroupNode>(node_);
-  }
-
-  std::unique_ptr<ParquetFileWriter> MakeWriter(
-      const std::shared_ptr<GroupNode>& schema) {
-    sink_ = std::make_shared<InMemoryOutputStream>();
-    return ParquetFileWriter::Open(sink_, schema);
-  }
-
-  std::unique_ptr<ParquetFileReader> ReaderFromSink() {
-    std::shared_ptr<ParquetBuffer> buffer = sink_->GetBuffer();
-    std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
-    return ParquetFileReader::Open(std::move(source));
-  }
-
-  void ReadSingleColumnFile(
-      std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Array>* out) {
-    arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
-    std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
-    ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader));
-    ASSERT_NE(nullptr, column_reader.get());
-
-    ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out));
-    ASSERT_NE(nullptr, out->get());
-  }
+  TrackingPool() : pool_(default_memory_pool()), bytes_allocated_(0) {}
 
-  void ReadAndCheckSingleColumnFile(Array* values) {
-    std::shared_ptr<Array> out;
-    ReadSingleColumnFile(ReaderFromSink(), &out);
-    ASSERT_TRUE(values->Equals(out));
+  Status Allocate(int64_t size, uint8_t** out) override {
+    RETURN_NOT_OK(pool_->Allocate(size, out));
+    bytes_allocated_ += size;
+    return Status::OK();
   }
 
-  void ReadTableFromFile(
-      std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Table>* out) {
-    arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
-    ASSERT_OK_NO_THROW(reader.ReadFlatTable(out));
-    ASSERT_NE(nullptr, out->get());
+  void Free(uint8_t* buffer, int64_t size) override {
+    pool_->Free(buffer, size);
+    bytes_allocated_ -= size;
   }
 
-  void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) {
-    std::shared_ptr<Table> out;
-    ReadTableFromFile(ReaderFromSink(), &out);
-    ASSERT_EQ(1, out->num_columns());
-    ASSERT_EQ(values->length(), out->num_rows());
-
-    std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
-    ASSERT_EQ(1, chunked_array->num_chunks());
-    ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
-  }
+  int64_t bytes_allocated() const override { return bytes_allocated_; }
 
-  template <typename ArrayType>
-  void WriteFlatColumn(const std::shared_ptr<GroupNode>& schema,
-      const std::shared_ptr<ArrayType>& values) {
-    FileWriter writer(default_memory_pool(), MakeWriter(schema));
-    ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
-    ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get()));
-    ASSERT_OK_NO_THROW(writer.Close());
-  }
-
-  std::shared_ptr<InMemoryOutputStream> sink_;
+ private:
+  MemoryPool* pool_;
+  int64_t bytes_allocated_;
 };
 
-// We habe separate tests for UInt32Type as this is currently the only type
-// where a roundtrip does not yield the identical Array structure.
-// There we write an UInt32 Array but receive an Int64 Array as result for
-// Parquet version 1.0.
+TEST(TestParquetAllocator, CustomPool) {
+  TrackingPool pool;
 
-typedef ::testing::Types<BooleanType, UInt8Type, Int8Type, UInt16Type, Int16Type,
-    Int32Type, UInt64Type, Int64Type, FloatType, DoubleType, StringType> TestTypes;
+  ParquetAllocator allocator(&pool);
 
-TYPED_TEST_CASE(TestParquetIO, TestTypes);
+  ASSERT_EQ(&pool, allocator.pool());
 
-TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
-  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+  const int buffer_size = 10;
 
-  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
-  this->WriteFlatColumn(schema, values);
+  uint8_t* buffer = nullptr;
+  ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size););
 
-  this->ReadAndCheckSingleColumnFile(values.get());
-}
-
-TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
-  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
-  std::shared_ptr<Table> table = MakeSimpleTable(values, false);
-  this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_,
-      values->length(), default_writer_properties()));
-
-  std::shared_ptr<Table> out;
-  this->ReadTableFromFile(this->ReaderFromSink(), &out);
-  ASSERT_EQ(1, out->num_columns());
-  ASSERT_EQ(100, out->num_rows());
-
-  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
-  ASSERT_EQ(1, chunked_array->num_chunks());
-  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
-}
+  ASSERT_EQ(buffer_size, pool.bytes_allocated());
 
-TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
-  // This also tests max_definition_level = 1
-  auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+  // valgrind will complain if we write into nullptr
+  memset(buffer, 0, buffer_size);
 
-  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
-  this->WriteFlatColumn(schema, values);
+  allocator.Free(buffer, buffer_size);
 
-  this->ReadAndCheckSingleColumnFile(values.get());
+  ASSERT_EQ(0, pool.bytes_allocated());
 }
 
-TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
-  // This also tests max_definition_level = 1
-  std::shared_ptr<Array> values = NullableArray<TypeParam>(SMALL_SIZE, 10);
-  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
-  this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_,
-      values->length(), default_writer_properties()));
+// ----------------------------------------------------------------------
+// Read source tests
 
-  this->ReadAndCheckSingleColumnTable(values);
-}
-
-TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
-  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
-  int64_t chunk_size = values->length() / 4;
+class BufferReader : public io::RandomAccessFile {
+ public:
+  BufferReader(const uint8_t* buffer, int buffer_size)
+      : buffer_(buffer), buffer_size_(buffer_size), position_(0) {}
 
-  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
-  FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
-  for (int i = 0; i < 4; i++) {
-    ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
-    ASSERT_OK_NO_THROW(
-        writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+  Status Close() override {
+    // no-op
+    return Status::OK();
   }
-  ASSERT_OK_NO_THROW(writer.Close());
-
-  this->ReadAndCheckSingleColumnFile(values.get());
-}
 
-TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
-  auto values = NonNullArray<TypeParam>(LARGE_SIZE);
-  std::shared_ptr<Table> table = MakeSimpleTable(values, false);
-  this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_OK_NO_THROW(WriteFlatTable(
-      table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties()));
-
-  this->ReadAndCheckSingleColumnTable(values);
-}
-
-TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
-  int64_t chunk_size = SMALL_SIZE / 4;
-  auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
-
-  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
-  FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
-  for (int i = 0; i < 4; i++) {
-    ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
-    ASSERT_OK_NO_THROW(
-        writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+  Status Tell(int64_t* position) override {
+    *position = position_;
+    return Status::OK();
   }
-  ASSERT_OK_NO_THROW(writer.Close());
 
-  this->ReadAndCheckSingleColumnFile(values.get());
-}
-
-TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
-  // This also tests max_definition_level = 1
-  auto values = NullableArray<TypeParam>(LARGE_SIZE, 100);
-  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
-  this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_OK_NO_THROW(WriteFlatTable(
-      table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties()));
-
-  this->ReadAndCheckSingleColumnTable(values);
-}
-
-using TestUInt32ParquetIO = TestParquetIO<UInt32Type>;
-
-TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
-  // This also tests max_definition_level = 1
-  std::shared_ptr<PrimitiveArray> values = NullableArray<UInt32Type>(LARGE_SIZE, 100);
-  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
-
-  // Parquet 2.0 roundtrip should yield an uint32_t column again
-  this->sink_ = std::make_shared<InMemoryOutputStream>();
-  std::shared_ptr<::parquet::WriterProperties> properties =
-      ::parquet::WriterProperties::Builder()
-          .version(ParquetVersion::PARQUET_2_0)
-          ->build();
-  ASSERT_OK_NO_THROW(
-      WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
-  this->ReadAndCheckSingleColumnTable(values);
-}
+  Status ReadAt(
+      int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override {
+    RETURN_NOT_OK(Seek(position));
+    return Read(nbytes, bytes_read, buffer);
+  }
 
-TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
-  // This also tests max_definition_level = 1
-  std::shared_ptr<PrimitiveArray> values = NullableArray<UInt32Type>(LARGE_SIZE, 100);
-  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
-
-  // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0
-  // reader that a column is unsigned.
-  this->sink_ = std::make_shared<InMemoryOutputStream>();
-  std::shared_ptr<::parquet::WriterProperties> properties =
-      ::parquet::WriterProperties::Builder()
-          .version(ParquetVersion::PARQUET_1_0)
-          ->build();
-  ASSERT_OK_NO_THROW(
-      WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
-
-  std::shared_ptr<Array> expected_values;
-  std::shared_ptr<PoolBuffer> int64_data =
-      std::make_shared<PoolBuffer>(default_memory_pool());
-  {
-    ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
-    int64_t* int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
-    const uint32_t* uint32_data_ptr =
-        reinterpret_cast<const uint32_t*>(values->data()->data());
-    // std::copy might be faster but this is explicit on the casts)
-    for (int64_t i = 0; i < values->length(); i++) {
-      int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]);
-    }
+  Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override {
+    memcpy(buffer, buffer_ + position_, nbytes);
+    *bytes_read = std::min(nbytes, buffer_size_ - position_);
+    position_ += *bytes_read;
+    return Status::OK();
   }
-  ASSERT_OK(MakePrimitiveArray(std::make_shared<Int64Type>(), values->length(),
-      int64_data, values->null_count(), values->null_bitmap(), &expected_values));
-  this->ReadAndCheckSingleColumnTable(expected_values);
-}
 
-template <typename T>
-using ParquetCDataType = typename ParquetDataType<T>::c_type;
+  Status GetSize(int64_t* size) override {
+    *size = buffer_size_;
+    return Status::OK();
+  }
 
-template <typename TestType>
-class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
- public:
-  typedef typename TestType::c_type T;
-
-  void MakeTestFile(std::vector<T>& values, int num_chunks,
-      std::unique_ptr<ParquetFileReader>* file_reader) {
-    std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
-    std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema);
-    size_t chunk_size = values.size() / num_chunks;
-    // Convert to Parquet's expected physical type
-    std::vector<uint8_t> values_buffer(
-        sizeof(ParquetCDataType<TestType>) * values.size());
-    auto values_parquet =
-        reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data());
-    std::copy(values.cbegin(), values.cend(), values_parquet);
-    for (int i = 0; i < num_chunks; i++) {
-      auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
-      auto column_writer =
-          static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
-      ParquetCDataType<TestType>* data = values_parquet + i * chunk_size;
-      column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
-      column_writer->Close();
-      row_group_writer->Close();
+  Status Seek(int64_t position) override {
+    if (position < 0 || position >= buffer_size_) {
+      return Status::IOError("position out of bounds");
     }
-    file_writer->Close();
-    *file_reader = this->ReaderFromSink();
+
+    position_ = position;
+    return Status::OK();
   }
 
-  void CheckSingleColumnRequiredTableRead(int num_chunks) {
-    std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
-    std::unique_ptr<ParquetFileReader> file_reader;
-    ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader));
+ private:
+  const uint8_t* buffer_;
+  int buffer_size_;
+  int64_t position_;
+};
 
-    std::shared_ptr<Table> out;
-    this->ReadTableFromFile(std::move(file_reader), &out);
-    ASSERT_EQ(1, out->num_columns());
-    ASSERT_EQ(SMALL_SIZE, out->num_rows());
+TEST(TestParquetReadSource, Basics) {
+  std::string data = "this is the data";
+  auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str());
 
-    std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
-    ASSERT_EQ(1, chunked_array->num_chunks());
-    ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get());
-  }
+  ParquetAllocator allocator;
+  auto file = std::make_shared<BufferReader>(data_buffer, data.size());
+  auto source = std::make_shared<ParquetReadSource>(file, &allocator);
 
-  void CheckSingleColumnRequiredRead(int num_chunks) {
-    std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
-    std::unique_ptr<ParquetFileReader> file_reader;
-    ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader));
+  ASSERT_EQ(0, source->Tell());
+  ASSERT_NO_THROW(source->Seek(5));
+  ASSERT_EQ(5, source->Tell());
+  ASSERT_NO_THROW(source->Seek(0));
 
-    std::shared_ptr<Array> out;
-    this->ReadSingleColumnFile(std::move(file_reader), &out);
-
-    ExpectArray<TestType>(values.data(), out.get());
-  }
-};
+  // Seek out of bounds
+  ASSERT_THROW(source->Seek(100), ::parquet::ParquetException);
 
-typedef ::testing::Types<BooleanType, UInt8Type, Int8Type, UInt16Type, Int16Type,
-    UInt32Type, Int32Type, UInt64Type, Int64Type, FloatType,
-    DoubleType> PrimitiveTestTypes;
+  uint8_t buffer[50];
 
-TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes);
+  ASSERT_NO_THROW(source->Read(4, buffer));
+  ASSERT_EQ(0, std::memcmp(buffer, "this", 4));
+  ASSERT_EQ(4, source->Tell());
 
-TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) {
-  this->CheckSingleColumnRequiredRead(1);
-}
+  std::shared_ptr<::parquet::Buffer> pq_buffer;
 
-TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) {
-  this->CheckSingleColumnRequiredTableRead(1);
-}
+  ASSERT_NO_THROW(pq_buffer = source->Read(7));
 
-TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) {
-  this->CheckSingleColumnRequiredRead(4);
-}
+  auto expected_buffer = std::make_shared<::parquet::Buffer>(data_buffer + 4, 7);
 
-TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
-  this->CheckSingleColumnRequiredTableRead(4);
+  ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get()));
 }
 
 }  // namespace parquet
-
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ff6132f8/cpp/src/arrow/parquet/parquet-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-reader-writer-test.cc b/cpp/src/arrow/parquet/parquet-reader-writer-test.cc
new file mode 100644
index 0000000..bfc27d2
--- /dev/null
+++ b/cpp/src/arrow/parquet/parquet-reader-writer-test.cc
@@ -0,0 +1,489 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include "arrow/test-util.h"
+#include "arrow/parquet/test-util.h"
+#include "arrow/parquet/reader.h"
+#include "arrow/parquet/writer.h"
+#include "arrow/types/construct.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+#include "parquet/api/reader.h"
+#include "parquet/api/writer.h"
+
+using ParquetBuffer = parquet::Buffer;
+using parquet::BufferReader;
+using parquet::default_writer_properties;
+using parquet::InMemoryOutputStream;
+using parquet::LogicalType;
+using parquet::ParquetFileReader;
+using parquet::ParquetFileWriter;
+using parquet::RandomAccessSource;
+using parquet::Repetition;
+using parquet::SchemaDescriptor;
+using parquet::ParquetVersion;
+using ParquetType = parquet::Type;
+using parquet::schema::GroupNode;
+using parquet::schema::NodePtr;
+using parquet::schema::PrimitiveNode;
+
+namespace arrow {
+
+namespace parquet {
+
+const int SMALL_SIZE = 100;
+const int LARGE_SIZE = 10000;
+
+template <typename TestType>
+struct test_traits {};
+
+template <>
+struct test_traits<BooleanType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static uint8_t const value;
+};
+
+const uint8_t test_traits<BooleanType>::value(1);
+
+template <>
+struct test_traits<UInt8Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_8;
+  static uint8_t const value;
+};
+
+const uint8_t test_traits<UInt8Type>::value(64);
+
+template <>
+struct test_traits<Int8Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::INT_8;
+  static int8_t const value;
+};
+
+const int8_t test_traits<Int8Type>::value(-64);
+
+template <>
+struct test_traits<UInt16Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_16;
+  static uint16_t const value;
+};
+
+const uint16_t test_traits<UInt16Type>::value(1024);
+
+template <>
+struct test_traits<Int16Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::INT_16;
+  static int16_t const value;
+};
+
+const int16_t test_traits<Int16Type>::value(-1024);
+
+template <>
+struct test_traits<UInt32Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_32;
+  static uint32_t const value;
+};
+
+const uint32_t test_traits<UInt32Type>::value(1024);
+
+template <>
+struct test_traits<Int32Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static int32_t const value;
+};
+
+const int32_t test_traits<Int32Type>::value(-1024);
+
+template <>
+struct test_traits<UInt64Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+  static constexpr LogicalType::type logical_enum = LogicalType::UINT_64;
+  static uint64_t const value;
+};
+
+const uint64_t test_traits<UInt64Type>::value(1024);
+
+template <>
+struct test_traits<Int64Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static int64_t const value;
+};
+
+const int64_t test_traits<Int64Type>::value(-1024);
+
+template <>
+struct test_traits<FloatType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static float const value;
+};
+
+const float test_traits<FloatType>::value(2.1f);
+
+template <>
+struct test_traits<DoubleType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE;
+  static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+  static double const value;
+};
+
+const double test_traits<DoubleType>::value(4.2);
+
+template <>
+struct test_traits<StringType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
+  static constexpr LogicalType::type logical_enum = LogicalType::UTF8;
+  static std::string const value;
+};
+
+const std::string test_traits<StringType>::value("Test");
+
+template <typename T>
+using ParquetDataType = ::parquet::DataType<test_traits<T>::parquet_enum>;
+
+template <typename T>
+using ParquetWriter = ::parquet::TypedColumnWriter<ParquetDataType<T>>;
+
+template <typename TestType>
+class TestParquetIO : public ::testing::Test {
+ public:
+  virtual void SetUp() {}
+
+  std::shared_ptr<GroupNode> MakeSchema(Repetition::type repetition) {
+    auto pnode = PrimitiveNode::Make("column1", repetition,
+        test_traits<TestType>::parquet_enum, test_traits<TestType>::logical_enum);
+    NodePtr node_ =
+        GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
+    return std::static_pointer_cast<GroupNode>(node_);
+  }
+
+  std::unique_ptr<ParquetFileWriter> MakeWriter(
+      const std::shared_ptr<GroupNode>& schema) {
+    sink_ = std::make_shared<InMemoryOutputStream>();
+    return ParquetFileWriter::Open(sink_, schema);
+  }
+
+  std::unique_ptr<ParquetFileReader> ReaderFromSink() {
+    std::shared_ptr<ParquetBuffer> buffer = sink_->GetBuffer();
+    std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
+    return ParquetFileReader::Open(std::move(source));
+  }
+
+  void ReadSingleColumnFile(
+      std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Array>* out) {
+    arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
+    std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
+    ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader));
+    ASSERT_NE(nullptr, column_reader.get());
+
+    ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out));
+    ASSERT_NE(nullptr, out->get());
+  }
+
+  void ReadAndCheckSingleColumnFile(Array* values) {
+    std::shared_ptr<Array> out;
+    ReadSingleColumnFile(ReaderFromSink(), &out);
+    ASSERT_TRUE(values->Equals(out));
+  }
+
+  void ReadTableFromFile(
+      std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Table>* out) {
+    arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
+    ASSERT_OK_NO_THROW(reader.ReadFlatTable(out));
+    ASSERT_NE(nullptr, out->get());
+  }
+
+  void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) {
+    std::shared_ptr<Table> out;
+    ReadTableFromFile(ReaderFromSink(), &out);
+    ASSERT_EQ(1, out->num_columns());
+    ASSERT_EQ(values->length(), out->num_rows());
+
+    std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+    ASSERT_EQ(1, chunked_array->num_chunks());
+    ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+  }
+
+  template <typename ArrayType>
+  void WriteFlatColumn(const std::shared_ptr<GroupNode>& schema,
+      const std::shared_ptr<ArrayType>& values) {
+    FileWriter writer(default_memory_pool(), MakeWriter(schema));
+    ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
+    ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get()));
+    ASSERT_OK_NO_THROW(writer.Close());
+  }
+
+  std::shared_ptr<InMemoryOutputStream> sink_;
+};
+
+// We habe separate tests for UInt32Type as this is currently the only type
+// where a roundtrip does not yield the identical Array structure.
+// There we write an UInt32 Array but receive an Int64 Array as result for
+// Parquet version 1.0.
+
+typedef ::testing::Types<BooleanType, UInt8Type, Int8Type, UInt16Type, Int16Type,
+    Int32Type, UInt64Type, Int64Type, FloatType, DoubleType, StringType> TestTypes;
+
+TYPED_TEST_CASE(TestParquetIO, TestTypes);
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
+  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+  this->WriteFlatColumn(schema, values);
+
+  this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
+  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_,
+      values->length(), default_writer_properties()));
+
+  std::shared_ptr<Table> out;
+  this->ReadTableFromFile(this->ReaderFromSink(), &out);
+  ASSERT_EQ(1, out->num_columns());
+  ASSERT_EQ(100, out->num_rows());
+
+  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+  ASSERT_EQ(1, chunked_array->num_chunks());
+  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
+  // This also tests max_definition_level = 1
+  auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+  this->WriteFlatColumn(schema, values);
+
+  this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<Array> values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_,
+      values->length(), default_writer_properties()));
+
+  this->ReadAndCheckSingleColumnTable(values);
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
+  auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+  int64_t chunk_size = values->length() / 4;
+
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+  FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
+  for (int i = 0; i < 4; i++) {
+    ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+    ASSERT_OK_NO_THROW(
+        writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+  }
+  ASSERT_OK_NO_THROW(writer.Close());
+
+  this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
+  auto values = NonNullArray<TypeParam>(LARGE_SIZE);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_OK_NO_THROW(WriteFlatTable(
+      table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties()));
+
+  this->ReadAndCheckSingleColumnTable(values);
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
+  int64_t chunk_size = SMALL_SIZE / 4;
+  auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+  FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
+  for (int i = 0; i < 4; i++) {
+    ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+    ASSERT_OK_NO_THROW(
+        writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+  }
+  ASSERT_OK_NO_THROW(writer.Close());
+
+  this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
+  // This also tests max_definition_level = 1
+  auto values = NullableArray<TypeParam>(LARGE_SIZE, 100);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_OK_NO_THROW(WriteFlatTable(
+      table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties()));
+
+  this->ReadAndCheckSingleColumnTable(values);
+}
+
+using TestUInt32ParquetIO = TestParquetIO<UInt32Type>;
+
+TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<PrimitiveArray> values = NullableArray<UInt32Type>(LARGE_SIZE, 100);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+  // Parquet 2.0 roundtrip should yield an uint32_t column again
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  std::shared_ptr<::parquet::WriterProperties> properties =
+      ::parquet::WriterProperties::Builder()
+          .version(ParquetVersion::PARQUET_2_0)
+          ->build();
+  ASSERT_OK_NO_THROW(
+      WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
+  this->ReadAndCheckSingleColumnTable(values);
+}
+
+TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<PrimitiveArray> values = NullableArray<UInt32Type>(LARGE_SIZE, 100);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+  // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0
+  // reader that a column is unsigned.
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  std::shared_ptr<::parquet::WriterProperties> properties =
+      ::parquet::WriterProperties::Builder()
+          .version(ParquetVersion::PARQUET_1_0)
+          ->build();
+  ASSERT_OK_NO_THROW(
+      WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
+
+  std::shared_ptr<Array> expected_values;
+  std::shared_ptr<PoolBuffer> int64_data =
+      std::make_shared<PoolBuffer>(default_memory_pool());
+  {
+    ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
+    int64_t* int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
+    const uint32_t* uint32_data_ptr =
+        reinterpret_cast<const uint32_t*>(values->data()->data());
+    // std::copy might be faster but this is explicit on the casts)
+    for (int64_t i = 0; i < values->length(); i++) {
+      int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]);
+    }
+  }
+  ASSERT_OK(MakePrimitiveArray(std::make_shared<Int64Type>(), values->length(),
+      int64_data, values->null_count(), values->null_bitmap(), &expected_values));
+  this->ReadAndCheckSingleColumnTable(expected_values);
+}
+
+template <typename T>
+using ParquetCDataType = typename ParquetDataType<T>::c_type;
+
+template <typename TestType>
+class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
+ public:
+  typedef typename TestType::c_type T;
+
+  void MakeTestFile(std::vector<T>& values, int num_chunks,
+      std::unique_ptr<ParquetFileReader>* file_reader) {
+    std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+    std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema);
+    size_t chunk_size = values.size() / num_chunks;
+    // Convert to Parquet's expected physical type
+    std::vector<uint8_t> values_buffer(
+        sizeof(ParquetCDataType<TestType>) * values.size());
+    auto values_parquet =
+        reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data());
+    std::copy(values.cbegin(), values.cend(), values_parquet);
+    for (int i = 0; i < num_chunks; i++) {
+      auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
+      auto column_writer =
+          static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
+      ParquetCDataType<TestType>* data = values_parquet + i * chunk_size;
+      column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
+      column_writer->Close();
+      row_group_writer->Close();
+    }
+    file_writer->Close();
+    *file_reader = this->ReaderFromSink();
+  }
+
+  void CheckSingleColumnRequiredTableRead(int num_chunks) {
+    std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+    std::unique_ptr<ParquetFileReader> file_reader;
+    ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader));
+
+    std::shared_ptr<Table> out;
+    this->ReadTableFromFile(std::move(file_reader), &out);
+    ASSERT_EQ(1, out->num_columns());
+    ASSERT_EQ(SMALL_SIZE, out->num_rows());
+
+    std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+    ASSERT_EQ(1, chunked_array->num_chunks());
+    ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get());
+  }
+
+  void CheckSingleColumnRequiredRead(int num_chunks) {
+    std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+    std::unique_ptr<ParquetFileReader> file_reader;
+    ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader));
+
+    std::shared_ptr<Array> out;
+    this->ReadSingleColumnFile(std::move(file_reader), &out);
+
+    ExpectArray<TestType>(values.data(), out.get());
+  }
+};
+
+typedef ::testing::Types<BooleanType, UInt8Type, Int8Type, UInt16Type, Int16Type,
+    UInt32Type, Int32Type, UInt64Type, Int64Type, FloatType,
+    DoubleType> PrimitiveTestTypes;
+
+TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes);
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) {
+  this->CheckSingleColumnRequiredRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) {
+  this->CheckSingleColumnRequiredTableRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) {
+  this->CheckSingleColumnRequiredRead(4);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
+  this->CheckSingleColumnRequiredTableRead(4);
+}
+
+}  // namespace parquet
+
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ff6132f8/cpp/src/arrow/parquet/utils.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/utils.h b/cpp/src/arrow/parquet/utils.h
index 409bcd9..bcc46be 100644
--- a/cpp/src/arrow/parquet/utils.h
+++ b/cpp/src/arrow/parquet/utils.h
@@ -18,12 +18,12 @@
 #ifndef ARROW_PARQUET_UTILS_H
 #define ARROW_PARQUET_UTILS_H
 
-#include "arrow/util/status.h"
+#include <sstream>
 
+#include "arrow/util/status.h"
 #include "parquet/exception.h"
 
 namespace arrow {
-
 namespace parquet {
 
 #define PARQUET_CATCH_NOT_OK(s) \
@@ -36,8 +36,17 @@ namespace parquet {
     (s);                         \
   } catch (const ::parquet::ParquetException& e) {}
 
-}  // namespace parquet
+#define PARQUET_THROW_NOT_OK(s)                    \
+  do {                                             \
+    ::arrow::Status _s = (s);                      \
+    if (!_s.ok()) {                                \
+      std::stringstream ss;                        \
+      ss << "Arrow error: " << _s.ToString();      \
+      throw ::parquet::ParquetException(ss.str()); \
+    }                                              \
+  } while (0);
 
+}  // namespace parquet
 }  // namespace arrow
 
 #endif  // ARROW_PARQUET_UTILS_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/ff6132f8/python/pyarrow/includes/libarrow_io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd
index d874ba3..d0fb8f9 100644
--- a/python/pyarrow/includes/libarrow_io.pxd
+++ b/python/pyarrow/includes/libarrow_io.pxd
@@ -51,17 +51,17 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
 
     cdef cppclass HdfsReadableFile(CHdfsFile):
         CStatus GetSize(int64_t* size)
-        CStatus Read(int32_t nbytes, int32_t* bytes_read,
+        CStatus Read(int64_t nbytes, int64_t* bytes_read,
                      uint8_t* buffer)
 
-        CStatus ReadAt(int64_t position, int32_t nbytes,
-                       int32_t* bytes_read, uint8_t* buffer)
+        CStatus ReadAt(int64_t position, int64_t nbytes,
+                       int64_t* bytes_read, uint8_t* buffer)
 
     cdef cppclass HdfsWriteableFile(CHdfsFile):
-        CStatus Write(const uint8_t* buffer, int32_t nbytes)
+        CStatus Write(const uint8_t* buffer, int64_t nbytes)
 
-        CStatus Write(const uint8_t* buffer, int32_t nbytes,
-                      int32_t* bytes_written)
+        CStatus Write(const uint8_t* buffer, int64_t nbytes,
+                      int64_t* bytes_written)
 
     cdef cppclass CHdfsClient" arrow::io::HdfsClient":
         @staticmethod

http://git-wip-us.apache.org/repos/asf/arrow/blob/ff6132f8/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 8b97671..071eea5 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -383,7 +383,7 @@ cdef class HdfsFile:
         Read indicated number of bytes from the file, up to EOF
         """
         cdef:
-            int32_t bytes_read = 0
+            int64_t bytes_read = 0
             uint8_t* buf
 
         self._assert_readable()
@@ -394,7 +394,7 @@ cdef class HdfsFile:
         if buf == NULL:
             raise MemoryError("Failed to allocate {0} bytes".format(nbytes))
 
-        cdef int32_t total_bytes = 0
+        cdef int64_t total_bytes = 0
 
         cdef int rpc_chunksize = min(self.buffer_size, nbytes)
 
@@ -423,7 +423,7 @@ cdef class HdfsFile:
         memory). First seeks to the beginning of the file.
         """
         cdef:
-            int32_t bytes_read = 0
+            int64_t bytes_read = 0
             uint8_t* buf
         self._assert_readable()
 
@@ -499,6 +499,6 @@ cdef class HdfsFile:
         data = tobytes(data)
 
         cdef const uint8_t* buf = <const uint8_t*> cp.PyBytes_AS_STRING(data)
-        cdef int32_t bufsize = len(data)
+        cdef int64_t bufsize = len(data)
         with nogil:
             check_cstatus(self.wr_file.get().Write(buf, bufsize))


Mime
View raw message