arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-1364: [C++] IPC support machinery for record batch roundtrips to GPU device memory
Date Mon, 28 Aug 2017 18:46:48 GMT
Repository: arrow
Updated Branches:
  refs/heads/master c027a03b3 -> 0728148ef


ARROW-1364: [C++] IPC support machinery for record batch roundtrips to GPU device memory

This additionally does a few things:

* Change libarrow_gpu to use CUDA driver API instead of runtime API
* Adds code for exporting buffers using CUDA IPC on Linux, but this is not yet tested

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #1000 from wesm/ARROW-1364 and squashes the following commits:

e436755b [Wes McKinney] Add newline at end of file
a8812afc [Wes McKinney] Complete basic IPC message and record batch reads on GPU device memory
16d628f7 [Wes McKinney] More Arrow IPC scaffolding
591aceb0 [Wes McKinney] Draft SerializeRecordBatch for CUDA
84e45252 [Wes McKinney] Add classes and methods for simplifying use of CUDA IPC machinery. No tests yet
508febb5 [Wes McKinney] Test suite passing again
f3c724e0 [Wes McKinney] Get things compiling / linking using driver API
5d686fe9 [Wes McKinney] More progress
2840c60b [Wes McKinney] Progress
3a37fdfe [Wes McKinney] Start cuda context class
03d0baf7 [Wes McKinney] Start cuda_ipc file


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/0728148e
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/0728148e
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/0728148e

Branch: refs/heads/master
Commit: 0728148efef4f7d9e783f72db06b1d81e587c3d5
Parents: c027a03
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Mon Aug 28 14:46:43 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Mon Aug 28 14:46:43 2017 -0400

----------------------------------------------------------------------
 cpp/CMakeLists.txt                  |  10 +-
 cpp/src/arrow/gpu/.gitignore        |  18 ++
 cpp/src/arrow/gpu/CMakeLists.txt    |  95 +++--------
 cpp/src/arrow/gpu/cuda-benchmark.cc |   9 +-
 cpp/src/arrow/gpu/cuda-test.cc      | 114 ++++++++++++-
 cpp/src/arrow/gpu/cuda_api.h        |   4 +-
 cpp/src/arrow/gpu/cuda_arrow_ipc.cc | 110 ++++++++++++
 cpp/src/arrow/gpu/cuda_arrow_ipc.h  |  77 +++++++++
 cpp/src/arrow/gpu/cuda_common.h     |  17 +-
 cpp/src/arrow/gpu/cuda_context.cc   | 277 +++++++++++++++++++++++++++++++
 cpp/src/arrow/gpu/cuda_context.h    | 108 ++++++++++++
 cpp/src/arrow/gpu/cuda_memory.cc    | 120 +++++++++----
 cpp/src/arrow/gpu/cuda_memory.h     |  71 +++++---
 cpp/src/arrow/ipc/message.cc        |  60 ++++---
 cpp/src/arrow/ipc/message.h         |   9 +
 cpp/src/arrow/ipc/writer.cc         |  13 +-
 cpp/src/arrow/ipc/writer.h          |  12 ++
 python/pyarrow/plasma.pyx           |  14 +-
 python/pyarrow/tests/test_plasma.py |  25 +--
 19 files changed, 950 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 5d64333..cb7aa3a 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -712,15 +712,17 @@ endif()
 add_subdirectory(src/arrow)
 add_subdirectory(src/arrow/io)
 
+if (ARROW_GPU)
+  # IPC extensions required to build the GPU library
+  set(ARROW_IPC ON)
+  add_subdirectory(src/arrow/gpu)
+endif()
+
 if (ARROW_IPC)
   add_subdirectory(src/arrow/ipc)
   add_dependencies(arrow_dependencies metadata_fbs)
 endif()
 
