arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-203: Python: Basic filename based Parquet read/write
Date Fri, 10 Jun 2016 22:08:31 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 8197f246d -> ec66ddd1f


ARROW-203: Python: Basic filename based Parquet read/write

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes #83 from xhochy/arrow-203 and squashes the following commits:

405f85d [Uwe L. Korn] Remove FindParquet duplication
38d786c [Uwe L. Korn] Make code more readable by using using
ec07768 [Uwe L. Korn] Set LD_LIBRARY_PATH in python build
8d90d3f [Uwe L. Korn] Do not set LD_LIBRARY_PATH in python build
000e1e3 [Uwe L. Korn] Use unique_ptr and shared_ptr from Cython
8f6010a [Uwe L. Korn] Linter fixes
0514d01 [Uwe L. Korn] Handle exceptions on RowGroupWriter::Close better
77bd21a [Uwe L. Korn] Add pandas roundtrip to tests
f583b61 [Uwe L. Korn] Fix rpath for libarrow_parquet
00c1461 [Uwe L. Korn] Also ensure correct OSX compiler flags in PyArrow
4a80116 [Uwe L. Korn] Handle Python3 strings correctly
066c08a [Uwe L. Korn] Add missing functions to smart pointers
5706db2 [Uwe L. Korn] Use length and offset instead of slicing
443de8b [Uwe L. Korn] Add miniconda to the LD_LIBRARY_PATH
2dffc14 [Uwe L. Korn] Fix min mistake, use equals instead of ==
2006e70 [Uwe L. Korn] Rewrite test py.test style
9520c39 [Uwe L. Korn] Use PARQUET from miniconda path
cd3b9a9 [Uwe L. Korn] Also search for Parquet in PyArrow
6a41d23 [Uwe L. Korn] Re-use conda installation from C++
81f501e [Uwe L. Korn] No need to install conda in travis_script_python anymore
b505feb [Uwe L. Korn] Install parquet-cpp via conda
5d4929a [Uwe L. Korn] Add test-util.h
9b06e41 [Uwe L. Korn] Make tests templated
be6415c [Uwe L. Korn] Incorportate review comments
0fbed3f [Uwe L. Korn] Remove obsolete parquet files
081db5f [Uwe L. Korn] Limit and document chunk_size
7192cfb [Uwe L. Korn] Add const to slicing parameters
0463995 [Uwe L. Korn] ARROW-203: Python: Basic filename based Parquet read/write


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ec66ddd1
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ec66ddd1
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ec66ddd1

Branch: refs/heads/master
Commit: ec66ddd1fd4954b78967bfa1893480473e4d380c
Parents: 8197f24
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Fri Jun 10 15:08:23 2016 -0700
Committer: Wes McKinney <wesm@apache.org>
Committed: Fri Jun 10 15:08:23 2016 -0700

----------------------------------------------------------------------
 ci/travis_before_script_cpp.sh           |   6 +-
 ci/travis_conda_build.sh                 |  22 +--
 ci/travis_install_conda.sh               |  26 +++
 ci/travis_script_python.sh               |  21 +--
 cpp/src/arrow/column.h                   |   2 +
 cpp/src/arrow/parquet/CMakeLists.txt     |   7 +
 cpp/src/arrow/parquet/parquet-io-test.cc | 256 +++++++++++++++++++-------
 cpp/src/arrow/parquet/reader.cc          |  25 +++
 cpp/src/arrow/parquet/reader.h           |   3 +
 cpp/src/arrow/parquet/test-util.h        |  77 ++++++++
 cpp/src/arrow/parquet/utils.h            |   5 +
 cpp/src/arrow/parquet/writer.cc          |  99 +++++++---
 cpp/src/arrow/parquet/writer.h           |  12 +-
 cpp/src/arrow/util/status.h              |   9 +
 python/CMakeLists.txt                    |   8 +
 python/cmake_modules/FindArrow.cmake     |  14 +-
 python/conda.recipe/build.sh             |  13 ++
 python/pyarrow/array.pyx                 |   3 +
 python/pyarrow/error.pxd                 |   2 +
 python/pyarrow/error.pyx                 |   8 +
 python/pyarrow/includes/common.pxd       |   9 +-
 python/pyarrow/includes/libarrow.pxd     |   3 +
 python/pyarrow/includes/parquet.pxd      |  46 +++++
 python/pyarrow/parquet.pyx               |  50 ++++-
 python/pyarrow/schema.pyx                |   9 +-
 python/pyarrow/tests/test_parquet.py     |  59 ++++++
 python/setup.py                          |   4 +-
 27 files changed, 654 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/ci/travis_before_script_cpp.sh
----------------------------------------------------------------------
diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index 193c76f..6159f67 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -2,6 +2,10 @@
 
 set -e
 
+source $TRAVIS_BUILD_DIR/ci/travis_install_conda.sh
+conda install -y --channel apache/channel/dev parquet-cpp
+export PARQUET_HOME=$MINICONDA
+
 : ${CPP_BUILD_DIR=$TRAVIS_BUILD_DIR/cpp-build}
 
 mkdir $CPP_BUILD_DIR
@@ -19,7 +23,7 @@ echo $GTEST_HOME
 
 : ${ARROW_CPP_INSTALL=$TRAVIS_BUILD_DIR/cpp-install}
 
-CMAKE_COMMON_FLAGS="-DARROW_BUILD_BENCHMARKS=ON -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL"
+CMAKE_COMMON_FLAGS="-DARROW_BUILD_BENCHMARKS=ON -DARROW_PARQUET=ON -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL"
 
 if [ $TRAVIS_OS_NAME == "linux" ]; then
   cmake -DARROW_TEST_MEMCHECK=on $CMAKE_COMMON_FLAGS -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/ci/travis_conda_build.sh
----------------------------------------------------------------------
diff --git a/ci/travis_conda_build.sh b/ci/travis_conda_build.sh
index afa531d..c43a851 100755
--- a/ci/travis_conda_build.sh
+++ b/ci/travis_conda_build.sh
@@ -2,27 +2,7 @@
 
 set -e
 
