arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [3/3] arrow git commit: ARROW-67: C++ metadata flatbuffer serialization and data movement to memory maps
Date Wed, 23 Mar 2016 01:45:24 GMT
ARROW-67: C++ metadata flatbuffer serialization and data movement to memory maps

Several things here:

* Add Google flatbuffers dependency
* Flatbuffers IDL draft in collaboration with @jacques-n and @stevenmphillips
* Add Schema wrapper in Cython
* arrow::Schema conversion to/from flatbuffer representation
* Remove unneeded physical layout types from type.h
* Refactor ListType to be a nested type with a single child
* Implement shared memory round-trip for numeric row batches
* mmap-based shared memory interface and MemorySource abstract API

Quite a bit of judicious code cleaning and consolidation as part of this. For example, List types are now internally equivalent to a nested type with 1 named child field (versus a struct, which can have any number of child fields).

Associated JIRAs: ARROW-48, ARROW-57, ARROW-58

Author: Wes McKinney <wesm@apache.org>

Closes #28 from wesm/cpp-ipc-draft and squashes the following commits:

0cef7ea [Wes McKinney] Add NullArray type now that Array is virtual, fix pyarrow build
5e841f7 [Wes McKinney] Create explicit PrimitiveArray subclasses to avoid unwanted template instantiation
6fa6319 [Wes McKinney] ARROW-28: Draft C++ shared memory IPC workflow and related refactoring / scaffolding / cleaning.


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/65db0da8
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/65db0da8
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/65db0da8

Branch: refs/heads/master
Commit: 65db0da80b6a1fb6887b7ac1df24e2423d41dfb9
Parents: 093f9bd
Author: Wes McKinney <wesm@apache.org>
Authored: Tue Mar 22 18:45:13 2016 -0700
Committer: Wes McKinney <wesm@apache.org>
Committed: Tue Mar 22 18:45:13 2016 -0700

----------------------------------------------------------------------
 ci/travis_before_script_cpp.sh          |   9 +-
 ci/travis_script_cpp.sh                 |   6 +-
 cpp/CMakeLists.txt                      |  96 +++++---
 cpp/cmake_modules/FindFlatbuffers.cmake |  95 ++++++++
 cpp/setup_build_env.sh                  |   5 +-
 cpp/src/arrow/CMakeLists.txt            |   8 +
 cpp/src/arrow/api.h                     |  11 +-
 cpp/src/arrow/array-test.cc             |  14 +-
 cpp/src/arrow/array.cc                  |  26 ++-
 cpp/src/arrow/array.h                   |  27 ++-
 cpp/src/arrow/builder.h                 |   2 +-
 cpp/src/arrow/column-benchmark.cc       |  54 +++++
 cpp/src/arrow/column-test.cc            |  75 +++++++
 cpp/src/arrow/column.cc                 |  70 ++++++
 cpp/src/arrow/column.h                  | 108 +++++++++
 cpp/src/arrow/ipc/.gitignore            |   1 +
 cpp/src/arrow/ipc/CMakeLists.txt        |  51 +++++
 cpp/src/arrow/ipc/adapter.cc            | 305 ++++++++++++++++++++++++++
 cpp/src/arrow/ipc/adapter.h             |  86 ++++++++
 cpp/src/arrow/ipc/ipc-adapter-test.cc   | 112 ++++++++++
 cpp/src/arrow/ipc/ipc-memory-test.cc    |  82 +++++++
 cpp/src/arrow/ipc/ipc-metadata-test.cc  |  99 +++++++++
 cpp/src/arrow/ipc/memory.cc             | 162 ++++++++++++++
 cpp/src/arrow/ipc/memory.h              | 131 +++++++++++
 cpp/src/arrow/ipc/metadata-internal.cc  | 317 +++++++++++++++++++++++++++
 cpp/src/arrow/ipc/metadata-internal.h   |  69 ++++++
 cpp/src/arrow/ipc/metadata.cc           | 238 ++++++++++++++++++++
 cpp/src/arrow/ipc/metadata.h            | 146 ++++++++++++
 cpp/src/arrow/ipc/test-common.h         |  53 +++++
 cpp/src/arrow/schema-test.cc            | 104 +++++++++
 cpp/src/arrow/schema.cc                 |  63 ++++++
 cpp/src/arrow/schema.h                  |  55 +++++
 cpp/src/arrow/table-test.cc             | 128 +++++++++++
 cpp/src/arrow/table.cc                  |  86 ++++++++
 cpp/src/arrow/table.h                   | 128 +++++++++++
 cpp/src/arrow/table/CMakeLists.txt      |  33 ---
 cpp/src/arrow/table/column-benchmark.cc |  55 -----
 cpp/src/arrow/table/column-test.cc      |  75 -------
 cpp/src/arrow/table/column.cc           |  68 ------
 cpp/src/arrow/table/column.h            | 105 ---------
 cpp/src/arrow/table/schema-test.cc      | 110 ----------
 cpp/src/arrow/table/schema.cc           |  58 -----
 cpp/src/arrow/table/schema.h            |  55 -----
 cpp/src/arrow/table/table-test.cc       | 128 -----------
 cpp/src/arrow/table/table.cc            |  73 ------
 cpp/src/arrow/table/table.h             |  82 -------
 cpp/src/arrow/table/test-common.h       |  54 -----
 cpp/src/arrow/test-util.h               |  68 +++++-
 cpp/src/arrow/type.cc                   |  24 +-
 cpp/src/arrow/type.h                    | 177 ++++++---------
 cpp/src/arrow/types/CMakeLists.txt      |   2 -
 cpp/src/arrow/types/boolean.h           |   2 +-
 cpp/src/arrow/types/collection.h        |   2 +-
 cpp/src/arrow/types/construct.cc        |  53 +++--
 cpp/src/arrow/types/construct.h         |  11 +-
 cpp/src/arrow/types/datetime.h          |  16 +-
 cpp/src/arrow/types/floating.cc         |  22 --
 cpp/src/arrow/types/floating.h          |  36 ---
 cpp/src/arrow/types/integer.cc          |  22 --
 cpp/src/arrow/types/integer.h           |  57 -----
 cpp/src/arrow/types/json.cc             |   1 -
 cpp/src/arrow/types/json.h              |   4 +-
 cpp/src/arrow/types/list-test.cc        |  28 ++-
 cpp/src/arrow/types/list.cc             |  29 +++
 cpp/src/arrow/types/list.h              |  28 +--
 cpp/src/arrow/types/primitive-test.cc   |  41 ++--
 cpp/src/arrow/types/primitive.cc        |  16 +-
 cpp/src/arrow/types/primitive.h         | 102 +++++----
 cpp/src/arrow/types/string-test.cc      |  54 +++--
 cpp/src/arrow/types/string.h            |  55 ++---
 cpp/src/arrow/types/struct-test.cc      |  15 +-
 cpp/src/arrow/types/test-common.h       |   5 +-
 cpp/src/arrow/types/union.h             |  18 +-
 cpp/src/arrow/util/bit-util-test.cc     |   4 +-
 cpp/src/arrow/util/bit-util.h           |   1 -
 cpp/src/arrow/util/buffer-test.cc       |   3 +-
 cpp/src/arrow/util/buffer.cc            |   2 +-
 cpp/src/arrow/util/memory-pool-test.cc  |   7 +-
 cpp/src/arrow/util/memory-pool.cc       |   6 +-
 cpp/src/arrow/util/memory-pool.h        |   2 +-
 cpp/src/arrow/util/status.cc            |   3 +
 cpp/src/arrow/util/status.h             |   6 +
 cpp/src/arrow/util/test_main.cc         |   2 +-
 cpp/thirdparty/build_thirdparty.sh      |   9 +
 cpp/thirdparty/download_thirdparty.sh   |   5 +
 cpp/thirdparty/versions.sh              |   4 +
 format/Message.fbs                      | 183 ++++++++++++++++
 python/pyarrow/__init__.py              |   4 +-
 python/pyarrow/array.pxd                |   2 +-
 python/pyarrow/array.pyx                |  47 +++-
 python/pyarrow/includes/libarrow.pxd    | 107 ++++++---
 python/pyarrow/includes/pyarrow.pxd     |   5 +-
 python/pyarrow/scalar.pyx               |  24 +-
 python/pyarrow/schema.pxd               |   6 +-
 python/pyarrow/schema.pyx               | 155 +++++++++----
 python/pyarrow/tests/test_schema.py     |  28 ++-
 python/pyarrow/tests/test_table.py      |  40 ++++
 python/src/pyarrow/adapters/builtin.cc  |  20 +-
 python/src/pyarrow/helpers.cc           |  15 +-
 python/src/pyarrow/helpers.h            |   5 +-
 100 files changed, 3888 insertions(+), 1613 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/ci/travis_before_script_cpp.sh
----------------------------------------------------------------------
diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index 49dcc39..193c76f 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -19,7 +19,14 @@ echo $GTEST_HOME
 
 : ${ARROW_CPP_INSTALL=$TRAVIS_BUILD_DIR/cpp-install}
 
-cmake -DARROW_BUILD_BENCHMARKS=ON -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR
+CMAKE_COMMON_FLAGS="-DARROW_BUILD_BENCHMARKS=ON -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL"
+
+if [ $TRAVIS_OS_NAME == "linux" ]; then
+  cmake -DARROW_TEST_MEMCHECK=on $CMAKE_COMMON_FLAGS -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR
+else
+  cmake $CMAKE_COMMON_FLAGS -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR
+fi
+
 make -j4
 make install
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/ci/travis_script_cpp.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_cpp.sh b/ci/travis_script_cpp.sh
index d96b98f..997bdf3 100755
--- a/ci/travis_script_cpp.sh
+++ b/ci/travis_script_cpp.sh
@@ -8,10 +8,6 @@ pushd $CPP_BUILD_DIR
 
 make lint
 
-if [ $TRAVIS_OS_NAME == "linux" ]; then
-  valgrind --tool=memcheck --leak-check=yes --error-exitcode=1 ctest -L unittest
-else
-  ctest -L unittest
-fi
+ctest -L unittest
 
 popd

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 268c1d1..6d70107 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -51,7 +51,9 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
   option(ARROW_PARQUET
     "Build the Parquet adapter and link to libparquet"
     OFF)