-if (ARROW_GPU)
-  add_subdirectory(src/arrow/gpu)
-endif()
-
 set(ARROW_SRCS
   src/arrow/array.cc
   src/arrow/buffer.cc

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/gpu/.gitignore
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/.gitignore b/cpp/src/arrow/gpu/.gitignore
new file mode 100644
index 0000000..0ef3f98
--- /dev/null
+++ b/cpp/src/arrow/gpu/.gitignore
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cuda_version.h

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/gpu/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/CMakeLists.txt b/cpp/src/arrow/gpu/CMakeLists.txt
index cab0853..176916e 100644
--- a/cpp/src/arrow/gpu/CMakeLists.txt
+++ b/cpp/src/arrow/gpu/CMakeLists.txt
@@ -15,52 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-function(ADD_ARROW_CUDA_TEST REL_TEST_NAME)
-  set(options)
-  set(single_value_args)
-  set(multi_value_args STATIC_LINK_LIBS)
-  cmake_parse_arguments(ARG "${options}" "${one_value_args}" "${multi_value_args}" ${ARGN})
-  if(ARG_UNPARSED_ARGUMENTS)
-    message(SEND_ERROR "Error: unrecognized arguments: ${ARG_UNPARSED_ARGUMENTS}")
-  endif()
-
-  if(NO_TESTS OR NOT ARROW_BUILD_STATIC)
-    return()
-  endif()
-  get_filename_component(TEST_NAME ${REL_TEST_NAME} NAME_WE)
-
-  if(EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/${REL_TEST_NAME}.cc)
-    # This test has a corresponding .cc file, set it up as an executable.
-    set(TEST_PATH "${EXECUTABLE_OUTPUT_PATH}/${TEST_NAME}")
-    cuda_add_executable(${TEST_NAME} "${REL_TEST_NAME}.cc")
-
-    if (ARG_STATIC_LINK_LIBS)
-      # Customize link libraries
-      target_link_libraries(${TEST_NAME} ${ARG_STATIC_LINK_LIBS})
-    else()
-      target_link_libraries(${TEST_NAME} ${ARROW_TEST_LINK_LIBS})
-    endif()
-    add_dependencies(unittest ${TEST_NAME})
-  else()
-    # No executable, just invoke the test (probably a script) directly.
-    set(TEST_PATH ${CMAKE_CURRENT_SOURCE_DIR}/${REL_TEST_NAME})
-  endif()
-
-  if (ARROW_TEST_MEMCHECK)
-    SET_PROPERTY(TARGET ${TEST_NAME}
-      APPEND_STRING PROPERTY
-      COMPILE_FLAGS " -DARROW_VALGRIND")
-    add_test(${TEST_NAME}
-      bash -c "cd ${EXECUTABLE_OUTPUT_PATH}; valgrind --tool=memcheck --leak-check=full --leak-check-heuristics=stdstring --error-exitcode=1 ${TEST_PATH}")
-  elseif(MSVC)
-    add_test(${TEST_NAME} ${TEST_PATH})
-  else()
-    add_test(${TEST_NAME}
-      ${BUILD_SUPPORT_DIR}/run-test.sh ${CMAKE_BINARY_DIR} test ${TEST_PATH})
-  endif()
-  set_tests_properties(${TEST_NAME} PROPERTIES LABELS "unittest")
-endfunction()
-
 #######################################
 # arrow_gpu
 #######################################
@@ -73,36 +27,37 @@ find_package(CUDA REQUIRED)
 include_directories(SYSTEM ${CUDA_INCLUDE_DIRS})
 
 set(ARROW_GPU_SRCS
+  cuda_arrow_ipc.cc
+  cuda_context.cc
   cuda_memory.cc
 )
 
 set(ARROW_GPU_SHARED_LINK_LIBS
   arrow_shared
+  ${CUDA_LIBRARIES}
+  ${CUDA_CUDA_LIBRARY}
 )
 
-add_library(arrow_gpu_objlib OBJECT
-  ${ARROW_GPU_SRCS}
+ADD_ARROW_LIB(arrow_gpu
+  SOURCES ${ARROW_GPU_SRCS}
+  SHARED_LINK_FLAGS ""
+  SHARED_LINK_LIBS ${ARROW_GPU_SHARED_LINK_LIBS}
+  STATIC_LINK_LIBS ""
 )
-set_property(TARGET arrow_gpu_objlib PROPERTY POSITION_INDEPENDENT_CODE 1)
 
-if (ARROW_BUILD_SHARED)
-  cuda_add_library(arrow_gpu_shared SHARED $<TARGET_OBJECTS:arrow_gpu_objlib>)
-  install(TARGETS arrow_gpu_shared
-    RUNTIME DESTINATION bin
-    LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
-    ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR})
-endif()
+# CUDA build version
+configure_file(cuda_version.h.in
+  "${CMAKE_CURRENT_SOURCE_DIR}/cuda_version.h"
+  @ONLY)
 
-if (ARROW_BUILD_STATIC)
-  add_library(arrow_gpu_static STATIC $<TARGET_OBJECTS:arrow_gpu_objlib>)
-  install(TARGETS arrow_gpu_static
-    RUNTIME DESTINATION bin
-    LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
-    ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR})
-endif()
+install(FILES
+  "${CMAKE_CURRENT_SOURCE_DIR}/cuda_version.h"
+  DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/gpu")
 
 install(FILES
-  cuda_common.h
+  cuda_api.h
+  cuda_arrow_ipc.h
+  cuda_context.h
   cuda_memory.h
   DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/gpu")
 
@@ -110,25 +65,17 @@ install(FILES
 configure_file(arrow-gpu.pc.in
   "${CMAKE_CURRENT_BINARY_DIR}/arrow-gpu.pc"
   @ONLY)
+
 install(
   FILES "${CMAKE_CURRENT_BINARY_DIR}/arrow-gpu.pc"
   DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
 
-# CUDA build version
-configure_file(cuda_version.h.in
-  "${CMAKE_CURRENT_BINARY_DIR}/cuda_version.h"
-  @ONLY)
-
-install(FILES
-  "${CMAKE_CURRENT_BINARY_DIR}/cuda_version.h"
-  DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/gpu")
-
 set(ARROW_GPU_TEST_LINK_LIBS
   arrow_gpu_shared
   ${ARROW_TEST_LINK_LIBS})
 
 if (ARROW_BUILD_TESTS)
-  ADD_ARROW_CUDA_TEST(cuda-test
+  ADD_ARROW_TEST(cuda-test
     STATIC_LINK_LIBS ${ARROW_GPU_TEST_LINK_LIBS})
 endif()
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/gpu/cuda-benchmark.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda-benchmark.cc b/cpp/src/arrow/gpu/cuda-benchmark.cc
index 82caacc..805a044 100644
--- a/cpp/src/arrow/gpu/cuda-benchmark.cc
+++ b/cpp/src/arrow/gpu/cuda-benchmark.cc
@@ -25,7 +25,7 @@
 #include "arrow/memory_pool.h"
 #include "arrow/test-util.h"
 
-#include "arrow/gpu/cuda_memory.h"
+#include "arrow/gpu/cuda_api.h"
 
 namespace arrow {
 namespace gpu {
@@ -35,8 +35,13 @@ constexpr int64_t kGpuNumber = 0;
 static void CudaBufferWriterBenchmark(benchmark::State& state, const int64_t total_bytes,
                                       const int64_t chunksize,
                                       const int64_t buffer_size) {
+  CudaDeviceManager* manager;
+  ABORT_NOT_OK(CudaDeviceManager::GetInstance(&manager));
+  std::shared_ptr<CudaContext> context;
+  ABORT_NOT_OK(manager->GetContext(kGpuNumber, &context));
+
   std::shared_ptr<CudaBuffer> device_buffer;
-  ABORT_NOT_OK(AllocateCudaBuffer(kGpuNumber, total_bytes, &device_buffer));
+  ABORT_NOT_OK(context->Allocate(total_bytes, &device_buffer));
   CudaBufferWriter writer(device_buffer);
 
   if (buffer_size > 0) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/gpu/cuda-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc
index f479701..aa9d3ef 100644
--- a/cpp/src/arrow/gpu/cuda-test.cc
+++ b/cpp/src/arrow/gpu/cuda-test.cc
@@ -21,23 +21,39 @@
 
 #include "gtest/gtest.h"
 
+#include "arrow/ipc/api.h"
+#include "arrow/ipc/test-common.h"
 #include "arrow/status.h"
 #include "arrow/test-util.h"
 
-#include "arrow/gpu/cuda_memory.h"
+#include "arrow/gpu/cuda_api.h"
 
 namespace arrow {
 namespace gpu {
 
 constexpr int kGpuNumber = 0;
 
-class TestCudaBuffer : public ::testing::Test {};
+class TestCudaBufferBase : public ::testing::Test {
+ public:
+  void SetUp() {
+    ASSERT_OK(CudaDeviceManager::GetInstance(&manager_));
+    ASSERT_OK(manager_->GetContext(kGpuNumber, &context_));
+  }
+
+ protected:
+  CudaDeviceManager* manager_;
+  std::shared_ptr<CudaContext> context_;
+};
+
+class TestCudaBuffer : public TestCudaBufferBase {
+ public:
+  void SetUp() { TestCudaBufferBase::SetUp(); }
+};
 
 TEST_F(TestCudaBuffer, Allocate) {
   const int64_t kSize = 100;
   std::shared_ptr<CudaBuffer> buffer;
-
-  ASSERT_OK(AllocateCudaBuffer(kGpuNumber, kSize, &buffer));
+  ASSERT_OK(context_->Allocate(kSize, &buffer));
   ASSERT_EQ(kSize, buffer->size());
 }
 
@@ -52,7 +68,7 @@ void AssertCudaBufferEquals(const CudaBuffer& buffer, const uint8_t* host_data,
 TEST_F(TestCudaBuffer, CopyFromHost) {
   const int64_t kSize = 1000;
   std::shared_ptr<CudaBuffer> device_buffer;
-  ASSERT_OK(AllocateCudaBuffer(kGpuNumber, kSize, &device_buffer));
+  ASSERT_OK(context_->Allocate(kSize, &device_buffer));
 
   std::shared_ptr<PoolBuffer> host_buffer;
   ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer));
@@ -63,10 +79,49 @@ TEST_F(TestCudaBuffer, CopyFromHost) {
   AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize);
 }
 
-class TestCudaBufferWriter : public ::testing::Test {
+// IPC only supported on Linux
+#if defined(__linux)
+
+TEST_F(TestCudaBuffer, DISABLED_ExportForIpc) {
+  // For this test to work, a second process needs to be spawned
+  const int64_t kSize = 1000;
+  std::shared_ptr<CudaBuffer> device_buffer;
+  ASSERT_OK(context_->Allocate(kSize, &device_buffer));
+
+  std::shared_ptr<PoolBuffer> host_buffer;
+  ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer));
+  ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), kSize));
+
+  // Export for IPC and serialize
+  std::unique_ptr<CudaIpcMemHandle> ipc_handle;
+  ASSERT_OK(device_buffer->ExportForIpc(&ipc_handle));
+
+  std::shared_ptr<Buffer> serialized_handle;
+  ASSERT_OK(ipc_handle->Serialize(default_memory_pool(), &serialized_handle));
+
+  // Deserialize IPC handle and open
+  std::unique_ptr<CudaIpcMemHandle> ipc_handle2;
+  ASSERT_OK(CudaIpcMemHandle::FromBuffer(serialized_handle->data(), &ipc_handle2));
+
+  std::shared_ptr<CudaBuffer> ipc_buffer;
+  ASSERT_OK(context_->OpenIpcBuffer(*ipc_handle2, &ipc_buffer));
+
+  ASSERT_EQ(kSize, ipc_buffer->size());
+
+  std::shared_ptr<MutableBuffer> ipc_data;
+  ASSERT_OK(AllocateBuffer(default_memory_pool(), kSize, &ipc_data));
+  ASSERT_OK(ipc_buffer->CopyToHost(0, kSize, ipc_data->mutable_data()));
+  ASSERT_EQ(0, std::memcmp(ipc_buffer->data(), host_buffer->data(), kSize));
+}
+
+#endif
+
+class TestCudaBufferWriter : public TestCudaBufferBase {
  public:
+  void SetUp() { TestCudaBufferBase::SetUp(); }
+
   void Allocate(const int64_t size) {
-    ASSERT_OK(AllocateCudaBuffer(kGpuNumber, size, &device_buffer_));
+    ASSERT_OK(context_->Allocate(size, &device_buffer_));
     writer_.reset(new CudaBufferWriter(device_buffer_));
   }
 
@@ -164,11 +219,16 @@ TEST_F(TestCudaBufferWriter, EdgeCases) {
   AssertCudaBufferEquals(*device_buffer_, host_data, 1000);
 }
 
-TEST(TestCudaBufferReader, Basics) {
+class TestCudaBufferReader : public TestCudaBufferBase {
+ public:
+  void SetUp() { TestCudaBufferBase::SetUp(); }
+};
+
+TEST_F(TestCudaBufferReader, Basics) {
   std::shared_ptr<CudaBuffer> device_buffer;
 
   const int64_t size = 1000;
-  ASSERT_OK(AllocateCudaBuffer(kGpuNumber, size, &device_buffer));
+  ASSERT_OK(context_->Allocate(size, &device_buffer));
 
   std::shared_ptr<PoolBuffer> buffer;
   ASSERT_OK(test::MakeRandomBytePoolBuffer(1000, default_memory_pool(), &buffer));
@@ -204,5 +264,41 @@ TEST(TestCudaBufferReader, Basics) {
   ASSERT_EQ(0, std::memcmp(stack_buffer, host_data + 925, 75));
 }
 
+class TestCudaArrowIpc : public TestCudaBufferBase {
+ public:
+  void SetUp() {
+    TestCudaBufferBase::SetUp();
+    pool_ = default_memory_pool();
+  }
+
+ protected:
+  MemoryPool* pool_;
+};
+
+TEST_F(TestCudaArrowIpc, BasicWriteRead) {
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK(ipc::MakeIntRecordBatch(&batch));
+
+  std::shared_ptr<CudaBuffer> device_serialized;
+  ASSERT_OK(arrow::gpu::SerializeRecordBatch(*batch, context_.get(), &device_serialized));
+
+  // Test that ReadRecordBatch works properly
+  std::shared_ptr<RecordBatch> device_batch;
+  ASSERT_OK(ReadRecordBatch(batch->schema(), device_serialized, default_memory_pool(),
+                            &device_batch));
+
+  // Copy data from device, read batch, and compare
+  std::shared_ptr<MutableBuffer> host_buffer;
+  int64_t size = device_serialized->size();
+  ASSERT_OK(AllocateBuffer(pool_, size, &host_buffer));
+  ASSERT_OK(device_serialized->CopyToHost(0, size, host_buffer->mutable_data()));
+
+  std::shared_ptr<RecordBatch> cpu_batch;
+  io::BufferReader cpu_reader(host_buffer);
+  ASSERT_OK(ipc::ReadRecordBatch(batch->schema(), &cpu_reader, &cpu_batch));
+
+  ipc::CompareBatch(*batch, *cpu_batch);
+}
+
 }  // namespace gpu
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/gpu/cuda_api.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda_api.h b/cpp/src/arrow/gpu/cuda_api.h
index a70e0af..c63b77e 100644
--- a/cpp/src/arrow/gpu/cuda_api.h
+++ b/cpp/src/arrow/gpu/cuda_api.h
@@ -18,7 +18,9 @@
 #ifndef ARROW_GPU_CUDA_API_H
 #define ARROW_GPU_CUDA_API_H
 
+#include "arrow/gpu/cuda_arrow_ipc.h"
+#include "arrow/gpu/cuda_context.h"
 #include "arrow/gpu/cuda_memory.h"
 #include "arrow/gpu/cuda_version.h"
 
-#endif // ARROW_GPU_CUDA_API_H
+#endif  // ARROW_GPU_CUDA_API_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
new file mode 100644
index 0000000..669857d
--- /dev/null
+++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/gpu/cuda_arrow_ipc.h"
+
+#include <cstdint>
+#include <memory>
+#include <sstream>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/ipc/Message_generated.h"
+#include "arrow/ipc/message.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/ipc/writer.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/util/visibility.h"
+
+#include "arrow/gpu/cuda_context.h"
+#include "arrow/gpu/cuda_memory.h"
+
+namespace arrow {
+
+namespace flatbuf = org::apache::arrow::flatbuf;
+
+namespace gpu {
+
+Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx,
+                            std::shared_ptr<CudaBuffer>* out) {
+  int64_t size = 0;
+  RETURN_NOT_OK(ipc::GetRecordBatchSize(batch, &size));
+
+  std::shared_ptr<CudaBuffer> buffer;
+  RETURN_NOT_OK(ctx->Allocate(size, &buffer));
+
+  CudaBufferWriter stream(buffer);
+
+  // Use 8MB buffering, which yields generally good performance
+  RETURN_NOT_OK(stream.SetBufferSize(1 << 23));
+
+  // We use the default memory pool here since any allocations are ephemeral
+  RETURN_NOT_OK(ipc::SerializeRecordBatch(batch, default_memory_pool(), &stream));
+  RETURN_NOT_OK(stream.Close());
+  *out = buffer;
+  return Status::OK();
+}
+
+Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool,
+                   std::unique_ptr<ipc::Message>* out) {
+  int32_t message_length = 0;
+  int64_t bytes_read = 0;
+
+  RETURN_NOT_OK(reader->Read(sizeof(int32_t), &bytes_read,
+                             reinterpret_cast<uint8_t*>(&message_length)));
+  if (bytes_read != sizeof(int32_t)) {
+    *out = nullptr;
+    return Status::OK();
+  }
+
+  if (message_length == 0) {
+    // Optional 0 EOS control message
+    *out = nullptr;
+    return Status::OK();
+  }
+
+  std::shared_ptr<MutableBuffer> metadata;
+  RETURN_NOT_OK(AllocateBuffer(pool, message_length, &metadata));
+  RETURN_NOT_OK(reader->Read(message_length, &bytes_read, metadata->mutable_data()));
+  if (bytes_read != message_length) {
+    std::stringstream ss;
+    ss << "Expected " << message_length << " metadata bytes, but only got " << bytes_read;
+    return Status::IOError(ss.str());
+  }
+
+  return ipc::Message::ReadFrom(metadata, reader, out);
+}
+
+Status ReadRecordBatch(const std::shared_ptr<Schema>& schema,
+                       const std::shared_ptr<CudaBuffer>& buffer, MemoryPool* pool,
+                       std::shared_ptr<RecordBatch>* out) {
+  CudaBufferReader cuda_reader(buffer);
+
+  std::unique_ptr<ipc::Message> message;
+  RETURN_NOT_OK(ReadMessage(&cuda_reader, pool, &message));
+
+  if (!message) {
+    return Status::Invalid("Message is length 0");
+  }
+
+  // Zero-copy read on device memory
+  return ipc::ReadRecordBatch(*message, schema, out);
+}
+
+}  // namespace gpu
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/gpu/cuda_arrow_ipc.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.h b/cpp/src/arrow/gpu/cuda_arrow_ipc.h
new file mode 100644
index 0000000..52dd924
--- /dev/null
+++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.h
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_GPU_CUDA_ARROW_IPC_H
+#define ARROW_GPU_CUDA_ARROW_IPC_H
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/buffer.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+#include "arrow/gpu/cuda_memory.h"
+
+namespace arrow {
+
+class MemoryPool;
+class RecordBatch;
+class Schema;
+
+namespace ipc {
+
+class Message;
+
+}  // namespace ipc
+
+namespace gpu {
+
+/// \brief Write record batch message to GPU device memory
+/// \param[in] batch record batch to write
+/// \param[in] ctx CudaContext to allocate device memory from
+/// \param[out] out the returned device buffer which contains the record batch message
+/// \return Status
+ARROW_EXPORT
+Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx,
+                            std::shared_ptr<CudaBuffer>* out);
+
+/// \brief Read Arrow IPC message located on GPU device
+/// \param[in] reader a CudaBufferReader
+/// \param[in] pool a MemoryPool to allocate CPU memory for the metadata
+/// \param[out] message the deserialized message, body still on device
+///
+/// This function reads the message metadata into host memory, but leaves the
+/// message body on the device
+ARROW_EXPORT
+Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool,
+                   std::unique_ptr<ipc::Message>* message);
+
+/// \brief ReadRecordBatch specialized to handle metadata on CUDA device
+/// \param[in] schema the Schema for the record batch
+/// \param[in] buffer a CudaBuffer containing the complete IPC message
+/// \param[in] pool a MemoryPool to use for allocating space for the metadata
+/// \param[out] out the reconstructed RecordBatch, with device pointers
+ARROW_EXPORT
+Status ReadRecordBatch(const std::shared_ptr<Schema>& schema,
+                       const std::shared_ptr<CudaBuffer>& buffer, MemoryPool* pool,
+                       std::shared_ptr<RecordBatch>* out);
+
+}  // namespace gpu
+}  // namespace arrow
+
+#endif  // ARROW_GPU_CUDA_ARROW_IPC_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/gpu/cuda_common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda_common.h b/cpp/src/arrow/gpu/cuda_common.h
index 1d65f96..c06c1a2 100644
--- a/cpp/src/arrow/gpu/cuda_common.h
+++ b/cpp/src/arrow/gpu/cuda_common.h
@@ -22,7 +22,7 @@
 
 #include <sstream>
 
-#include <cuda_runtime_api.h>
+#include <cuda.h>
 
 namespace arrow {
 namespace gpu {
@@ -34,24 +34,13 @@ namespace gpu {
     (void)ret;            \
   } while (0)
 
-#define CUDA_RETURN_NOT_OK(STMT)                                       \
-  do {                                                                 \
-    cudaError_t ret = (STMT);                                          \
-    if (ret != cudaSuccess) {                                          \
-      std::stringstream ss;                                            \
-      ss << "Cuda API call in " << __FILE__ << " at line " << __LINE__ \
-         << " failed: " << #STMT;                                      \
-      return Status::IOError(ss.str());                                \
-    }                                                                  \
-  } while (0)
-
-#define CUDADRV_RETURN_NOT_OK(STMT)                                           \
+#define CU_RETURN_NOT_OK(STMT)                                                \
   do {                                                                        \
     CUresult ret = (STMT);                                                    \
     if (ret != CUDA_SUCCESS) {                                                \
       std::stringstream ss;                                                   \
       ss << "Cuda Driver API call in " << __FILE__ << " at line " << __LINE__ \
-         << " failed: " << #STMT;                                             \
+         << " failed with code " << ret << ": " << #STMT;                     \
       return Status::IOError(ss.str());                                       \
     }                                                                         \
   } while (0)

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/gpu/cuda_context.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc
new file mode 100644
index 0000000..430ecab
--- /dev/null
+++ b/cpp/src/arrow/gpu/cuda_context.cc
@@ -0,0 +1,277 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/gpu/cuda_context.h"
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <cuda.h>
+
+#include "arrow/gpu/cuda_common.h"
+#include "arrow/gpu/cuda_memory.h"
+
+namespace arrow {
+namespace gpu {
+
+struct CudaDevice {
+  int device_num;
+  CUdevice handle;
+  int64_t total_memory;
+};
+
+class CudaContext::CudaContextImpl {
+ public:
+  CudaContextImpl() {}
+
+  Status Init(const CudaDevice& device) {
+    device_ = device;
+    CU_RETURN_NOT_OK(cuCtxCreate(&context_, 0, device_.handle));
+    is_open_ = true;
+    return Status::OK();
+  }
+
+  Status Close() {
+    if (is_open_ && own_context_) {
+      CU_RETURN_NOT_OK(cuCtxDestroy(context_));
+    }
+    is_open_ = false;
+    return Status::OK();
+  }
+
+  int64_t bytes_allocated() const { return bytes_allocated_.load(); }
+
+  Status Allocate(int64_t nbytes, uint8_t** out) {
+    CU_RETURN_NOT_OK(cuCtxSetCurrent(context_));
+
+    CUdeviceptr data;
+    CU_RETURN_NOT_OK(cuMemAlloc(&data, static_cast<size_t>(nbytes)));
+    *out = reinterpret_cast<uint8_t*>(data);
+    return Status::OK();
+  }
+
+  Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes) {
+    CU_RETURN_NOT_OK(cuCtxSetCurrent(context_));
+    CU_RETURN_NOT_OK(cuMemcpyHtoD(reinterpret_cast<CUdeviceptr>(dst),
+                                  reinterpret_cast<const void*>(src),
+                                  static_cast<size_t>(nbytes)));
+    return Status::OK();
+  }
+
+  Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes) {
+    CU_RETURN_NOT_OK(cuCtxSetCurrent(context_));
+    CU_RETURN_NOT_OK(cuMemcpyDtoH(dst, reinterpret_cast<const CUdeviceptr>(src),
+                                  static_cast<size_t>(nbytes)));
+    return Status::OK();
+  }
+
+  Status Free(uint8_t* device_ptr, int64_t nbytes) {
+    CU_RETURN_NOT_OK(cuMemFree(reinterpret_cast<CUdeviceptr>(device_ptr)));
+    return Status::OK();
+  }
+
+  Status ExportIpcBuffer(uint8_t* data, std::unique_ptr<CudaIpcMemHandle>* handle) {
+    CU_RETURN_NOT_OK(cuCtxSetCurrent(context_));
+    CUipcMemHandle cu_handle;
+    CU_RETURN_NOT_OK(cuIpcGetMemHandle(&cu_handle, reinterpret_cast<CUdeviceptr>(data)));
+    *handle = std::unique_ptr<CudaIpcMemHandle>(new CudaIpcMemHandle(&cu_handle));
+    return Status::OK();
+  }
+
+  Status OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle, uint8_t** out) {
+    CU_RETURN_NOT_OK(cuCtxSetCurrent(context_));
+    auto handle = reinterpret_cast<const CUipcMemHandle*>(ipc_handle.handle());
+
+    CUdeviceptr data;
+    CU_RETURN_NOT_OK(
+        cuIpcOpenMemHandle(&data, *handle, CU_IPC_MEM_LAZY_ENABLE_PEER_ACCESS));
+    *out = reinterpret_cast<uint8_t*>(data);
+    return Status::OK();
+  }
+
+  const CudaDevice device() const { return device_; }
+
+ private:
+  CudaDevice device_;
+  CUcontext context_;
+  bool is_open_;
+
+  // So that we can utilize a CUcontext that was created outside this library
+  bool own_context_;
+
+  std::atomic<int64_t> bytes_allocated_;
+};
+
+class CudaDeviceManager::CudaDeviceManagerImpl {
+ public:
+  CudaDeviceManagerImpl() : host_bytes_allocated_(0) {}
+
+  Status Init() {
+    CU_RETURN_NOT_OK(cuInit(0));
+    CU_RETURN_NOT_OK(cuDeviceGetCount(&num_devices_));
+
+    devices_.resize(num_devices_);
+    for (int i = 0; i < num_devices_; ++i) {
+      RETURN_NOT_OK(GetDeviceProperties(i, &devices_[i]));
+    }
+    return Status::OK();
+  }
+
+  Status AllocateHost(int64_t nbytes, uint8_t** out) {
+    CU_RETURN_NOT_OK(cuMemHostAlloc(reinterpret_cast<void**>(out),
+                                    static_cast<size_t>(nbytes),
+                                    CU_MEMHOSTALLOC_PORTABLE));
+    host_bytes_allocated_ += nbytes;
+    return Status::OK();
+  }
+
+  Status FreeHost(uint8_t* data, int64_t nbytes) {
+    CU_RETURN_NOT_OK(cuMemFreeHost(data));
+    host_bytes_allocated_ -= nbytes;
+    return Status::OK();
+  }
+
+  Status GetDeviceProperties(int device_number, CudaDevice* device) {
+    device->device_num = device_number;
+    CU_RETURN_NOT_OK(cuDeviceGet(&device->handle, device_number));
+
+    size_t total_memory = 0;
+    CU_RETURN_NOT_OK(cuDeviceTotalMem(&total_memory, device->handle));
+    device->total_memory = total_memory;
+    return Status::OK();
+  }
+
+  Status CreateNewContext(int device_number, std::shared_ptr<CudaContext>* out) {
+    *out = std::shared_ptr<CudaContext>(new CudaContext());
+    return (*out)->impl_->Init(devices_[device_number]);
+  }
+
+  Status GetContext(int device_number, std::shared_ptr<CudaContext>* out) {
+    auto it = contexts_.find(device_number);
+    if (it == contexts_.end()) {
+      std::shared_ptr<CudaContext> new_context;
+      RETURN_NOT_OK(CreateNewContext(device_number, &new_context));
+      contexts_[device_number] = *out = new_context;
+    } else {
+      *out = it->second;
+    }
+    return Status::OK();
+  }
+
+  int num_devices() const { return num_devices_; }
+
+ private:
+  int num_devices_;
+  std::vector<CudaDevice> devices_;
+
+  // device_number -> CudaContext
+  std::unordered_map<int, std::shared_ptr<CudaContext>> contexts_;
+
+  int host_bytes_allocated_;
+};
+
+CudaDeviceManager::CudaDeviceManager() { impl_.reset(new CudaDeviceManagerImpl()); }
+
+std::unique_ptr<CudaDeviceManager> CudaDeviceManager::instance_ = nullptr;
+
+Status CudaDeviceManager::GetInstance(CudaDeviceManager** manager) {
+  if (!instance_) {
+    instance_.reset(new CudaDeviceManager());
+    RETURN_NOT_OK(instance_->impl_->Init());
+  }
+  *manager = instance_.get();
+  return Status::OK();
+}
+
+Status CudaDeviceManager::GetContext(int device_number,
+                                     std::shared_ptr<CudaContext>* out) {
+  return impl_->GetContext(device_number, out);
+}
+
+Status CudaDeviceManager::CreateNewContext(int device_number,
+                                           std::shared_ptr<CudaContext>* out) {
+  return impl_->CreateNewContext(device_number, out);
+}
+
+Status CudaDeviceManager::AllocateHost(int64_t nbytes,
+                                       std::shared_ptr<CudaHostBuffer>* out) {
+  uint8_t* data = nullptr;
+  RETURN_NOT_OK(impl_->AllocateHost(nbytes, &data));
+  *out = std::make_shared<CudaHostBuffer>(data, nbytes);
+  return Status::OK();
+}
+
+Status CudaDeviceManager::FreeHost(uint8_t* data, int64_t nbytes) {
+  return impl_->FreeHost(data, nbytes);
+}
+
+int CudaDeviceManager::num_devices() const { return impl_->num_devices(); }
+
+// ----------------------------------------------------------------------
+// CudaContext public API
+
+CudaContext::CudaContext() { impl_.reset(new CudaContextImpl()); }
+
+CudaContext::~CudaContext() {}
+
+Status CudaContext::Allocate(int64_t nbytes, std::shared_ptr<CudaBuffer>* out) {
+  uint8_t* data = nullptr;
+  RETURN_NOT_OK(impl_->Allocate(nbytes, &data));
+  *out = std::make_shared<CudaBuffer>(data, nbytes, this->shared_from_this(), true);
+  return Status::OK();
+}
+
+Status CudaContext::ExportIpcBuffer(uint8_t* data,
+                                    std::unique_ptr<CudaIpcMemHandle>* handle) {
+  return impl_->ExportIpcBuffer(data, handle);
+}
+
+Status CudaContext::CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes) {
+  return impl_->CopyHostToDevice(dst, src, nbytes);
+}
+
+Status CudaContext::CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes) {
+  return impl_->CopyDeviceToHost(dst, src, nbytes);
+}
+
+Status CudaContext::Close() { return impl_->Close(); }
+
+Status CudaContext::Free(uint8_t* device_ptr, int64_t nbytes) {
+  return impl_->Free(device_ptr, nbytes);
+}
+
+Status CudaContext::OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle,
+                                  std::shared_ptr<CudaBuffer>* out) {
+  uint8_t* data = nullptr;
+  RETURN_NOT_OK(impl_->OpenIpcBuffer(ipc_handle, &data));
+
+  // Need to ask the device how big the buffer is
+  size_t allocation_size = 0;
+  CU_RETURN_NOT_OK(cuMemGetAddressRange(nullptr, &allocation_size,
+                                        reinterpret_cast<CUdeviceptr>(data)));
+
+  *out = std::make_shared<CudaBuffer>(data, allocation_size, this->shared_from_this(),
+                                      true, true);
+  return Status::OK();
+}
+
+}  // namespace gpu
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/gpu/cuda_context.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h
new file mode 100644
index 0000000..6471059
--- /dev/null
+++ b/cpp/src/arrow/gpu/cuda_context.h
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_GPU_CUDA_CONTEXT_H
+#define ARROW_GPU_CUDA_CONTEXT_H
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+#include "arrow/gpu/cuda_memory.h"
+
+namespace arrow {
+namespace gpu {
+
+// Forward declaration
+class CudaContext;
+
+class ARROW_EXPORT CudaDeviceManager {
+ public:
+  static Status GetInstance(CudaDeviceManager** manager);
+
+  /// \brief Get the shared CUDA driver context for a particular device
+  Status GetContext(int gpu_number, std::shared_ptr<CudaContext>* ctx);
+
+  /// \brief Create a new context for a given device number
+  ///
+  /// In general code will use GetContext
+  Status CreateNewContext(int gpu_number, std::shared_ptr<CudaContext>* ctx);
+
+  Status AllocateHost(int64_t nbytes, std::shared_ptr<CudaHostBuffer>* buffer);
+
+  Status FreeHost(uint8_t* data, int64_t nbytes);
+
+  int num_devices() const;
+
+ private:
+  CudaDeviceManager();
+  static std::unique_ptr<CudaDeviceManager> instance_;
+
+  class CudaDeviceManagerImpl;
+  std::unique_ptr<CudaDeviceManagerImpl> impl_;
+
+  friend CudaContext;
+};
+
+struct ARROW_EXPORT CudaDeviceInfo {};
+
+/// \class CudaContext
+/// \brief Friendlier interface to the CUDA driver API
+class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext> {
+ public:
+  ~CudaContext();
+
+  Status Close();
+
+  /// \brief Allocate CUDA memory on GPU device for this context
+  /// \param[in] nbytes number of bytes
+  /// \param[out] out the allocated buffer
+  /// \return Status
+  Status Allocate(int64_t nbytes, std::shared_ptr<CudaBuffer>* out);
+
+  /// \brief Open existing CUDA IPC memory handle
+  /// \param[in] ipc_handle opaque pointer to CUipcMemHandle (driver API)
+  /// \param[out] buffer a CudaBuffer referencing
+  /// \return Status
+  Status OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle,
+                       std::shared_ptr<CudaBuffer>* buffer);
+
+  int64_t bytes_allocated() const;
+
+ private:
+  CudaContext();
+
+  Status ExportIpcBuffer(uint8_t* data, std::unique_ptr<CudaIpcMemHandle>* handle);
+  Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes);
+  Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes);
+  Status Free(uint8_t* device_ptr, int64_t nbytes);
+
+  class CudaContextImpl;
+  std::unique_ptr<CudaContextImpl> impl_;
+
+  friend CudaBuffer;
+  friend CudaBufferReader;
+  friend CudaBufferWriter;
+  friend CudaDeviceManager::CudaDeviceManagerImpl;
+};
+
+}  // namespace gpu
+}  // namespace arrow
+
+#endif  // ARROW_GPU_CUDA_CONTEXT_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/gpu/cuda_memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc
index 4870813..3c88fe2 100644
--- a/cpp/src/arrow/gpu/cuda_memory.cc
+++ b/cpp/src/arrow/gpu/cuda_memory.cc
@@ -19,68 +19,124 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <cstdlib>
 #include <memory>
 