-if [ $TRAVIS_OS_NAME == "linux" ]; then
-  MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh"
-else
-  MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-MacOSX-x86_64.sh"
-fi
-
-wget -O miniconda.sh $MINICONDA_URL
-MINICONDA=$TRAVIS_BUILD_DIR/miniconda
-bash miniconda.sh -b -p $MINICONDA
-export PATH="$MINICONDA/bin:$PATH"
-conda update -y -q conda
-conda info -a
-
-conda config --set show_channel_urls yes
-conda config --add channels conda-forge
-conda config --add channels apache
-
-conda install --yes conda-build jinja2 anaconda-client
-
-# faster builds, please
-conda install -y nomkl
+source $TRAVIS_BUILD_DIR/ci/travis_install_conda.sh
 
 # Build libarrow
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/ci/travis_install_conda.sh
----------------------------------------------------------------------
diff --git a/ci/travis_install_conda.sh b/ci/travis_install_conda.sh
new file mode 100644
index 0000000..bef667d
--- /dev/null
+++ b/ci/travis_install_conda.sh
@@ -0,0 +1,26 @@
+#!/usr/bin/env bash
+
+set -e
+
+if [ $TRAVIS_OS_NAME == "linux" ]; then
+  MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh"
+else
+  MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-MacOSX-x86_64.sh"
+fi
+
+wget -O miniconda.sh $MINICONDA_URL
+export MINICONDA=$TRAVIS_BUILD_DIR/miniconda
+bash miniconda.sh -b -p $MINICONDA
+export PATH="$MINICONDA/bin:$PATH"
+conda update -y -q conda
+conda info -a
+
+conda config --set show_channel_urls yes
+conda config --add channels conda-forge
+conda config --add channels apache
+
+conda install --yes conda-build jinja2 anaconda-client
+
+# faster builds, please
+conda install -y nomkl
+

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/ci/travis_script_python.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh
index d45b895..6d35785 100755
--- a/ci/travis_script_python.sh
+++ b/ci/travis_script_python.sh
@@ -4,6 +4,12 @@ set -e
 
 PYTHON_DIR=$TRAVIS_BUILD_DIR/python
 
+# Re-use conda installation from C++
+export MINICONDA=$TRAVIS_BUILD_DIR/miniconda
+export PATH="$MINICONDA/bin:$PATH"
+export LD_LIBRARY_PATH="$MINICONDA/lib:$LD_LIBRARY_PATH"
+export PARQUET_HOME=$MINICONDA
+
 # Share environment with C++
 pushd $CPP_BUILD_DIR
 source setup_build_env.sh
@@ -11,21 +17,6 @@ popd
 
 pushd $PYTHON_DIR
 
-# Bootstrap a Conda Python environment
-
-if [ $TRAVIS_OS_NAME == "linux" ]; then
-  MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh"
-else
-  MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-MacOSX-x86_64.sh"
-fi
-
-curl $MINICONDA_URL > miniconda.sh
-MINICONDA=$TRAVIS_BUILD_DIR/miniconda
-bash miniconda.sh -b -p $MINICONDA
-export PATH="$MINICONDA/bin:$PATH"
-conda update -y -q conda
-conda info -a
-
 python_version_tests() {
   PYTHON_VERSION=$1
   CONDA_ENV_NAME="pyarrow-test-${PYTHON_VERSION}"

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/column.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column.h b/cpp/src/arrow/column.h
index 22becc3..e409566 100644
--- a/cpp/src/arrow/column.h
+++ b/cpp/src/arrow/column.h
@@ -67,6 +67,8 @@ class Column {
 
   int64_t null_count() const { return data_->null_count(); }
 
+  const std::shared_ptr<Field>& field() const { return field_; }
+
   // @returns: the column's name in the passed metadata
   const std::string& name() const { return field_->name; }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt
index c00cc9f..f00bb53 100644
--- a/cpp/src/arrow/parquet/CMakeLists.txt
+++ b/cpp/src/arrow/parquet/CMakeLists.txt
@@ -35,6 +35,13 @@ add_library(arrow_parquet SHARED
 target_link_libraries(arrow_parquet ${PARQUET_LIBS})
 SET_TARGET_PROPERTIES(arrow_parquet PROPERTIES LINKER_LANGUAGE CXX)
 
+if (APPLE)
+  set_target_properties(arrow_parquet
+    PROPERTIES
+    BUILD_WITH_INSTALL_RPATH ON
+    INSTALL_NAME_DIR "@rpath")
+endif()
+
 ADD_ARROW_TEST(parquet-schema-test)
 ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/parquet-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc
index 845574d..db779d8 100644
--- a/cpp/src/arrow/parquet/parquet-io-test.cc
+++ b/cpp/src/arrow/parquet/parquet-io-test.cc
@@ -18,6 +18,7 @@
 #include "gtest/gtest.h"
 
 #include "arrow/test-util.h"
+#include "arrow/parquet/test-util.h"
 #include "arrow/parquet/reader.h"
 #include "arrow/parquet/writer.h"
 #include "arrow/types/primitive.h"
@@ -44,36 +45,45 @@ namespace arrow {
 
 namespace parquet {
 
-template <typename ArrowType>
-std::shared_ptr<PrimitiveArray> NonNullArray(
-    size_t size, typename ArrowType::c_type value) {
-  std::vector<typename ArrowType::c_type> values(size, value);
-  NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
-  builder.Append(values.data(), values.size());
-  return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
-}
+const int SMALL_SIZE = 100;
+const int LARGE_SIZE = 10000;
 
-// This helper function only supports (size/2) nulls yet.
-template <typename ArrowType>
-std::shared_ptr<PrimitiveArray> NullableArray(
-    size_t size, typename ArrowType::c_type value, size_t num_nulls) {
-  std::vector<typename ArrowType::c_type> values(size, value);
-  std::vector<uint8_t> valid_bytes(size, 1);
+template <typename TestType>
+struct test_traits {};
 
-  for (size_t i = 0; i < num_nulls; i++) {
-    valid_bytes[i * 2] = 0;
-  }
+template <>
+struct test_traits<Int32Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+};
 
-  NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
-  builder.Append(values.data(), values.size(), valid_bytes.data());
-  return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
-}
+template <>
+struct test_traits<Int64Type> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+};
+
+template <>
+struct test_traits<FloatType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
+};
+
+template <>
+struct test_traits<DoubleType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE;
+};
+
+template <typename T>
+using ParquetDataType = ::parquet::DataType<test_traits<T>::parquet_enum>;
 
+template <typename T>
+using ParquetWriter = ::parquet::TypedColumnWriter<ParquetDataType<T>>;
+
+template <typename TestType>
 class TestParquetIO : public ::testing::Test {
  public:
+  typedef typename TestType::c_type T;
   virtual void SetUp() {}
 
-  std::shared_ptr<GroupNode> Schema(
+  std::shared_ptr<GroupNode> MakeSchema(
       ParquetType::type parquet_type, Repetition::type repetition) {
     auto pnode = PrimitiveNode::Make("column1", repetition, parquet_type);
     NodePtr node_ =
@@ -98,20 +108,27 @@ class TestParquetIO : public ::testing::Test {
     std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
     ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader)));
     ASSERT_NE(nullptr, column_reader.get());
-    ASSERT_OK(column_reader->NextBatch(100, out));
+    ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out));
+    ASSERT_NE(nullptr, out->get());
+  }
+
+  void ReadTableFromFile(
+      std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Table>* out) {
+    arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
+    ASSERT_NO_THROW(ASSERT_OK(reader.ReadFlatTable(out)));
     ASSERT_NE(nullptr, out->get());
   }
 
