arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject arrow git commit: ARROW-594: [C++/Python] Write arrow::Table to stream and file writers
Date Sun, 10 Sep 2017 06:30:53 GMT
Repository: arrow
Updated Branches:
  refs/heads/master b5d510f72 -> 947ca871c


ARROW-594: [C++/Python] Write arrow::Table to stream and file writers

* Add TableBatchReader class which creates a sequence of record batches from a table with
chunked columns
* Write Table to stream/file using the TableBatchReader
* Add Python bindings

Also resolves ARROW-989

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #1074 from wesm/ARROW-594 and squashes the following commits:

8126372 [Wes McKinney] Add write_table method to record batch writer classes
ad30059 [Wes McKinney] Add RecordBatchWriter::WriteTable method
00a317b [Wes McKinney] Fix go example
d643877 [Wes McKinney] Add Table::IsChunked method
84a9dcb [Wes McKinney] Add unit tests for TableBatchIterator
1748b39 [Wes McKinney] Draft TableBatchIterator implementation. Move RecordBatchReader interface
to arrow/table.h


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/947ca871
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/947ca871
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/947ca871

Branch: refs/heads/master
Commit: 947ca871c60fbd8680b8d477be2a5bee54fa8e0e
Parents: b5d510f
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Sun Sep 10 08:30:46 2017 +0200
Committer: Uwe L. Korn <uwelk@xhochy.com>
Committed: Sun Sep 10 08:30:46 2017 +0200

----------------------------------------------------------------------
 c_glib/arrow-glib/reader.cpp             |   2 +-
 cpp/src/arrow/ipc/ipc-read-write-test.cc |  42 ++++++---
 cpp/src/arrow/ipc/reader.cc              |   8 +-
 cpp/src/arrow/ipc/reader.h               |  19 +---
 cpp/src/arrow/ipc/stream-to-file.cc      |   2 +-
 cpp/src/arrow/ipc/writer.cc              |  15 +++
 cpp/src/arrow/ipc/writer.h               |  21 +++--
 cpp/src/arrow/pretty_print.cc            |   4 +-
 cpp/src/arrow/python/arrow_to_pandas.cc  |   2 +-
 cpp/src/arrow/python/arrow_to_python.cc  |   4 +-
 cpp/src/arrow/table-test.cc              |  77 ++++++++++++++++
 cpp/src/arrow/table.cc                   | 128 ++++++++++++++++++++++++--
 cpp/src/arrow/table.h                    |  41 +++++++++
 python/pyarrow/includes/libarrow.pxd     |   6 +-
 python/pyarrow/ipc.pxi                   |  32 ++++++-
 python/pyarrow/tests/test_ipc.py         |  21 ++++-
 16 files changed, 356 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/c_glib/arrow-glib/reader.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/reader.cpp b/c_glib/arrow-glib/reader.cpp