+#include <cuda.h>
+
 #include "arrow/buffer.h"
 #include "arrow/io/memory.h"
 #include "arrow/status.h"
 #include "arrow/util/logging.h"
 
 #include "arrow/gpu/cuda_common.h"
+#include "arrow/gpu/cuda_context.h"
 
 namespace arrow {
 namespace gpu {
 
-CudaBuffer::~CudaBuffer() {
+// ----------------------------------------------------------------------
+// CUDA IPC memory handle
+
+struct CudaIpcMemHandle::CudaIpcMemHandleImpl {
+  explicit CudaIpcMemHandleImpl(const void* handle) {
+    memcpy(&ipc_handle, handle, sizeof(CUipcMemHandle));
+  }
+
+  CUipcMemHandle ipc_handle;
+};
+
+CudaIpcMemHandle::CudaIpcMemHandle(const void* handle) {
+  impl_.reset(new CudaIpcMemHandleImpl(handle));
+}
+
+CudaIpcMemHandle::~CudaIpcMemHandle() {}
+
+Status CudaIpcMemHandle::FromBuffer(const void* opaque_handle,
+                                    std::unique_ptr<CudaIpcMemHandle>* handle) {
+  *handle = std::unique_ptr<CudaIpcMemHandle>(new CudaIpcMemHandle(opaque_handle));
+  return Status::OK();
+}
+
+Status CudaIpcMemHandle::Serialize(MemoryPool* pool, std::shared_ptr<Buffer>* out) const {
+  std::shared_ptr<MutableBuffer> buffer;
+  constexpr size_t kHandleSize = sizeof(CUipcMemHandle);
+  RETURN_NOT_OK(AllocateBuffer(pool, static_cast<int64_t>(kHandleSize), &buffer));
+  memcpy(buffer->mutable_data(), &impl_->ipc_handle, kHandleSize);
+  *out = buffer;
+  return Status::OK();
+}
+
+const void* CudaIpcMemHandle::handle() const { return &impl_->ipc_handle; }
+
+// ----------------------------------------------------------------------
+
+CudaBuffer::CudaBuffer(uint8_t* data, int64_t size,
+                       const std::shared_ptr<CudaContext>& context, bool own_data,
+                       bool is_ipc)
+    : Buffer(data, size), context_(context), own_data_(own_data), is_ipc_(is_ipc) {
+  is_mutable_ = true;
+  mutable_data_ = data;
+}
+
+CudaBuffer::~CudaBuffer() { DCHECK(Close().ok()); }
+
+Status CudaBuffer::Close() {
   if (own_data_) {
-    CUDA_DCHECK(cudaFree(mutable_data_));
+    if (is_ipc_) {
+      CU_RETURN_NOT_OK(cuIpcCloseMemHandle(reinterpret_cast<CUdeviceptr>(mutable_data_)));
+    } else {
+      return context_->Free(mutable_data_, size_);
+    }
   }
+  return Status::OK();
 }
 
 CudaBuffer::CudaBuffer(const std::shared_ptr<CudaBuffer>& parent, const int64_t offset,
                        const int64_t size)
-    : Buffer(parent, offset, size), gpu_number_(parent->gpu_number()) {}
+    : Buffer(parent, offset, size),
+      context_(parent->context()),
+      own_data_(false),
+      is_ipc_(false) {}
 
 Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes,
                               uint8_t* out) const {
-  CUDA_RETURN_NOT_OK(cudaMemcpy(out, data_ + position, nbytes, cudaMemcpyDeviceToHost));
-  return Status::OK();
+  return context_->CopyDeviceToHost(out, data_ + position, nbytes);
 }
 
 Status CudaBuffer::CopyFromHost(const int64_t position, const uint8_t* data,
                                 int64_t nbytes) {
   DCHECK_LE(nbytes, size_ - position) << "Copy would overflow buffer";
-  CUDA_RETURN_NOT_OK(
-      cudaMemcpy(mutable_data_ + position, data, nbytes, cudaMemcpyHostToDevice));
-  return Status::OK();
+  return context_->CopyHostToDevice(mutable_data_ + position, data, nbytes);
 }
 
-Status AllocateCudaBuffer(int gpu_number, const int64_t size,
-                          std::shared_ptr<CudaBuffer>* out) {
-  CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number));
-  uint8_t* data = nullptr;
-  CUDA_RETURN_NOT_OK(
-      cudaMalloc(reinterpret_cast<void**>(&data), static_cast<size_t>(size)));
-  *out = std::make_shared<CudaBuffer>(data, size, gpu_number, true);
+Status CudaBuffer::ExportForIpc(std::unique_ptr<CudaIpcMemHandle>* handle) {
+  if (is_ipc_) {
+    return Status::Invalid("Buffer has already been exported for IPC");
+  }
+  RETURN_NOT_OK(context_->ExportIpcBuffer(mutable_data_, handle));
+  own_data_ = false;
   return Status::OK();
 }
 
-CudaHostBuffer::~CudaHostBuffer() { CUDA_DCHECK(cudaFreeHost(mutable_data_)); }
+CudaHostBuffer::~CudaHostBuffer() {
+  CudaDeviceManager* manager = nullptr;
+  DCHECK(CudaDeviceManager::GetInstance(&manager).ok());
+  DCHECK(manager->FreeHost(mutable_data_, size_).ok());
+}
 
 // ----------------------------------------------------------------------
 // CudaBufferReader
 
 CudaBufferReader::CudaBufferReader(const std::shared_ptr<CudaBuffer>& buffer)
-    : io::BufferReader(buffer), cuda_buffer_(buffer), gpu_number_(buffer->gpu_number()) {}
+    : io::BufferReader(buffer), cuda_buffer_(buffer), context_(buffer->context()) {}
 
 CudaBufferReader::~CudaBufferReader() {}
 
 Status CudaBufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
   nbytes = std::min(nbytes, size_ - position_);
-  CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_));
-  CUDA_RETURN_NOT_OK(
-      cudaMemcpy(buffer, data_ + position_, nbytes, cudaMemcpyDeviceToHost));
   *bytes_read = nbytes;