-
+  option(ARROW_TEST_MEMCHECK
+	"Run the test suite using valgrind --tool=memcheck"
+	OFF)
   option(ARROW_BUILD_TESTS
     "Build the Arrow googletest unit tests"
     ON)
@@ -60,6 +62,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
     "Build the Arrow micro benchmarks"
     OFF)
 
+  option(ARROW_IPC
+    "Build the Arrow IPC extensions"
+    ON)
+
 endif()
 
 if(NOT ARROW_BUILD_TESTS)
@@ -260,17 +266,17 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}")
 include_directories(src)
 
 ############################################################
-# Benchmarking 
+# Benchmarking
 ############################################################
 # Add a new micro benchmark, with or without an executable that should be built.
 # If benchmarks are enabled then they will be run along side unit tests with ctest.
-# 'make runbenchmark' and 'make unittest' to build/run only benchmark or unittests, 
+# 'make runbenchmark' and 'make unittest' to build/run only benchmark or unittests,
 # respectively.
 #
 # REL_BENCHMARK_NAME is the name of the benchmark app. It may be a single component
 # (e.g. monotime-benchmark) or contain additional components (e.g.
 # net/net_util-benchmark). Either way, the last component must be a globally
-# unique name.  
+# unique name.
 
 # The benchmark will registered as unit test with ctest with a label
 # of 'benchmark'.
@@ -281,7 +287,7 @@ function(ADD_ARROW_BENCHMARK REL_BENCHMARK_NAME)
     return()
   endif()
   get_filename_component(BENCHMARK_NAME ${REL_BENCHMARK_NAME} NAME_WE)
-   
+
   if(EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/${REL_BENCHMARK_NAME}.cc)
     # This benchmark has a corresponding .cc file, set it up as an executable.
     set(BENCHMARK_PATH "${EXECUTABLE_OUTPUT_PATH}/${BENCHMARK_NAME}")
@@ -294,7 +300,7 @@ function(ADD_ARROW_BENCHMARK REL_BENCHMARK_NAME)
     set(BENCHMARK_PATH ${CMAKE_CURRENT_SOURCE_DIR}/${REL_BENCHMARK_NAME})
     set(NO_COLOR "")
   endif()
-  
+
   add_test(${BENCHMARK_NAME}
     ${BUILD_SUPPORT_DIR}/run-test.sh ${CMAKE_BINARY_DIR} benchmark ${BENCHMARK_PATH} ${NO_COLOR})
   set_tests_properties(${BENCHMARK_NAME} PROPERTIES LABELS "benchmark")
@@ -345,9 +351,18 @@ function(ADD_ARROW_TEST REL_TEST_NAME)
     set(TEST_PATH ${CMAKE_CURRENT_SOURCE_DIR}/${REL_TEST_NAME})
   endif()
 
-  add_test(${TEST_NAME}
-    ${BUILD_SUPPORT_DIR}/run-test.sh ${CMAKE_BINARY_DIR} test ${TEST_PATH})
+  if (ARROW_TEST_MEMCHECK)
+	SET_PROPERTY(TARGET ${TEST_NAME}
+	  APPEND_STRING PROPERTY
+	  COMPILE_FLAGS " -DARROW_VALGRIND")
+	add_test(${TEST_NAME}
+	  valgrind --tool=memcheck --leak-check=full --error-exitcode=1 ${TEST_PATH})
+  else()
+	add_test(${TEST_NAME}
+      ${BUILD_SUPPORT_DIR}/run-test.sh ${CMAKE_BINARY_DIR} test ${TEST_PATH})
+  endif()
   set_tests_properties(${TEST_NAME} PROPERTIES LABELS "unittest")
+
   if(ARGN)
     set_tests_properties(${TEST_NAME} PROPERTIES ${ARGN})
   endif()
@@ -403,7 +418,7 @@ if ("$ENV{GTEST_HOME}" STREQUAL "")
   set(GTest_HOME ${THIRDPARTY_DIR}/googletest-release-1.7.0)
 endif()
 
-## Google Benchmark 
+## Google Benchmark
 if ("$ENV{GBENCHMARK_HOME}" STREQUAL "")
   set(GBENCHMARK_HOME ${THIRDPARTY_DIR}/installed)
 endif()
@@ -487,24 +502,10 @@ if (UNIX)
   add_custom_target(lint ${BUILD_SUPPORT_DIR}/cpplint.py
   --verbose=2
   --linelength=90
-  --filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/c++11
-    `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h`)
+  --filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/c++11,-runtime/references
+    `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h | sed -e '/_generated/g'`)
 endif (UNIX)
 
-#----------------------------------------------------------------------
-# Parquet adapter
-
-if(ARROW_PARQUET)
-  find_package(Parquet REQUIRED)
-  include_directories(SYSTEM ${PARQUET_INCLUDE_DIR})
-  ADD_THIRDPARTY_LIB(parquet
-    STATIC_LIB ${PARQUET_STATIC_LIB}
-    SHARED_LIB ${PARQUET_SHARED_LIB})
-
-  add_subdirectory(src/arrow/parquet)
-  list(APPEND LINK_LIBS arrow_parquet parquet)
-endif()
-
 ############################################################
 # Subdirectories
 ############################################################
