arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-1392: [C++] Add GPU IO interfaces for CUDA
Date Wed, 23 Aug 2017 13:42:03 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 3c70ff1d0 -> 2c3a5f4e9


ARROW-1392: [C++] Add GPU IO interfaces for CUDA

This makes it easy to write from host to device and read from device to host. We also need
a zero-copy device reader for IPC purposes (where we don't want to move any data to the host),
can do that in a subsequent patch.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #985 from wesm/ARROW-1392 and squashes the following commits:

ae24cb53 [Wes McKinney] Add section to C++ README about building libarrow_gpu
229a2687 [Wes McKinney] Refactor CudaBufferReader to return zero-copy device pointers. Add
unit tests
415157ac [Wes McKinney] Make Tell overrides in arrow-glib const
5daa59eb [Wes McKinney] Add cuda-benchmark module
1cf1196c [Wes McKinney] Test CudaBuffer::CopyFromHost
a2708f2c [Wes McKinney] Implement IO interfaces for CUDA buffers


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/2c3a5f4e
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/2c3a5f4e
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/2c3a5f4e

Branch: refs/heads/master
Commit: 2c3a5f4e9dbe9d89545e6033a43b273ac1499870
Parents: 3c70ff1
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Wed Aug 23 09:41:58 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Wed Aug 23 09:41:58 2017 -0400

----------------------------------------------------------------------
 c_glib/arrow-glib/input-stream.cpp  |   2 +-
 c_glib/arrow-glib/output-stream.cpp |   2 +-
 cpp/README.md                       |  19 +++-
 cpp/apidoc/Doxyfile                 |   2 +-
 cpp/src/arrow/builder.h             |  27 ++---
 cpp/src/arrow/gpu/CMakeLists.txt    |  34 ++++++-
 cpp/src/arrow/gpu/cuda-benchmark.cc |  93 +++++++++++++++++
 cpp/src/arrow/gpu/cuda-test.cc      | 169 ++++++++++++++++++++++++++++++-
 cpp/src/arrow/gpu/cuda_common.h     |  17 ++--
 cpp/src/arrow/gpu/cuda_memory.cc    | 116 ++++++++++++++++++++-
 cpp/src/arrow/gpu/cuda_memory.h     |  89 +++++++++++++++-
 cpp/src/arrow/io/file.cc            |   6 +-
 cpp/src/arrow/io/file.h             |   6 +-
 cpp/src/arrow/io/hdfs.cc            |   4 +-
 cpp/src/arrow/io/hdfs.h             |   4 +-
 cpp/src/arrow/io/interfaces.h       |   4 +-
 cpp/src/arrow/io/memory.cc          |  16 +--
 cpp/src/arrow/io/memory.h           |  13 ++-
 cpp/src/arrow/memory_pool.h         |   6 ++
 cpp/src/arrow/python/io.cc          |   4 +-
 cpp/src/arrow/python/io.h           |   4 +-
 cpp/src/arrow/util/io-util.h        |  16 +--
 22 files changed, 574 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/c_glib/arrow-glib/input-stream.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/input-stream.cpp b/c_glib/arrow-glib/input-stream.cpp
index 645ce8a..db3efd4 100644
--- a/c_glib/arrow-glib/input-stream.cpp
+++ b/c_glib/arrow-glib/input-stream.cpp
@@ -405,7 +405,7 @@ namespace garrow {
       }
     }
 