+  RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, data_ + position_, nbytes));
   position_ += nbytes;
   return Status::OK();
 }
@@ -97,7 +153,7 @@ Status CudaBufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
 
 CudaBufferWriter::CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer)
     : io::FixedSizeBufferWriter(buffer),
-      gpu_number_(buffer->gpu_number()),
+      context_(buffer->context()),
       buffer_size_(0),
       buffer_position_(0) {}
 
@@ -108,10 +164,8 @@ Status CudaBufferWriter::Close() { return Flush(); }
 Status CudaBufferWriter::Flush() {
   if (buffer_size_ > 0 && buffer_position_ > 0) {
     // Only need to flush when the write has been buffered
-    CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_));
-    CUDA_RETURN_NOT_OK(cudaMemcpy(mutable_data_ + position_ - buffer_position_,
-                                  host_buffer_data_, buffer_position_,
-                                  cudaMemcpyHostToDevice));
+    RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_ - buffer_position_,
+                                             host_buffer_data_, buffer_position_));
     buffer_position_ = 0;
   }
   return Status::OK();
@@ -137,9 +191,7 @@ Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
     if (nbytes + buffer_position_ >= buffer_size_) {
       // Reach end of buffer, write everything
       RETURN_NOT_OK(Flush());
-      CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_));
-      CUDA_RETURN_NOT_OK(
-          cudaMemcpy(mutable_data_ + position_, data, nbytes, cudaMemcpyHostToDevice));
+      RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes));
     } else {
       // Write bytes to buffer
       std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes);
