arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [3/3] arrow git commit: ARROW-661: [C++] Add LargeRecordBatch metadata type, IPC support, associated refactoring
Date Mon, 20 Mar 2017 09:48:42 GMT
ARROW-661: [C++] Add LargeRecordBatch metadata type, IPC support, associated refactoring

This patch enables the following code for writing record batches exceeding 2^31 - 1

```c++
RETURN_NOT_OK(WriteLargeRecordBatch(
    batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
return ReadLargeRecordBatch(batch.schema(), 0, mmap_.get(), result);
```

This also does a fair amount of refactoring and code consolidation related to ongoing code cleaning in arrow_ipc.

These APIs are marked experimental. This does add `LargeRecordBatch` flatbuffer type to the Message union, but I've indicated that Arrow implementations (e.g. Java) are not required to implement this type. It's strictly to enable C++ users to write very large datasets that have been embedded for convenience in Arrow's structured data model.

cc @pcmoritz @robertnishihara

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #404 from wesm/ARROW-661 and squashes the following commits:

9c18a95 [Wes McKinney] Fix import ordering
d7811f2 [Wes McKinney] cpplint
179a1e3 [Wes McKinney] Add unit test for large record batches. Use bytewise comparisons with aligned bitmaps
36c3862 [Wes McKinney] Get LargeRecordBatch round trip working. Add to Message union for now
4c1d08c [Wes McKinney] Refactoring, failing test fixture for large record batch
f4c8830 [Wes McKinney] Consolidate ipc-metadata-test and ipc-read-write-test and draft large record batch read/write path
85d1a1c [Wes McKinney] Add (untested) metadata writer for LargeRecordBatch
0f2722c [Wes McKinney] Consolidate metadata-internal.h into metadata.h. Use own Arrow structs for IPC metadata and convert to flatbuffers later
e8f8973 [Wes McKinney] Split adapter.h/cc into reader.h/writer.h. Draft LargeRecordBatch type


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/df2220f3
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/df2220f3
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/df2220f3

Branch: refs/heads/master
Commit: df2220f350282925a454ed911eed6618e4d53969
Parents: 4c5f79c
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Mon Mar 20 10:48:34 2017 +0100
Committer: Uwe L. Korn <uwelk@xhochy.com>
Committed: Mon Mar 20 10:48:34 2017 +0100

----------------------------------------------------------------------
 cpp/src/arrow/allocator-test.cc          |   1 +
 cpp/src/arrow/allocator.h                |   1 +
 cpp/src/arrow/io/test-common.h           |  18 +
 cpp/src/arrow/ipc/CMakeLists.txt         |  15 +-
 cpp/src/arrow/ipc/adapter.cc             | 630 -----------------------
 cpp/src/arrow/ipc/adapter.h              | 104 ----
 cpp/src/arrow/ipc/api.h                  |   1 -
 cpp/src/arrow/ipc/ipc-adapter-test.cc    | 320 ------------
 cpp/src/arrow/ipc/ipc-file-test.cc       | 228 ---------
 cpp/src/arrow/ipc/ipc-metadata-test.cc   | 100 ----
 cpp/src/arrow/ipc/ipc-read-write-test.cc | 608 ++++++++++++++++++++++
 cpp/src/arrow/ipc/metadata-internal.cc   | 597 ----------------------
 cpp/src/arrow/ipc/metadata-internal.h    |  83 ---
 cpp/src/arrow/ipc/metadata.cc            | 692 +++++++++++++++++++++++++-
 cpp/src/arrow/ipc/metadata.h             |  40 +-
 cpp/src/arrow/ipc/reader.cc              | 171 ++++++-
 cpp/src/arrow/ipc/reader.h               |  22 +
 cpp/src/arrow/ipc/test-common.h          |   2 +-
 cpp/src/arrow/ipc/writer.cc              | 544 ++++++++++++++++++--
 cpp/src/arrow/ipc/writer.h               |  46 +-
 cpp/src/arrow/loader.h                   |  25 +
 cpp/src/arrow/type.h                     |   1 +
 cpp/src/arrow/util/bit-util.cc           |  16 +-
 format/Message.fbs                       |  22 +-
 24 files changed, 2131 insertions(+), 2156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/allocator-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/allocator-test.cc b/cpp/src/arrow/allocator-test.cc
index 0b24267..811ef5a 100644
--- a/cpp/src/arrow/allocator-test.cc
+++ b/cpp/src/arrow/allocator-test.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "gtest/gtest.h"
+
 #include "arrow/allocator.h"
 #include "arrow/test-util.h"
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/allocator.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/allocator.h b/cpp/src/arrow/allocator.h
index c976ba9..e00023d 100644
--- a/cpp/src/arrow/allocator.h
+++ b/cpp/src/arrow/allocator.h
@@ -21,6 +21,7 @@
 #include <cstddef>
 #include <memory>
 #include <utility>