-  std::unique_ptr<ParquetFileReader> Int64File(
-      std::vector<int64_t>& values, int num_chunks) {
-    std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED);
+  std::unique_ptr<ParquetFileReader> TestFile(std::vector<T>& values, int num_chunks) {
+    std::shared_ptr<GroupNode> schema =
+        MakeSchema(test_traits<TestType>::parquet_enum, Repetition::REQUIRED);
     std::unique_ptr<ParquetFileWriter> file_writer = MakeWriter(schema);
     size_t chunk_size = values.size() / num_chunks;
     for (int i = 0; i < num_chunks; i++) {
       auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
-      auto column_writer =
-          static_cast<::parquet::Int64Writer*>(row_group_writer->NextColumn());
-      int64_t* data = values.data() + i * chunk_size;
+      auto column_writer = static_cast<ParquetWriter<TestType>*>(
+          row_group_writer->NextColumn());
+      T* data = values.data() + i * chunk_size;
       column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
       column_writer->Close();
       row_group_writer->Close();
@@ -120,71 +137,135 @@ class TestParquetIO : public ::testing::Test {
     return ReaderFromSink();
   }
 
- private:
   std::shared_ptr<InMemoryOutputStream> sink_;
 };
 
-TEST_F(TestParquetIO, SingleColumnInt64Read) {
-  std::vector<int64_t> values(100, 128);
-  std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 1);
+typedef ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType> TestTypes;
+
+TYPED_TEST_CASE(TestParquetIO, TestTypes);
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredRead) {
+  std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
+  std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 1);
 
   std::shared_ptr<Array> out;
-  ReadSingleColumnFile(std::move(file_reader), &out);
+  this->ReadSingleColumnFile(std::move(file_reader), &out);
 
-  Int64Array* out_array = static_cast<Int64Array*>(out.get());
-  for (size_t i = 0; i < values.size(); i++) {
-    EXPECT_EQ(values[i], out_array->raw_data()[i]);
-  }
+  ExpectArray<typename TypeParam::c_type>(values.data(), out.get());
 }
 
-TEST_F(TestParquetIO, SingleColumnInt64ChunkedRead) {
-  std::vector<int64_t> values(100, 128);
-  std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 4);
+TYPED_TEST(TestParquetIO, SingleColumnRequiredTableRead) {
+  std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
+  std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 1);
+
+  std::shared_ptr<Table> out;
+  this->ReadTableFromFile(std::move(file_reader), &out);
+  ASSERT_EQ(1, out->num_columns());
+  ASSERT_EQ(SMALL_SIZE, out->num_rows());
+
+  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+  ASSERT_EQ(1, chunked_array->num_chunks());
+  ExpectArray<typename TypeParam::c_type>(values.data(), chunked_array->chunk(0).get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedRead) {
+  std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
+  std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 4);
 
   std::shared_ptr<Array> out;
-  ReadSingleColumnFile(std::move(file_reader), &out);
+  this->ReadSingleColumnFile(std::move(file_reader), &out);
 
-  Int64Array* out_array = static_cast<Int64Array*>(out.get());
-  for (size_t i = 0; i < values.size(); i++) {
-    EXPECT_EQ(values[i], out_array->raw_data()[i]);
-  }
+  ExpectArray<typename TypeParam::c_type>(values.data(), out.get());
 }
 
-TEST_F(TestParquetIO, SingleColumnInt64Write) {
-  std::shared_ptr<PrimitiveArray> values = NonNullArray<Int64Type>(100, 128);
+TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedTableRead) {
+  std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
+  std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 4);
+
+  std::shared_ptr<Table> out;
+  this->ReadTableFromFile(std::move(file_reader), &out);
+  ASSERT_EQ(1, out->num_columns());
+  ASSERT_EQ(SMALL_SIZE, out->num_rows());
 
-  std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED);
-  FileWriter writer(default_memory_pool(), MakeWriter(schema));
+  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+  ASSERT_EQ(1, chunked_array->num_chunks());
+  ExpectArray<typename TypeParam::c_type>(values.data(), chunked_array->chunk(0).get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
+  std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128);
+
+  std::shared_ptr<GroupNode> schema =
+      this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::REQUIRED);
+  FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
   ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length())));
   ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get())));
   ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
 
   std::shared_ptr<Array> out;
-  ReadSingleColumnFile(ReaderFromSink(), &out);
+  this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
   ASSERT_TRUE(values->Equals(out));
 }
 
-TEST_F(TestParquetIO, SingleColumnDoubleReadWrite) {
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
+  std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_NO_THROW(ASSERT_OK(
+      WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length())));
+
+  std::shared_ptr<Table> out;
+  this->ReadTableFromFile(this->ReaderFromSink(), &out);
+  ASSERT_EQ(1, out->num_columns());
+  ASSERT_EQ(100, out->num_rows());
+
+  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+  ASSERT_EQ(1, chunked_array->num_chunks());
+  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
   // This also tests max_definition_level = 1
-  std::shared_ptr<PrimitiveArray> values = NullableArray<DoubleType>(100, 128, 10);
+  std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10);
 
-  std::shared_ptr<GroupNode> schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL);
-  FileWriter writer(default_memory_pool(), MakeWriter(schema));
+  std::shared_ptr<GroupNode> schema =
+      this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::OPTIONAL);
+  FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
   ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length())));
   ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get())));
   ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
 
   std::shared_ptr<Array> out;
-  ReadSingleColumnFile(ReaderFromSink(), &out);
+  this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
   ASSERT_TRUE(values->Equals(out));
 }
 
-TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) {
-  std::shared_ptr<PrimitiveArray> values = NonNullArray<Int64Type>(100, 128);
-  std::shared_ptr<PrimitiveArray> values_chunk = NonNullArray<Int64Type>(25, 128);
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_NO_THROW(ASSERT_OK(
+      WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length())));
+
+  std::shared_ptr<Table> out;
+  this->ReadTableFromFile(this->ReaderFromSink(), &out);
+  ASSERT_EQ(1, out->num_columns());
+  ASSERT_EQ(SMALL_SIZE, out->num_rows());
+
+  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+  ASSERT_EQ(1, chunked_array->num_chunks());
+  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
 
-  std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED);
-  FileWriter writer(default_memory_pool(), MakeWriter(schema));
+TYPED_TEST(TestParquetIO, SingleColumnIntRequiredChunkedWrite) {
+  std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128);
+  std::shared_ptr<PrimitiveArray> values_chunk =
+      NonNullArray<TypeParam>(SMALL_SIZE / 4, 128);
+
+  std::shared_ptr<GroupNode> schema =
+      this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::REQUIRED);
+  FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
   for (int i = 0; i < 4; i++) {
     ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length())));
     ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get())));
@@ -192,18 +273,37 @@ TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) {
   ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
 
   std::shared_ptr<Array> out;
-  ReadSingleColumnFile(ReaderFromSink(), &out);
+  this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
   ASSERT_TRUE(values->Equals(out));
 }
 
-TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) {
-  std::shared_ptr<PrimitiveArray> values = NullableArray<DoubleType>(100, 128, 10);
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
+  std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(LARGE_SIZE, 128);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_NO_THROW(
+      ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512)));
+
+  std::shared_ptr<Table> out;
+  this->ReadTableFromFile(this->ReaderFromSink(), &out);
+  ASSERT_EQ(1, out->num_columns());
+  ASSERT_EQ(LARGE_SIZE, out->num_rows());
+
+  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+  ASSERT_EQ(1, chunked_array->num_chunks());
+  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
+  std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10);
   std::shared_ptr<PrimitiveArray> values_chunk_nulls =
-      NullableArray<DoubleType>(25, 128, 10);
-  std::shared_ptr<PrimitiveArray> values_chunk = NullableArray<DoubleType>(25, 128, 0);
+      NullableArray<TypeParam>(SMALL_SIZE / 4, 128, 10);
+  std::shared_ptr<PrimitiveArray> values_chunk =
+      NullableArray<TypeParam>(SMALL_SIZE / 4, 128, 0);
 
-  std::shared_ptr<GroupNode> schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL);
-  FileWriter writer(default_memory_pool(), MakeWriter(schema));
+  std::shared_ptr<GroupNode> schema =
+      this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::OPTIONAL);
+  FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
   ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk_nulls->length())));
   ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk_nulls.get())));
   for (int i = 0; i < 3; i++) {
@@ -213,10 +313,28 @@ TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) {
   ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
 
   std::shared_ptr<Array> out;
-  ReadSingleColumnFile(ReaderFromSink(), &out);
+  this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
   ASSERT_TRUE(values->Equals(out));
 }
 
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(LARGE_SIZE, 128, 100);
+  std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  ASSERT_NO_THROW(
+      ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512)));
+
+  std::shared_ptr<Table> out;
+  this->ReadTableFromFile(this->ReaderFromSink(), &out);
+  ASSERT_EQ(1, out->num_columns());
+  ASSERT_EQ(LARGE_SIZE, out->num_rows());
+
+  std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+  ASSERT_EQ(1, chunked_array->num_chunks());
+  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
 }  // namespace parquet
 
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc
index 346de25..3b4882d 100644
--- a/cpp/src/arrow/parquet/reader.cc
+++ b/cpp/src/arrow/parquet/reader.cc
@@ -18,10 +18,14 @@
 #include "arrow/parquet/reader.h"
 
 #include <queue>
+#include <string>
+#include <vector>
 
+#include "arrow/column.h"
 #include "arrow/parquet/schema.h"
 #include "arrow/parquet/utils.h"
 #include "arrow/schema.h"
+#include "arrow/table.h"
 #include "arrow/types/primitive.h"
 #include "arrow/util/status.h"
 
@@ -40,6 +44,7 @@ class FileReader::Impl {
   bool CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr);
   Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
   Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);
+  Status ReadFlatTable(std::shared_ptr<Table>* out);
 
  private:
   MemoryPool* pool_;
@@ -103,6 +108,22 @@ Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
   return flat_column_reader->NextBatch(reader_->num_rows(), out);
 }
 
+Status FileReader::Impl::ReadFlatTable(std::shared_ptr<Table>* table) {
+  const std::string& name = reader_->descr()->schema()->name();
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(FromParquetSchema(reader_->descr(), &schema));
+
+  std::vector<std::shared_ptr<Column>> columns(reader_->num_columns());
+  for (int i = 0; i < reader_->num_columns(); i++) {
+    std::shared_ptr<Array> array;
+    RETURN_NOT_OK(ReadFlatColumn(i, &array));
+    columns[i] = std::make_shared<Column>(schema->field(i), array);
+  }
+
+  *table = std::make_shared<Table>(name, schema, columns);
+  return Status::OK();
+}
+
 FileReader::FileReader(
     MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader)
     : impl_(new FileReader::Impl(pool, std::move(reader))) {}
@@ -117,6 +138,10 @@ Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
   return impl_->ReadFlatColumn(i, out);
 }
 
+Status FileReader::ReadFlatTable(std::shared_ptr<Table>* out) {
+  return impl_->ReadFlatTable(out);
+}
+
 FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr,
     ::parquet::ParquetFileReader* reader, int column_index)
     : pool_(pool),

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h
index 41ca7eb..db7a157 100644
--- a/cpp/src/arrow/parquet/reader.h
+++ b/cpp/src/arrow/parquet/reader.h
@@ -29,6 +29,7 @@ class Array;
 class MemoryPool;
 class RowBatch;
 class Status;