@@ -147,9 +199,7 @@ Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
     }
   } else {
     // Unbuffered write
-    CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_));
-    CUDA_RETURN_NOT_OK(
-        cudaMemcpy(mutable_data_ + position_, data, nbytes, cudaMemcpyHostToDevice));
+    RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes));
   }
   position_ += nbytes;
   return Status::OK();
@@ -169,11 +219,9 @@ Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) {
 // ----------------------------------------------------------------------
 
 Status AllocateCudaHostBuffer(const int64_t size, std::shared_ptr<CudaHostBuffer>* out) {
-  uint8_t* data = nullptr;
-  CUDA_RETURN_NOT_OK(
-      cudaMallocHost(reinterpret_cast<void**>(&data), static_cast<size_t>(size)));
-  *out = std::make_shared<CudaHostBuffer>(data, size);
-  return Status::OK();
+  CudaDeviceManager* manager = nullptr;
+  RETURN_NOT_OK(CudaDeviceManager::GetInstance(&manager));
+  return manager->AllocateHost(size, out);
 }
 
 }  // namespace gpu

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/gpu/cuda_memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h
index bd8b89a..d540737 100644
--- a/cpp/src/arrow/gpu/cuda_memory.h
+++ b/cpp/src/arrow/gpu/cuda_memory.h
@@ -29,17 +29,17 @@
 namespace arrow {
 namespace gpu {
 
+class CudaContext;
+class CudaIpcMemHandle;
+
 /// \class CudaBuffer
 /// \brief An Arrow buffer located on a GPU device
 ///
 /// Be careful using this in any Arrow code which may not be GPU-aware
 class ARROW_EXPORT CudaBuffer : public Buffer {
  public:
-  CudaBuffer(uint8_t* data, int64_t size, const int gpu_number, bool own_data = false)
-      : Buffer(data, size), gpu_number_(gpu_number), own_data_(own_data) {
-    is_mutable_ = true;
-    mutable_data_ = data;
-  }
+  CudaBuffer(uint8_t* data, int64_t size, const std::shared_ptr<CudaContext>& context,
+             bool own_data = false, bool is_ipc = false);
 
   CudaBuffer(const std::shared_ptr<CudaBuffer>& parent, const int64_t offset,
              const int64_t size);
@@ -58,11 +58,22 @@ class ARROW_EXPORT CudaBuffer : public Buffer {
   /// \return Status
   Status CopyFromHost(const int64_t position, const uint8_t* data, int64_t nbytes);
 
-  int gpu_number() const { return gpu_number_; }
+  /// \brief Expose this device buffer as IPC memory which can be used in other processes
+  /// \param[out] handle the exported IPC handle
+  /// \return Status
+  ///
+  /// \note After calling this function, this device memory will not be freed
+  /// when the CudaBuffer is destructed
+  virtual Status ExportForIpc(std::unique_ptr<CudaIpcMemHandle>* handle);
+
+  std::shared_ptr<CudaContext> context() const { return context_; }
 
- private:
-  const int gpu_number_;
+ protected:
+  std::shared_ptr<CudaContext> context_;
   bool own_data_;
+  bool is_ipc_;
+
+  virtual Status Close();
 };
 
 /// \class CudaHostBuffer
@@ -73,6 +84,37 @@ class ARROW_EXPORT CudaHostBuffer : public MutableBuffer {
   ~CudaHostBuffer();
 };
 
+/// \class CudaIpcHandle
+/// \brief A container for a CUDA IPC handle
+class ARROW_EXPORT CudaIpcMemHandle {
+ public:
+  ~CudaIpcMemHandle();
+
+  /// \brief Create CudaIpcMemHandle from opaque buffer (e.g. from another process)
+  /// \param[in] opaque_handle a CUipcMemHandle as a const void*
+  /// \param[out] handle the CudaIpcMemHandle instance
+  /// \return Status
+  static Status FromBuffer(const void* opaque_handle,
+                           std::unique_ptr<CudaIpcMemHandle>* handle);
+
+  /// \brief Write CudaIpcMemHandle to a Buffer
+  /// \param[in] pool a MemoryPool to allocate memory from
+  /// \param[out] out the serialized buffer
+  /// \return Status
+  Status Serialize(MemoryPool* pool, std::shared_ptr<Buffer>* out) const;
+
+ private:
+  explicit CudaIpcMemHandle(const void* handle);
+
+  struct CudaIpcMemHandleImpl;
+  std::unique_ptr<CudaIpcMemHandleImpl> impl_;
+
+  const void* handle() const;
+
+  friend CudaBuffer;
+  friend CudaContext;
+};
+
 /// \class CudaBufferReader
 /// \brief File interface for zero-copy read from CUDA buffers
 ///
@@ -98,7 +140,7 @@ class ARROW_EXPORT CudaBufferReader : public io::BufferReader {
 
  private:
   std::shared_ptr<CudaBuffer> cuda_buffer_;
-  int gpu_number_;
+  std::shared_ptr<CudaContext> context_;
 };
 
 /// \class CudaBufferWriter
@@ -132,7 +174,7 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter {
   int64_t num_bytes_buffered() const { return buffer_position_; }
 
  private:
-  int gpu_number_;
+  std::shared_ptr<CudaContext> context_;
 
   // Pinned host buffer for buffering writes on CPU before calling cudaMalloc
   int64_t buffer_size_;
@@ -141,15 +183,6 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter {
   uint8_t* host_buffer_data_;
 };
 
-/// \brief Allocate CUDA memory on a GPU device
-/// \param[in] gpu_number Device number to allocate
-/// \param[in] size number of bytes
-/// \param[out] out the allocated buffer
-/// \return Status
-ARROW_EXPORT
-Status AllocateCudaBuffer(const int gpu_number, const int64_t size,
-                          std::shared_ptr<CudaBuffer>* out);
-
 /// \brief Allocate CUDA-accessible memory on CPU host
 /// \param[in] size number of bytes
 /// \param[out] out the allocated buffer

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/ipc/message.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index eb06aaf..53f0203 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -149,6 +149,24 @@ bool Message::Equals(const Message& other) const {
   }
 }
 
+Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
+                         std::unique_ptr<Message>* out) {
+  auto fb_message = flatbuf::GetMessage(metadata->data());
+
+  int64_t body_length = fb_message->bodyLength();
+
+  std::shared_ptr<Buffer> body;
+  RETURN_NOT_OK(stream->Read(body_length, &body));
+  if (body->size() < body_length) {
+    std::stringstream ss;
+    ss << "Expected to be able to read " << body_length << " bytes for message body, got "
+       << body->size();
+    return Status::IOError(ss.str());
+  }
+
+  return Message::Open(metadata, body, out);
+}
+
 Status Message::SerializeTo(io::OutputStream* file, int64_t* output_length) const {
   int32_t metadata_length = 0;
   RETURN_NOT_OK(WriteMessage(*metadata(), file, &metadata_length));
@@ -178,29 +196,6 @@ std::string FormatMessageType(Message::Type type) {
   return "unknown";
 }
 
-// ----------------------------------------------------------------------
-// Read and write messages
-
-static Status ReadFullMessage(const std::shared_ptr<Buffer>& metadata,
-                              io::InputStream* stream,
-                              std::unique_ptr<Message>* message) {
-  auto fb_message = flatbuf::GetMessage(metadata->data());
-
-  int64_t body_length = fb_message->bodyLength();
-
-  std::shared_ptr<Buffer> body;
-  RETURN_NOT_OK(stream->Read(body_length, &body));
-
-  if (body->size() < body_length) {
-    std::stringstream ss;
-    ss << "Expected to be able to read " << body_length << " bytes for message body, got "
-       << body->size();
-    return Status::IOError(ss.str());
-  }
-
-  return Message::Open(metadata, body, message);
-}
-
 Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
                    std::unique_ptr<Message>* message) {
   std::shared_ptr<Buffer> buffer;
@@ -216,32 +211,33 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
   }
 
   auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4);
-  return ReadFullMessage(metadata, file, message);
+  return Message::ReadFrom(metadata, file, message);
 }
 
 Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