-    arrow::Status Tell(int64_t *position) override {
+    arrow::Status Tell(int64_t *position) const override {
       if (!G_IS_SEEKABLE(input_stream_)) {
         std::string message("[gio-input-stream][tell] "
                             "not seekable input stream: <");

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/c_glib/arrow-glib/output-stream.cpp
----------------------------------------------------------------------
diff --git a/c_glib/arrow-glib/output-stream.cpp b/c_glib/arrow-glib/output-stream.cpp
index 7f37864..739992f 100644
--- a/c_glib/arrow-glib/output-stream.cpp
+++ b/c_glib/arrow-glib/output-stream.cpp
@@ -312,7 +312,7 @@ namespace garrow {
       }
     }
 
-    arrow::Status Tell(int64_t *position) override {
+    arrow::Status Tell(int64_t *position) const override {
       if (!G_IS_SEEKABLE(output_stream_)) {
         std::string message("[gio-output-stream][tell] "
                             "not seekable output stream: <");

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/README.md
----------------------------------------------------------------------
diff --git a/cpp/README.md b/cpp/README.md
index 0228faf..c05e03f 100644
--- a/cpp/README.md
+++ b/cpp/README.md
@@ -101,16 +101,27 @@ all of these variables. Note that `ARROW_BUILD_TOOLCHAIN` will not set
 `BOOST_ROOT`, so if you have custom Boost installation, you must set this
 environment variable separately.
 
-### Building Python integration library
+### Building Python integration library (optional)
 
-The `arrow_python` shared library can be built by passing `-DARROW_PYTHON=on`
-to CMake. This must be installed or in your library load path to be able to
-build pyarrow, the Arrow Python bindings.
+The optional `arrow_python` shared library can be built by passing
+`-DARROW_PYTHON=on` to CMake. This must be installed or in your library load
+path to be able to build pyarrow, the Arrow Python bindings.
 
 The Python library must be built against the same Python version for which you
 are building pyarrow, e.g. Python 2.7 or Python 3.6. NumPy must also be
 installed.
 
+### Building GPU extension library (optional)
+
+The optional `arrow_gpu` shared library can be built by passing
+`-DARROW_GPU=on`. This requires a CUDA installation to build, and to use many
+of the functions you must have a functioning GPU. Currently only CUDA
+functionality is supported, though if there is demand we can also add OpenCL
+interfaces in this library as needed.
+
+The CUDA toolchain used to build the library can be customized by using the
+`$CUDA_HOME` environment variable.
+
 ### API documentation
 
 To generate the (html) API documentation, run the following command in the apidoc

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/apidoc/Doxyfile
----------------------------------------------------------------------
diff --git a/cpp/apidoc/Doxyfile b/cpp/apidoc/Doxyfile
index 94156d5..fadb6d5 100644
--- a/cpp/apidoc/Doxyfile
+++ b/cpp/apidoc/Doxyfile
@@ -2084,7 +2084,7 @@ PREDEFINED = __attribute__(x)= \
 # definition found in the source code.
 # This tag requires that the tag ENABLE_PREPROCESSING is set to YES.
 
-EXPAND_AS_DEFINED = ARROW_MEMORY_POOL_ARG
+EXPAND_AS_DEFINED = ARROW_MEMORY_POOL_DEFAULT
 
 # If the SKIP_FUNCTION_MACROS tag is set to YES then doxygen's preprocessor will
 # remove all references to function-like macros that are alone on a line, have

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 687286d..4e3cd92 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -54,12 +54,6 @@ struct Decimal;
 
 static constexpr int64_t kMinBuilderCapacity = 1 << 5;
 
-#ifdef ARROW_NO_DEFAULT_MEMORY_POOL
-#define ARROW_MEMORY_POOL_ARG pool
-#else
-#define ARROW_MEMORY_POOL_ARG pool = default_memory_pool()
-#endif
-
 /// Base class for all data array builders.
 //
 /// This class provides a facilities for incrementally building the null bitmap
@@ -167,7 +161,8 @@ class ARROW_EXPORT ArrayBuilder {
 
 class ARROW_EXPORT NullBuilder : public ArrayBuilder {
  public:
-  explicit NullBuilder(MemoryPool* ARROW_MEMORY_POOL_ARG) : ArrayBuilder(null(), pool) {}
+  explicit NullBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT)
+      : ArrayBuilder(null(), pool) {}
 
   Status AppendNull() {
     ++null_count_;
@@ -236,8 +231,8 @@ class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder<T> {
 
   template <typename T1 = T>
   explicit NumericBuilder(
-      typename std::enable_if<TypeTraits<T1>::is_parameter_free, MemoryPool*>::type
-          ARROW_MEMORY_POOL_ARG)
+      typename std::enable_if<TypeTraits<T1>::is_parameter_free, MemoryPool*>::type
pool
+          ARROW_MEMORY_POOL_DEFAULT)
       : PrimitiveBuilder<T1>(TypeTraits<T1>::type_singleton(), pool) {}
 
   using PrimitiveBuilder<T>::Append;
@@ -368,7 +363,7 @@ inline uint8_t ExpandedUIntSize(uint64_t val, uint8_t current_int_size)
{
 
 class ARROW_EXPORT AdaptiveUIntBuilder : public internal::AdaptiveIntBuilderBase {
  public:
-  explicit AdaptiveUIntBuilder(MemoryPool* ARROW_MEMORY_POOL_ARG);
+  explicit AdaptiveUIntBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
 
   using ArrayBuilder::Advance;
 
@@ -427,7 +422,7 @@ class ARROW_EXPORT AdaptiveUIntBuilder : public internal::AdaptiveIntBuilderBase
 
 class ARROW_EXPORT AdaptiveIntBuilder : public internal::AdaptiveIntBuilderBase {
  public:
-  explicit AdaptiveIntBuilder(MemoryPool* ARROW_MEMORY_POOL_ARG);
+  explicit AdaptiveIntBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
 
   using ArrayBuilder::Advance;
 
@@ -486,7 +481,7 @@ class ARROW_EXPORT AdaptiveIntBuilder : public internal::AdaptiveIntBuilderBase
 
 class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
  public:
-  explicit BooleanBuilder(MemoryPool* ARROW_MEMORY_POOL_ARG);
+  explicit BooleanBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
 
   explicit BooleanBuilder(const std::shared_ptr<DataType>& type, MemoryPool* pool);
 
@@ -607,7 +602,7 @@ class ARROW_EXPORT ListBuilder : public ArrayBuilder {
 /// \brief Builder class for variable-length binary data
 class ARROW_EXPORT BinaryBuilder : public ArrayBuilder {
  public:
-  explicit BinaryBuilder(MemoryPool* ARROW_MEMORY_POOL_ARG);
+  explicit BinaryBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
 
 #ifndef ARROW_NO_DEPRECATED_API
   /// \deprecated Since 0.6.0
@@ -656,7 +651,7 @@ class ARROW_EXPORT BinaryBuilder : public ArrayBuilder {
 class ARROW_EXPORT StringBuilder : public BinaryBuilder {
  public:
   using BinaryBuilder::BinaryBuilder;
-  explicit StringBuilder(MemoryPool* ARROW_MEMORY_POOL_ARG);
+  explicit StringBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
 
   using BinaryBuilder::Append;
 
@@ -676,7 +671,7 @@ class ARROW_EXPORT FixedSizeBinaryBuilder : public ArrayBuilder {
 #endif
 
   FixedSizeBinaryBuilder(const std::shared_ptr<DataType>& type,
-                         MemoryPool* ARROW_MEMORY_POOL_ARG);
+                         MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
 
   Status Append(const uint8_t* value);
   Status Append(const uint8_t* data, int64_t length,
@@ -699,7 +694,7 @@ class ARROW_EXPORT FixedSizeBinaryBuilder : public ArrayBuilder {
 class ARROW_EXPORT DecimalBuilder : public FixedSizeBinaryBuilder {
  public:
   explicit DecimalBuilder(const std::shared_ptr<DataType>& type,
-                          MemoryPool* ARROW_MEMORY_POOL_ARG);
+                          MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT);
 
 #ifndef ARROW_NO_DEPRECATED_API
   /// \deprecated Since 0.6.0

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/gpu/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/CMakeLists.txt b/cpp/src/arrow/gpu/CMakeLists.txt
index 97de492..9fc9efa 100644
--- a/cpp/src/arrow/gpu/CMakeLists.txt
+++ b/cpp/src/arrow/gpu/CMakeLists.txt
@@ -80,9 +80,26 @@ set(ARROW_GPU_SHARED_LINK_LIBS
   arrow_shared
 )
 
-cuda_add_library(arrow_gpu SHARED
+add_library(arrow_gpu_objlib OBJECT
   ${ARROW_GPU_SRCS}
 )
+set_property(TARGET arrow_gpu_objlib PROPERTY POSITION_INDEPENDENT_CODE 1)
+
+if (ARROW_BUILD_SHARED)
+  cuda_add_library(arrow_gpu_shared SHARED $<TARGET_OBJECTS:arrow_gpu_objlib>)
+  install(TARGETS arrow_gpu_shared
+    RUNTIME DESTINATION bin
+    LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
+    ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR})
+endif()
+
+if (ARROW_BUILD_STATIC)
+  add_library(arrow_gpu_static STATIC $<TARGET_OBJECTS:arrow_gpu_objlib>)
+  install(TARGETS arrow_gpu_static
+    RUNTIME DESTINATION bin
+    LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
+    ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR})
+endif()
 
 install(FILES
   cuda_common.h
@@ -97,10 +114,19 @@ install(
   FILES "${CMAKE_CURRENT_BINARY_DIR}/arrow-gpu.pc"
   DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
 
+set(ARROW_GPU_TEST_LINK_LIBS
+  arrow_gpu_shared
+  ${ARROW_TEST_LINK_LIBS})
+
 if (ARROW_BUILD_TESTS)
-  set(ARROW_GPU_TEST_LINK_LIBS
-    ${ARROW_TEST_LINK_LIBS}
-    arrow_gpu)
   ADD_ARROW_CUDA_TEST(cuda-test
     STATIC_LINK_LIBS ${ARROW_GPU_TEST_LINK_LIBS})
 endif()
+
+if (ARROW_BUILD_BENCHMARKS)
+  cuda_add_executable(cuda-benchmark cuda-benchmark.cc)
+  target_link_libraries(cuda-benchmark
+    arrow_gpu_shared
+    gtest
+    ${ARROW_BENCHMARK_LINK_LIBS})
+endif()

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/gpu/cuda-benchmark.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda-benchmark.cc b/cpp/src/arrow/gpu/cuda-benchmark.cc
new file mode 100644
index 0000000..82caacc
--- /dev/null
+++ b/cpp/src/arrow/gpu/cuda-benchmark.cc
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/memory_pool.h"
+#include "arrow/test-util.h"
+
+#include "arrow/gpu/cuda_memory.h"
+
+namespace arrow {
+namespace gpu {
+
+constexpr int64_t kGpuNumber = 0;
+
+static void CudaBufferWriterBenchmark(benchmark::State& state, const int64_t total_bytes,
+                                      const int64_t chunksize,
+                                      const int64_t buffer_size) {
+  std::shared_ptr<CudaBuffer> device_buffer;
+  ABORT_NOT_OK(AllocateCudaBuffer(kGpuNumber, total_bytes, &device_buffer));
+  CudaBufferWriter writer(device_buffer);
+
+  if (buffer_size > 0) {
+    ABORT_NOT_OK(writer.SetBufferSize(buffer_size));
+  }
+
+  std::shared_ptr<PoolBuffer> buffer;
+  ASSERT_OK(test::MakeRandomBytePoolBuffer(total_bytes, default_memory_pool(), &buffer));
+
+  const uint8_t* host_data = buffer->data();
+  while (state.KeepRunning()) {
+    int64_t bytes_written = 0;
+    ABORT_NOT_OK(writer.Seek(0));
+    while (bytes_written < total_bytes) {
+      int64_t bytes_to_write = std::min(chunksize, total_bytes - bytes_written);
+      ABORT_NOT_OK(writer.Write(host_data + bytes_written, bytes_to_write));
+      bytes_written += bytes_to_write;
+    }
+  }
+  state.SetBytesProcessed(int64_t(state.iterations()) * total_bytes);
+}
+
+static void BM_Writer_Buffered(benchmark::State& state) {
+  // 128MB
+  const int64_t kTotalBytes = 1 << 27;
+
+  // 8MB
+  const int64_t kBufferSize = 1 << 23;
+
+  CudaBufferWriterBenchmark(state, kTotalBytes, state.range(0), kBufferSize);
+}
+
+static void BM_Writer_Unbuffered(benchmark::State& state) {
+  // 128MB
+  const int64_t kTotalBytes = 1 << 27;
+  CudaBufferWriterBenchmark(state, kTotalBytes, state.range(0), 0);
+}
+
+// Vary chunk write size from 256 bytes to 64K
+BENCHMARK(BM_Writer_Buffered)
+    ->RangeMultiplier(16)
+    ->Range(1 << 8, 1 << 16)
+    ->MinTime(1.0)
+    ->UseRealTime();
+
+BENCHMARK(BM_Writer_Unbuffered)
+    ->RangeMultiplier(4)
+    ->RangeMultiplier(16)
+    ->Range(1 << 8, 1 << 16)
+    ->MinTime(1.0)
+    ->UseRealTime();
+
+}  // namespace gpu
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/gpu/cuda-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc
index 9dc8e1a..f479701 100644
--- a/cpp/src/arrow/gpu/cuda-test.cc
+++ b/cpp/src/arrow/gpu/cuda-test.cc
@@ -29,17 +29,180 @@
 namespace arrow {
 namespace gpu {
 
+constexpr int kGpuNumber = 0;
+
 class TestCudaBuffer : public ::testing::Test {};
 
 TEST_F(TestCudaBuffer, Allocate) {
-  const int device = 0;
-
   const int64_t kSize = 100;
   std::shared_ptr<CudaBuffer> buffer;
 
-  ASSERT_OK(AllocateCudaBuffer(device, kSize, &buffer));
+  ASSERT_OK(AllocateCudaBuffer(kGpuNumber, kSize, &buffer));
   ASSERT_EQ(kSize, buffer->size());
 }
 
+void AssertCudaBufferEquals(const CudaBuffer& buffer, const uint8_t* host_data,
+                            const int64_t nbytes) {
+  std::shared_ptr<MutableBuffer> result;
+  ASSERT_OK(AllocateBuffer(default_memory_pool(), nbytes, &result));
+  ASSERT_OK(buffer.CopyToHost(0, buffer.size(), result->mutable_data()));
+  ASSERT_EQ(0, std::memcmp(result->data(), host_data, nbytes));
+}
+
+TEST_F(TestCudaBuffer, CopyFromHost) {
+  const int64_t kSize = 1000;
+  std::shared_ptr<CudaBuffer> device_buffer;
+  ASSERT_OK(AllocateCudaBuffer(kGpuNumber, kSize, &device_buffer));
+
+  std::shared_ptr<PoolBuffer> host_buffer;
+  ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer));
+
+  ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), 500));
+  ASSERT_OK(device_buffer->CopyFromHost(500, host_buffer->data() + 500, kSize - 500));
+
+  AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize);
+}
+
+class TestCudaBufferWriter : public ::testing::Test {
+ public:
+  void Allocate(const int64_t size) {
+    ASSERT_OK(AllocateCudaBuffer(kGpuNumber, size, &device_buffer_));
+    writer_.reset(new CudaBufferWriter(device_buffer_));
+  }
+
+  void TestWrites(const int64_t total_bytes, const int64_t chunksize,
+                  const int64_t buffer_size = 0) {
+    std::shared_ptr<PoolBuffer> buffer;
+    ASSERT_OK(
+        test::MakeRandomBytePoolBuffer(total_bytes, default_memory_pool(), &buffer));
+
+    if (buffer_size > 0) {
+      ASSERT_OK(writer_->SetBufferSize(buffer_size));
+    }
+
+    int64_t position = 0;
+    ASSERT_OK(writer_->Tell(&position));
+    ASSERT_EQ(0, position);
+
+    const uint8_t* host_data = buffer->data();
+    ASSERT_OK(writer_->Write(host_data, chunksize));
+    ASSERT_OK(writer_->Tell(&position));
+    ASSERT_EQ(chunksize, position);
+
+    ASSERT_OK(writer_->Seek(0));
+    ASSERT_OK(writer_->Tell(&position));
+    ASSERT_EQ(0, position);
+
+    while (position < total_bytes) {
+      int64_t bytes_to_write = std::min(chunksize, total_bytes - position);
+      ASSERT_OK(writer_->Write(host_data + position, bytes_to_write));
+      position += bytes_to_write;
+    }
+
+    ASSERT_OK(writer_->Flush());
+
+    AssertCudaBufferEquals(*device_buffer_, buffer->data(), total_bytes);
+  }
+
+ protected:
+  std::shared_ptr<CudaBuffer> device_buffer_;
+  std::unique_ptr<CudaBufferWriter> writer_;
+};
+
+TEST_F(TestCudaBufferWriter, UnbufferedWrites) {
+  const int64_t kTotalSize = 1 << 16;
+  Allocate(kTotalSize);
+  TestWrites(kTotalSize, 1000);
+}
+
+TEST_F(TestCudaBufferWriter, BufferedWrites) {
+  const int64_t kTotalSize = 1 << 16;
+  Allocate(kTotalSize);
+  TestWrites(kTotalSize, 1000, 1 << 12);
+}
+
+TEST_F(TestCudaBufferWriter, EdgeCases) {
+  Allocate(1000);
+
+  std::shared_ptr<PoolBuffer> buffer;
+  ASSERT_OK(test::MakeRandomBytePoolBuffer(1000, default_memory_pool(), &buffer));
+  const uint8_t* host_data = buffer->data();
+
+  ASSERT_EQ(0, writer_->buffer_size());
+  ASSERT_OK(writer_->SetBufferSize(100));
+  ASSERT_EQ(100, writer_->buffer_size());
+
+  // Write 0 bytes
+  int64_t position = 0;
+  ASSERT_OK(writer_->Write(host_data, 0));
+  ASSERT_OK(writer_->Tell(&position));
+  ASSERT_EQ(0, position);
+
+  // Write some data, then change buffer size
+  ASSERT_OK(writer_->Write(host_data, 10));
+  ASSERT_OK(writer_->SetBufferSize(200));
+  ASSERT_EQ(200, writer_->buffer_size());
+
+  ASSERT_EQ(0, writer_->num_bytes_buffered());
+
+  // Write more than buffer size
+  ASSERT_OK(writer_->Write(host_data + 10, 300));
+  ASSERT_EQ(0, writer_->num_bytes_buffered());
+
+  // Write exactly buffer size
+  ASSERT_OK(writer_->Write(host_data + 310, 200));
+  ASSERT_EQ(0, writer_->num_bytes_buffered());
+
+  // Write rest of bytes
+  ASSERT_OK(writer_->Write(host_data + 510, 390));
+  ASSERT_OK(writer_->Write(host_data + 900, 100));
+
+  // Close flushes
+  ASSERT_OK(writer_->Close());
+
+  // Check that everything was written
+  AssertCudaBufferEquals(*device_buffer_, host_data, 1000);
+}
+
+TEST(TestCudaBufferReader, Basics) {
+  std::shared_ptr<CudaBuffer> device_buffer;
+
+  const int64_t size = 1000;
+  ASSERT_OK(AllocateCudaBuffer(kGpuNumber, size, &device_buffer));
+
+  std::shared_ptr<PoolBuffer> buffer;
+  ASSERT_OK(test::MakeRandomBytePoolBuffer(1000, default_memory_pool(), &buffer));
+  const uint8_t* host_data = buffer->data();
+
+  ASSERT_OK(device_buffer->CopyFromHost(0, host_data, 1000));
+
+  CudaBufferReader reader(device_buffer);
+
+  // Read to host memory
+  uint8_t stack_buffer[100] = {0};
+  int64_t bytes_read = 0;
+  ASSERT_OK(reader.Seek(950));
+
+  int64_t position = 0;
+  ASSERT_OK(reader.Tell(&position));
+  ASSERT_EQ(950, position);
+
+  ASSERT_OK(reader.Read(100, &bytes_read, stack_buffer));
+  ASSERT_EQ(50, bytes_read);
+  ASSERT_EQ(0, std::memcmp(stack_buffer, host_data + 950, 50));
+  ASSERT_OK(reader.Tell(&position));
+  ASSERT_EQ(1000, position);
+
+  ASSERT_OK(reader.Seek(925));
+  std::shared_ptr<Buffer> tmp;
+  ASSERT_OK(reader.Read(100, &tmp));
+  ASSERT_EQ(75, tmp->size());
+  ASSERT_OK(reader.Tell(&position));
+  ASSERT_EQ(1000, position);
+
+  ASSERT_OK(std::dynamic_pointer_cast<CudaBuffer>(tmp)->CopyToHost(0, 75, stack_buffer));
+  ASSERT_EQ(0, std::memcmp(stack_buffer, host_data + 925, 75));
+}
+
 }  // namespace gpu
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/gpu/cuda_common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda_common.h b/cpp/src/arrow/gpu/cuda_common.h
index 75f41c9..fae2f6e 100644
--- a/cpp/src/arrow/gpu/cuda_common.h
+++ b/cpp/src/arrow/gpu/cuda_common.h
@@ -20,6 +20,8 @@
 #ifndef ARROW_GPU_CUDA_COMMON_H
 #define ARROW_GPU_CUDA_COMMON_H
 
+#include <sstream>
+
 #include <cuda_runtime_api.h>
 
 namespace arrow {
@@ -32,12 +34,15 @@ namespace gpu {
     (void)ret;            \
   } while (0)
 
-#define CUDA_RETURN_NOT_OK(STMT)                              \
-  do {                                                        \
-    cudaError_t ret = (STMT);                                 \
-    if (ret != cudaSuccess) {                                 \
-      return Status::IOError("Cuda API call failed: " #STMT); \
-    }                                                         \
+#define CUDA_RETURN_NOT_OK(STMT)                                       \
+  do {                                                                 \
+    cudaError_t ret = (STMT);                                          \
+    if (ret != cudaSuccess) {                                          \
+      std::stringstream ss;                                            \
+      ss << "Cuda API call in " << __FILE__ << " at line " << __LINE__
\
+         << " failed: " << #STMT;                                      \
+      return Status::IOError(ss.str());                                \
+    }                                                                  \
   } while (0)
 
 }  // namespace gpu

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/gpu/cuda_memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc
index cf37bda..d669619 100644
--- a/cpp/src/arrow/gpu/cuda_memory.cc
+++ b/cpp/src/arrow/gpu/cuda_memory.cc
@@ -17,10 +17,12 @@
 
 #include "arrow/gpu/cuda_memory.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <memory>
 
 #include "arrow/buffer.h"
+#include "arrow/io/memory.h"
 #include "arrow/status.h"
 #include "arrow/util/logging.h"
 
@@ -35,8 +37,21 @@ CudaBuffer::~CudaBuffer() {
   }
 }
 
-Status CudaBuffer::CopyHost(uint8_t* out) {
-  CUDA_RETURN_NOT_OK(cudaMemcpy(out, data_, size_, cudaMemcpyDeviceToHost));
+CudaBuffer::CudaBuffer(const std::shared_ptr<CudaBuffer>& parent, const int64_t
offset,
+                       const int64_t size)
+    : Buffer(parent, offset, size), gpu_number_(parent->gpu_number()) {}
+
+Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes,
+                              uint8_t* out) const {
+  CUDA_RETURN_NOT_OK(cudaMemcpy(out, data_ + position, nbytes, cudaMemcpyDeviceToHost));
+  return Status::OK();
+}
+
+Status CudaBuffer::CopyFromHost(const int64_t position, const uint8_t* data,
+                                int64_t nbytes) {
+  DCHECK_LE(nbytes, size_ - position) << "Copy would overflow buffer";
+  CUDA_RETURN_NOT_OK(
+      cudaMemcpy(mutable_data_ + position, data, nbytes, cudaMemcpyHostToDevice));
   return Status::OK();
 }
 
@@ -52,8 +67,101 @@ Status AllocateCudaBuffer(int gpu_number, const int64_t size,
 
 CudaHostBuffer::~CudaHostBuffer() { CUDA_DCHECK(cudaFreeHost(mutable_data_)); }
 
-Status AllocateCudaHostBuffer(const int gpu_number, const int64_t size,
-                              std::shared_ptr<CudaHostBuffer>* out) {
+// ----------------------------------------------------------------------
+// CudaBufferReader
+
+CudaBufferReader::CudaBufferReader(const std::shared_ptr<CudaBuffer>& buffer)
+    : io::BufferReader(buffer), cuda_buffer_(buffer) {}
+
+CudaBufferReader::~CudaBufferReader() {}
+
+Status CudaBufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
+  nbytes = std::min(nbytes, size_ - position_);
+  CUDA_RETURN_NOT_OK(
+      cudaMemcpy(buffer, data_ + position_, nbytes, cudaMemcpyDeviceToHost));
+  *bytes_read = nbytes;
+  position_ += nbytes;
+  return Status::OK();
+}
+
+Status CudaBufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  int64_t size = std::min(nbytes, size_ - position_);
+  *out = std::make_shared<CudaBuffer>(cuda_buffer_, position_, size);
+  position_ += size;
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// CudaBufferWriter
+
+CudaBufferWriter::CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer)
+    : io::FixedSizeBufferWriter(buffer), buffer_size_(0), buffer_position_(0) {}
+
+CudaBufferWriter::~CudaBufferWriter() {}
+
+Status CudaBufferWriter::Close() { return Flush(); }
+
+Status CudaBufferWriter::Flush() {
+  if (buffer_size_ > 0 && buffer_position_ > 0) {
+    // Only need to flush when the write has been buffered
+    CUDA_RETURN_NOT_OK(cudaMemcpy(mutable_data_ + position_ - buffer_position_,
+                                  host_buffer_data_, buffer_position_,
+                                  cudaMemcpyHostToDevice));
+    buffer_position_ = 0;
+  }
+  return Status::OK();
+}
+
+Status CudaBufferWriter::Seek(int64_t position) {
+  if (buffer_position_ > 0) {
+    RETURN_NOT_OK(Flush());
+  }
+  return io::FixedSizeBufferWriter::Seek(position);
+}
+
+Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
+  if (memcopy_num_threads_ > 1) {
+    return Status::Invalid("parallel CUDA memcpy not supported");
+  }
+
+  if (nbytes == 0) {
+    return Status::OK();
+  }
+
+  if (buffer_size_ > 0) {
+    if (nbytes + buffer_position_ >= buffer_size_) {
+      // Reach end of buffer, write everything
+      RETURN_NOT_OK(Flush());
+      CUDA_RETURN_NOT_OK(
+          cudaMemcpy(mutable_data_ + position_, data, nbytes, cudaMemcpyHostToDevice));
+    } else {
+      // Write bytes to buffer
+      std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes);
+      buffer_position_ += nbytes;
+    }
+  } else {
+    // Unbuffered write
+    CUDA_RETURN_NOT_OK(
+        cudaMemcpy(mutable_data_ + position_, data, nbytes, cudaMemcpyHostToDevice));
+  }
+  position_ += nbytes;
+  return Status::OK();
+}
+
+Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) {
+  if (buffer_position_ > 0) {
+    // Flush any buffered data
+    RETURN_NOT_OK(Flush());
+  }
+  RETURN_NOT_OK(AllocateCudaHostBuffer(buffer_size, &host_buffer_));
+  host_buffer_data_ = host_buffer_->mutable_data();
+  buffer_size_ = buffer_size;
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+
+Status AllocateCudaHostBuffer(const int64_t size, std::shared_ptr<CudaHostBuffer>*
out) {
   uint8_t* data = nullptr;
   CUDA_RETURN_NOT_OK(
       cudaMallocHost(reinterpret_cast<void**>(&data), static_cast<size_t>(size)));

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/gpu/cuda_memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h
index 885373a..acf65e6 100644
--- a/cpp/src/arrow/gpu/cuda_memory.h
+++ b/cpp/src/arrow/gpu/cuda_memory.h
@@ -22,25 +22,41 @@
 #include <memory>
 
 #include "arrow/buffer.h"
+#include "arrow/io/memory.h"
+#include "arrow/memory_pool.h"
 #include "arrow/status.h"
 
 namespace arrow {
 namespace gpu {
 
+/// \class CudaBuffer
 /// \brief An Arrow buffer located on a GPU device
 ///
 /// Be careful using this in any Arrow code which may not be GPU-aware
-class ARROW_EXPORT CudaBuffer : public MutableBuffer {
+class ARROW_EXPORT CudaBuffer : public Buffer {
  public:
   CudaBuffer(uint8_t* data, int64_t size, const int gpu_number, bool own_data = false)
-      : MutableBuffer(data, size), gpu_number_(gpu_number), own_data_(own_data) {}
+      : Buffer(data, size), gpu_number_(gpu_number), own_data_(own_data) {
+    is_mutable_ = true;
+    mutable_data_ = data;
+  }
+
+  CudaBuffer(const std::shared_ptr<CudaBuffer>& parent, const int64_t offset,
+             const int64_t size);
 
   ~CudaBuffer();
 
   /// \brief Copy memory from GPU device to CPU host
   /// \param[out] out a pre-allocated output buffer
   /// \return Status
-  Status CopyHost(uint8_t* out);
+  Status CopyToHost(const int64_t position, const int64_t nbytes, uint8_t* out) const;
+
+  /// \brief Copy memory to device at position
+  /// \param[in] position start position to copy bytes
+  /// \param[in] data the host data to copy
+  /// \param[in] nbytes number of bytes to copy
+  /// \return Status
+  Status CopyFromHost(const int64_t position, const uint8_t* data, int64_t nbytes);
 
   int gpu_number() const { return gpu_number_; }
 
@@ -49,6 +65,7 @@ class ARROW_EXPORT CudaBuffer : public MutableBuffer {
   bool own_data_;
 };
 
+/// \class CudaHostBuffer
 /// \brief Device-accessible CPU memory created using cudaHostAlloc
 class ARROW_EXPORT CudaHostBuffer : public MutableBuffer {
  public:
@@ -56,6 +73,72 @@ class ARROW_EXPORT CudaHostBuffer : public MutableBuffer {
   ~CudaHostBuffer();
 };
 
+/// \class CudaBufferReader
+/// \brief File interface for zero-copy read from CUDA buffers
+///
+/// Note: Reads return pointers to device memory. This means you must be
+/// careful using this interface with any Arrow code which may expect to be
+/// able to do anything other than pointer arithmetic on the returned buffers
+class ARROW_EXPORT CudaBufferReader : public io::BufferReader {
+ public:
+  explicit CudaBufferReader(const std::shared_ptr<CudaBuffer>& buffer);
+  ~CudaBufferReader();
+
+  /// \brief Read bytes into pre-allocated host memory
+  /// \param[in] nbytes number of bytes to read
+  /// \param[out] bytes_read actual number of bytes read
+  /// \param[out] buffer pre-allocated memory to write into
+  Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
+
+  /// \brief Zero-copy read from device memory
+  /// \param[in] nbytes number of bytes to read
+  /// \param[out] out a Buffer referencing device memory
+  /// \return Status
+  Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
+ private:
+  // In case we need to access anything GPU-specific, like device number
+  std::shared_ptr<CudaBuffer> cuda_buffer_;
+};
+
+/// \class CudaBufferWriter
+/// \brief File interface for writing to CUDA buffers, with optional buffering
+class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter {
+ public:
+  explicit CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer);
+  ~CudaBufferWriter();
+
+  /// \brief Close writer and flush buffered bytes to GPU
+  Status Close() override;
+
+  /// \brief Flush buffered bytes to GPU
+  Status Flush() override;
+
+  // Seek requires flushing if any bytes are buffered
+  Status Seek(int64_t position) override;
+  Status Write(const uint8_t* data, int64_t nbytes) override;
+
+  /// \brief Set CPU buffer size to limit calls to cudaMemcpy
+  /// \param[in] buffer_size the size of CPU buffer to allocate
+  /// \return Status
+  ///
+  /// By default writes are unbuffered
+  Status SetBufferSize(const int64_t buffer_size);
+
+  /// \brief Returns size of host (CPU) buffer, 0 for unbuffered
+  int64_t buffer_size() const { return buffer_size_; }
+
+  /// \brief Returns number of bytes buffered on host
+  int64_t num_bytes_buffered() const { return buffer_position_; }
+
+ private:
+  // Pinned host buffer for buffering writes on CPU before calling cudaMalloc
+  int64_t buffer_size_;
+  int64_t buffer_position_;
+  std::shared_ptr<CudaHostBuffer> host_buffer_;
+  uint8_t* host_buffer_data_;
+};
+
 /// \brief Allocate CUDA memory on a GPU device
 /// \param[in] gpu_number Device number to allocate
 /// \param[in] size number of bytes

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/io/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 85caee1..3957852 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -440,7 +440,7 @@ Status ReadableFile::Open(const std::string& path, MemoryPool* memory_pool,
 
 Status ReadableFile::Close() { return impl_->Close(); }
 
-Status ReadableFile::Tell(int64_t* pos) { return impl_->Tell(pos); }
+Status ReadableFile::Tell(int64_t* pos) const { return impl_->Tell(pos); }
 
 Status ReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
   return impl_->Read(nbytes, bytes_read, out);
@@ -492,7 +492,7 @@ Status FileOutputStream::Open(const std::string& path, bool append,
 
 Status FileOutputStream::Close() { return impl_->Close(); }
 
-Status FileOutputStream::Tell(int64_t* pos) { return impl_->Tell(pos); }
+Status FileOutputStream::Tell(int64_t* pos) const { return impl_->Tell(pos); }
 
 Status FileOutputStream::Write(const uint8_t* data, int64_t length) {
   return impl_->Write(data, length);
@@ -612,7 +612,7 @@ Status MemoryMappedFile::GetSize(int64_t* size) {
   return Status::OK();
 }
 
-Status MemoryMappedFile::Tell(int64_t* position) {
+Status MemoryMappedFile::Tell(int64_t* position) const {
   *position = memory_map_->position();
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/io/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index 2a0e89c..f26d905 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -57,7 +57,7 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
 
   // OutputStream interface
   Status Close() override;
-  Status Tell(int64_t* position) override;
+  Status Tell(int64_t* position) const override;
 
   // Write bytes to the stream. Thread-safe
   Status Write(const uint8_t* data, int64_t nbytes) override;
@@ -91,7 +91,7 @@ class ARROW_EXPORT ReadableFile : public RandomAccessFile {
                      std::shared_ptr<ReadableFile>* file);
 
   Status Close() override;
-  Status Tell(int64_t* position) override;
+  Status Tell(int64_t* position) const override;
 
   // Read bytes from the file. Thread-safe
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
@@ -130,7 +130,7 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
 
   Status Close() override;
 
-  Status Tell(int64_t* position) override;
+  Status Tell(int64_t* position) const override;
 
   Status Seek(int64_t position) override;
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
index ba446b5..8b143c3 100644
--- a/cpp/src/arrow/io/hdfs.cc
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -230,7 +230,7 @@ Status HdfsReadableFile::GetSize(int64_t* size) { return impl_->GetSize(size);
}
 
 Status HdfsReadableFile::Seek(int64_t position) { return impl_->Seek(position); }
 
-Status HdfsReadableFile::Tell(int64_t* position) { return impl_->Tell(position); }
+Status HdfsReadableFile::Tell(int64_t* position) const { return impl_->Tell(position);
}
 
 // ----------------------------------------------------------------------
 // File writing
@@ -284,7 +284,7 @@ Status HdfsOutputStream::Write(const uint8_t* buffer, int64_t nbytes)
{
 
 Status HdfsOutputStream::Flush() { return impl_->Flush(); }
 
-Status HdfsOutputStream::Tell(int64_t* position) { return impl_->Tell(position); }
+Status HdfsOutputStream::Tell(int64_t* position) const { return impl_->Tell(position);
}
 
 // ----------------------------------------------------------------------
 // HDFS client

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/io/hdfs.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h
index ca88953..82e3f71 100644
--- a/cpp/src/arrow/io/hdfs.h
+++ b/cpp/src/arrow/io/hdfs.h
@@ -199,7 +199,7 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
   bool supports_zero_copy() const override;
 
   Status Seek(int64_t position) override;
-  Status Tell(int64_t* position) override;
+  Status Tell(int64_t* position) const override;
 
   void set_memory_pool(MemoryPool* pool);
 
@@ -228,7 +228,7 @@ class ARROW_EXPORT HdfsOutputStream : public OutputStream {
 
   Status Flush() override;
 
-  Status Tell(int64_t* position) override;
+  Status Tell(int64_t* position) const override;
 
  private:
   class ARROW_NO_EXPORT HdfsOutputStreamImpl;

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/io/interfaces.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 4bb7ebe..3388acb 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -72,7 +72,7 @@ class ARROW_EXPORT FileInterface {
  public:
   virtual ~FileInterface() = 0;
   virtual Status Close() = 0;
-  virtual Status Tell(int64_t* position) = 0;
+  virtual Status Tell(int64_t* position) const = 0;
 
   FileMode::type mode() const { return mode_; }
 
@@ -94,7 +94,7 @@ class ARROW_EXPORT Writeable {
  public:
   virtual Status Write(const uint8_t* data, int64_t nbytes) = 0;
 
-  // Default implementation is a no-op
+  /// \brief Flush buffered bytes, if any
   virtual Status Flush();
 
   Status Write(const std::string& data);

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 50f3ddf..532c2c6 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -75,7 +75,7 @@ Status BufferOutputStream::Finish(std::shared_ptr<Buffer>* result)
{
   return Status::OK();
 }
 
-Status BufferOutputStream::Tell(int64_t* position) {
+Status BufferOutputStream::Tell(int64_t* position) const {
   *position = position_;
   return Status::OK();
 }
@@ -104,9 +104,12 @@ Status BufferOutputStream::Reserve(int64_t nbytes) {
 // ----------------------------------------------------------------------
 // OutputStream that doesn't write anything
 
-Status MockOutputStream::Close() { return Status::OK(); }
+Status MockOutputStream::Close() {
+  // no-op
+  return Status::OK();
+}
 
-Status MockOutputStream::Tell(int64_t* position) {
+Status MockOutputStream::Tell(int64_t* position) const {
   *position = extent_bytes_written_;
   return Status::OK();
 }
@@ -128,6 +131,7 @@ FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>&
buff
     : memcopy_num_threads_(kMemcopyDefaultNumThreads),
       memcopy_blocksize_(kMemcopyDefaultBlocksize),
       memcopy_threshold_(kMemcopyDefaultThreshold) {
+  DCHECK(buffer) << "Buffer was nullptr";
   buffer_ = buffer;
   DCHECK(buffer->is_mutable()) << "Must pass mutable buffer";
   mutable_data_ = buffer->mutable_data();
@@ -138,7 +142,7 @@ FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>&
buff
 FixedSizeBufferWriter::~FixedSizeBufferWriter() {}
 
 Status FixedSizeBufferWriter::Close() {
-  // No-op
+  // no-op
   return Status::OK();
 }
 
@@ -150,7 +154,7 @@ Status FixedSizeBufferWriter::Seek(int64_t position) {
   return Status::OK();
 }
 
-Status FixedSizeBufferWriter::Tell(int64_t* position) {
+Status FixedSizeBufferWriter::Tell(int64_t* position) const {
   *position = position_;
   return Status::OK();
 }
@@ -201,7 +205,7 @@ Status BufferReader::Close() {
   return Status::OK();
 }
 
-Status BufferReader::Tell(int64_t* position) {
+Status BufferReader::Tell(int64_t* position) const {
   *position = position_;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index 1f81774..667a7f0 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -51,7 +51,7 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream {
 
   // Implement the OutputStream interface
   Status Close() override;
-  Status Tell(int64_t* position) override;
+  Status Tell(int64_t* position) const override;
   Status Write(const uint8_t* data, int64_t nbytes) override;
 
   /// Close the stream and return the buffer
@@ -74,7 +74,7 @@ class ARROW_EXPORT MockOutputStream : public OutputStream {
 
   // Implement the OutputStream interface
   Status Close() override;
-  Status Tell(int64_t* position) override;
+  Status Tell(int64_t* position) const override;
   Status Write(const uint8_t* data, int64_t nbytes) override;
 
   int64_t GetExtentBytesWritten() const { return extent_bytes_written_; }
@@ -93,7 +93,7 @@ class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile {
 
   Status Close() override;
   Status Seek(int64_t position) override;
-  Status Tell(int64_t* position) override;
+  Status Tell(int64_t* position) const override;
   Status Write(const uint8_t* data, int64_t nbytes) override;
   Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;
 
@@ -101,7 +101,7 @@ class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile {
   void set_memcopy_blocksize(int64_t blocksize);
   void set_memcopy_threshold(int64_t threshold);
 
- private:
+ protected:
   std::mutex lock_;
   std::shared_ptr<Buffer> buffer_;
   uint8_t* mutable_data_;
@@ -120,8 +120,7 @@ class ARROW_EXPORT BufferReader : public RandomAccessFile {
   virtual ~BufferReader();
 
   Status Close() override;
-  Status Tell(int64_t* position) override;
-
+  Status Tell(int64_t* position) const override;
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
 
   // Zero copy read
@@ -134,7 +133,7 @@ class ARROW_EXPORT BufferReader : public RandomAccessFile {
 
   std::shared_ptr<Buffer> buffer() const { return buffer_; }
 
- private:
+ protected:
   std::shared_ptr<Buffer> buffer_;
   const uint8_t* data_;
   int64_t size_;

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/memory_pool.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h
index 90bc593..5bb2b56 100644
--- a/cpp/src/arrow/memory_pool.h
+++ b/cpp/src/arrow/memory_pool.h
@@ -109,6 +109,12 @@ class ARROW_EXPORT LoggingMemoryPool : public MemoryPool {
 
 ARROW_EXPORT MemoryPool* default_memory_pool();
 
+#ifdef ARROW_NO_DEFAULT_MEMORY_POOL
+#define ARROW_MEMORY_POOL_DEFAULT
+#else
+#define ARROW_MEMORY_POOL_DEFAULT = default_memory_pool()
+#endif
+
 }  // namespace arrow
 
 #endif  // ARROW_MEMORY_POOL_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/python/io.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/io.cc b/cpp/src/arrow/python/io.cc
index 4c73fd6..99b99f7 100644
--- a/cpp/src/arrow/python/io.cc
+++ b/cpp/src/arrow/python/io.cc
@@ -113,7 +113,7 @@ Status PyReadableFile::Seek(int64_t position) {
   return file_->Seek(position, 0);
 }
 
-Status PyReadableFile::Tell(int64_t* position) {
+Status PyReadableFile::Tell(int64_t* position) const {
   PyAcquireGIL lock;
   return file_->Tell(position);
 }
@@ -177,7 +177,7 @@ Status PyOutputStream::Close() {
   return file_->Close();
 }
 
-Status PyOutputStream::Tell(int64_t* position) {
+Status PyOutputStream::Tell(int64_t* position) const {
   *position = position_;
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/python/io.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/io.h b/cpp/src/arrow/python/io.h
index ebd4c5a..eda7346 100644
--- a/cpp/src/arrow/python/io.h
+++ b/cpp/src/arrow/python/io.h
@@ -63,7 +63,7 @@ class ARROW_EXPORT PyReadableFile : public io::RandomAccessFile {
 
   Status Seek(int64_t position) override;
 
-  Status Tell(int64_t* position) override;
+  Status Tell(int64_t* position) const override;
 
   bool supports_zero_copy() const override;
 
@@ -77,7 +77,7 @@ class ARROW_EXPORT PyOutputStream : public io::OutputStream {
   virtual ~PyOutputStream();
 
   Status Close() override;
-  Status Tell(int64_t* position) override;
+  Status Tell(int64_t* position) const override;
   Status Write(const uint8_t* data, int64_t nbytes) override;
 
  private:

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c3a5f4e/cpp/src/arrow/util/io-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/io-util.h b/cpp/src/arrow/util/io-util.h
index 34bee18..6fe3a5c 100644
--- a/cpp/src/arrow/util/io-util.h
+++ b/cpp/src/arrow/util/io-util.h
@@ -33,13 +33,14 @@ class StdoutStream : public OutputStream {
   StdoutStream() : pos_(0) { set_mode(FileMode::WRITE); }
   virtual ~StdoutStream() {}
 
-  Status Close() { return Status::OK(); }
-  Status Tell(int64_t* position) {
+  Status Close() override { return Status::OK(); }
+
+  Status Tell(int64_t* position) const override {
     *position = pos_;
     return Status::OK();
   }
 
-  Status Write(const uint8_t* data, int64_t nbytes) {
+  Status Write(const uint8_t* data, int64_t nbytes) override {
     pos_ += nbytes;
     std::cout.write(reinterpret_cast<const char*>(data), nbytes);
     return Status::OK();
@@ -55,13 +56,14 @@ class StdinStream : public InputStream {
   StdinStream() : pos_(0) { set_mode(FileMode::READ); }
   virtual ~StdinStream() {}
 
-  Status Close() { return Status::OK(); }
-  Status Tell(int64_t* position) {
+  Status Close() override { return Status::OK(); }
+
+  Status Tell(int64_t* position) const override {
     *position = pos_;
     return Status::OK();
   }
 
-  virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+  Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override {
     std::cin.read(reinterpret_cast<char*>(out), nbytes);
     if (std::cin) {
       *bytes_read = nbytes;
@@ -72,7 +74,7 @@ class StdinStream : public InputStream {
     return Status::OK();
   }
 
-  virtual Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override {
     auto buffer = std::make_shared<PoolBuffer>(nullptr);
     RETURN_NOT_OK(buffer->Resize(nbytes));
     int64_t bytes_read;


Mime
View raw message