+class Table;
 
 namespace parquet {
 
@@ -90,6 +91,8 @@ class FileReader {
   Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
   // Read column as a whole into an Array.
   Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);
+  // Read a table of flat columns into a Table.
+  Status ReadFlatTable(std::shared_ptr<Table>* out);
 
   virtual ~FileReader();
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/test-util.h b/cpp/src/arrow/parquet/test-util.h
new file mode 100644
index 0000000..1496082
--- /dev/null
+++ b/cpp/src/arrow/parquet/test-util.h
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <vector>
+
+#include "arrow/types/primitive.h"
+
+namespace arrow {
+
+namespace parquet {
+
+template <typename ArrowType>
+std::shared_ptr<PrimitiveArray> NonNullArray(
+    size_t size, typename ArrowType::c_type value) {
+  std::vector<typename ArrowType::c_type> values(size, value);
+  NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
+  builder.Append(values.data(), values.size());
+  return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls yet.
+template <typename ArrowType>
+std::shared_ptr<PrimitiveArray> NullableArray(
+    size_t size, typename ArrowType::c_type value, size_t num_nulls) {
+  std::vector<typename ArrowType::c_type> values(size, value);
+  std::vector<uint8_t> valid_bytes(size, 1);
+
+  for (size_t i = 0; i < num_nulls; i++) {
+    valid_bytes[i * 2] = 0;
+  }
+
+  NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
+  builder.Append(values.data(), values.size(), valid_bytes.data());
+  return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+std::shared_ptr<Column> MakeColumn(const std::string& name,
+    const std::shared_ptr<PrimitiveArray>& array, bool nullable) {
+  auto field = std::make_shared<Field>(name, array->type(), nullable);
+  return std::make_shared<Column>(field, array);
+}
+
+std::shared_ptr<Table> MakeSimpleTable(
+    const std::shared_ptr<PrimitiveArray>& values, bool nullable) {
+  std::shared_ptr<Column> column = MakeColumn("col", values, nullable);
+  std::vector<std::shared_ptr<Column>> columns({column});
+  std::vector<std::shared_ptr<Field>> fields({column->field()});
+  auto schema = std::make_shared<Schema>(fields);
+  return std::make_shared<Table>("table", schema, columns);
+}
+
+template <typename T>
+void ExpectArray(T* expected, Array* result) {
+  PrimitiveArray* p_array = static_cast<PrimitiveArray*>(result);
+  for (size_t i = 0; i < result->length(); i++) {
+    EXPECT_EQ(expected[i], reinterpret_cast<const T*>(p_array->data()->data())[i]);
+  }
+}
+
+}  // namespace parquet
+
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/utils.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/utils.h b/cpp/src/arrow/parquet/utils.h
index b32792f..409bcd9 100644
--- a/cpp/src/arrow/parquet/utils.h
+++ b/cpp/src/arrow/parquet/utils.h
@@ -31,6 +31,11 @@ namespace parquet {
     (s);                        \
   } catch (const ::parquet::ParquetException& e) { return Status::Invalid(e.what()); }
 
+#define PARQUET_IGNORE_NOT_OK(s) \
+  try {                          \
+    (s);                         \
+  } catch (const ::parquet::ParquetException& e) {}
+
 }  // namespace parquet
 
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc
index 3ad2c5b..1223901 100644
--- a/cpp/src/arrow/parquet/writer.cc
+++ b/cpp/src/arrow/parquet/writer.cc
@@ -17,11 +17,21 @@
 
 #include "arrow/parquet/writer.h"
 
+#include <algorithm>
+#include <vector>
+
 #include "arrow/array.h"
+#include "arrow/column.h"
+#include "arrow/table.h"
+#include "arrow/types/construct.h"
 #include "arrow/types/primitive.h"
+#include "arrow/parquet/schema.h"
 #include "arrow/parquet/utils.h"
 #include "arrow/util/status.h"
 
+using parquet::ParquetFileWriter;
+using parquet::schema::GroupNode;
+
 namespace arrow {
 
 namespace parquet {
@@ -32,8 +42,9 @@ class FileWriter::Impl {
 
   Status NewRowGroup(int64_t chunk_size);
   template <typename ParquetType>
-  Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data);
-  Status WriteFlatColumnChunk(const PrimitiveArray* data);
+  Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data,
+      int64_t offset, int64_t length);
+  Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length);
   Status Close();
 
   virtual ~Impl() {}
@@ -60,31 +71,31 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
 }
 
 template <typename ParquetType>
-Status FileWriter::Impl::TypedWriteBatch(
-    ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data) {
+Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer,
+    const PrimitiveArray* data, int64_t offset, int64_t length) {
+  // TODO: DCHECK((offset + length) <= data->length());
   auto data_ptr =
-      reinterpret_cast<const typename ParquetType::c_type*>(data->data()->data());
+      reinterpret_cast<const typename ParquetType::c_type*>(data->data()->data()) +
+      offset;
   auto writer =
       reinterpret_cast<::parquet::TypedColumnWriter<ParquetType>*>(column_writer);
   if (writer->descr()->max_definition_level() == 0) {
     // no nulls, just dump the data
-    PARQUET_CATCH_NOT_OK(writer->WriteBatch(data->length(), nullptr, nullptr, data_ptr));
+    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_ptr));
   } else if (writer->descr()->max_definition_level() == 1) {
-    RETURN_NOT_OK(def_levels_buffer_.Resize(data->length() * sizeof(int16_t)));
+    RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
     int16_t* def_levels_ptr =
         reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
     if (data->null_count() == 0) {
-      std::fill(def_levels_ptr, def_levels_ptr + data->length(), 1);
-      PARQUET_CATCH_NOT_OK(
-          writer->WriteBatch(data->length(), def_levels_ptr, nullptr, data_ptr));
+      std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+      PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, data_ptr));
     } else {
-      RETURN_NOT_OK(data_buffer_.Resize(
-          (data->length() - data->null_count()) * sizeof(typename ParquetType::c_type)));
+      RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(typename ParquetType::c_type)));
       auto buffer_ptr =
           reinterpret_cast<typename ParquetType::c_type*>(data_buffer_.mutable_data());
       int buffer_idx = 0;