index ba835fd..1941f64 100644
--- a/c_glib/arrow-glib/reader.cpp
+++ b/c_glib/arrow-glib/reader.cpp
@@ -193,7 +193,7 @@ garrow_record_batch_reader_read_next_record_batch(GArrowRecordBatchReader
*reade
 {
   auto arrow_reader = garrow_record_batch_reader_get_raw(reader);
   std::shared_ptr<arrow::RecordBatch> arrow_record_batch;
-  auto status = arrow_reader->ReadNextRecordBatch(&arrow_record_batch);
+  auto status = arrow_reader->ReadNext(&arrow_record_batch);
 
   if (garrow_error_check(error,
                          status,

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index ac4d36e..ad3af0f 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -544,8 +544,8 @@ TEST_P(TestFileFormat, RoundTrip) {
   ASSERT_OK((*GetParam())(&batch1));  // NOLINT clang-tidy gtest issue
   ASSERT_OK((*GetParam())(&batch2));  // NOLINT clang-tidy gtest issue
 
-  std::vector<std::shared_ptr<RecordBatch>> in_batches = {batch1, batch2};
-  std::vector<std::shared_ptr<RecordBatch>> out_batches;
+  BatchVector in_batches = {batch1, batch2};
+  BatchVector out_batches;
 
   ASSERT_OK(RoundTripHelper(in_batches, &out_batches));
 
@@ -564,14 +564,14 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*>
{
   }
   void TearDown() {}
 
-  Status RoundTripHelper(const RecordBatch& batch,
-                         std::vector<std::shared_ptr<RecordBatch>>* out_batches)
{
+  Status RoundTripHelper(const BatchVector& batches, BatchVector* out_batches) {
     // Write the file
     std::shared_ptr<RecordBatchWriter> writer;
-    RETURN_NOT_OK(RecordBatchStreamWriter::Open(sink_.get(), batch.schema(), &writer));
-    int num_batches = 5;
-    for (int i = 0; i < num_batches; ++i) {
-      RETURN_NOT_OK(writer->WriteRecordBatch(batch));
+    RETURN_NOT_OK(
+        RecordBatchStreamWriter::Open(sink_.get(), batches[0]->schema(), &writer));
+
+    for (const auto& batch : batches) {
+      RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
     }
     RETURN_NOT_OK(writer->Close());
     RETURN_NOT_OK(sink_->Close());
@@ -584,7 +584,7 @@ class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*>
{
 
     std::shared_ptr<RecordBatch> chunk;
     while (true) {
-      RETURN_NOT_OK(reader->ReadNextRecordBatch(&chunk));
+      RETURN_NOT_OK(reader->ReadNext(&chunk));
       if (chunk == nullptr) {
         break;
       }
@@ -604,9 +604,9 @@ TEST_P(TestStreamFormat, RoundTrip) {
   std::shared_ptr<RecordBatch> batch;
   ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
 
-  std::vector<std::shared_ptr<RecordBatch>> out_batches;
+  BatchVector out_batches;
 
-  ASSERT_OK(RoundTripHelper(*batch, &out_batches));
+  ASSERT_OK(RoundTripHelper({batch, batch, batch}, &out_batches));
 
   // Compare batches. Same
   for (size_t i = 0; i < out_batches.size(); ++i) {
@@ -666,17 +666,31 @@ TEST_F(TestStreamFormat, DictionaryRoundTrip) {
   std::shared_ptr<RecordBatch> batch;
   ASSERT_OK(MakeDictionary(&batch));
 
-  std::vector<std::shared_ptr<RecordBatch>> out_batches;
-  ASSERT_OK(RoundTripHelper(*batch, &out_batches));
+  BatchVector out_batches;
+  ASSERT_OK(RoundTripHelper({batch}, &out_batches));
 
   CheckBatchDictionaries(*out_batches[0]);
 }
 
+TEST_F(TestStreamFormat, WriteTable) {
+  std::shared_ptr<RecordBatch> b1, b2, b3;
+  ASSERT_OK(MakeIntRecordBatch(&b1));
+  ASSERT_OK(MakeIntRecordBatch(&b2));
+  ASSERT_OK(MakeIntRecordBatch(&b3));
+
+  BatchVector out_batches;
+  ASSERT_OK(RoundTripHelper({b1, b2, b3}, &out_batches));
+
+  ASSERT_TRUE(b1->Equals(*out_batches[0]));
+  ASSERT_TRUE(b2->Equals(*out_batches[1]));
+  ASSERT_TRUE(b3->Equals(*out_batches[2]));
+}
+
 TEST_F(TestFileFormat, DictionaryRoundTrip) {
   std::shared_ptr<RecordBatch> batch;
   ASSERT_OK(MakeDictionary(&batch));
 
-  std::vector<std::shared_ptr<RecordBatch>> out_batches;
+  BatchVector out_batches;
   ASSERT_OK(RoundTripHelper({batch}, &out_batches));
 
   CheckBatchDictionaries(*out_batches[0]);

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 919dd7a..2a0633f 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -389,8 +389,6 @@ static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block*
block) {
   return FileBlock{block->offset(), block->metaDataLength(), block->bodyLength()};
 }
 
-RecordBatchReader::~RecordBatchReader() {}
-
 class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
  public:
   RecordBatchStreamReaderImpl() {}
@@ -432,7 +430,7 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
     return GetSchema(message->header(), dictionary_memo_, &schema_);
   }
 
-  Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) {
     std::unique_ptr<Message> message;
     RETURN_NOT_OK(ReadMessageAndValidate(message_reader_.get(), Message::RECORD_BATCH,
                                          true, &message));
@@ -504,8 +502,8 @@ std::shared_ptr<Schema> RecordBatchStreamReader::schema() const
{
   return impl_->schema();
 }
 
-Status RecordBatchStreamReader::ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch)
{
-  return impl_->ReadNextRecordBatch(batch);
+Status RecordBatchStreamReader::ReadNext(std::shared_ptr<RecordBatch>* batch) {
+  return impl_->ReadNext(batch);
 }
 
 // ----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/cpp/src/arrow/ipc/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index 52b04ab..54174f9 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -24,6 +24,7 @@
 #include <memory>
 
 #include "arrow/ipc/message.h"
+#include "arrow/table.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
@@ -43,21 +44,7 @@ class RandomAccessFile;
 
 namespace ipc {
 
-/// \brief Abstract interface for reading stream of record batches
-class ARROW_EXPORT RecordBatchReader {
- public:
-  virtual ~RecordBatchReader();
-
-  /// \return the shared schema of the record batches in the stream
-  virtual std::shared_ptr<Schema> schema() const = 0;
-
-  /// Read the next record batch in the stream. Return nullptr for batch when
-  /// reaching end of stream
-  ///
-  /// \param(out) batch the next loaded batch, nullptr at end of stream
-  /// \return Status
-  virtual Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) = 0;
-};
+using RecordBatchReader = ::arrow::RecordBatchReader;
 
 /// \class RecordBatchStreamReader
 /// \brief Synchronous batch stream reader that reads from io::InputStream
@@ -96,7 +83,7 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader {
                      std::shared_ptr<RecordBatchReader>* out);
 
   std::shared_ptr<Schema> schema() const override;
-  Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override;
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override;
 
  private:
   RecordBatchStreamReader();

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/cpp/src/arrow/ipc/stream-to-file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream-to-file.cc b/cpp/src/arrow/ipc/stream-to-file.cc
index 4f8cd80..859969b 100644
--- a/cpp/src/arrow/ipc/stream-to-file.cc
+++ b/cpp/src/arrow/ipc/stream-to-file.cc
@@ -45,7 +45,7 @@ Status ConvertToFile() {
 
   std::shared_ptr<RecordBatch> batch;
   while (true) {
-    RETURN_NOT_OK(reader->ReadNextRecordBatch(&batch));
+    RETURN_NOT_OK(reader->ReadNext(&batch));
     if (batch == nullptr) break;
     RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 3f3d168..9f557f6 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -655,6 +655,21 @@ Status GetTensorSize(const Tensor& tensor, int64_t* size) {
 
 RecordBatchWriter::~RecordBatchWriter() {}
 
+Status RecordBatchWriter::WriteTable(const Table& table) {
+  TableBatchReader reader(table);
+
+  std::shared_ptr<RecordBatch> batch;
+  while (true) {
+    RETURN_NOT_OK(reader.ReadNext(&batch));
+    if (batch == nullptr) {
+      break;
+    }
+    RETURN_NOT_OK(WriteRecordBatch(*batch, true));
+  }
+
+  return Status::OK();
+}
+
 // ----------------------------------------------------------------------
 // Stream writer implementation
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/cpp/src/arrow/ipc/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index 3f110fe..bff7bb0 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -37,6 +37,7 @@ class MemoryPool;
 class RecordBatch;
 class Schema;
 class Status;
+class Table;
 class Tensor;
 
 namespace io {
@@ -53,15 +54,20 @@ class ARROW_EXPORT RecordBatchWriter {
  public:
   virtual ~RecordBatchWriter();
 
-  /// Write a record batch to the stream
+  /// \brief Write a record batch to the stream
   ///
   /// \param allow_64bit boolean permitting field lengths exceeding INT32_MAX
-  /// \return Status indicate success or failure
+  /// \return Status
   virtual Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false)
= 0;
 
-  /// Perform any logic necessary to finish the stream
+  /// \brief Write possibly-chunked table by creating sequence of record batches
+  /// \param[in] table
+  /// \return Status
+  Status WriteTable(const Table& table);
+
+  /// \brief Perform any logic necessary to finish the stream
   ///
-  /// \return Status indicate success or failure
+  /// \return Status
   virtual Status Close() = 0;
 
   /// In some cases, writing may require memory allocation. We use the default
@@ -84,7 +90,7 @@ class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter {
   /// \param(in) sink output stream to write to
   /// \param(in) schema the schema of the record batches to be written
   /// \param(out) out the created stream writer
-  /// \return Status indicating success or failure
+  /// \return Status
   static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
                      std::shared_ptr<RecordBatchWriter>* out);
 
@@ -118,7 +124,7 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter
{
   /// \param(in) sink output stream to write to
   /// \param(in) schema the schema of the record batches to be written
   /// \param(out) out the created stream writer
-  /// \return Status indicating success or failure
+  /// \return Status
   static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
                      std::shared_ptr<RecordBatchWriter>* out);
 
@@ -195,7 +201,7 @@ Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool*
pool,
 /// \param[in] scheam the schema to write
 /// \param[in] pool a MemoryPool to allocate memory from
 /// \param[out] out the serialized schema
-/// \return Status
+/// \returnn Status
 ARROW_EXPORT
 Status SerializeSchema(const Schema& schema, MemoryPool* pool,
                        std::shared_ptr<Buffer>* out);
@@ -203,6 +209,7 @@ Status SerializeSchema(const Schema& schema, MemoryPool* pool,
 /// \brief Write multiple record batches to OutputStream
 /// \param[in] batches a vector of batches. Must all have same schema
 /// \param[out] dst an OutputStream
+/// \return Status
 ARROW_EXPORT
 Status WriteRecordBatchStream(const std::vector<std::shared_ptr<RecordBatch>>&
batches,
                               io::OutputStream* dst);

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/cpp/src/arrow/pretty_print.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc
index 3f64d75..cc1acf4 100644
--- a/cpp/src/arrow/pretty_print.cc
+++ b/cpp/src/arrow/pretty_print.cc
@@ -45,9 +45,7 @@ class PrettyPrinter {
   void OpenArray();
   void CloseArray();
 
-  void Flush() {
-    (*sink_) << std::flush;
-  }
+  void Flush() { (*sink_) << std::flush; }
 
  protected:
   int indent_;

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/cpp/src/arrow/python/arrow_to_pandas.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/arrow_to_pandas.cc b/cpp/src/arrow/python/arrow_to_pandas.cc
index 769f609..117bf23 100644
--- a/cpp/src/arrow/python/arrow_to_pandas.cc
+++ b/cpp/src/arrow/python/arrow_to_pandas.cc
@@ -1192,7 +1192,7 @@ class DataFrameBlockCreator {
   Status CreateBlocks() {
     for (int i = 0; i < table_->num_columns(); ++i) {
       std::shared_ptr<Column> col = table_->column(i);
-      PandasBlock::type output_type;
+      PandasBlock::type output_type = PandasBlock::OBJECT;
       RETURN_NOT_OK(GetPandasBlockType(*col, options_, &output_type));
 
       int block_placement = 0;

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/cpp/src/arrow/python/arrow_to_python.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc
index bc12ba7..a281fe3 100644
--- a/cpp/src/arrow/python/arrow_to_python.cc
+++ b/cpp/src/arrow/python/arrow_to_python.cc
@@ -226,9 +226,9 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject*
out)
   RETURN_NOT_OK(
       src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_tensors)));
 
-  std::shared_ptr<ipc::RecordBatchReader> reader;
+  std::shared_ptr<RecordBatchReader> reader;
   RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader));
-  RETURN_NOT_OK(reader->ReadNextRecordBatch(&out->batch));
+  RETURN_NOT_OK(reader->ReadNext(&out->batch));
 
   RETURN_NOT_OK(src->Tell(&offset));
   offset += 4;  // Skip the end-of-stream message

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/cpp/src/arrow/table-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc
index 4b67492..a9c7e6d 100644
--- a/cpp/src/arrow/table-test.cc
+++ b/cpp/src/arrow/table-test.cc
@@ -33,6 +33,11 @@ using std::vector;
 
 namespace arrow {
 
+std::shared_ptr<Column> column(const std::shared_ptr<Field>& field,
+                               const std::vector<std::shared_ptr<Array>>&
arrays) {
+  return std::make_shared<Column>(field, arrays);
+}
+
 class TestChunkedArray : public TestBase {
  protected:
   virtual void Construct() {
@@ -437,6 +442,29 @@ TEST_F(TestTable, AddColumn) {
   ASSERT_TRUE(result->Equals(Table(ex_schema, ex_columns)));
 }
 
+TEST_F(TestTable, IsChunked) {
+  ArrayVector c1, c2;
+
+  auto a1 = MakePrimitive<Int32Array>(10);
+  auto a2 = MakePrimitive<Int32Array>(20);
+
+  auto sch1 = arrow::schema({field("f1", int32()), field("f2", int32())});
+
+  std::vector<std::shared_ptr<Column>> columns;
+
+  std::shared_ptr<RecordBatch> batch;
+
+  columns = {column(sch1->field(0), {a1}), column(sch1->field(1), {a1})};
+  auto t1 = std::make_shared<Table>(sch1, columns);
+
+  ASSERT_FALSE(t1->IsChunked());
+
+  columns = {column(sch1->field(0), {a2}), column(sch1->field(1), {a1, a1})};
+  auto t2 = std::make_shared<Table>(sch1, columns);
+
+  ASSERT_TRUE(t2->IsChunked());
+}
+
 class TestRecordBatch : public TestBase {};
 
 TEST_F(TestRecordBatch, Equals) {
@@ -522,4 +550,53 @@ TEST_F(TestRecordBatch, Slice) {
   }
 }
 
+class TestTableBatchReader : public TestBase {};
+
+TEST_F(TestTableBatchReader, ReadNext) {
+  ArrayVector c1, c2;
+
+  auto a1 = MakePrimitive<Int32Array>(10);
+  auto a2 = MakePrimitive<Int32Array>(20);
+  auto a3 = MakePrimitive<Int32Array>(30);
+  auto a4 = MakePrimitive<Int32Array>(10);
+
+  auto sch1 = arrow::schema({field("f1", int32()), field("f2", int32())});
+
+  std::vector<std::shared_ptr<Column>> columns;
+
+  std::shared_ptr<RecordBatch> batch;
+
+  columns = {column(sch1->field(0), {a1, a4, a2}), column(sch1->field(1), {a2, a2})};
+  Table t1(sch1, columns);
+
+  TableBatchReader i1(t1);
+
+  ASSERT_OK(i1.ReadNext(&batch));
+  ASSERT_EQ(10, batch->num_rows());
+
+  ASSERT_OK(i1.ReadNext(&batch));
+  ASSERT_EQ(10, batch->num_rows());
+
+  ASSERT_OK(i1.ReadNext(&batch));
+  ASSERT_EQ(20, batch->num_rows());
+
+  ASSERT_OK(i1.ReadNext(&batch));
+  ASSERT_EQ(nullptr, batch);
+
+  columns = {column(sch1->field(0), {a1}), column(sch1->field(1), {a4})};
+  Table t2(sch1, columns);
+
+  TableBatchReader i2(t2);
+
+  ASSERT_OK(i2.ReadNext(&batch));
+  ASSERT_EQ(10, batch->num_rows());
+
+  // Ensure non-sliced
+  ASSERT_EQ(a1->data().get(), batch->column_data(0).get());
+  ASSERT_EQ(a4->data().get(), batch->column_data(1).get());
+
+  ASSERT_OK(i1.ReadNext(&batch));
+  ASSERT_EQ(nullptr, batch);
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/cpp/src/arrow/table.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index 8c7c4e2..4a08dc5 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -30,6 +30,8 @@
 
 namespace arrow {
 
+using internal::ArrayData;
+
 // ----------------------------------------------------------------------
 // ChunkedArray and Column methods
 
@@ -184,13 +186,13 @@ RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema,
int64_t num_rows
 }
 
 RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
-                         std::vector<std::shared_ptr<internal::ArrayData>>&&
columns)
+                         std::vector<std::shared_ptr<ArrayData>>&& columns)
     : RecordBatch(schema, num_rows) {
   columns_ = std::move(columns);
 }
 
 RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
-                         const std::vector<std::shared_ptr<internal::ArrayData>>&
columns)
+                         const std::vector<std::shared_ptr<ArrayData>>& columns)
     : RecordBatch(schema, num_rows) {
   columns_ = columns;
 }
@@ -245,13 +247,13 @@ std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset)
const {
 }
 
 std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset, int64_t length) const
{
-  std::vector<std::shared_ptr<internal::ArrayData>> arrays;
+  std::vector<std::shared_ptr<ArrayData>> arrays;
   arrays.reserve(num_columns());
   for (const auto& field : columns_) {
     int64_t col_length = std::min(field->length - offset, length);
     int64_t col_offset = field->offset + offset;
 
-    auto new_data = std::make_shared<internal::ArrayData>(*field);
+    auto new_data = std::make_shared<ArrayData>(*field);
     new_data->length = col_length;
     new_data->offset = col_offset;
     new_data->null_count = kUnknownNullCount;
@@ -263,7 +265,7 @@ std::shared_ptr<RecordBatch> RecordBatch::Slice(int64_t offset,
int64_t length)
 
 Status RecordBatch::Validate() const {
   for (int i = 0; i < num_columns(); ++i) {
-    const internal::ArrayData& arr = *columns_[i];
+    const ArrayData& arr = *columns_[i];
     if (arr.length != num_rows_) {
       std::stringstream ss;
       ss << "Number of rows in column " << i << " did not match batch:
" << arr.length
@@ -471,9 +473,18 @@ Status Table::ValidateColumns() const {
   return Status::OK();
 }
 
-Status ARROW_EXPORT MakeTable(const std::shared_ptr<Schema>& schema,
-                              const std::vector<std::shared_ptr<Array>>&
arrays,
-                              std::shared_ptr<Table>* table) {
+bool Table::IsChunked() const {
+  for (size_t i = 0; i < columns_.size(); ++i) {
+    if (columns_[i]->data()->num_chunks() > 1) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Status MakeTable(const std::shared_ptr<Schema>& schema,
+                 const std::vector<std::shared_ptr<Array>>& arrays,
+                 std::shared_ptr<Table>* table) {
   // Make sure the length of the schema corresponds to the length of the vector
   if (schema->num_fields() != static_cast<int>(arrays.size())) {
     std::stringstream ss;
@@ -493,4 +504,105 @@ Status ARROW_EXPORT MakeTable(const std::shared_ptr<Schema>&
schema,
   return Status::OK();
 }
 
+// ----------------------------------------------------------------------
+// Base record batch reader
+
+RecordBatchReader::~RecordBatchReader() {}
+
+#ifndef ARROW_NO_DEPRECATED_API
+Status RecordBatchReader::ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch)
{
+  return ReadNext(batch);
+}
+#endif
+
+// ----------------------------------------------------------------------
+// Convert a table to a sequence of record batches
+
+class TableBatchReader::TableBatchReaderImpl {
+ public:
+  explicit TableBatchReaderImpl(const Table& table)
+      : table_(table),
+        column_data_(table.num_columns()),
+        chunk_numbers_(table.num_columns(), 0),
+        chunk_offsets_(table.num_columns(), 0),
+        absolute_row_position_(0) {
+    for (int i = 0; i < table.num_columns(); ++i) {
+      column_data_[i] = table.column(i)->data().get();
+    }
+  }
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* out) {
+    if (absolute_row_position_ == table_.num_rows()) {
+      *out = nullptr;
+      return Status::OK();
+    }
+
+    // Determine the minimum contiguous slice across all columns
+    int64_t chunksize = table_.num_rows();
+    std::vector<const Array*> chunks(table_.num_columns());
+    for (int i = 0; i < table_.num_columns(); ++i) {
+      auto chunk = column_data_[i]->chunk(chunk_numbers_[i]).get();
+      int64_t chunk_remaining = chunk->length() - chunk_offsets_[i];
+
+      if (chunk_remaining < chunksize) {
+        chunksize = chunk_remaining;
+      }
+
+      chunks[i] = chunk;
+    }
+
+    // Slice chunks and advance chunk index as appropriate
+    std::vector<std::shared_ptr<ArrayData>> batch_data;
+    batch_data.reserve(table_.num_columns());
+
+    for (int i = 0; i < table_.num_columns(); ++i) {
+      // Exhausted chunk
+      const Array* chunk = chunks[i];
+      const int64_t offset = chunk_offsets_[i];
+      std::shared_ptr<ArrayData> slice_data;
+      if ((chunk->length() - offset) == chunksize) {
+        ++chunk_numbers_[i];
+        chunk_offsets_[i] = 0;
+        if (chunk_offsets_[i] > 0) {
+          // Need to slice
+          slice_data = chunk->Slice(offset, chunksize)->data();
+        } else {
+          // No slice
+          slice_data = chunk->data();
+        }
+      } else {
+        slice_data = chunk->Slice(offset, chunksize)->data();
+      }
+      batch_data.emplace_back(std::move(slice_data));
+    }
+
+    absolute_row_position_ += chunksize;
+    *out =
+        std::make_shared<RecordBatch>(table_.schema(), chunksize, std::move(batch_data));
+
+    return Status::OK();
+  }
+
+  std::shared_ptr<Schema> schema() const { return table_.schema(); }
+
+ private:
+  const Table& table_;
+  std::vector<ChunkedArray*> column_data_;
+  std::vector<int> chunk_numbers_;
+  std::vector<int64_t> chunk_offsets_;
+  int64_t absolute_row_position_;
+};
+
+TableBatchReader::TableBatchReader(const Table& table) {
+  impl_.reset(new TableBatchReaderImpl(table));
+}
+
+TableBatchReader::~TableBatchReader() {}
+
+std::shared_ptr<Schema> TableBatchReader::schema() const { return impl_->schema();
}
+
+Status TableBatchReader::ReadNext(std::shared_ptr<RecordBatch>* out) {
+  return impl_->ReadNext(out);
+}
+
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/cpp/src/arrow/table.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index da2722d..90336e9 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -262,6 +262,9 @@ class ARROW_EXPORT Table {
   /// \brief Perform any checks to validate the input arguments
   Status ValidateColumns() const;
 
+  /// \brief Return true if any column has multiple chunks
+  bool IsChunked() const;
+
  private:
   std::shared_ptr<Schema> schema_;
   std::vector<std::shared_ptr<Column>> columns_;
@@ -269,6 +272,44 @@ class ARROW_EXPORT Table {
   int64_t num_rows_;
 };
 
+/// \brief Abstract interface for reading stream of record batches
+class ARROW_EXPORT RecordBatchReader {
+ public:
+  virtual ~RecordBatchReader();
+
+  /// \return the shared schema of the record batches in the stream
+  virtual std::shared_ptr<Schema> schema() const = 0;
+
+  /// Read the next record batch in the stream. Return nullptr for batch when
+  /// reaching end of stream
+  ///
+  /// \param(out) batch the next loaded batch, nullptr at end of stream
+  /// \return Status
+  virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0;
+
+#ifndef ARROW_NO_DEPRECATED_API
+  /// \deprecated Since 0.7.0
+  Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch);
+#endif
+};
+
+/// \brief Compute a sequence of record batches from a (possibly chunked) Table
+class ARROW_EXPORT TableBatchReader : public RecordBatchReader {
+ public:
+  ~TableBatchReader();
+
+  /// \brief Read batches with the maximum possible size
+  explicit TableBatchReader(const Table& table);
+
+  std::shared_ptr<Schema> schema() const override;
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* out) override;
+
+ private:
+  class TableBatchReaderImpl;
+  std::unique_ptr<TableBatchReaderImpl> impl_;
+};
+
 /// \brief Construct table from multiple input tables.
 /// \return Status, fails if any schemas are different
 ARROW_EXPORT

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 3b7ddcf..5e67088 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -632,11 +632,13 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
 
     cdef cppclass CRecordBatchWriter" arrow::ipc::RecordBatchWriter":
         CStatus Close()
-        CStatus WriteRecordBatch(const CRecordBatch& batch)
+        CStatus WriteRecordBatch(const CRecordBatch& batch,
+                                 c_bool allow_64bit)
+        CStatus WriteTable(const CTable& table)
 
     cdef cppclass CRecordBatchReader" arrow::ipc::RecordBatchReader":
         shared_ptr[CSchema] schema()
-        CStatus ReadNextRecordBatch(shared_ptr[CRecordBatch]* batch)
+        CStatus ReadNext(shared_ptr[CRecordBatch]* batch)
 
     cdef cppclass CRecordBatchStreamReader \
             " arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader):

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/python/pyarrow/ipc.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 027a00d..27e9167 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -175,6 +175,21 @@ cdef class _RecordBatchWriter:
                                               &self.writer))
         self.closed = False
 
+    def write(self, table_or_batch):
+        """
+        Write RecordBatch or Table to stream
+
+        Parameters
+        ----------
+        table_or_batch : {RecordBatch, Table}
+        """
+        if isinstance(table_or_batch, RecordBatch):
+            self.write_batch(table_or_batch)
+        elif isinstance(table_or_batch, Table):
+            self.write_table(table_or_batch)
+        else:
+            raise ValueError(type(table_or_batch))
+
     def write_batch(self, RecordBatch batch):
         """
         Write RecordBatch to stream
@@ -185,7 +200,18 @@ cdef class _RecordBatchWriter:
         """
         with nogil:
             check_status(self.writer.get()
-                         .WriteRecordBatch(deref(batch.batch)))
+                         .WriteRecordBatch(deref(batch.batch), 1))
+
+    def write_table(self, Table table):
+        """
+        Write RecordBatch to stream
+
+        Parameters
+        ----------
+        batch : RecordBatch
+        """
+        with nogil:
+            check_status(self.writer.get().WriteTable(table.table[0]))
 
     def close(self):
         """
@@ -245,7 +271,7 @@ cdef class _RecordBatchReader:
         cdef shared_ptr[CRecordBatch] batch
 
         with nogil:
-            check_status(self.reader.get().ReadNextRecordBatch(&batch))
+            check_status(self.reader.get().ReadNext(&batch))
 
         if batch.get() == NULL:
             raise StopIteration
@@ -263,7 +289,7 @@ cdef class _RecordBatchReader:
 
         with nogil:
             while True:
-                check_status(self.reader.get().ReadNextRecordBatch(&batch))
+                check_status(self.reader.get().ReadNext(&batch))
                 if batch.get() == NULL:
                     break
                 batches.push_back(batch)

http://git-wip-us.apache.org/repos/asf/arrow/blob/947ca871/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index ecdbe62..6ac44fa 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -40,7 +40,7 @@ class MessagingTest(object):
     def _get_source(self):
         return self.sink.getvalue()
 
-    def write_batches(self, num_batches=5):
+    def write_batches(self, num_batches=5, as_table=False):
         nrows = 5
         df = pd.DataFrame({
             'one': np.random.randn(nrows),
@@ -54,12 +54,17 @@ class MessagingTest(object):
         for i in range(num_batches):
             unique_df = df.copy()
             unique_df['one'] = np.random.randn(len(df))
-
             batch = pa.RecordBatch.from_pandas(unique_df)
-            writer.write_batch(batch)
             frames.append(unique_df)
             batches.append(batch)
 
+        if as_table:
+            table = pa.Table.from_batches(batches)
+            writer.write_table(table)
+        else:
+            for batch in batches:
+                writer.write_batch(batch)
+
         writer.close()
         return frames, batches
 
@@ -75,8 +80,8 @@ class TestFile(MessagingTest, unittest.TestCase):
         with pytest.raises(pa.ArrowInvalid):
             pa.open_file(buf)
 
-    def test_simple_roundtrip(self):
-        _, batches = self.write_batches()
+    def _check_roundtrip(self, as_table=False):
+        _, batches = self.write_batches(as_table=as_table)
         file_contents = pa.BufferReader(self._get_source())
 
         reader = pa.open_file(file_contents)
@@ -89,6 +94,12 @@ class TestFile(MessagingTest, unittest.TestCase):
             assert batches[i].equals(batch)
             assert reader.schema.equals(batches[0].schema)
 
+    def test_simple_roundtrip(self):
+        self._check_roundtrip(as_table=False)
+
+    def test_write_table(self):
+        self._check_roundtrip(as_table=True)
+
     def test_read_all(self):
         _, batches = self.write_batches()
         file_contents = pa.BufferReader(self._get_source())


Mime
View raw message