@@ -515,15 +516,18 @@ set(LIBARROW_LINK_LIBS
 set(ARROW_SRCS
   src/arrow/array.cc
   src/arrow/builder.cc
+  src/arrow/column.cc
+  src/arrow/schema.cc
+  src/arrow/table.cc
   src/arrow/type.cc
 
-  src/arrow/table/column.cc
-  src/arrow/table/schema.cc
-  src/arrow/table/table.cc
+  # IPC / Shared memory library; to be turned into an optional component
+  src/arrow/ipc/adapter.cc
+  src/arrow/ipc/memory.cc
+  src/arrow/ipc/metadata.cc
+  src/arrow/ipc/metadata-internal.cc
 
   src/arrow/types/construct.cc
-  src/arrow/types/floating.cc
-  src/arrow/types/integer.cc
   src/arrow/types/json.cc
   src/arrow/types/list.cc
   src/arrow/types/primitive.cc
@@ -559,9 +563,39 @@ target_link_libraries(arrow ${LIBARROW_LINK_LIBS})
 
 add_subdirectory(src/arrow)
 add_subdirectory(src/arrow/util)
-add_subdirectory(src/arrow/table)
 add_subdirectory(src/arrow/types)
 
 install(TARGETS arrow
   LIBRARY DESTINATION lib
   ARCHIVE DESTINATION lib)
+
+#----------------------------------------------------------------------
+# Parquet adapter library
+
+if(ARROW_PARQUET)
+  find_package(Parquet REQUIRED)
+  include_directories(SYSTEM ${PARQUET_INCLUDE_DIR})
+  ADD_THIRDPARTY_LIB(parquet
+    STATIC_LIB ${PARQUET_STATIC_LIB}
+    SHARED_LIB ${PARQUET_SHARED_LIB})
+
+  add_subdirectory(src/arrow/parquet)
+  list(APPEND LINK_LIBS arrow_parquet parquet)
+endif()
+
+#----------------------------------------------------------------------
+# IPC library
+
+## Flatbuffers
+if(ARROW_IPC)
+  find_package(Flatbuffers REQUIRED)
+  message(STATUS "Flatbuffers include dir: ${FLATBUFFERS_INCLUDE_DIR}")
+  message(STATUS "Flatbuffers static library: ${FLATBUFFERS_STATIC_LIB}")
+  message(STATUS "Flatbuffers compiler: ${FLATBUFFERS_COMPILER}")
+  include_directories(SYSTEM ${FLATBUFFERS_INCLUDE_DIR})
+  add_library(flatbuffers STATIC IMPORTED)
+  set_target_properties(flatbuffers PROPERTIES
+	IMPORTED_LOCATION ${FLATBUFFERS_STATIC_LIB})
+
+  add_subdirectory(src/arrow/ipc)
+endif()

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/cmake_modules/FindFlatbuffers.cmake
----------------------------------------------------------------------
diff --git a/cpp/cmake_modules/FindFlatbuffers.cmake b/cpp/cmake_modules/FindFlatbuffers.cmake
new file mode 100644
index 0000000..ee472d1
--- /dev/null
+++ b/cpp/cmake_modules/FindFlatbuffers.cmake
@@ -0,0 +1,95 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Tries to find Flatbuffers headers and libraries.
+#
+# Usage of this module as follows:
+#
+#  find_package(Flatbuffers)
+#
+# Variables used by this module, they can change the default behaviour and need
+# to be set before calling find_package:
+#
+#  Flatbuffers_HOME -
+#   When set, this path is inspected instead of standard library locations as
+#   the root of the Flatbuffers installation.  The environment variable
+#   FLATBUFFERS_HOME overrides this veriable.
+#
+# This module defines
+#  FLATBUFFERS_INCLUDE_DIR, directory containing headers
+#  FLATBUFFERS_LIBS, directory containing flatbuffers libraries
+#  FLATBUFFERS_STATIC_LIB, path to libflatbuffers.a
+#  FLATBUFFERS_FOUND, whether flatbuffers has been found
+
+if( NOT "$ENV{FLATBUFFERS_HOME}" STREQUAL "")
+    file( TO_CMAKE_PATH "$ENV{FLATBUFFERS_HOME}" _native_path )
+    list( APPEND _flatbuffers_roots ${_native_path} )
+elseif ( Flatbuffers_HOME )
+    list( APPEND _flatbuffers_roots ${Flatbuffers_HOME} )
+endif()
+
+# Try the parameterized roots, if they exist
+if ( _flatbuffers_roots )
+    find_path( FLATBUFFERS_INCLUDE_DIR NAMES flatbuffers/flatbuffers.h
+        PATHS ${_flatbuffers_roots} NO_DEFAULT_PATH
+        PATH_SUFFIXES "include" )
+    find_library( FLATBUFFERS_LIBRARIES NAMES flatbuffers
+        PATHS ${_flatbuffers_roots} NO_DEFAULT_PATH
+        PATH_SUFFIXES "lib" )
+else ()
+    find_path( FLATBUFFERS_INCLUDE_DIR NAMES flatbuffers/flatbuffers.h )
+    find_library( FLATBUFFERS_LIBRARIES NAMES flatbuffers )
+endif ()
+
+find_program(FLATBUFFERS_COMPILER flatc
+  $ENV{FLATBUFFERS_HOME}/bin
+  /usr/local/bin
+  /usr/bin
+  NO_DEFAULT_PATH
+)
+
+if (FLATBUFFERS_INCLUDE_DIR AND FLATBUFFERS_LIBRARIES)
+  set(FLATBUFFERS_FOUND TRUE)
+  get_filename_component( FLATBUFFERS_LIBS ${FLATBUFFERS_LIBRARIES} PATH )
+  set(FLATBUFFERS_LIB_NAME libflatbuffers)
+  set(FLATBUFFERS_STATIC_LIB ${FLATBUFFERS_LIBS}/${FLATBUFFERS_LIB_NAME}.a)
+else ()
+  set(FLATBUFFERS_FOUND FALSE)
+endif ()
+
+if (FLATBUFFERS_FOUND)
+  if (NOT Flatbuffers_FIND_QUIETLY)
+    message(STATUS "Found the Flatbuffers library: ${FLATBUFFERS_LIBRARIES}")
+  endif ()
+else ()
+  if (NOT Flatbuffers_FIND_QUIETLY)
+    set(FLATBUFFERS_ERR_MSG "Could not find the Flatbuffers library. Looked in ")
+    if ( _flatbuffers_roots )
+      set(FLATBUFFERS_ERR_MSG "${FLATBUFFERS_ERR_MSG} in ${_flatbuffers_roots}.")
+    else ()
+      set(FLATBUFFERS_ERR_MSG "${FLATBUFFERS_ERR_MSG} system search paths.")
+    endif ()
+    if (Flatbuffers_FIND_REQUIRED)
+      message(FATAL_ERROR "${FLATBUFFERS_ERR_MSG}")
+    else (Flatbuffers_FIND_REQUIRED)
+      message(STATUS "${FLATBUFFERS_ERR_MSG}")
+    endif (Flatbuffers_FIND_REQUIRED)
+  endif ()
+endif ()
+
+mark_as_advanced(
+  FLATBUFFERS_INCLUDE_DIR
+  FLATBUFFERS_LIBS
+  FLATBUFFERS_STATIC_LIB
+  FLATBUFFERS_COMPILER
+)

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/setup_build_env.sh
----------------------------------------------------------------------
diff --git a/cpp/setup_build_env.sh b/cpp/setup_build_env.sh
index 04688e7..6520dbd 100755
--- a/cpp/setup_build_env.sh
+++ b/cpp/setup_build_env.sh
@@ -2,11 +2,12 @@
 
 SOURCE_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
 
-./thirdparty/download_thirdparty.sh || { echo "download_thirdparty.sh failed" ; return; } 
-./thirdparty/build_thirdparty.sh || { echo "build_thirdparty.sh failed" ; return; } 
+./thirdparty/download_thirdparty.sh || { echo "download_thirdparty.sh failed" ; return; }
+./thirdparty/build_thirdparty.sh || { echo "build_thirdparty.sh failed" ; return; }
 source thirdparty/versions.sh
 
 export GTEST_HOME=$SOURCE_DIR/thirdparty/$GTEST_BASEDIR
 export GBENCHMARK_HOME=$SOURCE_DIR/thirdparty/installed
+export FLATBUFFERS_HOME=$SOURCE_DIR/thirdparty/installed
 
 echo "Build env initialized"

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 73e6a9b..2d42edc 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -19,7 +19,10 @@
 install(FILES
   api.h
   array.h
+  column.h
   builder.h
+  schema.h
+  table.h
   type.h
   DESTINATION include/arrow)
 
@@ -30,3 +33,8 @@ install(FILES
 set(ARROW_TEST_LINK_LIBS ${ARROW_MIN_TEST_LIBS})
 
 ADD_ARROW_TEST(array-test)
+ADD_ARROW_TEST(column-test)
+ADD_ARROW_TEST(schema-test)
+ADD_ARROW_TEST(table-test)
+
+ADD_ARROW_BENCHMARK(column-benchmark)

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/api.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/api.h b/cpp/src/arrow/api.h
index c73d4b3..7be7f88 100644
--- a/cpp/src/arrow/api.h
+++ b/cpp/src/arrow/api.h
@@ -22,20 +22,19 @@
 
 #include "arrow/array.h"
 #include "arrow/builder.h"
+#include "arrow/column.h"
+#include "arrow/schema.h"
+#include "arrow/table.h"
 #include "arrow/type.h"
 
-#include "arrow/table/column.h"
-#include "arrow/table/schema.h"
-#include "arrow/table/table.h"
-
 #include "arrow/types/boolean.h"
 #include "arrow/types/construct.h"
-#include "arrow/types/floating.h"
-#include "arrow/types/integer.h"
 #include "arrow/types/list.h"
+#include "arrow/types/primitive.h"
 #include "arrow/types/string.h"
 #include "arrow/types/struct.h"
 
+#include "arrow/util/buffer.h"
 #include "arrow/util/memory-pool.h"
 #include "arrow/util/status.h"
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/array-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index df827aa..eded594 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -15,30 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <gtest/gtest.h>
-
 #include <cstdint>
 #include <cstdlib>
 #include <memory>
 #include <vector>
 
+#include "gtest/gtest.h"
+
 #include "arrow/array.h"
 #include "arrow/test-util.h"
 #include "arrow/type.h"
-#include "arrow/types/integer.h"
 #include "arrow/types/primitive.h"
 #include "arrow/util/buffer.h"
 #include "arrow/util/memory-pool.h"
-#include "arrow/util/status.h"
 
 namespace arrow {
 
-static TypePtr int32 = TypePtr(new Int32Type());
-
 class TestArray : public ::testing::Test {
  public:
   void SetUp() {
-    pool_ = GetDefaultMemoryPool();
+    pool_ = default_memory_pool();
   }
 
  protected:
@@ -75,10 +71,10 @@ TEST_F(TestArray, TestIsNull) {
     if (x > 0) ++null_count;
   }
 
-  std::shared_ptr<Buffer> null_buf = bytes_to_null_buffer(nulls.data(),
+  std::shared_ptr<Buffer> null_buf = test::bytes_to_null_buffer(nulls.data(),
       nulls.size());
   std::unique_ptr<Array> arr;
-  arr.reset(new Array(int32, nulls.size(), null_count, null_buf));
+  arr.reset(new Int32Array(nulls.size(), nullptr, null_count, null_buf));
 
   ASSERT_EQ(null_count, arr->null_count());
   ASSERT_EQ(5, null_buf->size());

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index ee4ef66..5a5bc10 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -28,11 +28,6 @@ namespace arrow {
 
 Array::Array(const TypePtr& type, int32_t length, int32_t null_count,
     const std::shared_ptr<Buffer>& nulls) {
-  Init(type, length, null_count, nulls);
-}
-
-void Array::Init(const TypePtr& type, int32_t length, int32_t null_count,
-    const std::shared_ptr<Buffer>& nulls) {
   type_ = type;
   length_ = length;
   null_count_ = null_count;
@@ -42,4 +37,25 @@ void Array::Init(const TypePtr& type, int32_t length, int32_t null_count,
   }
 }
 
+bool Array::EqualsExact(const Array& other) const {
+  if (this == &other) return true;
+  if (length_ != other.length_ || null_count_ != other.null_count_ ||
+      type_enum() != other.type_enum()) {
+    return false;
+  }
+  if (null_count_ > 0) {
+    return nulls_->Equals(*other.nulls_, util::bytes_for_bits(length_));
+  } else {
+    return true;
+  }
+}
+
+bool NullArray::Equals(const std::shared_ptr<Array>& arr) const {
+  if (this == arr.get()) return true;
+  if (Type::NA != arr->type_enum()) {
+    return false;
+  }
+  return arr->length() == length_;
+}
+
 } // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 85e853e..65fc0aa 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -40,20 +40,11 @@ class Buffer;
 // explicitly increment its reference count
 class Array {
  public:
-  Array() :
-      null_count_(0),
-      length_(0),
-      nulls_(nullptr),
-      null_bits_(nullptr) {}
-
   Array(const TypePtr& type, int32_t length, int32_t null_count = 0,
       const std::shared_ptr<Buffer>& nulls = nullptr);
 
   virtual ~Array() {}
 
-  void Init(const TypePtr& type, int32_t length, int32_t null_count,
-      const std::shared_ptr<Buffer>& nulls);
-
   // Determine if a slot is null. For inner loops. Does *not* boundscheck
   bool IsNull(int i) const {
     return null_count_ > 0 && util::get_bit(null_bits_, i);
@@ -63,12 +54,15 @@ class Array {
   int32_t null_count() const { return null_count_;}
 
   const std::shared_ptr<DataType>& type() const { return type_;}
-  LogicalType::type logical_type() const { return type_->type;}
+  Type::type type_enum() const { return type_->type;}
 
   const std::shared_ptr<Buffer>& nulls() const {
     return nulls_;
   }
 
+  bool EqualsExact(const Array& arr) const;
+  virtual bool Equals(const std::shared_ptr<Array>& arr) const = 0;
+
  protected:
   TypePtr type_;
   int32_t null_count_;
@@ -78,9 +72,22 @@ class Array {
   const uint8_t* null_bits_;
 
  private:
+  Array() {}
   DISALLOW_COPY_AND_ASSIGN(Array);
 };
 
+// Degenerate null type Array
+class NullArray : public Array {
+ public:
+  NullArray(const std::shared_ptr<DataType>& type, int32_t length) :
+      Array(type, length, length, nullptr) {}
+
+  explicit NullArray(int32_t length) :
+      NullArray(std::make_shared<NullType>(), length) {}
+
+  bool Equals(const std::shared_ptr<Array>& arr) const override;
+};
+
 typedef std::shared_ptr<Array> ArrayPtr;
 
 } // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 8cc689c..d5d1fdf 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -99,7 +99,7 @@ class ArrayBuilder {
   int32_t capacity_;
 
   // Child value array builders. These are owned by this class
-  std::vector<std::unique_ptr<ArrayBuilder> > children_;
+  std::vector<std::unique_ptr<ArrayBuilder>> children_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(ArrayBuilder);

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/column-benchmark.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column-benchmark.cc b/cpp/src/arrow/column-benchmark.cc
new file mode 100644
index 0000000..69ee52c
--- /dev/null
+++ b/cpp/src/arrow/column-benchmark.cc
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include "benchmark/benchmark.h"
+
+#include "arrow/test-util.h"
+#include "arrow/types/primitive.h"
+#include "arrow/util/memory-pool.h"
+
+namespace arrow {
+namespace {
+  template <typename ArrayType>
+  std::shared_ptr<Array> MakePrimitive(int32_t length, int32_t null_count = 0) {
+    auto pool = default_memory_pool();
+    auto data = std::make_shared<PoolBuffer>(pool);
+    auto nulls = std::make_shared<PoolBuffer>(pool);
+    data->Resize(length * sizeof(typename ArrayType::value_type));
+    nulls->Resize(util::bytes_for_bits(length));
+    return std::make_shared<ArrayType>(length, data, 10, nulls);
+  }
+}  // anonymous namespace
+
+
+static void BM_BuildInt32ColumnByChunk(benchmark::State& state) { //NOLINT non-const reference
+  ArrayVector arrays;
+  for (int chunk_n = 0; chunk_n < state.range_x(); ++chunk_n) {
+    arrays.push_back(MakePrimitive<Int32Array>(100, 10));
+  }
+  const auto INT32 = std::make_shared<Int32Type>();
+  const auto field = std::make_shared<Field>("c0", INT32);
+  std::unique_ptr<Column> column;
+  while (state.KeepRunning()) {
+    column.reset(new Column(field, arrays));
+  }
+}
+
+BENCHMARK(BM_BuildInt32ColumnByChunk)->Range(5, 50000);
+
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/column-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column-test.cc b/cpp/src/arrow/column-test.cc
new file mode 100644
index 0000000..0630785
--- /dev/null
+++ b/cpp/src/arrow/column-test.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "arrow/column.h"
+#include "arrow/schema.h"
+#include "arrow/test-util.h"
+#include "arrow/type.h"
+#include "arrow/types/primitive.h"
+
+using std::shared_ptr;
+using std::vector;
+
+namespace arrow {
+
+const auto INT32 = std::make_shared<Int32Type>();
+
+class TestColumn : public TestBase {
+ protected:
+  std::shared_ptr<ChunkedArray> data_;
+  std::unique_ptr<Column> column_;
+};
+
+TEST_F(TestColumn, BasicAPI) {
+  ArrayVector arrays;
+  arrays.push_back(MakePrimitive<Int32Array>(100));
+  arrays.push_back(MakePrimitive<Int32Array>(100, 10));
+  arrays.push_back(MakePrimitive<Int32Array>(100, 20));
+
+  auto field = std::make_shared<Field>("c0", INT32);
+  column_.reset(new Column(field, arrays));
+
+  ASSERT_EQ("c0", column_->name());
+  ASSERT_TRUE(column_->type()->Equals(INT32));
+  ASSERT_EQ(300, column_->length());
+  ASSERT_EQ(30, column_->null_count());
+  ASSERT_EQ(3, column_->data()->num_chunks());
+}
+
+TEST_F(TestColumn, ChunksInhomogeneous) {
+  ArrayVector arrays;
+  arrays.push_back(MakePrimitive<Int32Array>(100));
+  arrays.push_back(MakePrimitive<Int32Array>(100, 10));
+
+  auto field = std::make_shared<Field>("c0", INT32);
+  column_.reset(new Column(field, arrays));
+
+  ASSERT_OK(column_->ValidateData());
+
+  arrays.push_back(MakePrimitive<Int16Array>(100, 10));
+  column_.reset(new Column(field, arrays));
+  ASSERT_RAISES(Invalid, column_->ValidateData());
+}
+
+} // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/column.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column.cc b/cpp/src/arrow/column.cc
new file mode 100644
index 0000000..46acf8d
--- /dev/null
+++ b/cpp/src/arrow/column.cc
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/column.h"
+
+#include <memory>
+#include <sstream>
+
+#include "arrow/array.h"
+#include "arrow/type.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+
+ChunkedArray::ChunkedArray(const ArrayVector& chunks) :
+    chunks_(chunks) {
+  length_ = 0;
+  null_count_ = 0;
+  for (const std::shared_ptr<Array>& chunk : chunks) {
+    length_ += chunk->length();
+    null_count_ += chunk->null_count();
+  }
+}
+
+Column::Column(const std::shared_ptr<Field>& field, const ArrayVector& chunks) :
+    field_(field) {
+  data_ = std::make_shared<ChunkedArray>(chunks);
+}
+
+Column::Column(const std::shared_ptr<Field>& field,
+    const std::shared_ptr<Array>& data) :
+    field_(field) {
+  data_ = std::make_shared<ChunkedArray>(ArrayVector({data}));
+}
+
+Column::Column(const std::shared_ptr<Field>& field,
+    const std::shared_ptr<ChunkedArray>& data) :
+    field_(field),
+    data_(data) {}
+
+Status Column::ValidateData() {
+  for (int i = 0; i < data_->num_chunks(); ++i) {
+    const std::shared_ptr<DataType>& type = data_->chunk(i)->type();
+    if (!this->type()->Equals(type)) {
+      std::stringstream ss;
+      ss << "In chunk " << i << " expected type "
+         << this->type()->ToString()
+         << " but saw "
+         << type->ToString();
+      return Status::Invalid(ss.str());
+    }
+  }
+  return Status::OK();
+}
+
+} // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/column.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column.h b/cpp/src/arrow/column.h
new file mode 100644
index 0000000..1ad97b2
--- /dev/null
+++ b/cpp/src/arrow/column.h
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_COLUMN_H
+#define ARROW_COLUMN_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/type.h"
+
+namespace arrow {
+
+class Array;
+class Status;
+
+typedef std::vector<std::shared_ptr<Array>> ArrayVector;
+
+// A data structure managing a list of primitive Arrow arrays logically as one
+// large array
+class ChunkedArray {
+ public:
+  explicit ChunkedArray(const ArrayVector& chunks);
+
+  // @returns: the total length of the chunked array; computed on construction
+  int64_t length() const {
+    return length_;
+  }
+
+  int64_t null_count() const {
+    return null_count_;
+  }
+
+  int num_chunks() const {
+    return chunks_.size();
+  }
+
+  const std::shared_ptr<Array>& chunk(int i) const {
+    return chunks_[i];
+  }
+
+ protected:
+  ArrayVector chunks_;
+  int64_t length_;
+  int64_t null_count_;
+};
+
+// An immutable column data structure consisting of a field (type metadata) and
+// a logical chunked data array (which can be validated as all being the same
+// type).
+class Column {
+ public:
+  Column(const std::shared_ptr<Field>& field, const ArrayVector& chunks);
+  Column(const std::shared_ptr<Field>& field,
+      const std::shared_ptr<ChunkedArray>& data);
+
+  Column(const std::shared_ptr<Field>& field, const std::shared_ptr<Array>& data);
+
+  int64_t length() const {
+    return data_->length();
+  }
+
+  int64_t null_count() const {
+    return data_->null_count();
+  }
+
+  // @returns: the column's name in the passed metadata
+  const std::string& name() const {
+    return field_->name;
+  }
+
+  // @returns: the column's type according to the metadata
+  const std::shared_ptr<DataType>& type() const {
+    return field_->type;
+  }
+
+  // @returns: the column's data as a chunked logical array
+  const std::shared_ptr<ChunkedArray>& data() const {
+    return data_;
+  }
+  // Verify that the column's array data is consistent with the passed field's
+  // metadata
+  Status ValidateData();
+
+ protected:
+  std::shared_ptr<Field> field_;
+  std::shared_ptr<ChunkedArray> data_;
+};
+
+} // namespace arrow
+
+#endif  // ARROW_COLUMN_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/ipc/.gitignore
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/.gitignore b/cpp/src/arrow/ipc/.gitignore
new file mode 100644
index 0000000..8150d7e
--- /dev/null
+++ b/cpp/src/arrow/ipc/.gitignore
@@ -0,0 +1 @@
+*_generated.h
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
new file mode 100644
index 0000000..383684f
--- /dev/null
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#######################################
+# arrow_ipc
+#######################################
+
+# Headers: top level
+install(FILES
+  adapter.h
+  metadata.h
+  memory.h
+  DESTINATION include/arrow/ipc)
+
+ADD_ARROW_TEST(ipc-adapter-test)
+ADD_ARROW_TEST(ipc-memory-test)
+ADD_ARROW_TEST(ipc-metadata-test)
+
+# make clean will delete the generated file
+set_source_files_properties(Metadata_generated.h PROPERTIES GENERATED TRUE)
+
+set(OUTPUT_DIR ${CMAKE_SOURCE_DIR}/src/arrow/ipc)
+set(FBS_OUTPUT_FILES "${OUTPUT_DIR}/Message_generated.h")
+
+set(FBS_SRC ${CMAKE_SOURCE_DIR}/../format/Message.fbs)
+get_filename_component(ABS_FBS_SRC ${FBS_SRC} ABSOLUTE)
+
+add_custom_command(
+  OUTPUT ${FBS_OUTPUT_FILES}
+  COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${ABS_FBS_SRC}
+  DEPENDS ${ABS_FBS_SRC}
+  COMMENT "Running flatc compiler on ${FBS_SRC}"
+  VERBATIM
+)
+
+add_custom_target(metadata_fbs DEPENDS ${FBS_OUTPUT_FILES})
+add_dependencies(arrow metadata_fbs)

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
new file mode 100644
index 0000000..7cdb965
--- /dev/null
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -0,0 +1,305 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/ipc/adapter.h"
+
+#include <cstdint>
+#include <cstring>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/ipc/memory.h"
+#include "arrow/ipc/Message_generated.h"
+#include "arrow/ipc/metadata.h"
+#include "arrow/ipc/metadata-internal.h"
+#include "arrow/schema.h"
+#include "arrow/table.h"
+#include "arrow/type.h"
+#include "arrow/types/construct.h"
+#include "arrow/types/primitive.h"
+#include "arrow/util/buffer.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+
+namespace flatbuf = apache::arrow::flatbuf;
+
+namespace ipc {
+
+static bool IsPrimitive(const DataType* type) {
+  switch (type->type) {
+    // NA is null type or "no type", considered primitive for now
+    case Type::NA:
+    case Type::BOOL:
+    case Type::UINT8:
+    case Type::INT8:
+    case Type::UINT16:
+    case Type::INT16:
+    case Type::UINT32:
+    case Type::INT32:
+    case Type::UINT64:
+    case Type::INT64:
+    case Type::FLOAT:
+    case Type::DOUBLE:
+      return true;
+    default:
+      return false;
+  }
+}
+
+// ----------------------------------------------------------------------
+// Row batch write path
+
+Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes,
+    std::vector<std::shared_ptr<Buffer>>* buffers) {
+  if (IsPrimitive(arr->type().get())) {
+    const PrimitiveArray* prim_arr = static_cast<const PrimitiveArray*>(arr);
+
+    field_nodes->push_back(
+        flatbuf::FieldNode(prim_arr->length(), prim_arr->null_count()));
+
+    if (prim_arr->null_count() > 0) {
+      buffers->push_back(prim_arr->nulls());
+    } else {
+      // Push a dummy zero-length buffer, not to be copied
+      buffers->push_back(std::make_shared<Buffer>(nullptr, 0));
+    }
+    buffers->push_back(prim_arr->data());
+  } else if (arr->type_enum() == Type::LIST) {
+    // TODO(wesm)
+    return Status::NotImplemented("List type");
+  } else if (arr->type_enum() == Type::STRUCT) {
+    // TODO(wesm)
+    return Status::NotImplemented("Struct type");
+  }
+
+  return Status::OK();
+}
+
+class RowBatchWriter {
+ public:
+  explicit RowBatchWriter(const RowBatch* batch) :
+      batch_(batch) {}
+
+  Status AssemblePayload() {
+    // Perform depth-first traversal of the row-batch
+    for (int i = 0; i < batch_->num_columns(); ++i) {
+      const Array* arr = batch_->column(i).get();
+      RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_));
+    }
+    return Status::OK();
+  }
+
+  Status Write(MemorySource* dst, int64_t position, int64_t* data_header_offset) {
+    // Write out all the buffers contiguously and compute the total size of the
+    // memory payload
+    int64_t offset = 0;
+    for (size_t i = 0; i < buffers_.size(); ++i) {
+      const Buffer* buffer = buffers_[i].get();
+      int64_t size = buffer->size();
+
+      // TODO(wesm): We currently have no notion of shared memory page id's,
+      // but we've included it in the metadata IDL for when we have it in the
+      // future. Use page=0 for now
+      //
+      // Note that page ids are a bespoke notion for Arrow and not a feature we
+      // are using from any OS-level shared memory. The thought is that systems
+      // may (in the future) associate integer page id's with physical memory
+      // pages (according to whatever is the desired shared memory mechanism)
+      buffer_meta_.push_back(flatbuf::Buffer(0, position + offset, size));
+
+      if (size > 0) {
+        RETURN_NOT_OK(dst->Write(position + offset, buffer->data(), size));
+        offset += size;
+      }
+    }
+
+    // Now that we have computed the locations of all of the buffers in shared
+    // memory, the data header can be converted to a flatbuffer and written out
+    //
+    // Note: The memory written here is prefixed by the size of the flatbuffer
+    // itself as an int32_t. On reading from a MemorySource, you will have to
+    // determine the data header size then request a buffer such that you can
+    // construct the flatbuffer data accessor object (see arrow::ipc::Message)
+    std::shared_ptr<Buffer> data_header;
+    RETURN_NOT_OK(WriteDataHeader(batch_->num_rows(), offset,
+            field_nodes_, buffer_meta_, &data_header));
+
+    // Write the data header at the end
+    RETURN_NOT_OK(dst->Write(position + offset, data_header->data(),
+            data_header->size()));
+
+    *data_header_offset = position + offset;
+    return Status::OK();
+  }
+
+  // This must be called after invoking AssemblePayload
+  int64_t DataHeaderSize() {
+    // TODO(wesm): In case it is needed, compute the upper bound for the size
+    // of the buffer containing the flatbuffer data header.
+    return 0;
+  }
+
+  // Total footprint of buffers. This must be called after invoking
+  // AssemblePayload
+  int64_t TotalBytes() {
+    int64_t total = 0;
+    for (const std::shared_ptr<Buffer>& buffer : buffers_) {
+      total += buffer->size();
+    }
+    return total;
+  }
+
+ private:
+  const RowBatch* batch_;
+
+  std::vector<flatbuf::FieldNode> field_nodes_;
+  std::vector<flatbuf::Buffer> buffer_meta_;
+  std::vector<std::shared_ptr<Buffer>> buffers_;
+};
+
+Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position,
+    int64_t* header_offset) {
+  RowBatchWriter serializer(batch);
+  RETURN_NOT_OK(serializer.AssemblePayload());
+  return serializer.Write(dst, position, header_offset);
+}
+// ----------------------------------------------------------------------
+// Row batch read path
+
+static constexpr int64_t INIT_METADATA_SIZE = 4096;
+
+class RowBatchReader::Impl {
+ public:
+  Impl(MemorySource* source, const std::shared_ptr<RecordBatchMessage>& metadata) :
+      source_(source),
+      metadata_(metadata) {
+    num_buffers_ = metadata->num_buffers();
+    num_flattened_fields_ = metadata->num_fields();
+  }
+
+  Status AssembleBatch(const std::shared_ptr<Schema>& schema,
+      std::shared_ptr<RowBatch>* out) {
+    std::vector<std::shared_ptr<Array>> arrays(schema->num_fields());
+
+    // The field_index and buffer_index are incremented in NextArray based on
+    // how much of the batch is "consumed" (through nested data reconstruction,
+    // for example)
+    field_index_ = 0;
+    buffer_index_ = 0;
+    for (int i = 0; i < schema->num_fields(); ++i) {
+      const Field* field = schema->field(i).get();
+      RETURN_NOT_OK(NextArray(field, &arrays[i]));
+    }
+
+    *out = std::make_shared<RowBatch>(schema, metadata_->length(),
+        arrays);
+    return Status::OK();
+  }
+
+ private:
+  // Traverse the flattened record batch metadata and reassemble the
+  // corresponding array containers
+  Status NextArray(const Field* field, std::shared_ptr<Array>* out) {
+    const std::shared_ptr<DataType>& type = field->type;
+
+    // pop off a field
+    if (field_index_ >= num_flattened_fields_) {
+      return Status::Invalid("Ran out of field metadata, likely malformed");
+    }
+
+    // This only contains the length and null count, which we need to figure
+    // out what to do with the buffers. For example, if null_count == 0, then
+    // we can skip that buffer without reading from shared memory
+    FieldMetadata field_meta = metadata_->field(field_index_++);
+
+    if (IsPrimitive(type.get())) {
+      std::shared_ptr<Buffer> nulls;
+      std::shared_ptr<Buffer> data;
+      if (field_meta.null_count == 0) {
+        nulls = nullptr;
+        ++buffer_index_;
+      } else {
+        RETURN_NOT_OK(GetBuffer(buffer_index_++, &nulls));
+      }
+      if (field_meta.length > 0) {
+        RETURN_NOT_OK(GetBuffer(buffer_index_++, &data));
+      } else {
+        data.reset(new Buffer(nullptr, 0));
+      }
+      return MakePrimitiveArray(type, field_meta.length, data,
+          field_meta.null_count, nulls, out);
+    } else {
+      return Status::NotImplemented("Non-primitive types not complete yet");
+    }
+  }
+
+  Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
+    BufferMetadata metadata = metadata_->buffer(buffer_index);
+    return source_->ReadAt(metadata.offset, metadata.length, out);
+  }
+
+  MemorySource* source_;
+  std::shared_ptr<RecordBatchMessage> metadata_;
+
+  int field_index_;
+  int buffer_index_;
+  int num_buffers_;
+  int num_flattened_fields_;
+};
+
+Status RowBatchReader::Open(MemorySource* source, int64_t position,
+    std::shared_ptr<RowBatchReader>* out) {
+  std::shared_ptr<Buffer> metadata;
+  RETURN_NOT_OK(source->ReadAt(position, INIT_METADATA_SIZE, &metadata));
+
+  int32_t metadata_size = *reinterpret_cast<const int32_t*>(metadata->data());
+
+  // We may not need to call source->ReadAt again
+  if (metadata_size > static_cast<int>(INIT_METADATA_SIZE - sizeof(int32_t))) {
+    // We don't have enough data, read the indicated metadata size.
+    RETURN_NOT_OK(source->ReadAt(position + sizeof(int32_t),
+            metadata_size, &metadata));
+  }
+
+  // TODO(wesm): buffer slicing here would be better in case ReadAt returns
+  // allocated memory
+
+  std::shared_ptr<Message> message;
+  RETURN_NOT_OK(Message::Open(metadata, &message));
+
+  if (message->type() != Message::RECORD_BATCH) {
+    return Status::Invalid("Metadata message is not a record batch");
+  }
+
+  std::shared_ptr<RecordBatchMessage> batch_meta = message->GetRecordBatch();
+
+  std::shared_ptr<RowBatchReader> result(new RowBatchReader());
+  result->impl_.reset(new Impl(source, batch_meta));
+  *out = result;
+
+  return Status::OK();
+}
+
+Status RowBatchReader::GetRowBatch(const std::shared_ptr<Schema>& schema,
+    std::shared_ptr<RowBatch>* out) {
+  return impl_->AssembleBatch(schema, out);
+}
+
+
+} // namespace ipc
+} // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
new file mode 100644
index 0000000..26dea6d
--- /dev/null
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Public API for writing and accessing (with zero copy, if possible) Arrow
+// data in shared memory
+
+#ifndef ARROW_IPC_ADAPTER_H
+#define ARROW_IPC_ADAPTER_H
+
+#include <cstdint>
+#include <memory>
+
+namespace arrow {
+
+class Array;
+class RowBatch;
+class Schema;
+class Status;
+
+namespace ipc {
+
+class MemorySource;
+class RecordBatchMessage;
+
+// ----------------------------------------------------------------------
+// Write path
+
+// Write the RowBatch (collection of equal-length Arrow arrays) to the memory
+// source at the indicated position
+//
+// First, each of the memory buffers are written out end-to-end in starting at
+// the indicated position.
+//
+// Then, this function writes the batch metadata as a flatbuffer (see
+// format/Message.fbs -- the RecordBatch message type) like so:
+//
+// <int32: metadata size> <uint8*: metadata>
+//
+// Finally, the memory offset to the start of the metadata / data header is
+// returned in an out-variable
+Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position,
+    int64_t* header_offset);
+
+// int64_t GetRowBatchMetadata(const RowBatch* batch);
+
+// Compute the precise number of bytes needed in a contiguous memory segment to
+// write the row batch. This involves generating the complete serialized
+// Flatbuffers metadata.
+int64_t GetRowBatchSize(const RowBatch* batch);
+
+// ----------------------------------------------------------------------
+// "Read" path; does not copy data if the MemorySource does not
+
+class RowBatchReader {
+ public:
+  static Status Open(MemorySource* source, int64_t position,
+      std::shared_ptr<RowBatchReader>* out);
+
+  // Reassemble the row batch. A Schema is required to be able to construct the
+  // right array containers
+  Status GetRowBatch(const std::shared_ptr<Schema>& schema,
+      std::shared_ptr<RowBatch>* out);
+
+ private:
+  class Impl;
+  std::unique_ptr<Impl> impl_;
+};
+
+} // namespace ipc
+} // namespace arrow
+
+#endif // ARROW_IPC_MEMORY_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
new file mode 100644
index 0000000..d75998f
--- /dev/null
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <limits>
+#include <memory>
+#include <random>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "arrow/ipc/adapter.h"
+#include "arrow/ipc/memory.h"
+#include "arrow/ipc/test-common.h"
+
+#include "arrow/test-util.h"
+#include "arrow/types/primitive.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/buffer.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace ipc {
+
+class TestWriteRowBatch : public ::testing::Test, public MemoryMapFixture {
+ public:
+  void SetUp() {
+    pool_ = default_memory_pool();
+  }
+  void TearDown() {
+    MemoryMapFixture::TearDown();
+  }
+
+  void InitMemoryMap(int64_t size) {
+    std::string path = "test-write-row-batch";
+    MemoryMapFixture::CreateFile(path, size);
+    ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &mmap_));
+  }
+
+ protected:
+  MemoryPool* pool_;
+  std::shared_ptr<MemoryMappedSource> mmap_;
+};
+
+const auto INT32 = std::make_shared<Int32Type>();
+
+TEST_F(TestWriteRowBatch, IntegerRoundTrip) {
+  const int length = 1000;
+
+  // Make the schema
+  auto f0 = std::make_shared<Field>("f0", INT32);
+  auto f1 = std::make_shared<Field>("f1", INT32);
+  std::shared_ptr<Schema> schema(new Schema({f0, f1}));
+
+  // Example data
+
+  auto data = std::make_shared<PoolBuffer>(pool_);
+  ASSERT_OK(data->Resize(length * sizeof(int32_t)));
+  test::rand_uniform_int(length, 0, 0, std::numeric_limits<int32_t>::max(),
+      reinterpret_cast<int32_t*>(data->mutable_data()));
+
+  auto nulls = std::make_shared<PoolBuffer>(pool_);
+  int null_bytes = util::bytes_for_bits(length);
+  ASSERT_OK(nulls->Resize(null_bytes));
+  test::random_bytes(null_bytes, 0, nulls->mutable_data());
+
+  auto a0 = std::make_shared<Int32Array>(length, data);
+  auto a1 = std::make_shared<Int32Array>(length, data,
+      test::bitmap_popcount(nulls->data(), length), nulls);
+
+  RowBatch batch(schema, length, {a0, a1});
+
+  // TODO(wesm): computing memory requirements for a row batch
+  // 64k is plenty of space
+  InitMemoryMap(1 << 16);
+
+  int64_t header_location;
+  ASSERT_OK(WriteRowBatch(mmap_.get(), &batch, 0, &header_location));
+
+  std::shared_ptr<RowBatchReader> result;
+  ASSERT_OK(RowBatchReader::Open(mmap_.get(), header_location, &result));
+
+  std::shared_ptr<RowBatch> batch_result;
+  ASSERT_OK(result->GetRowBatch(schema, &batch_result));
+  EXPECT_EQ(batch.num_rows(), batch_result->num_rows());
+
+  for (int i = 0; i < batch.num_columns(); ++i) {
+    EXPECT_TRUE(batch.column(i)->Equals(batch_result->column(i)))
+      << i << batch.column_name(i);
+  }
+}
+
+} // namespace ipc
+} // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/ipc/ipc-memory-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-memory-test.cc b/cpp/src/arrow/ipc/ipc-memory-test.cc
new file mode 100644
index 0000000..332ad2a
--- /dev/null
+++ b/cpp/src/arrow/ipc/ipc-memory-test.cc
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "arrow/ipc/memory.h"
+#include "arrow/ipc/test-common.h"
+#include "arrow/test-util.h"
+#include "arrow/util/buffer.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace ipc {
+
+class TestMemoryMappedSource : public ::testing::Test, public MemoryMapFixture {
+ public:
+  void TearDown() {
+    MemoryMapFixture::TearDown();
+  }
+};
+
+TEST_F(TestMemoryMappedSource, InvalidUsages) {
+}
+
+TEST_F(TestMemoryMappedSource, WriteRead) {
+  const int64_t buffer_size = 1024;
+  std::vector<uint8_t> buffer(buffer_size);
+
+  test::random_bytes(1024, 0, buffer.data());
+
+  const int reps = 5;
+
+  std::string path = "ipc-write-read-test";
+  CreateFile(path, reps * buffer_size);
+
+  std::shared_ptr<MemoryMappedSource> result;
+  ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &result));
+
+  int64_t position = 0;
+
+  std::shared_ptr<Buffer> out_buffer;
+  for (int i = 0; i < reps; ++i) {
+    ASSERT_OK(result->Write(position, buffer.data(), buffer_size));
+    ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer));
+
+    ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
+
+    position += buffer_size;
+  }
+}
+
+TEST_F(TestMemoryMappedSource, InvalidFile) {
+  std::string non_existent_path = "invalid-file-name-asfd";
+
+  std::shared_ptr<MemoryMappedSource> result;
+  ASSERT_RAISES(IOError, MemoryMappedSource::Open(non_existent_path,
+          MemorySource::READ_ONLY, &result));
+}
+
+} // namespace ipc
+} // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/ipc/ipc-metadata-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc
new file mode 100644
index 0000000..ceabec0
--- /dev/null
+++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <sstream>
+#include <string>
+
+#include "gtest/gtest.h"
+
+#include "arrow/ipc/metadata.h"
+#include "arrow/schema.h"
+#include "arrow/test-util.h"
+#include "arrow/type.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+
+class Buffer;
+
+static inline void assert_schema_equal(const Schema* lhs, const Schema* rhs) {
+  if (!lhs->Equals(*rhs)) {
+    std::stringstream ss;
+    ss << "left schema: " << lhs->ToString() << std::endl
+       << "right schema: " << rhs->ToString() << std::endl;
+    FAIL() << ss.str();
+  }
+}
+
+class TestSchemaMessage : public ::testing::Test {
+ public:
+  void SetUp() {}
+
+  void CheckRoundtrip(const Schema* schema) {
+    std::shared_ptr<Buffer> buffer;
+    ASSERT_OK(ipc::WriteSchema(schema, &buffer));
+
+    std::shared_ptr<ipc::Message> message;
+    ASSERT_OK(ipc::Message::Open(buffer, &message));
+
+    ASSERT_EQ(ipc::Message::SCHEMA, message->type());
+
+    std::shared_ptr<ipc::SchemaMessage> schema_msg = message->GetSchema();
+    ASSERT_EQ(schema->num_fields(), schema_msg->num_fields());
+
+    std::shared_ptr<Schema> schema2;
+    ASSERT_OK(schema_msg->GetSchema(&schema2));
+
+    assert_schema_equal(schema, schema2.get());
+  }
+};
+
+const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
+
+TEST_F(TestSchemaMessage, PrimitiveFields) {
+  auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
+  auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>());
+  auto f2 = std::make_shared<Field>("f2", std::make_shared<Int32Type>());
+  auto f3 = std::make_shared<Field>("f3", std::make_shared<Int64Type>());
+  auto f4 = std::make_shared<Field>("f4", std::make_shared<UInt8Type>());
+  auto f5 = std::make_shared<Field>("f5", std::make_shared<UInt16Type>());
+  auto f6 = std::make_shared<Field>("f6", std::make_shared<UInt32Type>());
+  auto f7 = std::make_shared<Field>("f7", std::make_shared<UInt64Type>());
+  auto f8 = std::make_shared<Field>("f8", std::make_shared<FloatType>());
+  auto f9 = std::make_shared<Field>("f9", std::make_shared<DoubleType>());
+  auto f10 = std::make_shared<Field>("f10", std::make_shared<BooleanType>());
+
+  Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10});
+  CheckRoundtrip(&schema);
+}
+
+TEST_F(TestSchemaMessage, NestedFields) {
+  auto type = std::make_shared<ListType>(std::make_shared<Int32Type>());
+  auto f0 = std::make_shared<Field>("f0", type);
+
+  std::shared_ptr<StructType> type2(new StructType({
+            std::make_shared<Field>("k1", INT32),
+              std::make_shared<Field>("k2", INT32),
+              std::make_shared<Field>("k3", INT32)}));
+  auto f1 = std::make_shared<Field>("f1", type2);
+
+  Schema schema({f0, f1});
+  CheckRoundtrip(&schema);
+}
+
+} // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/ipc/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/memory.cc b/cpp/src/arrow/ipc/memory.cc
new file mode 100644
index 0000000..e630ccd
--- /dev/null
+++ b/cpp/src/arrow/ipc/memory.cc
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/ipc/memory.h"
+
+#include <sys/mman.h> // For memory-mapping
+#include <algorithm>
+#include <cerrno>
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <sstream>
+#include <string>
+
+#include "arrow/util/buffer.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace ipc {
+
+MemorySource::MemorySource(AccessMode access_mode) :
+    access_mode_(access_mode) {}
+
+MemorySource::~MemorySource() {}
+
+// Implement MemoryMappedSource
+
+class MemoryMappedSource::Impl {
+ public:
+  Impl() :
+      file_(nullptr),
+      is_open_(false),
+      data_(nullptr) {}
+
+  ~Impl() {
+    if (is_open_) {
+      munmap(data_, size_);
+      fclose(file_);
+    }
+  }
+
+  Status Open(const std::string& path, MemorySource::AccessMode mode) {
+    if (is_open_) {
+      return Status::IOError("A file is already open");
+    }
+
+    path_ = path;
+
+    if (mode == MemorySource::READ_WRITE) {
+      file_ = fopen(path.c_str(), "r+b");
+    } else {
+      file_ = fopen(path.c_str(), "rb");
+    }
+    if (file_ == nullptr) {
+      std::stringstream ss;
+      ss << "Unable to open file, errno: " << errno;
+      return Status::IOError(ss.str());
+    }
+
+    fseek(file_, 0L, SEEK_END);
+    if (ferror(file_)) {
+      return Status::IOError("Unable to seek to end of file");
+    }
+    size_ = ftell(file_);
+
+    fseek(file_, 0L, SEEK_SET);
+    is_open_ = true;
+
+    // TODO(wesm): Add read-only version of this
+    data_ = reinterpret_cast<uint8_t*>(mmap(nullptr, size_,
+            PROT_READ | PROT_WRITE,
+            MAP_SHARED, fileno(file_), 0));
+    if (data_ == nullptr) {
+      std::stringstream ss;
+      ss << "Memory mapping file failed, errno: " << errno;
+      return Status::IOError(ss.str());
+    }
+
+    return Status::OK();
+  }
+
+  int64_t size() const {
+    return size_;
+  }
+
+  uint8_t* data() {
+    return data_;
+  }
+
+ private:
+  std::string path_;
+  FILE* file_;
+  int64_t size_;
+  bool is_open_;
+
+  // The memory map
+  uint8_t* data_;
+};
+
+MemoryMappedSource::MemoryMappedSource(AccessMode access_mode) :
+    MemorySource(access_mode) {}
+
+Status MemoryMappedSource::Open(const std::string& path, AccessMode access_mode,
+    std::shared_ptr<MemoryMappedSource>* out) {
+  std::shared_ptr<MemoryMappedSource> result(new MemoryMappedSource(access_mode));
+
+  result->impl_.reset(new Impl());
+  RETURN_NOT_OK(result->impl_->Open(path, access_mode));
+
+  *out = result;
+  return Status::OK();
+}
+
+int64_t MemoryMappedSource::Size() const {
+  return impl_->size();
+}
+
+Status MemoryMappedSource::Close() {
+  // munmap handled in ::Impl dtor
+  return Status::OK();
+}
+
+Status MemoryMappedSource::ReadAt(int64_t position, int64_t nbytes,
+    std::shared_ptr<Buffer>* out) {
+  if (position < 0 || position >= impl_->size()) {
+    return Status::Invalid("position is out of bounds");
+  }
+
+  nbytes = std::min(nbytes, impl_->size() - position);
+  *out = std::make_shared<Buffer>(impl_->data() + position, nbytes);
+  return Status::OK();
+}
+
+Status MemoryMappedSource::Write(int64_t position, const uint8_t* data,
+    int64_t nbytes) {
+  if (position < 0 || position >= impl_->size()) {
+    return Status::Invalid("position is out of bounds");
+  }
+
+  // TODO(wesm): verify we are not writing past the end of the buffer
+  uint8_t* dst = impl_->data() + position;
+  memcpy(dst, data, nbytes);
+
+  return Status::OK();
+}
+
+} // namespace ipc
+} // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/ipc/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/memory.h b/cpp/src/arrow/ipc/memory.h
new file mode 100644
index 0000000..0b4d834
--- /dev/null
+++ b/cpp/src/arrow/ipc/memory.h
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Public API for different interprocess memory sharing mechanisms
+
+#ifndef ARROW_IPC_MEMORY_H
+#define ARROW_IPC_MEMORY_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "arrow/util/macros.h"
+
+namespace arrow {
+
+class Buffer;
+class MutableBuffer;
+class Status;
+
+namespace ipc {
+
+// Abstract output stream
+class OutputStream {
+ public:
+  virtual ~OutputStream() {}
+  // Close the output stream
+  virtual Status Close() = 0;
+
+  // The current position in the output stream
+  virtual int64_t Tell() const = 0;
+
+  // Write bytes to the stream
+  virtual Status Write(const uint8_t* data, int64_t length) = 0;
+};
+
+// An output stream that writes to a MutableBuffer, such as one obtained from a
+// memory map
+class BufferOutputStream : public OutputStream {
+ public:
+  explicit BufferOutputStream(const std::shared_ptr<MutableBuffer>& buffer):
+      buffer_(buffer) {}
+
+  // Implement the OutputStream interface
+  Status Close() override;
+  int64_t Tell() const override;
+  Status Write(const uint8_t* data, int64_t length) override;
+
+  // Returns the number of bytes remaining in the buffer
+  int64_t bytes_remaining() const;
+
+ private:
+  std::shared_ptr<MutableBuffer> buffer_;
+  int64_t capacity_;
+  int64_t position_;
+};
+
+class MemorySource {
+ public:
+  // Indicates the access permissions of the memory source
+  enum AccessMode {
+    READ_ONLY,
+    READ_WRITE
+  };
+
+  virtual ~MemorySource();
+
+  // Retrieve a buffer of memory from the source of the indicates size and at
+  // the indicated location
+  // @returns: arrow::Status indicating success / failure. The buffer is set
+  // into the *out argument
+  virtual Status ReadAt(int64_t position, int64_t nbytes,
+      std::shared_ptr<Buffer>* out) = 0;
+
+  virtual Status Close() = 0;
+
+  virtual Status Write(int64_t position, const uint8_t* data, int64_t nbytes) = 0;
+
+  // @return: the size in bytes of the memory source
+  virtual int64_t Size() const = 0;
+
+ protected:
+  explicit MemorySource(AccessMode access_mode = AccessMode::READ_WRITE);
+
+  AccessMode access_mode_;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(MemorySource);
+};
+
+// A memory source that uses memory-mapped files for memory interactions
+class MemoryMappedSource : public MemorySource {
+ public:
+  static Status Open(const std::string& path, AccessMode access_mode,
+      std::shared_ptr<MemoryMappedSource>* out);
+
+  Status Close() override;
+
+  Status ReadAt(int64_t position, int64_t nbytes,
+      std::shared_ptr<Buffer>* out) override;
+
+  Status Write(int64_t position, const uint8_t* data, int64_t nbytes) override;
+
+  // @return: the size in bytes of the memory source
+  int64_t Size() const override;
+
+ private:
+  explicit MemoryMappedSource(AccessMode access_mode);
+  // Hide the internal details of this class for now
+  class Impl;
+  std::unique_ptr<Impl> impl_;
+};
+
+} // namespace ipc
+} // namespace arrow
+
+#endif // ARROW_IPC_MEMORY_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
new file mode 100644
index 0000000..14b1869
--- /dev/null
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/ipc/metadata-internal.h"
+
+#include <flatbuffers/flatbuffers.h>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <sstream>
+#include <string>
+
+#include "arrow/ipc/Message_generated.h"
+#include "arrow/schema.h"
+#include "arrow/type.h"
+#include "arrow/util/buffer.h"
+#include "arrow/util/status.h"
+
+typedef flatbuffers::FlatBufferBuilder FBB;
+typedef flatbuffers::Offset<arrow::flatbuf::Field> FieldOffset;
+typedef flatbuffers::Offset<void> Offset;
+
+namespace arrow {
+
+namespace flatbuf = apache::arrow::flatbuf;
+
+namespace ipc {
+
+const std::shared_ptr<DataType> BOOL = std::make_shared<BooleanType>();
+const std::shared_ptr<DataType> INT8 = std::make_shared<Int8Type>();
+const std::shared_ptr<DataType> INT16 = std::make_shared<Int16Type>();
+const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
+const std::shared_ptr<DataType> INT64 = std::make_shared<Int64Type>();
+const std::shared_ptr<DataType> UINT8 = std::make_shared<UInt8Type>();
+const std::shared_ptr<DataType> UINT16 = std::make_shared<UInt16Type>();
+const std::shared_ptr<DataType> UINT32 = std::make_shared<UInt32Type>();
+const std::shared_ptr<DataType> UINT64 = std::make_shared<UInt64Type>();
+const std::shared_ptr<DataType> FLOAT = std::make_shared<FloatType>();
+const std::shared_ptr<DataType> DOUBLE = std::make_shared<DoubleType>();
+
+static Status IntFromFlatbuffer(const flatbuf::Int* int_data,
+    std::shared_ptr<DataType>* out) {
+  if (int_data->bitWidth() % 8 != 0) {
+    return Status::NotImplemented("Integers not in cstdint are not implemented");
+  } else if (int_data->bitWidth() > 64) {
+    return Status::NotImplemented("Integers with more than 64 bits not implemented");
+  }
+
+  switch (int_data->bitWidth()) {
+    case 8:
+      *out = int_data->is_signed() ? INT8 : UINT8;
+      break;
+    case 16:
+      *out = int_data->is_signed() ? INT16 : UINT16;
+      break;
+    case 32:
+      *out = int_data->is_signed() ? INT32 : UINT32;
+      break;
+    case 64:
+      *out = int_data->is_signed() ? INT64 : UINT64;
+      break;
+    default:
+      *out = nullptr;
+      break;
+  }
+  return Status::OK();
+}
+
+static Status FloatFromFlatuffer(const flatbuf::FloatingPoint* float_data,
+    std::shared_ptr<DataType>* out) {
+  if (float_data->precision() == flatbuf::Precision_SINGLE) {
+    *out = FLOAT;
+  } else {
+    *out = DOUBLE;
+  }
+  return Status::OK();
+}
+
+static Status TypeFromFlatbuffer(flatbuf::Type type,
+    const void* type_data, const std::vector<std::shared_ptr<Field>>& children,
+    std::shared_ptr<DataType>* out) {
+  switch (type) {
+    case flatbuf::Type_NONE:
+      return Status::Invalid("Type metadata cannot be none");
+    case flatbuf::Type_Int:
+      return IntFromFlatbuffer(static_cast<const flatbuf::Int*>(type_data), out);
+    case flatbuf::Type_Bit:
+      return Status::NotImplemented("Type is not implemented");
+    case flatbuf::Type_FloatingPoint:
+      return FloatFromFlatuffer(static_cast<const flatbuf::FloatingPoint*>(type_data),
+          out);
+    case flatbuf::Type_Binary:
+    case flatbuf::Type_Utf8:
+      return Status::NotImplemented("Type is not implemented");
+    case flatbuf::Type_Bool:
+      *out = BOOL;
+      return Status::OK();
+    case flatbuf::Type_Decimal:
+    case flatbuf::Type_Timestamp:
+    case flatbuf::Type_List:
+      if (children.size() != 1) {
+        return Status::Invalid("List must have exactly 1 child field");
+      }
+      *out = std::make_shared<ListType>(children[0]);
+      return Status::OK();
+    case flatbuf::Type_Tuple:
+      *out = std::make_shared<StructType>(children);
+      return Status::OK();
+    case flatbuf::Type_Union:
+      return Status::NotImplemented("Type is not implemented");
+    default:
+      return Status::Invalid("Unrecognized type");
+  }
+}
+
+// Forward declaration
+static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
+    FieldOffset* offset);
+
+static Offset IntToFlatbuffer(FBB& fbb, int bitWidth,
+    bool is_signed) {
+  return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union();
+}
+
+static Offset FloatToFlatbuffer(FBB& fbb,
+    flatbuf::Precision precision) {
+  return flatbuf::CreateFloatingPoint(fbb, precision).Union();
+}
+
+static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+    std::vector<FieldOffset>* out_children, Offset* offset) {
+  FieldOffset field;
+  RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(0), &field));
+  out_children->push_back(field);
+  *offset = flatbuf::CreateList(fbb).Union();
+  return Status::OK();
+}
+
+static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+    std::vector<FieldOffset>* out_children, Offset* offset) {
+  FieldOffset field;
+  for (int i = 0; i < type->num_children(); ++i) {
+    RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), &field));
+    out_children->push_back(field);
+  }
+  *offset = flatbuf::CreateTuple(fbb).Union();
+  return Status::OK();
+}
+
+#define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED)            \
+  *out_type = flatbuf::Type_Int;                        \
+  *offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \
+  break;
+
+
+static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+    std::vector<FieldOffset>* children,
+    flatbuf::Type* out_type, Offset* offset) {
+  switch (type->type) {
+    case Type::BOOL:
+      *out_type = flatbuf::Type_Bool;
+      *offset = flatbuf::CreateBool(fbb).Union();
+      break;
+    case Type::UINT8:
+      INT_TO_FB_CASE(8, false);
+    case Type::INT8:
+      INT_TO_FB_CASE(8, true);
+    case Type::UINT16:
+      INT_TO_FB_CASE(16, false);
+    case Type::INT16:
+      INT_TO_FB_CASE(16, true);
+    case Type::UINT32:
+      INT_TO_FB_CASE(32, false);
+    case Type::INT32:
+      INT_TO_FB_CASE(32, true);
+    case Type::UINT64:
+      INT_TO_FB_CASE(64, false);
+    case Type::INT64:
+      INT_TO_FB_CASE(64, true);
+    case Type::FLOAT:
+      *out_type = flatbuf::Type_FloatingPoint;
+      *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_SINGLE);
+      break;
+    case Type::DOUBLE:
+      *out_type = flatbuf::Type_FloatingPoint;
+      *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_DOUBLE);
+      break;
+    case Type::LIST:
+      *out_type = flatbuf::Type_List;
+      return ListToFlatbuffer(fbb, type, children, offset);
+    case Type::STRUCT:
+      *out_type = flatbuf::Type_Tuple;
+      return StructToFlatbuffer(fbb, type, children, offset);
+    default:
+      std::stringstream ss;
+      ss << "Unable to convert type: " << type->ToString()
+         << std::endl;
+      return Status::NotImplemented(ss.str());
+  }
+  return Status::OK();
+}
+
+static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
+    FieldOffset* offset) {
+  auto fb_name = fbb.CreateString(field->name);
+
+  flatbuf::Type type_enum;
+  Offset type_data;
+  std::vector<FieldOffset> children;
+
+  RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type, &children, &type_enum, &type_data));
+  auto fb_children = fbb.CreateVector(children);
+
+  *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum,
+      type_data, fb_children);
+
+  return Status::OK();
+}
+
+Status FieldFromFlatbuffer(const flatbuf::Field* field,
+    std::shared_ptr<Field>* out) {
+  std::shared_ptr<DataType> type;
+
+  auto children = field->children();
+  std::vector<std::shared_ptr<Field>> child_fields(children->size());
+  for (size_t i = 0; i < children->size(); ++i) {
+    RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), &child_fields[i]));
+  }
+
+  RETURN_NOT_OK(TypeFromFlatbuffer(field->type_type(),
+          field->type(), child_fields, &type));
+
+  *out = std::make_shared<Field>(field->name()->str(), type);
+  return Status::OK();
+}
+
+// Implement MessageBuilder
+
+Status MessageBuilder::SetSchema(const Schema* schema) {
+  header_type_ = flatbuf::MessageHeader_Schema;
+
+  std::vector<FieldOffset> field_offsets;
+  for (int i = 0; i < schema->num_fields(); ++i) {
+    const std::shared_ptr<Field>& field = schema->field(i);
+    FieldOffset offset;
+    RETURN_NOT_OK(FieldToFlatbuffer(fbb_, field, &offset));
+    field_offsets.push_back(offset);
+  }
+
+  header_ = flatbuf::CreateSchema(fbb_, fbb_.CreateVector(field_offsets)).Union();
+  body_length_ = 0;
+  return Status::OK();
+}
+
+Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length,
+    const std::vector<flatbuf::FieldNode>& nodes,
+    const std::vector<flatbuf::Buffer>& buffers) {
+  header_type_ = flatbuf::MessageHeader_RecordBatch;
+  header_ = flatbuf::CreateRecordBatch(fbb_, length,
+      fbb_.CreateVectorOfStructs(nodes),
+      fbb_.CreateVectorOfStructs(buffers)).Union();
+  body_length_ = body_length;
+
+  return Status::OK();
+}
+
+
+Status WriteDataHeader(int32_t length, int64_t body_length,
+    const std::vector<flatbuf::FieldNode>& nodes,
+    const std::vector<flatbuf::Buffer>& buffers,
+    std::shared_ptr<Buffer>* out) {
+  MessageBuilder message;
+  RETURN_NOT_OK(message.SetRecordBatch(length, body_length, nodes, buffers));
+  RETURN_NOT_OK(message.Finish());
+  return message.GetBuffer(out);
+}
+
+Status MessageBuilder::Finish() {
+  auto message = flatbuf::CreateMessage(fbb_, header_type_, header_,
+      body_length_);
+  fbb_.Finish(message);
+  return Status::OK();
+}
+
+Status MessageBuilder::GetBuffer(std::shared_ptr<Buffer>* out) {
+  // The message buffer is prefixed by the size of the complete flatbuffer as
+  // int32_t
+  // <int32_t: flatbuffer size><uint8_t*: flatbuffer data>
+  int32_t size = fbb_.GetSize();
+
+  auto result = std::make_shared<PoolBuffer>();
+  RETURN_NOT_OK(result->Resize(size + sizeof(int32_t)));
+
+  uint8_t* dst = result->mutable_data();
+  memcpy(dst, reinterpret_cast<int32_t*>(&size), sizeof(int32_t));
+  memcpy(dst + sizeof(int32_t), fbb_.GetBufferPointer(), size);
+
+  *out = result;
+  return Status::OK();
+}
+
+} // namespace ipc
+} // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/65db0da8/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
new file mode 100644
index 0000000..f7365d2
--- /dev/null
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_IPC_METADATA_INTERNAL_H
+#define ARROW_IPC_METADATA_INTERNAL_H
+
+#include <flatbuffers/flatbuffers.h>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/ipc/Message_generated.h"
+
+namespace arrow {
+
+namespace flatbuf = apache::arrow::flatbuf;
+
+class Buffer;
+struct Field;
+class Schema;
+class Status;
+
+namespace ipc {
+
+Status FieldFromFlatbuffer(const flatbuf::Field* field,
+    std::shared_ptr<Field>* out);
+
+class MessageBuilder {
+ public:
+  Status SetSchema(const Schema* schema);
+
+  Status SetRecordBatch(int32_t length, int64_t body_length,
+      const std::vector<flatbuf::FieldNode>& nodes,
+      const std::vector<flatbuf::Buffer>& buffers);
+
+  Status Finish();
+
+  Status GetBuffer(std::shared_ptr<Buffer>* out);
+
+ private:
+  flatbuf::MessageHeader header_type_;
+  flatbuffers::Offset<void> header_;
+  int64_t body_length_;
+  flatbuffers::FlatBufferBuilder fbb_;
+};
+
+Status WriteDataHeader(int32_t length, int64_t body_length,
+    const std::vector<flatbuf::FieldNode>& nodes,
+    const std::vector<flatbuf::Buffer>& buffers,
+    std::shared_ptr<Buffer>* out);
+
+} // namespace ipc
+} // namespace arrow
+
+#endif // ARROW_IPC_METADATA_INTERNAL_H


Mime
View raw message