-      for (size_t i = 0; i < data->length(); i++) {
-        if (data->IsNull(i)) {
+      for (size_t i = 0; i < length; i++) {
+        if (data->IsNull(offset + i)) {
           def_levels_ptr[i] = 0;
         } else {
           def_levels_ptr[i] = 1;
@@ -92,7 +103,7 @@ Status FileWriter::Impl::TypedWriteBatch(
         }
       }
       PARQUET_CATCH_NOT_OK(
-          writer->WriteBatch(data->length(), def_levels_ptr, nullptr, buffer_ptr));
+          writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
     }
   } else {
     return Status::NotImplemented("no support for max definition level > 1 yet");
@@ -107,12 +118,13 @@ Status FileWriter::Impl::Close() {
   return Status::OK();
 }
 
-#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
-  case Type::ENUM:                                     \
-    return TypedWriteBatch<ParquetType>(writer, data); \
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType)                 \
+  case Type::ENUM:                                                     \
+    return TypedWriteBatch<ParquetType>(writer, data, offset, length); \
     break;
 
-Status FileWriter::Impl::WriteFlatColumnChunk(const PrimitiveArray* data) {
+Status FileWriter::Impl::WriteFlatColumnChunk(
+    const PrimitiveArray* data, int64_t offset, int64_t length) {
   ::parquet::ColumnWriter* writer;
   PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn());
   switch (data->type_enum()) {
@@ -133,8 +145,11 @@ Status FileWriter::NewRowGroup(int64_t chunk_size) {
   return impl_->NewRowGroup(chunk_size);
 }
 
-Status FileWriter::WriteFlatColumnChunk(const PrimitiveArray* data) {
-  return impl_->WriteFlatColumnChunk(data);
+Status FileWriter::WriteFlatColumnChunk(
+    const PrimitiveArray* data, int64_t offset, int64_t length) {
+  int64_t real_length = length;
+  if (length == -1) { real_length = data->length(); }
+  return impl_->WriteFlatColumnChunk(data, offset, real_length);
 }
 
 Status FileWriter::Close() {
@@ -143,6 +158,48 @@ Status FileWriter::Close() {
 
 FileWriter::~FileWriter() {}
 
+Status WriteFlatTable(const Table* table, MemoryPool* pool,
+    std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size) {
+  std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema;
+  RETURN_NOT_OK(ToParquetSchema(table->schema().get(), &parquet_schema));
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema());
+  std::unique_ptr<ParquetFileWriter> parquet_writer =
+      ParquetFileWriter::Open(sink, schema_node);
+  FileWriter writer(pool, std::move(parquet_writer));
+
+  // TODO: Support writing chunked arrays.
+  for (int i = 0; i < table->num_columns(); i++) {
+    if (table->column(i)->data()->num_chunks() != 1) {
+      return Status::NotImplemented("No support for writing chunked arrays yet.");
+    }
+  }
+
+  // Cast to PrimitiveArray instances as we work with them.
+  std::vector<std::shared_ptr<PrimitiveArray>> arrays(table->num_columns());
+  for (int i = 0; i < table->num_columns(); i++) {
+    // num_chunks == 1 as per above loop
+    std::shared_ptr<Array> array = table->column(i)->data()->chunk(0);
+    auto primitive_array = std::dynamic_pointer_cast<PrimitiveArray>(array);
+    if (!primitive_array) {
+      PARQUET_IGNORE_NOT_OK(writer.Close());
+      return Status::NotImplemented("Table must consist of PrimitiveArray instances");
+    }
+    arrays[i] = primitive_array;
+  }
+
+  for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) {
+    int64_t offset = chunk * chunk_size;
+    int64_t size = std::min(chunk_size, table->num_rows() - offset);
+    RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close()));
+    for (int i = 0; i < table->num_columns(); i++) {
+      RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(arrays[i].get(), offset, size),
+          PARQUET_IGNORE_NOT_OK(writer.Close()));
+    }
+  }
+
+  return writer.Close();
+}
+
 }  // namespace parquet
 
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h
index 38f7d0b..83e799f 100644
--- a/cpp/src/arrow/parquet/writer.h
+++ b/cpp/src/arrow/parquet/writer.h
@@ -29,6 +29,7 @@ class MemoryPool;
 class PrimitiveArray;
 class RowBatch;
 class Status;
+class Table;
 
 namespace parquet {
 
@@ -42,7 +43,8 @@ class FileWriter {
   FileWriter(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer);
 
   Status NewRowGroup(int64_t chunk_size);
-  Status WriteFlatColumnChunk(const PrimitiveArray* data);
+  Status WriteFlatColumnChunk(
+      const PrimitiveArray* data, int64_t offset = 0, int64_t length = -1);
   Status Close();
 
   virtual ~FileWriter();
@@ -52,6 +54,14 @@ class FileWriter {
   std::unique_ptr<Impl> impl_;
 };
 
+/**
+ * Write a flat Table to Parquet.
+ *
+ * The table shall only consist of nullable, non-repeated columns of primitive type.
+ */
+Status WriteFlatTable(const Table* table, MemoryPool* pool,
+    std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size);
+
 }  // namespace parquet
 
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/util/status.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/status.h b/cpp/src/arrow/util/status.h
index 6ddc177..d1a7425 100644
--- a/cpp/src/arrow/util/status.h
+++ b/cpp/src/arrow/util/status.h
@@ -63,6 +63,15 @@ namespace arrow {
     if (!_s.ok()) { return _s; } \
   } while (0);
 
+#define RETURN_NOT_OK_ELSE(s, else_) \
+  do {                               \
+    Status _s = (s);                 \
+    if (!_s.ok()) {                  \
+      else_;                         \
+      return _s;                     \
+    }                                \
+  } while (0);
+
 enum class StatusCode : char {
   OK = 0,
   OutOfMemory = 1,

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 2173232..f1becfc 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -339,11 +339,17 @@ if (PYARROW_BUILD_TESTS)
     STATIC_LIB ${GTEST_STATIC_LIB})
 endif()
 
+## Parquet
+find_package(Parquet REQUIRED)
+include_directories(SYSTEM ${PARQUET_INCLUDE_DIR})
+
 ## Arrow
 find_package(Arrow REQUIRED)
 include_directories(SYSTEM ${ARROW_INCLUDE_DIR})
 ADD_THIRDPARTY_LIB(arrow
   SHARED_LIB ${ARROW_SHARED_LIB})
+ADD_THIRDPARTY_LIB(arrow_parquet
+    SHARED_LIB ${ARROW_PARQUET_SHARED_LIB})
 
 ############################################################
 # Linker setup
@@ -422,6 +428,7 @@ set(PYARROW_SRCS
 
 set(LINK_LIBS
   arrow
+  arrow_parquet
 )
 
 SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)
@@ -442,6 +449,7 @@ set(CYTHON_EXTENSIONS
   array
   config
   error
+  parquet
   scalar
   schema
   table

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/cmake_modules/FindArrow.cmake
----------------------------------------------------------------------
diff --git a/python/cmake_modules/FindArrow.cmake b/python/cmake_modules/FindArrow.cmake
index 3d99838..f0b258e 100644
--- a/python/cmake_modules/FindArrow.cmake
+++ b/python/cmake_modules/FindArrow.cmake
@@ -42,19 +42,27 @@ find_library(ARROW_LIB_PATH NAMES arrow
   ${ARROW_SEARCH_LIB_PATH}
   NO_DEFAULT_PATH)
 
-if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH)
+find_library(ARROW_PARQUET_LIB_PATH NAMES arrow_parquet
+  PATHS
+  ${ARROW_SEARCH_LIB_PATH}
+  NO_DEFAULT_PATH)
+
+if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH AND ARROW_PARQUET_LIB_PATH)
   set(ARROW_FOUND TRUE)
   set(ARROW_LIB_NAME libarrow)