+
 #include "arrow/memory_pool.h"
 #include "arrow/status.h"
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/io/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/test-common.h b/cpp/src/arrow/io/test-common.h
index 8355714..4c11476 100644
--- a/cpp/src/arrow/io/test-common.h
+++ b/cpp/src/arrow/io/test-common.h
@@ -41,6 +41,24 @@
 namespace arrow {
 namespace io {
 
+static inline Status ZeroMemoryMap(MemoryMappedFile* file) {
+  constexpr int64_t kBufferSize = 512;
+  static constexpr uint8_t kZeroBytes[kBufferSize] = {0};
+
+  RETURN_NOT_OK(file->Seek(0));
+  int64_t position = 0;
+  int64_t file_size;
+  RETURN_NOT_OK(file->GetSize(&file_size));
+
+  int64_t chunksize;
+  while (position < file_size) {
+    chunksize = std::min(kBufferSize, file_size - position);
+    RETURN_NOT_OK(file->Write(kZeroBytes, chunksize));
+    position += chunksize;
+  }
+  return Status::OK();
+}
+
 class MemoryMapFixture {
  public:
   void TearDown() {

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index c73af63..5d470df 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -29,12 +29,10 @@ set(ARROW_IPC_TEST_LINK_LIBS
   arrow_io_static)
 
 set(ARROW_IPC_SRCS
-  adapter.cc
   feather.cc
   json.cc
   json-internal.cc
   metadata.cc
-  metadata-internal.cc
   reader.cc
   writer.cc
 )
@@ -64,16 +62,8 @@ ADD_ARROW_TEST(feather-test)
 ARROW_TEST_LINK_LIBRARIES(feather-test
   ${ARROW_IPC_TEST_LINK_LIBS})
 
-ADD_ARROW_TEST(ipc-adapter-test)
-ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test
-  ${ARROW_IPC_TEST_LINK_LIBS})
-
-ADD_ARROW_TEST(ipc-file-test)
-ARROW_TEST_LINK_LIBRARIES(ipc-file-test
-  ${ARROW_IPC_TEST_LINK_LIBS})
-
-ADD_ARROW_TEST(ipc-metadata-test)
-ARROW_TEST_LINK_LIBRARIES(ipc-metadata-test
+ADD_ARROW_TEST(ipc-read-write-test)
+ARROW_TEST_LINK_LIBRARIES(ipc-read-write-test
   ${ARROW_IPC_TEST_LINK_LIBS})
 
 ADD_ARROW_TEST(ipc-json-test)
@@ -148,7 +138,6 @@ add_dependencies(arrow_ipc_objlib metadata_fbs)
 
 # Headers: top level
 install(FILES
-  adapter.h
   api.h
   feather.h
   json.h

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
deleted file mode 100644
index db9f63c..0000000
--- a/cpp/src/arrow/ipc/adapter.cc
+++ /dev/null
@@ -1,630 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/ipc/adapter.h"
-
-#include <algorithm>
-#include <cstdint>
-#include <cstring>
-#include <limits>
-#include <sstream>
-#include <vector>
-
-#include "arrow/array.h"
-#include "arrow/buffer.h"
-#include "arrow/io/interfaces.h"
-#include "arrow/io/memory.h"
-#include "arrow/ipc/Message_generated.h"
-#include "arrow/ipc/metadata-internal.h"
-#include "arrow/ipc/metadata.h"
-#include "arrow/ipc/util.h"
-#include "arrow/loader.h"
-#include "arrow/memory_pool.h"
-#include "arrow/schema.h"
-#include "arrow/status.h"
-#include "arrow/table.h"
-#include "arrow/type.h"
-#include "arrow/type_fwd.h"
-#include "arrow/util/bit-util.h"
-#include "arrow/util/logging.h"
-
-namespace arrow {
-
-namespace flatbuf = org::apache::arrow::flatbuf;
-
-namespace ipc {
-
-// ----------------------------------------------------------------------
-// Record batch write path
-
-class RecordBatchWriter : public ArrayVisitor {
- public:
-  RecordBatchWriter(
-      MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth)
-      : pool_(pool),
-        max_recursion_depth_(max_recursion_depth),
-        buffer_start_offset_(buffer_start_offset) {
-    DCHECK_GT(max_recursion_depth, 0);
-  }
-
-  virtual ~RecordBatchWriter() = default;
-
-  Status VisitArray(const Array& arr) {
-    if (max_recursion_depth_ <= 0) {
-      return Status::Invalid("Max recursion depth reached");
-    }
-
-    if (arr.length() > std::numeric_limits<int32_t>::max()) {
-      return Status::Invalid("Cannot write arrays larger than 2^31 - 1 in length");
-    }
-
-    // push back all common elements
-    field_nodes_.push_back(flatbuf::FieldNode(
-        static_cast<int32_t>(arr.length()), static_cast<int32_t>(arr.null_count())));
-    if (arr.null_count() > 0) {
-      std::shared_ptr<Buffer> bitmap = arr.null_bitmap();
-
-      if (arr.offset() != 0) {
-        // With a sliced array / non-zero offset, we must copy the bitmap
-        RETURN_NOT_OK(
-            CopyBitmap(pool_, bitmap->data(), arr.offset(), arr.length(), &bitmap));
-      }
-
-      buffers_.push_back(bitmap);
-    } else {
-      // Push a dummy zero-length buffer, not to be copied
-      buffers_.push_back(std::make_shared<Buffer>(nullptr, 0));
-    }
-    return arr.Accept(this);
-  }
-
-  Status Assemble(const RecordBatch& batch, int64_t* body_length) {
-    if (field_nodes_.size() > 0) {
-      field_nodes_.clear();
-      buffer_meta_.clear();
-      buffers_.clear();
-    }
-
-    // Perform depth-first traversal of the row-batch
-    for (int i = 0; i < batch.num_columns(); ++i) {
-      RETURN_NOT_OK(VisitArray(*batch.column(i)));
-    }
-
-    // The position for the start of a buffer relative to the passed frame of
-    // reference. May be 0 or some other position in an address space
-    int64_t offset = buffer_start_offset_;
-
-    // Construct the buffer metadata for the record batch header
-    for (size_t i = 0; i < buffers_.size(); ++i) {
-      const Buffer* buffer = buffers_[i].get();
-      int64_t size = 0;
-      int64_t padding = 0;
-
-      // The buffer might be null if we are handling zero row lengths.
-      if (buffer) {
-        size = buffer->size();
-        padding = BitUtil::RoundUpToMultipleOf64(size) - size;
-      }
-
-      // TODO(wesm): We currently have no notion of shared memory page id's,
-      // but we've included it in the metadata IDL for when we have it in the
-      // future. Use page = -1 for now
-      //
-      // Note that page ids are a bespoke notion for Arrow and not a feature we
-      // are using from any OS-level shared memory. The thought is that systems
-      // may (in the future) associate integer page id's with physical memory
-      // pages (according to whatever is the desired shared memory mechanism)
-      buffer_meta_.push_back(flatbuf::Buffer(-1, offset, size + padding));
-      offset += size + padding;
-    }
-
-    *body_length = offset - buffer_start_offset_;
-    DCHECK(BitUtil::IsMultipleOf64(*body_length));
-
-    return Status::OK();
-  }
-
-  // Override this for writing dictionary metadata
-  virtual Status WriteMetadataMessage(
-      int32_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) {
-    return WriteRecordBatchMessage(
-        num_rows, body_length, field_nodes_, buffer_meta_, out);
-  }
-
-  Status WriteMetadata(int32_t num_rows, int64_t body_length, io::OutputStream* dst,
-      int32_t* metadata_length) {
-    // Now that we have computed the locations of all of the buffers in shared
-    // memory, the data header can be converted to a flatbuffer and written out
-    //
-    // Note: The memory written here is prefixed by the size of the flatbuffer
-    // itself as an int32_t.
-    std::shared_ptr<Buffer> metadata_fb;
-    RETURN_NOT_OK(WriteMetadataMessage(num_rows, body_length, &metadata_fb));
-
-    // Need to write 4 bytes (metadata size), the metadata, plus padding to
-    // end on an 8-byte offset
-    int64_t start_offset;
-    RETURN_NOT_OK(dst->Tell(&start_offset));
-
-    int32_t padded_metadata_length = static_cast<int32_t>(metadata_fb->size()) + 4;
-    const int32_t remainder =
-        (padded_metadata_length + static_cast<int32_t>(start_offset)) % 8;
-    if (remainder != 0) { padded_metadata_length += 8 - remainder; }
-
-    // The returned metadata size includes the length prefix, the flatbuffer,
-    // plus padding
-    *metadata_length = padded_metadata_length;
-
-    // Write the flatbuffer size prefix including padding
-    int32_t flatbuffer_size = padded_metadata_length - 4;
-    RETURN_NOT_OK(
-        dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t)));
-
-    // Write the flatbuffer
-    RETURN_NOT_OK(dst->Write(metadata_fb->data(), metadata_fb->size()));
-
-    // Write any padding
-    int32_t padding =
-        padded_metadata_length - static_cast<int32_t>(metadata_fb->size()) - 4;
-    if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
-
-    return Status::OK();
-  }
-
-  Status Write(const RecordBatch& batch, io::OutputStream* dst, int32_t* metadata_length,
-      int64_t* body_length) {
-    RETURN_NOT_OK(Assemble(batch, body_length));
-
-#ifndef NDEBUG
-    int64_t start_position, current_position;
-    RETURN_NOT_OK(dst->Tell(&start_position));
-#endif
-
-    RETURN_NOT_OK(WriteMetadata(
-        static_cast<int32_t>(batch.num_rows()), *body_length, dst, metadata_length));
-
-#ifndef NDEBUG
-    RETURN_NOT_OK(dst->Tell(&current_position));
-    DCHECK(BitUtil::IsMultipleOf8(current_position));
-#endif
-
-    // Now write the buffers
-    for (size_t i = 0; i < buffers_.size(); ++i) {
-      const Buffer* buffer = buffers_[i].get();
-      int64_t size = 0;
-      int64_t padding = 0;
-
-      // The buffer might be null if we are handling zero row lengths.
-      if (buffer) {
-        size = buffer->size();
-        padding = BitUtil::RoundUpToMultipleOf64(size) - size;
-      }
-
-      if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); }
-
-      if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
-    }
-
-#ifndef NDEBUG
-    RETURN_NOT_OK(dst->Tell(&current_position));
-    DCHECK(BitUtil::IsMultipleOf8(current_position));
-#endif
-
-    return Status::OK();
-  }
-
-  Status GetTotalSize(const RecordBatch& batch, int64_t* size) {
-    // emulates the behavior of Write without actually writing
-    int32_t metadata_length = 0;
-    int64_t body_length = 0;
-    MockOutputStream dst;
-    RETURN_NOT_OK(Write(batch, &dst, &metadata_length, &body_length));
-    *size = dst.GetExtentBytesWritten();
-    return Status::OK();
-  }
-
- protected:
-  template <typename ArrayType>
-  Status VisitFixedWidth(const ArrayType& array) {
-    std::shared_ptr<Buffer> data_buffer = array.data();
-
-    if (array.offset() != 0) {
-      // Non-zero offset, slice the buffer
-      const auto& fw_type = static_cast<const FixedWidthType&>(*array.type());
-      const int type_width = fw_type.bit_width() / 8;
-      const int64_t byte_offset = array.offset() * type_width;
-
-      // Send padding if it's available
-      const int64_t buffer_length =
-          std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width),
-              data_buffer->size() - byte_offset);
-      data_buffer = SliceBuffer(data_buffer, byte_offset, buffer_length);
-    }
-    buffers_.push_back(data_buffer);
-    return Status::OK();
-  }
-
-  template <typename ArrayType>
-  Status GetZeroBasedValueOffsets(
-      const ArrayType& array, std::shared_ptr<Buffer>* value_offsets) {
-    // Share slicing logic between ListArray and BinaryArray
-
-    auto offsets = array.value_offsets();
-
-    if (array.offset() != 0) {
-      // If we have a non-zero offset, then the value offsets do not start at
-      // zero. We must a) create a new offsets array with shifted offsets and
-      // b) slice the values array accordingly
-
-      std::shared_ptr<MutableBuffer> shifted_offsets;
-      RETURN_NOT_OK(AllocateBuffer(
-          pool_, sizeof(int32_t) * (array.length() + 1), &shifted_offsets));
-
-      int32_t* dest_offsets = reinterpret_cast<int32_t*>(shifted_offsets->mutable_data());
-      const int32_t start_offset = array.value_offset(0);
-
-      for (int i = 0; i < array.length(); ++i) {
-        dest_offsets[i] = array.value_offset(i) - start_offset;
-      }
-      // Final offset
-      dest_offsets[array.length()] = array.value_offset(array.length()) - start_offset;
-      offsets = shifted_offsets;
-    }
-
-    *value_offsets = offsets;
-    return Status::OK();
-  }
-
-  Status VisitBinary(const BinaryArray& array) {
-    std::shared_ptr<Buffer> value_offsets;
-    RETURN_NOT_OK(GetZeroBasedValueOffsets<BinaryArray>(array, &value_offsets));
-    auto data = array.data();
-
-    if (array.offset() != 0) {
-      // Slice the data buffer to include only the range we need now
-      data = SliceBuffer(data, array.value_offset(0), array.value_offset(array.length()));
-    }
-
-    buffers_.push_back(value_offsets);
-    buffers_.push_back(data);
-    return Status::OK();
-  }
-
-  Status Visit(const FixedWidthBinaryArray& array) override {
-    auto data = array.data();
-    int32_t width = array.byte_width();
-
-    if (array.offset() != 0) {
-      data = SliceBuffer(data, array.offset() * width, width * array.length());
-    }
-    buffers_.push_back(data);
-    return Status::OK();
-  }
-
-  Status Visit(const BooleanArray& array) override {
-    buffers_.push_back(array.data());
-    return Status::OK();
-  }
-
-#define VISIT_FIXED_WIDTH(TYPE) \
-  Status Visit(const TYPE& array) override { return VisitFixedWidth<TYPE>(array); }
-
-  VISIT_FIXED_WIDTH(Int8Array);
-  VISIT_FIXED_WIDTH(Int16Array);
-  VISIT_FIXED_WIDTH(Int32Array);
-  VISIT_FIXED_WIDTH(Int64Array);
-  VISIT_FIXED_WIDTH(UInt8Array);
-  VISIT_FIXED_WIDTH(UInt16Array);
-  VISIT_FIXED_WIDTH(UInt32Array);
-  VISIT_FIXED_WIDTH(UInt64Array);
-  VISIT_FIXED_WIDTH(HalfFloatArray);
-  VISIT_FIXED_WIDTH(FloatArray);
-  VISIT_FIXED_WIDTH(DoubleArray);
-  VISIT_FIXED_WIDTH(DateArray);
-  VISIT_FIXED_WIDTH(Date32Array);
-  VISIT_FIXED_WIDTH(TimeArray);
-  VISIT_FIXED_WIDTH(TimestampArray);
-
-#undef VISIT_FIXED_WIDTH
-
-  Status Visit(const StringArray& array) override { return VisitBinary(array); }
-
-  Status Visit(const BinaryArray& array) override { return VisitBinary(array); }
-
-  Status Visit(const ListArray& array) override {
-    std::shared_ptr<Buffer> value_offsets;
-    RETURN_NOT_OK(GetZeroBasedValueOffsets<ListArray>(array, &value_offsets));
-    buffers_.push_back(value_offsets);
-
-    --max_recursion_depth_;
-    std::shared_ptr<Array> values = array.values();
-
-    if (array.offset() != 0) {
-      // For non-zero offset, we slice the values array accordingly
-      const int32_t offset = array.value_offset(0);
-      const int32_t length = array.value_offset(array.length()) - offset;
-      values = values->Slice(offset, length);
-    }
-    RETURN_NOT_OK(VisitArray(*values));
-    ++max_recursion_depth_;
-    return Status::OK();
-  }
-
-  Status Visit(const StructArray& array) override {
-    --max_recursion_depth_;
-    for (std::shared_ptr<Array> field : array.fields()) {
-      if (array.offset() != 0) {
-        // If offset is non-zero, slice the child array
-        field = field->Slice(array.offset(), array.length());
-      }
-      RETURN_NOT_OK(VisitArray(*field));
-    }
-    ++max_recursion_depth_;
-    return Status::OK();
-  }
-
-  Status Visit(const UnionArray& array) override {
-    auto type_ids = array.type_ids();
-    if (array.offset() != 0) {
-      type_ids = SliceBuffer(type_ids, array.offset() * sizeof(UnionArray::type_id_t),
-          array.length() * sizeof(UnionArray::type_id_t));
-    }
-
-    buffers_.push_back(type_ids);
-
-    --max_recursion_depth_;
-    if (array.mode() == UnionMode::DENSE) {
-      const auto& type = static_cast<const UnionType&>(*array.type());
-      auto value_offsets = array.value_offsets();
-
-      // The Union type codes are not necessary 0-indexed
-      uint8_t max_code = 0;
-      for (uint8_t code : type.type_codes) {
-        if (code > max_code) { max_code = code; }
-      }
-
-      // Allocate an array of child offsets. Set all to -1 to indicate that we
-      // haven't observed a first occurrence of a particular child yet
-      std::vector<int32_t> child_offsets(max_code + 1);
-      std::vector<int32_t> child_lengths(max_code + 1, 0);
-
-      if (array.offset() != 0) {
-        // This is an unpleasant case. Because the offsets are different for
-        // each child array, when we have a sliced array, we need to "rebase"
-        // the value_offsets for each array
-
-        const int32_t* unshifted_offsets = array.raw_value_offsets();
-        const uint8_t* type_ids = array.raw_type_ids();
-
-        // Allocate the shifted offsets
-        std::shared_ptr<MutableBuffer> shifted_offsets_buffer;
-        RETURN_NOT_OK(AllocateBuffer(
-            pool_, array.length() * sizeof(int32_t), &shifted_offsets_buffer));
-        int32_t* shifted_offsets =
-            reinterpret_cast<int32_t*>(shifted_offsets_buffer->mutable_data());
-
-        for (int64_t i = 0; i < array.length(); ++i) {
-          const uint8_t code = type_ids[i];
-          int32_t shift = child_offsets[code];
-          if (shift == -1) { child_offsets[code] = shift = unshifted_offsets[i]; }
-          shifted_offsets[i] = unshifted_offsets[i] - shift;
-
-          // Update the child length to account for observed value
-          ++child_lengths[code];
-        }
-
-        value_offsets = shifted_offsets_buffer;
-      }
-      buffers_.push_back(value_offsets);
-
-      // Visit children and slice accordingly
-      for (int i = 0; i < type.num_children(); ++i) {
-        std::shared_ptr<Array> child = array.child(i);
-        if (array.offset() != 0) {
-          const uint8_t code = type.type_codes[i];
-          child = child->Slice(child_offsets[code], child_lengths[code]);
-        }
-        RETURN_NOT_OK(VisitArray(*child));
-      }
-    } else {
-      for (std::shared_ptr<Array> child : array.children()) {
-        // Sparse union, slicing is simpler
-        if (array.offset() != 0) {
-          // If offset is non-zero, slice the child array
-          child = child->Slice(array.offset(), array.length());
-        }
-        RETURN_NOT_OK(VisitArray(*child));
-      }
-    }
-    ++max_recursion_depth_;
-    return Status::OK();
-  }
-
-  Status Visit(const DictionaryArray& array) override {
-    // Dictionary written out separately. Slice offset contained in the indices
-    return array.indices()->Accept(this);
-  }
-
-  // In some cases, intermediate buffers may need to be allocated (with sliced arrays)
-  MemoryPool* pool_;
-
-  std::vector<flatbuf::FieldNode> field_nodes_;
-  std::vector<flatbuf::Buffer> buffer_meta_;
-  std::vector<std::shared_ptr<Buffer>> buffers_;
-
-  int64_t max_recursion_depth_;
-  int64_t buffer_start_offset_;
-};
-
-class DictionaryWriter : public RecordBatchWriter {
- public:
-  using RecordBatchWriter::RecordBatchWriter;
-
-  Status WriteMetadataMessage(
-      int32_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override {
-    return WriteDictionaryMessage(
-        dictionary_id_, num_rows, body_length, field_nodes_, buffer_meta_, out);
-  }
-
-  Status Write(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
-      io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
-    dictionary_id_ = dictionary_id;
-
-    // Make a dummy record batch. A bit tedious as we have to make a schema
-    std::vector<std::shared_ptr<Field>> fields = {
-        arrow::field("dictionary", dictionary->type())};
-    auto schema = std::make_shared<Schema>(fields);
-    RecordBatch batch(schema, dictionary->length(), {dictionary});
-
-    return RecordBatchWriter::Write(batch, dst, metadata_length, body_length);
-  }
-
- private:
-  // TODO(wesm): Setting this in Write is a bit unclean, but it works
-  int64_t dictionary_id_;
-};
-
-Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
-    io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
-    MemoryPool* pool, int max_recursion_depth) {
-  RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth);
-  return writer.Write(batch, dst, metadata_length, body_length);
-}
-
-Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
-    int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
-    int64_t* body_length, MemoryPool* pool) {
-  DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth);
-  return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length);
-}
-
-Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
-  RecordBatchWriter writer(default_memory_pool(), 0, kMaxNestingDepth);
-  RETURN_NOT_OK(writer.GetTotalSize(batch, size));
-  return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-// Record batch read path
-
-class IpcComponentSource : public ArrayComponentSource {
- public:
-  IpcComponentSource(const RecordBatchMetadata& metadata, io::RandomAccessFile* file)
-      : metadata_(metadata), file_(file) {}
-
-  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override {
-    BufferMetadata buffer_meta = metadata_.buffer(buffer_index);
-    if (buffer_meta.length == 0) {
-      *out = nullptr;
-      return Status::OK();
-    } else {
-      return file_->ReadAt(buffer_meta.offset, buffer_meta.length, out);
-    }
-  }
-
-  Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override {
-    // pop off a field
-    if (field_index >= metadata_.num_fields()) {
-      return Status::Invalid("Ran out of field metadata, likely malformed");
-    }
-    *metadata = metadata_.field(field_index);
-    return Status::OK();
-  }
-
- private:
-  const RecordBatchMetadata& metadata_;
-  io::RandomAccessFile* file_;
-};
-
-class RecordBatchReader {
- public:
-  RecordBatchReader(const RecordBatchMetadata& metadata,
-      const std::shared_ptr<Schema>& schema, int max_recursion_depth,
-      io::RandomAccessFile* file)
-      : metadata_(metadata),
-        schema_(schema),
-        max_recursion_depth_(max_recursion_depth),
-        file_(file) {}
-
-  Status Read(std::shared_ptr<RecordBatch>* out) {
-    std::vector<std::shared_ptr<Array>> arrays(schema_->num_fields());
-
-    IpcComponentSource source(metadata_, file_);
-    ArrayLoaderContext context;
-    context.source = &source;
-    context.field_index = 0;
-    context.buffer_index = 0;
-    context.max_recursion_depth = max_recursion_depth_;
-
-    for (int i = 0; i < schema_->num_fields(); ++i) {
-      RETURN_NOT_OK(LoadArray(schema_->field(i)->type, &context, &arrays[i]));
-    }
-
-    *out = std::make_shared<RecordBatch>(schema_, metadata_.length(), arrays);
-    return Status::OK();
-  }
-
- private:
-  const RecordBatchMetadata& metadata_;
-  std::shared_ptr<Schema> schema_;
-  int max_recursion_depth_;
-  io::RandomAccessFile* file_;
-};
-
-Status ReadRecordBatch(const RecordBatchMetadata& metadata,
-    const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file,
-    std::shared_ptr<RecordBatch>* out) {
-  return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out);
-}
-
-Status ReadRecordBatch(const RecordBatchMetadata& metadata,
-    const std::shared_ptr<Schema>& schema, int max_recursion_depth,
-    io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
-  RecordBatchReader reader(metadata, schema, max_recursion_depth, file);
-  return reader.Read(out);
-}
-
-Status ReadDictionary(const DictionaryBatchMetadata& metadata,
-    const DictionaryTypeMap& dictionary_types, io::RandomAccessFile* file,
-    std::shared_ptr<Array>* out) {
-  int64_t id = metadata.id();
-  auto it = dictionary_types.find(id);
-  if (it == dictionary_types.end()) {
-    std::stringstream ss;
-    ss << "Do not have type metadata for dictionary with id: " << id;
-    return Status::KeyError(ss.str());
-  }
-
-  std::vector<std::shared_ptr<Field>> fields = {it->second};
-
-  // We need a schema for the record batch
-  auto dummy_schema = std::make_shared<Schema>(fields);
-
-  // The dictionary is embedded in a record batch with a single column
-  std::shared_ptr<RecordBatch> batch;
-  RETURN_NOT_OK(ReadRecordBatch(metadata.record_batch(), dummy_schema, file, &batch));
-
-  if (batch->num_columns() != 1) {
-    return Status::Invalid("Dictionary record batch must only contain one field");
-  }
-
-  *out = batch->column(0);
-  return Status::OK();
-}
-
-}  // namespace ipc
-}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
deleted file mode 100644
index cea4686..0000000
--- a/cpp/src/arrow/ipc/adapter.h
+++ /dev/null
@@ -1,104 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Public API for writing and accessing (with zero copy, if possible) Arrow
-// IPC binary formatted data (e.g. in shared memory, or from some other IO source)
-
-#ifndef ARROW_IPC_ADAPTER_H
-#define ARROW_IPC_ADAPTER_H
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-#include "arrow/ipc/metadata.h"
-#include "arrow/loader.h"
-#include "arrow/util/visibility.h"
-
-namespace arrow {
-
-class Array;
-class MemoryPool;
-class RecordBatch;
-class Schema;
-class Status;
-
-namespace io {
-
-class RandomAccessFile;
-class OutputStream;
-
-}  // namespace io
-
-namespace ipc {
-
-// ----------------------------------------------------------------------
-// Write path
-
-// Write the RecordBatch (collection of equal-length Arrow arrays) to the
-// output stream in a contiguous block. The record batch metadata is written as
-// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
-// prefixed by its size, followed by each of the memory buffers in the batch
-// written end to end (with appropriate alignment and padding):
-//
-// <int32: metadata size> <uint8*: metadata> <buffers>
-//
-// Finally, the absolute offsets (relative to the start of the output stream)
-// to the end of the body and end of the metadata / data header (suffixed by
-// the header size) is returned in out-variables
-//
-// @param(in) buffer_start_offset: the start offset to use in the buffer metadata,
-// default should be 0
-//
-// @param(out) metadata_length: the size of the length-prefixed flatbuffer
-// including padding to a 64-byte boundary
-//
-// @param(out) body_length: the size of the contiguous buffer block plus
-// padding bytes
-Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
-    io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
-    MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth);
-
-// Write Array as a DictionaryBatch message
-Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
-    int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
-    int64_t* body_length, MemoryPool* pool);
-
-// Compute the precise number of bytes needed in a contiguous memory segment to
-// write the record batch. This involves generating the complete serialized
-// Flatbuffers metadata.
-Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
-
-// ----------------------------------------------------------------------
-// "Read" path; does not copy data if the input supports zero copy reads
-
-Status ReadRecordBatch(const RecordBatchMetadata& metadata,
-    const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file,
-    std::shared_ptr<RecordBatch>* out);
-
-Status ReadRecordBatch(const RecordBatchMetadata& metadata,
-    const std::shared_ptr<Schema>& schema, int max_recursion_depth,
-    io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
-
-Status ReadDictionary(const DictionaryBatchMetadata& metadata,
-    const DictionaryTypeMap& dictionary_types, io::RandomAccessFile* file,
-    std::shared_ptr<Array>* out);
-
-}  // namespace ipc
-}  // namespace arrow
-
-#endif  // ARROW_IPC_MEMORY_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/api.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/api.h b/cpp/src/arrow/ipc/api.h
index ad7cd84..3f05e69 100644
--- a/cpp/src/arrow/ipc/api.h
+++ b/cpp/src/arrow/ipc/api.h
@@ -18,7 +18,6 @@
 #ifndef ARROW_IPC_API_H
 #define ARROW_IPC_API_H
 