-  std::shared_ptr<Buffer> buffer;
+  int32_t message_length = 0;
+  int64_t bytes_read = 0;
+  RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
+                           reinterpret_cast<uint8_t*>(&message_length)));
 
-  RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer));
-  if (buffer->size() != sizeof(int32_t)) {
+  if (bytes_read != sizeof(int32_t)) {
     *message = nullptr;
     return Status::OK();
   }
 
-  int32_t message_length = *reinterpret_cast<const int32_t*>(buffer->data());
-
   if (message_length == 0) {
     // Optional 0 EOS control message
     *message = nullptr;
     return Status::OK();
   }
 
-  RETURN_NOT_OK(file->Read(message_length, &buffer));
-  if (buffer->size() != message_length) {
+  std::shared_ptr<Buffer> metadata;
+  RETURN_NOT_OK(file->Read(message_length, &metadata));
+  if (metadata->size() != message_length) {
     return Status::IOError("Unexpected end of stream trying to read message");
   }
 
-  return ReadFullMessage(buffer, file, message);
+  return Message::ReadFrom(metadata, file, message);
 }
 
 // ----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/ipc/message.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index dce4e27..dbc50d8 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -79,6 +79,15 @@ class ARROW_EXPORT Message {
   static Status Open(const std::shared_ptr<Buffer>& metadata,
                      const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out);
 
+  /// \brief Read message body and create Message given Flatbuffer metadata
+  /// \param[in] metadata containing a serialized Message flatbuffer
+  /// \param[in] stream an InputStream
+  /// \param[out] out the created Message
+  ///
+  /// \note If stream supports zero-copy, this is zero-copy
+  static Status ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
+                         std::unique_ptr<Message>* out);
+
   /// \brief Write length-prefixed metadata and body to output stream
   ///
   /// \param[in] file output stream to write to

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 9c05cba..e17b974 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -901,14 +901,19 @@ Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool,
   RETURN_NOT_OK(AllocateBuffer(pool, size, &buffer));
 
   io::FixedSizeBufferWriter stream(buffer);