+  set(ARROW_PARQUET_LIB_NAME libarrow_parquet)
   set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH})
   set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a)
   set(ARROW_SHARED_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+  set(ARROW_PARQUET_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_PARQUET_LIB_NAME}.a)
+  set(ARROW_PARQUET_SHARED_LIB ${ARROW_LIBS}/${ARROW_PARQUET_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
 else ()
   set(ARROW_FOUND FALSE)
 endif ()
 
 if (ARROW_FOUND)
   if (NOT Arrow_FIND_QUIETLY)
-    message(STATUS "Found the Arrow library: ${ARROW_LIB_PATH}")
+    message(STATUS "Found the Arrow library: ${ARROW_LIB_PATH}, ${ARROW_PARQUET_LIB_PATH}")
   endif ()
 else ()
   if (NOT Arrow_FIND_QUIETLY)
@@ -74,4 +82,6 @@ mark_as_advanced(
   ARROW_LIBS
   ARROW_STATIC_LIB
   ARROW_SHARED_LIB
+  ARROW_PARQUET_STATIC_LIB
+  ARROW_PARQUET_SHARED_LIB
 )

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/conda.recipe/build.sh
----------------------------------------------------------------------
diff --git a/python/conda.recipe/build.sh b/python/conda.recipe/build.sh
index a9d9aed..a164c1a 100644
--- a/python/conda.recipe/build.sh
+++ b/python/conda.recipe/build.sh
@@ -6,6 +6,19 @@ export ARROW_HOME=$PREFIX
 
 cd $RECIPE_DIR
 
+if [ "$(uname)" == "Darwin" ]; then
+  # C++11 finagling for Mac OSX
+  export CC=clang
+  export CXX=clang++
+  export MACOSX_VERSION_MIN="10.7"
+  CXXFLAGS="${CXXFLAGS} -mmacosx-version-min=${MACOSX_VERSION_MIN}"
+  CXXFLAGS="${CXXFLAGS} -stdlib=libc++ -std=c++11"
+  export LDFLAGS="${LDFLAGS} -mmacosx-version-min=${MACOSX_VERSION_MIN}"
+  export LDFLAGS="${LDFLAGS} -stdlib=libc++ -std=c++11"
+  export LINKFLAGS="${LDFLAGS}"
+  export MACOSX_DEPLOYMENT_TARGET=10.7
+fi
+
 echo Setting the compiler...
 if [ `uname` == Linux ]; then
   EXTRA_CMAKE_ARGS=-DCMAKE_SHARED_LINKER_FLAGS=-static-libstdc++

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx
index a80b3ce..619e5ef 100644
--- a/python/pyarrow/array.pyx
+++ b/python/pyarrow/array.pyx
@@ -68,6 +68,9 @@ cdef class Array:
         values = array_format(self, window=10)
         return '{0}\n{1}'.format(type_format, values)
 
+    def equals(Array self, Array other):
+        return self.ap.Equals(other.sp_array)
+
     def __len__(self):
         if self.sp_array.get():
             return self.sp_array.get().length()

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/error.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pxd b/python/pyarrow/error.pxd
index d226abe..97ba0ef 100644
--- a/python/pyarrow/error.pxd
+++ b/python/pyarrow/error.pxd
@@ -15,6 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from pyarrow.includes.libarrow cimport CStatus
 from pyarrow.includes.pyarrow cimport *
 
+cdef check_cstatus(const CStatus& status)
 cdef check_status(const Status& status)

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/error.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pyx b/python/pyarrow/error.pyx
index 3f8d7dd..5a6a038 100644
--- a/python/pyarrow/error.pyx
+++ b/python/pyarrow/error.pyx
@@ -15,12 +15,20 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from pyarrow.includes.libarrow cimport CStatus
 from pyarrow.includes.common cimport c_string
 from pyarrow.compat import frombytes
 
 class ArrowException(Exception):
     pass
 
+cdef check_cstatus(const CStatus& status):
+    if status.ok():
+        return
+
+    cdef c_string c_message = status.ToString()
+    raise ArrowException(frombytes(c_message))
+
 cdef check_status(const Status& status):
     if status.ok():
         return

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/includes/common.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd
index e86d5d7..1f6ecee 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -19,6 +19,7 @@
 
 from libc.stdint cimport *
 from libcpp cimport bool as c_bool
+from libcpp.memory cimport shared_ptr, unique_ptr
 from libcpp.string cimport string as c_string
 from libcpp.vector cimport vector
 
@@ -32,11 +33,3 @@ cdef extern from "<iostream>":
 cdef extern from "<Python.h>":
     void Py_XDECREF(PyObject* o)
 
-cdef extern from "<memory>" namespace "std" nogil:
-
-    cdef cppclass shared_ptr[T]:
-        shared_ptr()
-        shared_ptr(T*)
-        T* get()
-        void reset()
-        void reset(T* p)

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index b2ef45a..90414e3 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -72,6 +72,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
     cdef cppclass MemoryPool" arrow::MemoryPool":
         int64_t bytes_allocated()
 
+    cdef MemoryPool* default_memory_pool()
+
     cdef cppclass CListType" arrow::ListType"(CDataType):
         CListType(const shared_ptr[CDataType]& value_type)
 
@@ -103,6 +105,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         int32_t null_count()
         Type type_enum()
 
+        c_bool Equals(const shared_ptr[CArray]& arr)
         c_bool IsNull(int i)
 
     cdef cppclass CBooleanArray" arrow::BooleanArray"(CArray):

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/includes/parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd
index ffdc5d4..0918344 100644
--- a/python/pyarrow/includes/parquet.pxd
+++ b/python/pyarrow/includes/parquet.pxd
@@ -18,6 +18,26 @@
 # distutils: language = c++
 
 from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool
+
+
+cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
+  cdef cppclass Node:
+    pass
+
+  cdef cppclass GroupNode(Node):
+    pass
+
+  cdef cppclass PrimitiveNode(Node):
+    pass
+
+cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
+  cdef cppclass SchemaDescriptor:
+    shared_ptr[Node] schema()
+    GroupNode* group()
+
+  cdef cppclass ColumnDescriptor:
+    pass
 
 cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
     cdef cppclass ColumnReader:
@@ -48,4 +68,30 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
         pass
 
     cdef cppclass ParquetFileReader:
+        # TODO: Some default arguments are missing
+        @staticmethod
+        unique_ptr[ParquetFileReader] OpenFile(const c_string& path)
+
+cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
+    cdef cppclass OutputStream:
         pass
+
+    cdef cppclass LocalFileOutputStream(OutputStream):
+        LocalFileOutputStream(const c_string& path)
+        void Close()
+
+
+cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil:
+    cdef cppclass FileReader:
+        FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader)
+        CStatus ReadFlatTable(shared_ptr[CTable]* out);
+
+
+cdef extern from "arrow/parquet/schema.h" namespace "arrow::parquet" nogil:
+    CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema, shared_ptr[CSchema]* out)
+    CStatus ToParquetSchema(const CSchema* arrow_schema, shared_ptr[SchemaDescriptor]* out)
+
+
+cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil:
+    cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool, shared_ptr[OutputStream] sink, int64_t chunk_size)
+

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx
index 622e7d0..3d5355e 100644
--- a/python/pyarrow/parquet.pyx
+++ b/python/pyarrow/parquet.pyx
@@ -19,5 +19,53 @@
 # distutils: language = c++
 # cython: embedsignature = True
 