-#include "arrow/ipc/adapter.h"
 #include "arrow/ipc/feather.h"
 #include "arrow/ipc/json.h"
 #include "arrow/ipc/metadata.h"

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
deleted file mode 100644
index 638d98a..0000000
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ /dev/null
@@ -1,320 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdint>
-#include <cstdio>
-#include <cstring>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "gtest/gtest.h"
-
-#include "arrow/io/memory.h"
-#include "arrow/io/test-common.h"
-#include "arrow/ipc/adapter.h"
-#include "arrow/ipc/metadata.h"
-#include "arrow/ipc/test-common.h"
-#include "arrow/ipc/util.h"
-
-#include "arrow/buffer.h"
-#include "arrow/memory_pool.h"
-#include "arrow/pretty_print.h"
-#include "arrow/status.h"
-#include "arrow/test-util.h"
-#include "arrow/util/bit-util.h"
-
-namespace arrow {
-namespace ipc {
-
-class IpcTestFixture : public io::MemoryMapFixture {
- public:
-  Status RoundTripHelper(const RecordBatch& batch, int memory_map_size,
-      std::shared_ptr<RecordBatch>* batch_result) {
-    std::string path = "test-write-row-batch";
-    io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
-
-    int32_t metadata_length;
-    int64_t body_length;
-
-    const int64_t buffer_offset = 0;
-
-    RETURN_NOT_OK(WriteRecordBatch(
-        batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
-
-    std::shared_ptr<Message> message;
-    RETURN_NOT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
-    auto metadata = std::make_shared<RecordBatchMetadata>(message);
-
-    // The buffer offsets start at 0, so we must construct a
-    // RandomAccessFile according to that frame of reference
-    std::shared_ptr<Buffer> buffer_payload;
-    RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, &buffer_payload));
-    io::BufferReader buffer_reader(buffer_payload);
-
-    return ReadRecordBatch(*metadata, batch.schema(), &buffer_reader, batch_result);
-  }
-
-  void CheckRoundtrip(const RecordBatch& batch, int64_t buffer_size) {
-    std::shared_ptr<RecordBatch> batch_result;
-
-    ASSERT_OK(RoundTripHelper(batch, 1 << 16, &batch_result));
-    EXPECT_EQ(batch.num_rows(), batch_result->num_rows());
-
-    ASSERT_TRUE(batch.schema()->Equals(batch_result->schema()));
-    ASSERT_EQ(batch.num_columns(), batch_result->num_columns())
-        << batch.schema()->ToString()
-        << " result: " << batch_result->schema()->ToString();
-
-    for (int i = 0; i < batch.num_columns(); ++i) {
-      const auto& left = *batch.column(i);
-      const auto& right = *batch_result->column(i);
-      if (!left.Equals(right)) {
-        std::stringstream pp_result;
-        std::stringstream pp_expected;
-
-        ASSERT_OK(PrettyPrint(left, 0, &pp_expected));
-        ASSERT_OK(PrettyPrint(right, 0, &pp_result));
-
-        FAIL() << "Index: " << i << " Expected: " << pp_expected.str()
-               << "\nGot: " << pp_result.str();
-      }
-    }
-  }
-
-  void CheckRoundtrip(const std::shared_ptr<Array>& array, int64_t buffer_size) {
-    auto f0 = arrow::field("f0", array->type());
-    std::vector<std::shared_ptr<Field>> fields = {f0};
-    auto schema = std::make_shared<Schema>(fields);
-
-    RecordBatch batch(schema, 0, {array});
-    CheckRoundtrip(batch, buffer_size);
-  }
-
- protected:
-  std::shared_ptr<io::MemoryMappedFile> mmap_;
-  MemoryPool* pool_;
-};
-
-class TestWriteRecordBatch : public ::testing::Test, public IpcTestFixture {
- public:
-  void SetUp() { pool_ = default_memory_pool(); }
-  void TearDown() { io::MemoryMapFixture::TearDown(); }
-};
-
-class TestRecordBatchParam : public ::testing::TestWithParam<MakeRecordBatch*>,
-                             public IpcTestFixture {
- public:
-  void SetUp() { pool_ = default_memory_pool(); }
-  void TearDown() { io::MemoryMapFixture::TearDown(); }
-  using IpcTestFixture::RoundTripHelper;
-  using IpcTestFixture::CheckRoundtrip;
-};
-
-TEST_P(TestRecordBatchParam, RoundTrip) {
-  std::shared_ptr<RecordBatch> batch;
-  ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
-
-  CheckRoundtrip(*batch, 1 << 20);
-}
-
-TEST_P(TestRecordBatchParam, SliceRoundTrip) {
-  std::shared_ptr<RecordBatch> batch;
-  ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
-
-  // Skip the zero-length case
-  if (batch->num_rows() < 2) { return; }
-
-  auto sliced_batch = batch->Slice(2, 10);
-  CheckRoundtrip(*sliced_batch, 1 << 20);
-}
-
-TEST_P(TestRecordBatchParam, ZeroLengthArrays) {
-  std::shared_ptr<RecordBatch> batch;
-  ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
-
-  std::shared_ptr<RecordBatch> zero_length_batch;
-  if (batch->num_rows() > 2) {
-    zero_length_batch = batch->Slice(2, 0);
-  } else {
-    zero_length_batch = batch->Slice(0, 0);
-  }
-
-  CheckRoundtrip(*zero_length_batch, 1 << 20);
-
-  // ARROW-544: check binary array
-  std::shared_ptr<MutableBuffer> value_offsets;
-  ASSERT_OK(AllocateBuffer(pool_, sizeof(int32_t), &value_offsets));
-  *reinterpret_cast<int32_t*>(value_offsets->mutable_data()) = 0;
-
-  std::shared_ptr<Array> bin_array = std::make_shared<BinaryArray>(0, value_offsets,
-      std::make_shared<Buffer>(nullptr, 0), std::make_shared<Buffer>(nullptr, 0));
-
-  // null value_offsets
-  std::shared_ptr<Array> bin_array2 = std::make_shared<BinaryArray>(0, nullptr, nullptr);
-
-  CheckRoundtrip(bin_array, 1 << 20);
-  CheckRoundtrip(bin_array2, 1 << 20);
-}
-
-INSTANTIATE_TEST_CASE_P(
-    RoundTripTests, TestRecordBatchParam,
-    ::testing::Values(&MakeIntRecordBatch, &MakeStringTypesRecordBatch,
-        &MakeNonNullRecordBatch, &MakeZeroLengthRecordBatch, &MakeListRecordBatch,
-        &MakeDeeplyNestedList, &MakeStruct, &MakeUnion, &MakeDictionary, &MakeDate,
-        &MakeTimestamps, &MakeTimes, &MakeFWBinary));
-
-void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
-  ipc::MockOutputStream mock;
-  int32_t mock_metadata_length = -1;
-  int64_t mock_body_length = -1;
-  int64_t size = -1;
-  ASSERT_OK(WriteRecordBatch(
-      *batch, 0, &mock, &mock_metadata_length, &mock_body_length, default_memory_pool()));
-  ASSERT_OK(GetRecordBatchSize(*batch, &size));
-  ASSERT_EQ(mock.GetExtentBytesWritten(), size);
-}
-
-TEST_F(TestWriteRecordBatch, IntegerGetRecordBatchSize) {
-  std::shared_ptr<RecordBatch> batch;
-
-  ASSERT_OK(MakeIntRecordBatch(&batch));
-  TestGetRecordBatchSize(batch);
-
-  ASSERT_OK(MakeListRecordBatch(&batch));
-  TestGetRecordBatchSize(batch);
-
-  ASSERT_OK(MakeZeroLengthRecordBatch(&batch));
-  TestGetRecordBatchSize(batch);
-
-  ASSERT_OK(MakeNonNullRecordBatch(&batch));
-  TestGetRecordBatchSize(batch);
-
-  ASSERT_OK(MakeDeeplyNestedList(&batch));
-  TestGetRecordBatchSize(batch);
-}
-
-class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
- public:
-  void SetUp() { pool_ = default_memory_pool(); }
-  void TearDown() { io::MemoryMapFixture::TearDown(); }
-
-  Status WriteToMmap(int recursion_level, bool override_level, int32_t* metadata_length,
-      int64_t* body_length, std::shared_ptr<RecordBatch>* batch,
-      std::shared_ptr<Schema>* schema) {
-    const int batch_length = 5;
-    TypePtr type = int32();
-    std::shared_ptr<Array> array;
-    const bool include_nulls = true;
-    RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array));
-    for (int i = 0; i < recursion_level; ++i) {
-      type = list(type);
-      RETURN_NOT_OK(
-          MakeRandomListArray(array, batch_length, include_nulls, pool_, &array));
-    }
-
-    auto f0 = field("f0", type);
-
-    *schema = std::shared_ptr<Schema>(new Schema({f0}));
-
-    std::vector<std::shared_ptr<Array>> arrays = {array};
-    *batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays);
-
-    std::string path = "test-write-past-max-recursion";
-    const int memory_map_size = 1 << 20;
-    io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
-
-    if (override_level) {
-      return WriteRecordBatch(**batch, 0, mmap_.get(), metadata_length, body_length,
-          pool_, recursion_level + 1);
-    } else {
-      return WriteRecordBatch(
-          **batch, 0, mmap_.get(), metadata_length, body_length, pool_);
-    }
-  }
-
- protected:
-  std::shared_ptr<io::MemoryMappedFile> mmap_;
-  MemoryPool* pool_;
-};
-
-TEST_F(RecursionLimits, WriteLimit) {
-  int32_t metadata_length = -1;
-  int64_t body_length = -1;
-  std::shared_ptr<Schema> schema;
-  std::shared_ptr<RecordBatch> batch;
-  ASSERT_RAISES(Invalid,
-      WriteToMmap((1 << 8) + 1, false, &metadata_length, &body_length, &batch, &schema));
-}
-
-TEST_F(RecursionLimits, ReadLimit) {
-  int32_t metadata_length = -1;
-  int64_t body_length = -1;
-  std::shared_ptr<Schema> schema;
-
-  const int recursion_depth = 64;
-
-  std::shared_ptr<RecordBatch> batch;
-  ASSERT_OK(WriteToMmap(
-      recursion_depth, true, &metadata_length, &body_length, &batch, &schema));
-
-  std::shared_ptr<Message> message;
-  ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
-  auto metadata = std::make_shared<RecordBatchMetadata>(message);
-
-  std::shared_ptr<Buffer> payload;
-  ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
-
-  io::BufferReader reader(payload);
-
-  std::shared_ptr<RecordBatch> result;
-  ASSERT_RAISES(Invalid, ReadRecordBatch(*metadata, schema, &reader, &result));
-}
-
-TEST_F(RecursionLimits, StressLimit) {
-  auto CheckDepth = [this](int recursion_depth, bool* it_works) {
-    int32_t metadata_length = -1;
-    int64_t body_length = -1;
-    std::shared_ptr<Schema> schema;
-    std::shared_ptr<RecordBatch> batch;
-    ASSERT_OK(WriteToMmap(
-        recursion_depth, true, &metadata_length, &body_length, &batch, &schema));
-
-    std::shared_ptr<Message> message;
-    ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
-    auto metadata = std::make_shared<RecordBatchMetadata>(message);
-
-    std::shared_ptr<Buffer> payload;
-    ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
-
-    io::BufferReader reader(payload);
-
-    std::shared_ptr<RecordBatch> result;
-    ASSERT_OK(ReadRecordBatch(*metadata, schema, recursion_depth + 1, &reader, &result));
-    *it_works = result->Equals(*batch);
-  };
-
-  bool it_works = false;
-  CheckDepth(100, &it_works);
-  ASSERT_TRUE(it_works);
-
-  CheckDepth(500, &it_works);
-  ASSERT_TRUE(it_works);
-}
-
-}  // namespace ipc
-}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/ipc-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc
deleted file mode 100644
index b457822..0000000
--- a/cpp/src/arrow/ipc/ipc-file-test.cc
+++ /dev/null
@@ -1,228 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdint>
-#include <cstdio>
-#include <cstring>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "gtest/gtest.h"
-
-#include "arrow/array.h"
-#include "arrow/io/memory.h"
-#include "arrow/io/test-common.h"
-#include "arrow/ipc/adapter.h"
-#include "arrow/ipc/reader.h"
-#include "arrow/ipc/test-common.h"
-#include "arrow/ipc/util.h"
-#include "arrow/ipc/writer.h"
-
-#include "arrow/buffer.h"
-#include "arrow/memory_pool.h"
-#include "arrow/status.h"
-#include "arrow/test-util.h"
-#include "arrow/util/bit-util.h"
-
-namespace arrow {
-namespace ipc {
-
-void CompareBatch(const RecordBatch& left, const RecordBatch& right) {
-  if (!left.schema()->Equals(right.schema())) {
-    FAIL() << "Left schema: " << left.schema()->ToString()
-           << "\nRight schema: " << right.schema()->ToString();
-  }
-  ASSERT_EQ(left.num_columns(), right.num_columns())
-      << left.schema()->ToString() << " result: " << right.schema()->ToString();
-  EXPECT_EQ(left.num_rows(), right.num_rows());
-  for (int i = 0; i < left.num_columns(); ++i) {
-    EXPECT_TRUE(left.column(i)->Equals(right.column(i)))
-        << "Idx: " << i << " Name: " << left.column_name(i);
-  }
-}
-
-using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
-
-class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
- public:
-  void SetUp() {
-    pool_ = default_memory_pool();
-    buffer_ = std::make_shared<PoolBuffer>(pool_);
-    sink_.reset(new io::BufferOutputStream(buffer_));
-  }
-  void TearDown() {}
-
-  Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) {
-    // Write the file
-    std::shared_ptr<FileWriter> writer;
-    RETURN_NOT_OK(FileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer));
-
-    const int num_batches = static_cast<int>(in_batches.size());
-
-    for (const auto& batch : in_batches) {
-      RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
-    }
-    RETURN_NOT_OK(writer->Close());
-    RETURN_NOT_OK(sink_->Close());
-
-    // Current offset into stream is the end of the file
-    int64_t footer_offset;
-    RETURN_NOT_OK(sink_->Tell(&footer_offset));
-
-    // Open the file
-    auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
-    std::shared_ptr<FileReader> reader;
-    RETURN_NOT_OK(FileReader::Open(buf_reader, footer_offset, &reader));
-
-    EXPECT_EQ(num_batches, reader->num_record_batches());
-    for (int i = 0; i < num_batches; ++i) {
-      std::shared_ptr<RecordBatch> chunk;
-      RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
-      out_batches->emplace_back(chunk);
-    }
-
-    return Status::OK();
-  }
-
- protected:
-  MemoryPool* pool_;
-
-  std::unique_ptr<io::BufferOutputStream> sink_;
-  std::shared_ptr<PoolBuffer> buffer_;
-};
-
-TEST_P(TestFileFormat, RoundTrip) {
-  std::shared_ptr<RecordBatch> batch1;
-  std::shared_ptr<RecordBatch> batch2;
-  ASSERT_OK((*GetParam())(&batch1));  // NOLINT clang-tidy gtest issue
-  ASSERT_OK((*GetParam())(&batch2));  // NOLINT clang-tidy gtest issue
-
-  std::vector<std::shared_ptr<RecordBatch>> in_batches = {batch1, batch2};
-  std::vector<std::shared_ptr<RecordBatch>> out_batches;
-
-  ASSERT_OK(RoundTripHelper(in_batches, &out_batches));
-
-  // Compare batches
-  for (size_t i = 0; i < in_batches.size(); ++i) {
-    CompareBatch(*in_batches[i], *out_batches[i]);
-  }
-}
-
-class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
- public:
-  void SetUp() {
-    pool_ = default_memory_pool();
-    buffer_ = std::make_shared<PoolBuffer>(pool_);
-    sink_.reset(new io::BufferOutputStream(buffer_));
-  }
-  void TearDown() {}
-
-  Status RoundTripHelper(
-      const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* out_batches) {
-    // Write the file
-    std::shared_ptr<StreamWriter> writer;
-    RETURN_NOT_OK(StreamWriter::Open(sink_.get(), batch.schema(), &writer));
-    int num_batches = 5;
-    for (int i = 0; i < num_batches; ++i) {
-      RETURN_NOT_OK(writer->WriteRecordBatch(batch));
-    }
-    RETURN_NOT_OK(writer->Close());
-    RETURN_NOT_OK(sink_->Close());
-
-    // Open the file
-    auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
-
-    std::shared_ptr<StreamReader> reader;
-    RETURN_NOT_OK(StreamReader::Open(buf_reader, &reader));
-
-    std::shared_ptr<RecordBatch> chunk;
-    while (true) {
-      RETURN_NOT_OK(reader->GetNextRecordBatch(&chunk));
-      if (chunk == nullptr) { break; }
-      out_batches->emplace_back(chunk);
-    }
-    return Status::OK();
-  }
-
- protected:
-  MemoryPool* pool_;
-
-  std::unique_ptr<io::BufferOutputStream> sink_;
-  std::shared_ptr<PoolBuffer> buffer_;
-};
-
-TEST_P(TestStreamFormat, RoundTrip) {
-  std::shared_ptr<RecordBatch> batch;
-  ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
-
-  std::vector<std::shared_ptr<RecordBatch>> out_batches;
-
-  ASSERT_OK(RoundTripHelper(*batch, &out_batches));
-
-  // Compare batches. Same
-  for (size_t i = 0; i < out_batches.size(); ++i) {
-    CompareBatch(*batch, *out_batches[i]);
-  }
-}
-
-#define BATCH_CASES()                                                                   \
-  ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, \
-      &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, &MakeStringTypesRecordBatch,   \
-      &MakeStruct, &MakeUnion, &MakeDictionary, &MakeDate, &MakeTimestamps, &MakeTimes, \
-      &MakeFWBinary);
-
-INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES());
-INSTANTIATE_TEST_CASE_P(StreamRoundTripTests, TestStreamFormat, BATCH_CASES());
-
-void CheckBatchDictionaries(const RecordBatch& batch) {
-  // Check that dictionaries that should be the same are the same
-  auto schema = batch.schema();
-
-  const auto& t0 = static_cast<const DictionaryType&>(*schema->field(0)->type);
-  const auto& t1 = static_cast<const DictionaryType&>(*schema->field(1)->type);
-
-  ASSERT_EQ(t0.dictionary().get(), t1.dictionary().get());
-
-  // Same dictionary used for list values
-  const auto& t3 = static_cast<const ListType&>(*schema->field(3)->type);
-  const auto& t3_value = static_cast<const DictionaryType&>(*t3.value_type());
-  ASSERT_EQ(t0.dictionary().get(), t3_value.dictionary().get());
-}
-
-TEST_F(TestStreamFormat, DictionaryRoundTrip) {
-  std::shared_ptr<RecordBatch> batch;
-  ASSERT_OK(MakeDictionary(&batch));
-
-  std::vector<std::shared_ptr<RecordBatch>> out_batches;
-  ASSERT_OK(RoundTripHelper(*batch, &out_batches));
-
-  CheckBatchDictionaries(*out_batches[0]);
-}
-
-TEST_F(TestFileFormat, DictionaryRoundTrip) {
-  std::shared_ptr<RecordBatch> batch;
-  ASSERT_OK(MakeDictionary(&batch));
-
-  std::vector<std::shared_ptr<RecordBatch>> out_batches;
-  ASSERT_OK(RoundTripHelper({batch}, &out_batches));
-
-  CheckBatchDictionaries(*out_batches[0]);
-}
-
-}  // namespace ipc
-}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/ipc-metadata-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc
deleted file mode 100644
index 4fb3204..0000000
--- a/cpp/src/arrow/ipc/ipc-metadata-test.cc
+++ /dev/null
@@ -1,100 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <memory>
-#include <sstream>
-#include <string>
-
-#include "gtest/gtest.h"
-
-#include "arrow/io/memory.h"
-#include "arrow/ipc/metadata-internal.h"
-#include "arrow/ipc/metadata.h"
-#include "arrow/ipc/test-common.h"
-#include "arrow/schema.h"
-#include "arrow/status.h"
-#include "arrow/test-util.h"
-#include "arrow/type.h"
-
-namespace arrow {
-
-class Buffer;
-
-namespace ipc {
-
-class TestSchemaMetadata : public ::testing::Test {
- public:
-  void SetUp() {}
-
-  void CheckRoundtrip(const Schema& schema, DictionaryMemo* memo) {
-    std::shared_ptr<Buffer> buffer;
-    ASSERT_OK(WriteSchemaMessage(schema, memo, &buffer));
-
-    std::shared_ptr<Message> message;
-    ASSERT_OK(Message::Open(buffer, 0, &message));
-
-    ASSERT_EQ(Message::SCHEMA, message->type());
-
-    auto schema_msg = std::make_shared<SchemaMetadata>(message);
-    ASSERT_EQ(schema.num_fields(), schema_msg->num_fields());
-
-    DictionaryMemo empty_memo;
-
-    std::shared_ptr<Schema> schema2;
-    ASSERT_OK(schema_msg->GetSchema(empty_memo, &schema2));
-
-    AssertSchemaEqual(schema, *schema2);
-  }
-};
-
-const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
-
-TEST_F(TestSchemaMetadata, PrimitiveFields) {
-  auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
-  auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>(), false);
-  auto f2 = std::make_shared<Field>("f2", std::make_shared<Int32Type>());
-  auto f3 = std::make_shared<Field>("f3", std::make_shared<Int64Type>());
-  auto f4 = std::make_shared<Field>("f4", std::make_shared<UInt8Type>());
-  auto f5 = std::make_shared<Field>("f5", std::make_shared<UInt16Type>());
-  auto f6 = std::make_shared<Field>("f6", std::make_shared<UInt32Type>());
-  auto f7 = std::make_shared<Field>("f7", std::make_shared<UInt64Type>());
-  auto f8 = std::make_shared<Field>("f8", std::make_shared<FloatType>());
-  auto f9 = std::make_shared<Field>("f9", std::make_shared<DoubleType>(), false);
-  auto f10 = std::make_shared<Field>("f10", std::make_shared<BooleanType>());
-
-  Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10});
-  DictionaryMemo memo;
-
-  CheckRoundtrip(schema, &memo);
-}
-
-TEST_F(TestSchemaMetadata, NestedFields) {
-  auto type = std::make_shared<ListType>(std::make_shared<Int32Type>());
-  auto f0 = std::make_shared<Field>("f0", type);
-
-  std::shared_ptr<StructType> type2(new StructType({std::make_shared<Field>("k1", INT32),
-      std::make_shared<Field>("k2", INT32), std::make_shared<Field>("k3", INT32)}));
-  auto f1 = std::make_shared<Field>("f1", type2);
-
-  Schema schema({f0, f1});
-  DictionaryMemo memo;
-
-  CheckRoundtrip(schema, &memo);
-}
-
-}  // namespace ipc
-}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
new file mode 100644
index 0000000..261ca1d
--- /dev/null
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -0,0 +1,608 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "arrow/array.h"
+#include "arrow/io/memory.h"
+#include "arrow/io/test-common.h"
+#include "arrow/ipc/api.h"
+#include "arrow/ipc/test-common.h"
+#include "arrow/ipc/util.h"
+
+#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
+#include "arrow/pretty_print.h"
+#include "arrow/status.h"
+#include "arrow/test-util.h"
+#include "arrow/util/bit-util.h"
+
+namespace arrow {
+namespace ipc {
+
+void CompareBatch(const RecordBatch& left, const RecordBatch& right) {
+  if (!left.schema()->Equals(right.schema())) {
+    FAIL() << "Left schema: " << left.schema()->ToString()
+           << "\nRight schema: " << right.schema()->ToString();
+  }
+  ASSERT_EQ(left.num_columns(), right.num_columns())
+      << left.schema()->ToString() << " result: " << right.schema()->ToString();
+  EXPECT_EQ(left.num_rows(), right.num_rows());
+  for (int i = 0; i < left.num_columns(); ++i) {
+    EXPECT_TRUE(left.column(i)->Equals(right.column(i)))
+        << "Idx: " << i << " Name: " << left.column_name(i);
+  }
+}
+
+using BatchVector = std::vector<std::shared_ptr<RecordBatch>>;
+
+class TestSchemaMetadata : public ::testing::Test {
+ public:
+  void SetUp() {}
+
+  void CheckRoundtrip(const Schema& schema, DictionaryMemo* memo) {
+    std::shared_ptr<Buffer> buffer;
+    ASSERT_OK(WriteSchemaMessage(schema, memo, &buffer));
+
+    std::shared_ptr<Message> message;
+    ASSERT_OK(Message::Open(buffer, 0, &message));
+
+    ASSERT_EQ(Message::SCHEMA, message->type());
+
+    auto schema_msg = std::make_shared<SchemaMetadata>(message);
+    ASSERT_EQ(schema.num_fields(), schema_msg->num_fields());
+
+    DictionaryMemo empty_memo;
+
+    std::shared_ptr<Schema> schema2;
+    ASSERT_OK(schema_msg->GetSchema(empty_memo, &schema2));
+
+    AssertSchemaEqual(schema, *schema2);
+  }
+};
+
+const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
+
+TEST_F(TestSchemaMetadata, PrimitiveFields) {
+  auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
+  auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>(), false);
+  auto f2 = std::make_shared<Field>("f2", std::make_shared<Int32Type>());
+  auto f3 = std::make_shared<Field>("f3", std::make_shared<Int64Type>());
+  auto f4 = std::make_shared<Field>("f4", std::make_shared<UInt8Type>());
+  auto f5 = std::make_shared<Field>("f5", std::make_shared<UInt16Type>());
+  auto f6 = std::make_shared<Field>("f6", std::make_shared<UInt32Type>());
+  auto f7 = std::make_shared<Field>("f7", std::make_shared<UInt64Type>());
+  auto f8 = std::make_shared<Field>("f8", std::make_shared<FloatType>());
+  auto f9 = std::make_shared<Field>("f9", std::make_shared<DoubleType>(), false);
+  auto f10 = std::make_shared<Field>("f10", std::make_shared<BooleanType>());
+
+  Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10});
+  DictionaryMemo memo;
+
+  CheckRoundtrip(schema, &memo);
+}
+
+TEST_F(TestSchemaMetadata, NestedFields) {
+  auto type = std::make_shared<ListType>(std::make_shared<Int32Type>());
+  auto f0 = std::make_shared<Field>("f0", type);
+
+  std::shared_ptr<StructType> type2(new StructType({std::make_shared<Field>("k1", INT32),
+      std::make_shared<Field>("k2", INT32), std::make_shared<Field>("k3", INT32)}));
+  auto f1 = std::make_shared<Field>("f1", type2);
+
+  Schema schema({f0, f1});
+  DictionaryMemo memo;
+
+  CheckRoundtrip(schema, &memo);
+}
+
+#define BATCH_CASES()                                                                   \
+  ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, \
+      &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, &MakeStringTypesRecordBatch,   \
+      &MakeStruct, &MakeUnion, &MakeDictionary, &MakeDate, &MakeTimestamps, &MakeTimes, \
+      &MakeFWBinary);
+
+class IpcTestFixture : public io::MemoryMapFixture {
+ public:
+  Status DoStandardRoundTrip(const RecordBatch& batch, bool zero_data,
+      std::shared_ptr<RecordBatch>* batch_result) {
+    int32_t metadata_length;
+    int64_t body_length;
+
+    const int64_t buffer_offset = 0;
+
+    if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); }
+    RETURN_NOT_OK(mmap_->Seek(0));
+
+    RETURN_NOT_OK(WriteRecordBatch(
+        batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
+
+    std::shared_ptr<Message> message;
+    RETURN_NOT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
+    auto metadata = std::make_shared<RecordBatchMetadata>(message);
+
+    // The buffer offsets start at 0, so we must construct a
+    // RandomAccessFile according to that frame of reference
+    std::shared_ptr<Buffer> buffer_payload;
+    RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, &buffer_payload));
+    io::BufferReader buffer_reader(buffer_payload);
+
+    return ReadRecordBatch(*metadata, batch.schema(), &buffer_reader, batch_result);
+  }
+
+  Status DoLargeRoundTrip(
+      const RecordBatch& batch, bool zero_data, std::shared_ptr<RecordBatch>* result) {
+    int32_t metadata_length;
+    int64_t body_length;
+
+    const int64_t buffer_offset = 0;
+
+    if (zero_data) { RETURN_NOT_OK(ZeroMemoryMap(mmap_.get())); }
+    RETURN_NOT_OK(mmap_->Seek(0));
+
+    RETURN_NOT_OK(WriteLargeRecordBatch(
+        batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
+    return ReadLargeRecordBatch(batch.schema(), 0, mmap_.get(), result);
+  }
+
+  void CheckReadResult(const RecordBatch& result, const RecordBatch& expected) {
+    EXPECT_EQ(expected.num_rows(), result.num_rows());
+
+    ASSERT_TRUE(expected.schema()->Equals(result.schema()));
+    ASSERT_EQ(expected.num_columns(), result.num_columns())
+        << expected.schema()->ToString() << " result: " << result.schema()->ToString();
+
+    for (int i = 0; i < expected.num_columns(); ++i) {
+      const auto& left = *expected.column(i);
+      const auto& right = *result.column(i);
+      if (!left.Equals(right)) {
+        std::stringstream pp_result;
+        std::stringstream pp_expected;
+
+        ASSERT_OK(PrettyPrint(left, 0, &pp_expected));
+        ASSERT_OK(PrettyPrint(right, 0, &pp_result));
+
+        FAIL() << "Index: " << i << " Expected: " << pp_expected.str()
+               << "\nGot: " << pp_result.str();
+      }
+    }
+  }
+
+  void CheckRoundtrip(const RecordBatch& batch, int64_t buffer_size) {
+    std::string path = "test-write-row-batch";
+    ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(buffer_size, path, &mmap_));
+
+    std::shared_ptr<RecordBatch> result;
+    ASSERT_OK(DoStandardRoundTrip(batch, true, &result));
+    CheckReadResult(*result, batch);
+
+    ASSERT_OK(DoLargeRoundTrip(batch, true, &result));
+    CheckReadResult(*result, batch);
+  }
+
+  void CheckRoundtrip(const std::shared_ptr<Array>& array, int64_t buffer_size) {
+    auto f0 = arrow::field("f0", array->type());
+    std::vector<std::shared_ptr<Field>> fields = {f0};
+    auto schema = std::make_shared<Schema>(fields);
+
+    RecordBatch batch(schema, 0, {array});
+    CheckRoundtrip(batch, buffer_size);
+  }
+
+ protected:
+  std::shared_ptr<io::MemoryMappedFile> mmap_;
+  MemoryPool* pool_;
+};
+
+class TestWriteRecordBatch : public ::testing::Test, public IpcTestFixture {
+ public:
+  void SetUp() { pool_ = default_memory_pool(); }
+  void TearDown() { io::MemoryMapFixture::TearDown(); }
+};
+
+class TestIpcRoundTrip : public ::testing::TestWithParam<MakeRecordBatch*>,
+                         public IpcTestFixture {
+ public:
+  void SetUp() { pool_ = default_memory_pool(); }
+  void TearDown() { io::MemoryMapFixture::TearDown(); }
+};
+
+TEST_P(TestIpcRoundTrip, RoundTrip) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
+
+  CheckRoundtrip(*batch, 1 << 20);
+}
+
+TEST_P(TestIpcRoundTrip, SliceRoundTrip) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
+
+  // Skip the zero-length case
+  if (batch->num_rows() < 2) { return; }
+
+  auto sliced_batch = batch->Slice(2, 10);
+  CheckRoundtrip(*sliced_batch, 1 << 20);
+}
+
+TEST_P(TestIpcRoundTrip, ZeroLengthArrays) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
+
+  std::shared_ptr<RecordBatch> zero_length_batch;
+  if (batch->num_rows() > 2) {
+    zero_length_batch = batch->Slice(2, 0);
+  } else {
+    zero_length_batch = batch->Slice(0, 0);
+  }
+
+  CheckRoundtrip(*zero_length_batch, 1 << 20);
+
+  // ARROW-544: check binary array
+  std::shared_ptr<MutableBuffer> value_offsets;
+  ASSERT_OK(AllocateBuffer(pool_, sizeof(int32_t), &value_offsets));
+  *reinterpret_cast<int32_t*>(value_offsets->mutable_data()) = 0;
+
+  std::shared_ptr<Array> bin_array = std::make_shared<BinaryArray>(0, value_offsets,
+      std::make_shared<Buffer>(nullptr, 0), std::make_shared<Buffer>(nullptr, 0));
+
+  // null value_offsets
+  std::shared_ptr<Array> bin_array2 = std::make_shared<BinaryArray>(0, nullptr, nullptr);
+
+  CheckRoundtrip(bin_array, 1 << 20);
+  CheckRoundtrip(bin_array2, 1 << 20);
+}
+
+void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
+  ipc::MockOutputStream mock;
+  int32_t mock_metadata_length = -1;
+  int64_t mock_body_length = -1;
+  int64_t size = -1;
+  ASSERT_OK(WriteRecordBatch(
+      *batch, 0, &mock, &mock_metadata_length, &mock_body_length, default_memory_pool()));
+  ASSERT_OK(GetRecordBatchSize(*batch, &size));
+  ASSERT_EQ(mock.GetExtentBytesWritten(), size);
+}
+
+TEST_F(TestWriteRecordBatch, IntegerGetRecordBatchSize) {
+  std::shared_ptr<RecordBatch> batch;
+
+  ASSERT_OK(MakeIntRecordBatch(&batch));
+  TestGetRecordBatchSize(batch);
+
+  ASSERT_OK(MakeListRecordBatch(&batch));
+  TestGetRecordBatchSize(batch);
+
+  ASSERT_OK(MakeZeroLengthRecordBatch(&batch));
+  TestGetRecordBatchSize(batch);
+
+  ASSERT_OK(MakeNonNullRecordBatch(&batch));
+  TestGetRecordBatchSize(batch);
+
+  ASSERT_OK(MakeDeeplyNestedList(&batch));
+  TestGetRecordBatchSize(batch);
+}
+
+class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
+ public:
+  void SetUp() { pool_ = default_memory_pool(); }
+  void TearDown() { io::MemoryMapFixture::TearDown(); }
+
+  Status WriteToMmap(int recursion_level, bool override_level, int32_t* metadata_length,
+      int64_t* body_length, std::shared_ptr<RecordBatch>* batch,
+      std::shared_ptr<Schema>* schema) {
+    const int batch_length = 5;
+    TypePtr type = int32();
+    std::shared_ptr<Array> array;
+    const bool include_nulls = true;
+    RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array));
+    for (int i = 0; i < recursion_level; ++i) {
+      type = list(type);
+      RETURN_NOT_OK(
+          MakeRandomListArray(array, batch_length, include_nulls, pool_, &array));
+    }
+
+    auto f0 = field("f0", type);
+
+    *schema = std::shared_ptr<Schema>(new Schema({f0}));
+
+    std::vector<std::shared_ptr<Array>> arrays = {array};
+    *batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays);
+
+    std::string path = "test-write-past-max-recursion";
+    const int memory_map_size = 1 << 20;
+    io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
+
+    if (override_level) {
+      return WriteRecordBatch(**batch, 0, mmap_.get(), metadata_length, body_length,
+          pool_, recursion_level + 1);
+    } else {
+      return WriteRecordBatch(
+          **batch, 0, mmap_.get(), metadata_length, body_length, pool_);
+    }
+  }
+
+ protected:
+  std::shared_ptr<io::MemoryMappedFile> mmap_;
+  MemoryPool* pool_;
+};
+
+TEST_F(RecursionLimits, WriteLimit) {
+  int32_t metadata_length = -1;
+  int64_t body_length = -1;
+  std::shared_ptr<Schema> schema;
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_RAISES(Invalid,
+      WriteToMmap((1 << 8) + 1, false, &metadata_length, &body_length, &batch, &schema));
+}
+
+TEST_F(RecursionLimits, ReadLimit) {
+  int32_t metadata_length = -1;
+  int64_t body_length = -1;
+  std::shared_ptr<Schema> schema;
+
+  const int recursion_depth = 64;
+
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK(WriteToMmap(
+      recursion_depth, true, &metadata_length, &body_length, &batch, &schema));
+
+  std::shared_ptr<Message> message;
+  ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
+  auto metadata = std::make_shared<RecordBatchMetadata>(message);
+
+  std::shared_ptr<Buffer> payload;
+  ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
+
+  io::BufferReader reader(payload);
+
+  std::shared_ptr<RecordBatch> result;
+  ASSERT_RAISES(Invalid, ReadRecordBatch(*metadata, schema, &reader, &result));
+}
+
+TEST_F(RecursionLimits, StressLimit) {
+  auto CheckDepth = [this](int recursion_depth, bool* it_works) {
+    int32_t metadata_length = -1;
+    int64_t body_length = -1;
+    std::shared_ptr<Schema> schema;
+    std::shared_ptr<RecordBatch> batch;
+    ASSERT_OK(WriteToMmap(
+        recursion_depth, true, &metadata_length, &body_length, &batch, &schema));
+
+    std::shared_ptr<Message> message;
+    ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
+    auto metadata = std::make_shared<RecordBatchMetadata>(message);
+
+    std::shared_ptr<Buffer> payload;
+    ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
+
+    io::BufferReader reader(payload);
+
+    std::shared_ptr<RecordBatch> result;
+    ASSERT_OK(ReadRecordBatch(*metadata, schema, recursion_depth + 1, &reader, &result));
+    *it_works = result->Equals(*batch);
+  };
+
+  bool it_works = false;
+  CheckDepth(100, &it_works);
+  ASSERT_TRUE(it_works);
+
+  CheckDepth(500, &it_works);
+  ASSERT_TRUE(it_works);
+}
+
+class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
+ public:
+  void SetUp() {
+    pool_ = default_memory_pool();
+    buffer_ = std::make_shared<PoolBuffer>(pool_);
+    sink_.reset(new io::BufferOutputStream(buffer_));
+  }
+  void TearDown() {}
+
+  Status RoundTripHelper(const BatchVector& in_batches, BatchVector* out_batches) {
+    // Write the file
+    std::shared_ptr<FileWriter> writer;
+    RETURN_NOT_OK(FileWriter::Open(sink_.get(), in_batches[0]->schema(), &writer));
+
+    const int num_batches = static_cast<int>(in_batches.size());
+
+    for (const auto& batch : in_batches) {
+      RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
+    }
+    RETURN_NOT_OK(writer->Close());
+    RETURN_NOT_OK(sink_->Close());
+
+    // Current offset into stream is the end of the file
+    int64_t footer_offset;
+    RETURN_NOT_OK(sink_->Tell(&footer_offset));
+
+    // Open the file
+    auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+    std::shared_ptr<FileReader> reader;
+    RETURN_NOT_OK(FileReader::Open(buf_reader, footer_offset, &reader));
+
+    EXPECT_EQ(num_batches, reader->num_record_batches());
+    for (int i = 0; i < num_batches; ++i) {
+      std::shared_ptr<RecordBatch> chunk;
+      RETURN_NOT_OK(reader->GetRecordBatch(i, &chunk));
+      out_batches->emplace_back(chunk);
+    }
+
+    return Status::OK();
+  }
+
+ protected:
+  MemoryPool* pool_;
+
+  std::unique_ptr<io::BufferOutputStream> sink_;
+  std::shared_ptr<PoolBuffer> buffer_;
+};
+
+TEST_P(TestFileFormat, RoundTrip) {
+  std::shared_ptr<RecordBatch> batch1;
+  std::shared_ptr<RecordBatch> batch2;
+  ASSERT_OK((*GetParam())(&batch1));  // NOLINT clang-tidy gtest issue
+  ASSERT_OK((*GetParam())(&batch2));  // NOLINT clang-tidy gtest issue
+
+  std::vector<std::shared_ptr<RecordBatch>> in_batches = {batch1, batch2};
+  std::vector<std::shared_ptr<RecordBatch>> out_batches;
+
+  ASSERT_OK(RoundTripHelper(in_batches, &out_batches));
+
+  // Compare batches
+  for (size_t i = 0; i < in_batches.size(); ++i) {
+    CompareBatch(*in_batches[i], *out_batches[i]);
+  }
+}
+
+class TestStreamFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
+ public:
+  void SetUp() {
+    pool_ = default_memory_pool();
+    buffer_ = std::make_shared<PoolBuffer>(pool_);
+    sink_.reset(new io::BufferOutputStream(buffer_));
+  }
+  void TearDown() {}
+
+  Status RoundTripHelper(
+      const RecordBatch& batch, std::vector<std::shared_ptr<RecordBatch>>* out_batches) {
+    // Write the file
+    std::shared_ptr<StreamWriter> writer;
+    RETURN_NOT_OK(StreamWriter::Open(sink_.get(), batch.schema(), &writer));
+    int num_batches = 5;
+    for (int i = 0; i < num_batches; ++i) {
+      RETURN_NOT_OK(writer->WriteRecordBatch(batch));
+    }
+    RETURN_NOT_OK(writer->Close());
+    RETURN_NOT_OK(sink_->Close());
+
+    // Open the file
+    auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+
+    std::shared_ptr<StreamReader> reader;
+    RETURN_NOT_OK(StreamReader::Open(buf_reader, &reader));
+
+    std::shared_ptr<RecordBatch> chunk;
+    while (true) {
+      RETURN_NOT_OK(reader->GetNextRecordBatch(&chunk));
+      if (chunk == nullptr) { break; }
+      out_batches->emplace_back(chunk);
+    }
+    return Status::OK();
+  }
+
+ protected:
+  MemoryPool* pool_;
+
+  std::unique_ptr<io::BufferOutputStream> sink_;
+  std::shared_ptr<PoolBuffer> buffer_;
+};
+
+TEST_P(TestStreamFormat, RoundTrip) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK((*GetParam())(&batch));  // NOLINT clang-tidy gtest issue
+
+  std::vector<std::shared_ptr<RecordBatch>> out_batches;
+
+  ASSERT_OK(RoundTripHelper(*batch, &out_batches));
+
+  // Compare batches. Same
+  for (size_t i = 0; i < out_batches.size(); ++i) {
+    CompareBatch(*batch, *out_batches[i]);
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(GenericIpcRoundTripTests, TestIpcRoundTrip, BATCH_CASES());
+INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES());
+INSTANTIATE_TEST_CASE_P(StreamRoundTripTests, TestStreamFormat, BATCH_CASES());
+
+TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
+  const int64_t length = static_cast<int64_t>(std::numeric_limits<int32_t>::max()) + 1;
+
+  BooleanBuilder builder(default_memory_pool());
+  ASSERT_OK(builder.Reserve(length));
+  ASSERT_OK(builder.Advance(length));
+
+  std::shared_ptr<Array> array;
+  ASSERT_OK(builder.Finish(&array));
+
+  auto f0 = arrow::field("f0", array->type());
+  std::vector<std::shared_ptr<Field>> fields = {f0};
+  auto schema = std::make_shared<Schema>(fields);
+
+  RecordBatch batch(schema, 0, {array});
+
+  std::string path = "test-write-large-record_batch";
+
+  // 512 MB
+  constexpr int64_t kBufferSize = 1 << 29;
+
+  ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(kBufferSize, path, &mmap_));
+
+  std::shared_ptr<RecordBatch> result;
+  ASSERT_OK(DoLargeRoundTrip(batch, false, &result));
+  CheckReadResult(*result, batch);
+
+  // Fails if we try to write this with the normal code path
+  ASSERT_RAISES(Invalid, DoStandardRoundTrip(batch, false, &result));
+}
+
+void CheckBatchDictionaries(const RecordBatch& batch) {
+  // Check that dictionaries that should be the same are the same
+  auto schema = batch.schema();
+
+  const auto& t0 = static_cast<const DictionaryType&>(*schema->field(0)->type);
+  const auto& t1 = static_cast<const DictionaryType&>(*schema->field(1)->type);
+
+  ASSERT_EQ(t0.dictionary().get(), t1.dictionary().get());
+
+  // Same dictionary used for list values
+  const auto& t3 = static_cast<const ListType&>(*schema->field(3)->type);
+  const auto& t3_value = static_cast<const DictionaryType&>(*t3.value_type());
+  ASSERT_EQ(t0.dictionary().get(), t3_value.dictionary().get());
+}
+
+TEST_F(TestStreamFormat, DictionaryRoundTrip) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK(MakeDictionary(&batch));
+
+  std::vector<std::shared_ptr<RecordBatch>> out_batches;
+  ASSERT_OK(RoundTripHelper(*batch, &out_batches));
+
+  CheckBatchDictionaries(*out_batches[0]);
+}
+
+TEST_F(TestFileFormat, DictionaryRoundTrip) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK(MakeDictionary(&batch));
+
+  std::vector<std::shared_ptr<RecordBatch>> out_batches;
+  ASSERT_OK(RoundTripHelper({batch}, &out_batches));
+
+  CheckBatchDictionaries(*out_batches[0]);
+}
+
+}  // namespace ipc
+}  // namespace arrow


Mime
View raw message