-  int32_t metadata_length = 0;
-  int64_t body_length = 0;
-  RETURN_NOT_OK(WriteRecordBatch(batch, 0, &stream, &metadata_length, &body_length, pool,
-                                 kMaxNestingDepth, true));
+  RETURN_NOT_OK(SerializeRecordBatch(batch, pool, &stream));
   *out = buffer;
   return Status::OK();
 }
 
+Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool,
+                            io::OutputStream* out) {
+  int32_t metadata_length = 0;
+  int64_t body_length = 0;
+  return WriteRecordBatch(batch, 0, out, &metadata_length, &body_length, pool,
+                          kMaxNestingDepth, true);
+}
+
 Status SerializeSchema(const Schema& schema, MemoryPool* pool,
                        std::shared_ptr<Buffer>* out) {
   std::shared_ptr<io::BufferOutputStream> stream;

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/cpp/src/arrow/ipc/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index d867982..3f110fe 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -177,6 +177,18 @@ ARROW_EXPORT
 Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool,
                             std::shared_ptr<Buffer>* out);
 
+/// \brief Write record batch to OutputStream
+///
+/// \param[in] batch the record batch to write
+/// \param[in] out the OutputStream to write the output to
+/// \return Status
+///
+/// If writing to pre-allocated memory, you can use
+/// arrow::ipc::GetRecordBatchSize to compute how much space is required
+ARROW_EXPORT
+Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool,
+                            io::OutputStream* out);
+
 /// \brief Serialize schema using stream writer as a sequence of one or more
 /// IPC messages
 ///

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/python/pyarrow/plasma.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index aebef1b..515b600 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -386,7 +386,8 @@ cdef class PlasmaClient:
         -------
         The object ID associated to the Python object.
         """
-        cdef ObjectID target_id = object_id if object_id else ObjectID.from_random()
+        cdef ObjectID target_id = (object_id if object_id
+                                   else ObjectID.from_random())
         # TODO(pcm): Make serialization code support non-sequences and
         # get rid of packing the value into a list here (and unpacking in get)
         serialized = pyarrow.serialize([value])
@@ -404,8 +405,8 @@ cdef class PlasmaClient:
         Parameters
         ----------
         object_ids : list or ObjectID
-            Object ID or list of object IDs associated to the values we get from
-            the store.
+            Object ID or list of object IDs associated to the values we get
+            from the store.
         timeout_ms : int, default -1
             The number of milliseconds that the get call should block before
             timing out and returning. Pass -1 if the call should block and 0
@@ -415,14 +416,15 @@ cdef class PlasmaClient:
         -------
         list or object
             Python value or list of Python values for the data associated with
-            the object_ids and ObjectNotAvailable if the object was not available.
+            the object_ids and ObjectNotAvailable if the object was not
+            available.
         """
         if isinstance(object_ids, collections.Sequence):
             results = []
             buffers = self.get_buffers(object_ids, timeout_ms)
             for i in range(len(object_ids)):