-from pyarrow.compat import frombytes, tobytes
+from pyarrow.includes.libarrow cimport *
+cimport pyarrow.includes.pyarrow as pyarrow
 from pyarrow.includes.parquet cimport *
+
+from pyarrow.compat import tobytes
+from pyarrow.error cimport check_cstatus
+from pyarrow.table cimport Table
+
+def read_table(filename, columns=None):
+    """
+    Read a Table from Parquet format
+    Returns
+    -------
+    table: pyarrow.Table
+    """
+    cdef unique_ptr[FileReader] reader
+    cdef Table table = Table()
+    cdef shared_ptr[CTable] ctable
+
+    # Must be in one expression to avoid calling std::move which is not possible
+    # in Cython (due to missing rvalue support)
+    reader = unique_ptr[FileReader](new FileReader(default_memory_pool(),
+        ParquetFileReader.OpenFile(tobytes(filename))))
+    check_cstatus(reader.get().ReadFlatTable(&ctable))
+    table.init(ctable)
+    return table
+
+def write_table(table, filename, chunk_size=None):
+    """
+    Write a Table to Parquet format
+
+    Parameters
+    ----------
+    table : pyarrow.Table
+    filename : string
+    chunk_size : int
+        The maximum number of rows in each Parquet RowGroup
+    """
+    cdef Table table_ = table
+    cdef CTable* ctable_ = table_.table
+    cdef shared_ptr[OutputStream] sink
+    cdef int64_t chunk_size_ = 0
+    if chunk_size is None:
+        chunk_size_ = min(ctable_.num_rows(), int(2**16))
+    else:
+        chunk_size_ = chunk_size
+
+    sink.reset(new LocalFileOutputStream(tobytes(filename)))
+    check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink, chunk_size_))
+

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/schema.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/schema.pyx b/python/pyarrow/schema.pyx
index 22ddf0c..084c304 100644
--- a/python/pyarrow/schema.pyx
+++ b/python/pyarrow/schema.pyx
@@ -201,7 +201,9 @@ def string():
 
 def list_(DataType value_type):
     cdef DataType out = DataType()
-    out.init(shared_ptr[CDataType](new CListType(value_type.sp_type)))
+    cdef shared_ptr[CDataType] list_type
+    list_type.reset(new CListType(value_type.sp_type))
+    out.init(list_type)
     return out
 
 def struct(fields):
@@ -212,12 +214,13 @@ def struct(fields):
         DataType out = DataType()
         Field field
         vector[shared_ptr[CField]] c_fields
+        cdef shared_ptr[CDataType] struct_type
 
     for field in fields:
         c_fields.push_back(field.sp_field)
 
-    out.init(shared_ptr[CDataType](
-        new CStructType(c_fields)))
+    struct_type.reset(new CStructType(c_fields))
+    out.init(struct_type)
     return out
 
 def schema(fields):

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
new file mode 100644
index 0000000..d92cf4c
--- /dev/null
+++ b/python/pyarrow/tests/test_parquet.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.compat import unittest
+import pyarrow as arrow
+import pyarrow.parquet
+
+A = arrow
+
+import numpy as np
+import os.path
+import pandas as pd
+
+import pandas.util.testing as pdt
+
+
+def test_single_pylist_column_roundtrip(tmpdir):
+    for dtype in [int, float]:
+        filename = tmpdir.join('single_{}_column.parquet'.format(dtype.__name__))
+        data = [A.from_pylist(list(map(dtype, range(5))))]
+        table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
+        A.parquet.write_table(table, filename.strpath)
+        table_read = pyarrow.parquet.read_table(filename.strpath)
+        for col_written, col_read in zip(table.itercolumns(), table_read.itercolumns()):
+            assert col_written.name == col_read.name
+            assert col_read.data.num_chunks == 1
+            data_written = col_written.data.chunk(0)
+            data_read = col_read.data.chunk(0)
+            assert data_written.equals(data_read)
+
+def test_pandas_rountrip(tmpdir):
+    size = 10000
+    df = pd.DataFrame({
+        'int32': np.arange(size, dtype=np.int32),
+        'int64': np.arange(size, dtype=np.int64),
+        'float32': np.arange(size, dtype=np.float32),
+        'float64': np.arange(size, dtype=np.float64)
+    })
+    filename = tmpdir.join('pandas_rountrip.parquet')
+    arrow_table = A.from_pandas_dataframe(df)
+    A.parquet.write_table(arrow_table, filename.strpath)
+    table_read = pyarrow.parquet.read_table(filename.strpath)
+    df_read = table_read.to_pandas()
+    pdt.assert_frame_equal(df, df_read)
+

http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 5f228ed..7edeb91 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -214,7 +214,7 @@ class build_ext(_build_ext):
             return name + suffix
 
     def get_cmake_cython_names(self):
-        return ['array', 'config', 'error', 'scalar', 'schema', 'table']
+        return ['array', 'config', 'error', 'parquet', 'scalar', 'schema', 'table']
 
     def get_names(self):
         return self._found_names
@@ -242,7 +242,7 @@ setup(
         'clean': clean,
         'build_ext': build_ext
     },
-    install_requires=['cython >= 0.21', 'numpy >= 1.9'],
+    install_requires=['cython >= 0.23', 'numpy >= 1.9'],
     description=DESC,
     license='Apache License, Version 2.0',
     maintainer="Apache Arrow Developers",


Mime
View raw message