-                # buffers[i] is None if this object was not available within the
-                # timeout
+                # buffers[i] is None if this object was not available within
+                # the timeout
                 if buffers[i]:
                     value, = pyarrow.deserialize(buffers[i])
                     results.append(value)

http://git-wip-us.apache.org/repos/asf/arrow/blob/0728148e/python/pyarrow/tests/test_plasma.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index d729c1e..a831ef2 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -187,8 +187,8 @@ class TestPlasmaClient(object):
         # Seal the object.
         self.plasma_client.seal(object_id)
         # Get the object.
-        memory_buffer = np.frombuffer(self.plasma_client.get_buffers([object_id])[0],
-                                      dtype="uint8")
+        memory_buffer = np.frombuffer(
+            self.plasma_client.get_buffers([object_id])[0], dtype="uint8")
         for i in range(length):
             assert memory_buffer[i] == i % 256
 
@@ -241,7 +241,8 @@ class TestPlasmaClient(object):
         # Test timing out of get with various timeouts.
         for timeout in [0, 10, 100, 1000]:
             object_ids = [random_object_id() for _ in range(num_object_ids)]
-            results = self.plasma_client.get_buffers(object_ids, timeout_ms=timeout)
+            results = self.plasma_client.get_buffers(object_ids,
+                                                     timeout_ms=timeout)
             assert results == num_object_ids * [None]
 
         data_buffers = []
@@ -257,7 +258,7 @@ class TestPlasmaClient(object):
         # timeouts.
         for timeout in [0, 10, 100, 1000]:
             data_results = self.plasma_client.get_buffers(object_ids,
-                                                         timeout_ms=timeout)
+                                                          timeout_ms=timeout)
             # metadata_results = self.plasma_client.get_metadata(
             #     object_ids, timeout_ms=timeout)
             for i in range(num_object_ids):
@@ -275,16 +276,16 @@ class TestPlasmaClient(object):
 
     def test_put_and_get(self):
         for value in [["hello", "world", 3, 1.0], None, "hello"]:
-             object_id = self.plasma_client.put(value)
-             [result] = self.plasma_client.get([object_id])
-             assert result == value
+            object_id = self.plasma_client.put(value)
+            [result] = self.plasma_client.get([object_id])
+            assert result == value
 
-             result = self.plasma_client.get(object_id)
-             assert result == value
+            result = self.plasma_client.get(object_id)
+            assert result == value
 
-             object_id = pa.plasma.ObjectID.from_random()
-             [result] = self.plasma_client.get([object_id], timeout_ms=0)
-             assert result == pa.plasma.ObjectNotAvailable
+            object_id = pa.plasma.ObjectID.from_random()
+            [result] = self.plasma_client.get([object_id], timeout_ms=0)
+            assert result == pa.plasma.ObjectNotAvailable
 
     def test_store_arrow_objects(self):
         data = np.random.randn(10, 4)


Mime
View raw message