arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pcmor...@apache.org
Subject arrow git commit: ARROW-1372: [Plasma] enable HUGETLB support on Linux to improve plasma put performance
Date Sun, 20 Aug 2017 01:44:55 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 652fd36a7 -> 10f7158df


ARROW-1372: [Plasma] enable HUGETLB support on Linux to improve plasma put performance

This PR makes it possible to use Plasma object store backed by a pre-mounted hugetlbfs.

Author: Philipp Moritz <pcmoritz@gmail.com>
Author: Alexey Tumanov <atumanov@gmail.com>

Closes #974 from atumanov/putperf and squashes the following commits:

077b78f [Philipp Moritz] add more comments
5aa4b0d [Philipp Moritz] preflight script formatting changes
22188a6 [Philipp Moritz] formatting
ffb9916 [Philipp Moritz] address comments
225429b [Philipp Moritz] update documentation with Alexey's fix
713a0c4 [Philipp Moritz] add missing includes
4c976bb [Philipp Moritz] make format
fb8e1b4 [Philipp Moritz] add helpful error message
7260d59 [Philipp Moritz] expose number of threads to python and try out cleanups
98b603e [Alexey Tumanov] map_populate on linux; fall back to mlock/memset otherwise
ce90ef4 [Alexey Tumanov] documenting new plasma store info fields
c52f211 [Philipp Moritz] cleanups (TODO: See if memory locking helps)
4702703 [Philipp Moritz] preliminary documentation
3073a99 [Alexey Tumanov] reenable hashing
a20ca56 [Alexey Tumanov] fix bug
dd04b87 [Alexey Tumanov] [arrow][putperf] enable HUGETLBFS support on linux


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/10f7158d
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/10f7158d
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/10f7158d

Branch: refs/heads/master
Commit: 10f7158df46d838d32ef214b0573b6d719756516
Parents: 652fd36
Author: Philipp Moritz <pcmoritz@gmail.com>
Authored: Sat Aug 19 18:43:06 2017 -0700
Committer: Philipp Moritz <pcmoritz@gmail.com>
Committed: Sat Aug 19 18:43:06 2017 -0700

----------------------------------------------------------------------
 cpp/src/plasma/client.cc             |  3 +-
 cpp/src/plasma/common.cc             |  2 +
 cpp/src/plasma/common.h              |  5 ++
 cpp/src/plasma/malloc.cc             | 82 +++++++++++++++++++----------
 cpp/src/plasma/malloc.h              |  2 +
 cpp/src/plasma/plasma.h              |  8 +++
 cpp/src/plasma/store.cc              | 86 ++++++++++++++++++++++---------
 cpp/src/plasma/store.h               |  7 ++-
 python/doc/source/plasma.rst         | 47 +++++++++++++++++
 python/pyarrow/includes/libarrow.pxd |  4 ++
 python/pyarrow/io.pxi                | 15 ++++++
 11 files changed, 206 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/cpp/src/plasma/client.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 8ea62c6..5e28d4f 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -91,7 +91,8 @@ uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t
map_size
     if (result == MAP_FAILED) {
       ARROW_LOG(FATAL) << "mmap failed";
     }
-    close(fd);
+    close(fd);  // Closing this fd has an effect on performance.
+
     ClientMmapTableEntry& entry = mmap_table_[store_fd_val];
     entry.pointer = result;
     entry.length = map_size;

http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/cpp/src/plasma/common.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc
index d7a7965..2de06d5 100644
--- a/cpp/src/plasma/common.cc
+++ b/cpp/src/plasma/common.cc
@@ -83,4 +83,6 @@ Status plasma_error_status(int plasma_error) {
 ARROW_EXPORT int ObjectStatusLocal = ObjectStatus_Local;
 ARROW_EXPORT int ObjectStatusRemote = ObjectStatus_Remote;
 
+const PlasmaStoreInfo* plasma_config;
+
 }  // namespace plasma

http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/cpp/src/plasma/common.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
index 2b71da6..66d5f30 100644
--- a/cpp/src/plasma/common.h
+++ b/cpp/src/plasma/common.h
@@ -95,6 +95,11 @@ enum ObjectRequestType {
 extern int ObjectStatusLocal;
 extern int ObjectStatusRemote;
 
+/// Globally accessible reference to plasma store configuration.
+/// TODO(pcm): This can be avoided with some refactoring of existing code
+/// by making it possible to pass a context object through dlmalloc.
+struct PlasmaStoreInfo;
+extern const PlasmaStoreInfo* plasma_config;
 }  // namespace plasma
 
 #endif  // PLASMA_COMMON_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/cpp/src/plasma/malloc.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc
index 77a8afe..6b9bc62 100644
--- a/cpp/src/plasma/malloc.cc
+++ b/cpp/src/plasma/malloc.cc
@@ -25,9 +25,13 @@
 #include <sys/mman.h>
 #include <unistd.h>
 
+#include <cerrno>
+#include <string>
 #include <unordered_map>
+#include <vector>
 
 #include "plasma/common.h"
+#include "plasma/plasma.h"
 
 extern "C" {
 void* fake_mmap(size_t);
@@ -60,12 +64,12 @@ struct mmap_record {
 
 namespace {
 
-/** Hashtable that contains one entry per segment that we got from the OS
- *  via mmap. Associates the address of that segment with its file descriptor
- *  and size. */
+/// Hashtable that contains one entry per segment that we got from the OS
+/// via mmap. Associates the address of that segment with its file descriptor
+/// and size.
 std::unordered_map<void*, mmap_record> mmap_records;
 
-} /* namespace */
+}  // namespace
 
 constexpr int GRANULARITY_MULTIPLIER = 2;
 
@@ -77,10 +81,11 @@ static ptrdiff_t pointer_distance(void const* pfrom, void const* pto)
{
   return (unsigned char const*)pto - (unsigned char const*)pfrom;
 }
 
-/* Create a buffer. This is creating a temporary file and then
- * immediately unlinking it so we do not leave traces in the system. */
+// Create a buffer. This is creating a temporary file and then
+// immediately unlinking it so we do not leave traces in the system.
 int create_buffer(int64_t size) {
   int fd;
+  std::string file_template = plasma::plasma_config->directory;
 #ifdef _WIN32
   if (!CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE,
                          (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))),
@@ -88,53 +93,72 @@ int create_buffer(int64_t size) {
     fd = -1;
   }
 #else
-#ifdef __linux__
-  constexpr char file_template[] = "/dev/shm/plasmaXXXXXX";
-#else
-  constexpr char file_template[] = "/tmp/plasmaXXXXXX";
-#endif
-  char file_name[32];
-  strncpy(file_name, file_template, 32);
-  fd = mkstemp(file_name);
-  if (fd < 0) return -1;
+  file_template += "/plasmaXXXXXX";
+  std::vector<char> file_name(file_template.begin(), file_template.end());
+  file_name.push_back('\0');
+  fd = mkstemp(&file_name[0]);
+  if (fd < 0) {
+    ARROW_LOG(FATAL) << "create_buffer failed to open file " << &file_name[0];
+    return -1;
+  }
+
   FILE* file = fdopen(fd, "a+");
   if (!file) {
     close(fd);
+    ARROW_LOG(FATAL) << "create_buffer: fdopen failed for " << &file_name[0];
     return -1;
   }
-  if (unlink(file_name) != 0) {
-    ARROW_LOG(FATAL) << "unlink error";
+  // Immediately unlink the file so we do not leave traces in the system.
+  if (unlink(&file_name[0]) != 0) {
+    ARROW_LOG(FATAL) << "failed to unlink file " << &file_name[0];
     return -1;
   }
-  if (ftruncate(fd, (off_t)size) != 0) {
-    ARROW_LOG(FATAL) << "ftruncate error";
-    return -1;
+  if (!plasma::plasma_config->hugepages_enabled) {
+    // Increase the size of the file to the desired size. This seems not to be
+    // needed for files that are backed by the huge page fs, see also
+    // http://www.mail-archive.com/kvm-devel@lists.sourceforge.net/msg14737.html
+    if (ftruncate(fd, (off_t)size) != 0) {
+      ARROW_LOG(FATAL) << "failed to ftruncate file " << &file_name[0];
+      return -1;
+    }
   }
 #endif
   return fd;
 }
 
 void* fake_mmap(size_t size) {
-  /* Add sizeof(size_t) so that the returned pointer is deliberately not
-   * page-aligned. This ensures that the segments of memory returned by
-   * fake_mmap are never contiguous. */
+  // Add sizeof(size_t) so that the returned pointer is deliberately not
+  // page-aligned. This ensures that the segments of memory returned by
+  // fake_mmap are never contiguous.
   size += sizeof(size_t);
 
   int fd = create_buffer(size);
   ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap";
+#ifdef __linux__
+  // MAP_POPULATE will pre-populate the page tables for this memory region
+  // which avoids work when accessing the pages later. Only supported on Linux.
+  void* pointer =
+      mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, 0);
+#else
   void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+#endif
   if (pointer == MAP_FAILED) {
+    ARROW_LOG(ERROR) << "mmap failed with error: " << std::strerror(errno);
+    if (errno == ENOMEM && plasma::plasma_config->hugepages_enabled) {
+      ARROW_LOG(ERROR)
+          << "  (this probably means you have to increase /proc/sys/vm/nr_hugepages)";
+    }
     return pointer;
   }
 
-  /* Increase dlmalloc's allocation granularity directly. */
+  // Increase dlmalloc's allocation granularity directly.
   mparams.granularity *= GRANULARITY_MULTIPLIER;
 
   mmap_record& record = mmap_records[pointer];
   record.fd = fd;
   record.size = size;
 
-  /* We lie to dlmalloc about where mapped memory actually lives. */
+  // We lie to dlmalloc about where mapped memory actually lives.
   pointer = pointer_advance(pointer, sizeof(size_t));
   ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")";
   return pointer;
@@ -148,8 +172,8 @@ int fake_munmap(void* addr, int64_t size) {
   auto entry = mmap_records.find(addr);
 
   if (entry == mmap_records.end() || entry->second.size != size) {
-    /* Reject requests to munmap that don't directly match previous
-     * calls to mmap, to prevent dlmalloc from trimming. */
+    // Reject requests to munmap that don't directly match previous
+    // calls to mmap, to prevent dlmalloc from trimming.
     return -1;
   }
 
@@ -163,7 +187,7 @@ int fake_munmap(void* addr, int64_t size) {
 }
 
 void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offset) {
-  /* TODO(rshin): Implement a more efficient search through mmap_records. */
+  // TODO(rshin): Implement a more efficient search through mmap_records.
   for (const auto& entry : mmap_records) {
     if (addr >= entry.first && addr < pointer_advance(entry.first, entry.second.size))
{
       *fd = entry.second.fd;
@@ -176,3 +200,5 @@ void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t*
offse
   *map_size = 0;
   *offset = 0;
 }
+
+void set_malloc_granularity(int value) { change_mparam(M_GRANULARITY, value); }

http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/cpp/src/plasma/malloc.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/malloc.h b/cpp/src/plasma/malloc.h
index b4af2c8..0df720d 100644
--- a/cpp/src/plasma/malloc.h
+++ b/cpp/src/plasma/malloc.h
@@ -23,4 +23,6 @@
 
 void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset);
 
+void set_malloc_granularity(int value);
+
 #endif  // MALLOC_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/cpp/src/plasma/plasma.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index d60e5a8..476002f 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -27,6 +27,7 @@
 #include <string.h>
 #include <unistd.h>  // pid_t
 
+#include <string>
 #include <unordered_map>
 #include <unordered_set>
 
@@ -129,6 +130,13 @@ struct PlasmaStoreInfo {
   /// The amount of memory (in bytes) that we allow to be allocated in the
   /// store.
   int64_t memory_capacity;
+  /// Boolean flag indicating whether to start the object store with hugepages
+  /// support enabled. Huge pages are substantially larger than normal memory
+  /// pages (e.g. 2MB or 1GB instead of 4KB) and using them can reduce
+  /// bookkeeping overhead from the OS.
+  bool hugepages_enabled;
+  /// A (platform-dependent) directory where to create the memory-backed file.
+  std::string directory;
 };
 
 /// Get an entry from the object table and return NULL if the object_id

http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/cpp/src/plasma/store.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 9f4b98c..aaa2bad 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -96,9 +96,12 @@ GetRequest::GetRequest(Client* client, const std::vector<ObjectID>&
object_ids)
 
 Client::Client(int fd) : fd(fd) {}
 
-PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory)
+PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory,
+                         bool hugepages_enabled)
     : loop_(loop), eviction_policy_(&store_info_) {
   store_info_.memory_capacity = system_memory;
+  store_info_.directory = directory;
+  store_info_.hugepages_enabled = hugepages_enabled;
 }
 
 // TODO(pcm): Get rid of this destructor by using RAII to clean up data.
@@ -114,6 +117,8 @@ PlasmaStore::~PlasmaStore() {
   }
 }
 
+const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info_; }
+
 // If this client is not already using the object, add the client to the
 // object's list of clients, otherwise do nothing.
 void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* client) {
@@ -633,10 +638,13 @@ class PlasmaStoreRunner {
  public:
   PlasmaStoreRunner() {}
 
-  void Start(char* socket_name, int64_t system_memory) {
+  void Start(char* socket_name, int64_t system_memory, std::string directory,
+             bool hugepages_enabled) {
     // Create the event loop.
     loop_.reset(new EventLoop);
-    store_.reset(new PlasmaStore(loop_.get(), system_memory));
+    store_.reset(
+        new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled));
+    plasma_config = store_->get_plasma_store_info();
     int socket = bind_ipc_sock(socket_name, true);
     // TODO(pcm): Check return value.
     ARROW_CHECK(socket >= 0);
@@ -670,7 +678,8 @@ void HandleSignal(int signal) {
   }
 }
 
-void start_server(char* socket_name, int64_t system_memory) {
+void start_server(char* socket_name, int64_t system_memory, std::string plasma_directory,
+                  bool hugepages_enabled) {
   // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
   // to a client that has already died, the store could die.
   signal(SIGPIPE, SIG_IGN);
@@ -678,17 +687,26 @@ void start_server(char* socket_name, int64_t system_memory) {
   PlasmaStoreRunner runner;
   g_runner = &runner;
   signal(SIGTERM, HandleSignal);
-  runner.Start(socket_name, system_memory);
+  runner.Start(socket_name, system_memory, plasma_directory, hugepages_enabled);
 }
 
 }  // namespace plasma
 
 int main(int argc, char* argv[]) {
   char* socket_name = NULL;
+  // Directory where plasma memory mapped files are stored.
+  std::string plasma_directory;
+  bool hugepages_enabled = false;
   int64_t system_memory = -1;
   int c;
-  while ((c = getopt(argc, argv, "s:m:")) != -1) {
+  while ((c = getopt(argc, argv, "s:m:d:h")) != -1) {
     switch (c) {
+      case 'd':
+        plasma_directory = std::string(optarg);
+        break;
+      case 'h':
+        hugepages_enabled = true;
+        break;
       case 's':
         socket_name = optarg;
         break;
@@ -705,36 +723,54 @@ int main(int argc, char* argv[]) {
         exit(-1);
     }
   }
+  // Sanity check command line options.
   if (!socket_name) {
     ARROW_LOG(FATAL) << "please specify socket for incoming connections with -s switch";
   }
   if (system_memory == -1) {
     ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch";
   }
+  if (hugepages_enabled && plasma_directory.empty()) {
+    ARROW_LOG(FATAL) << "if you want to use hugepages, please specify path to huge
pages "
+                        "filesystem with -d";
+  }
+  if (plasma_directory.empty()) {
 #ifdef __linux__
-  // On Linux, check that the amount of memory available in /dev/shm is large
-  // enough to accommodate the request. If it isn't, then fail.
-  int shm_fd = open("/dev/shm", O_RDONLY);
-  struct statvfs shm_vfs_stats;
-  fstatvfs(shm_fd, &shm_vfs_stats);
-  // The value shm_vfs_stats.f_bsize is the block size, and the value
-  // shm_vfs_stats.f_bavail is the number of available blocks.
-  int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
-  close(shm_fd);
-  if (system_memory > shm_mem_avail) {
-    ARROW_LOG(FATAL) << "System memory request exceeds memory available in /dev/shm.
The "
-                        "request is for "
-                     << system_memory << " bytes, and the amount available is
"
-                     << shm_mem_avail
-                     << " bytes. You may be able to free up space by deleting files
in "
-                        "/dev/shm. If you are inside a Docker container, you may need to
"
-                        "pass "
-                        "an argument with the flag '--shm-size' to 'docker run'.";
+    plasma_directory = "/dev/shm";
+#else
+    plasma_directory = "/tmp";
+#endif
+  }
+  ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory
+                  << " and huge page support "
+                  << (hugepages_enabled ? "enabled" : "disabled");
+#ifdef __linux__
+  if (!hugepages_enabled) {
+    // On Linux, check that the amount of memory available in /dev/shm is large
+    // enough to accommodate the request. If it isn't, then fail.
+    int shm_fd = open(plasma_directory.c_str(), O_RDONLY);
+    struct statvfs shm_vfs_stats;
+    fstatvfs(shm_fd, &shm_vfs_stats);
+    // The value shm_vfs_stats.f_bsize is the block size, and the value
+    // shm_vfs_stats.f_bavail is the number of available blocks.
+    int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
+    close(shm_fd);
+    if (system_memory > shm_mem_avail) {
+      ARROW_LOG(FATAL)
+          << "System memory request exceeds memory available in " << plasma_directory
+          << ". The request is for " << system_memory
+          << " bytes, and the amount available is " << shm_mem_avail
+          << " bytes. You may be able to free up space by deleting files in "
+             "/dev/shm. If you are inside a Docker container, you may need to "
+             "pass an argument with the flag '--shm-size' to 'docker run'.";
+    }
+  } else {
+    set_malloc_granularity(1024 * 1024 * 1024);  // 1 GB
   }
 #endif
   // Make it so dlmalloc fails if we try to request more memory than is
   // available.
   plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
   ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
-  plasma::start_server(socket_name, system_memory);
+  plasma::start_server(socket_name, system_memory, plasma_directory, hugepages_enabled);
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/cpp/src/plasma/store.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index fb732a1..61a3a24 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -19,6 +19,7 @@
 #define PLASMA_STORE_H
 
 #include <deque>
+#include <string>
 #include <vector>
 
 #include "plasma/common.h"
@@ -47,10 +48,14 @@ struct Client {
 
 class PlasmaStore {
  public:
-  PlasmaStore(EventLoop* loop, int64_t system_memory);
+  PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory,
+              bool hugetlbfs_enabled);
 
   ~PlasmaStore();
 
+  /// Get a const pointer to the internal PlasmaStoreInfo object.
+  const PlasmaStoreInfo* get_plasma_store_info();
+
   /// Create a new object. The client must do a call to release_object to tell
   /// the store when it is done with the object.
   ///

http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/python/doc/source/plasma.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/plasma.rst b/python/doc/source/plasma.rst
index 832d996..e4665d1 100644
--- a/python/doc/source/plasma.rst
+++ b/python/doc/source/plasma.rst
@@ -335,3 +335,50 @@ the original Pandas ``DataFrame`` structure.
 
   # Convert back into Pandas
   result = record_batch.to_pandas()
+
+Using Plasma with Huge Pages
+----------------------------
+
+On Linux it is possible to use the Plasma store with huge pages for increased
+throughput. You first need to create a file system and activate huge pages with
+
+.. code-block:: shell
+
+  sudo mkdir -p /mnt/hugepages
+  gid=`id -g`
+  uid=`id -u`
+  sudo mount -t hugetlbfs -o uid=$uid -o gid=$gid none /mnt/hugepages
+  sudo bash -c "echo $gid > /proc/sys/vm/hugetlb_shm_group"
+  sudo bash -c "echo 20000 > /proc/sys/vm/nr_hugepages"
+
+Note that you only need root access to create the file system, not for
+running the object store. You can then start the Plasma store with the ``-d``
+flag for the mount point of the huge page file system and the ``-h`` flag
+which indicates that huge pages are activated:
+
+.. code-block:: shell
+
+  plasma_store -s /tmp/plasma -m 10000000000 -d /mnt/hugepages -h
+
+You can test this with the following script:
+
+.. code-block:: python
+
+  import numpy as np
+  import pyarrow as pa
+  import pyarrow.plasma as plasma
+  import time
+
+  client = plasma.connect("/tmp/plasma", "", 0)
+
+  data = np.random.randn(100000000)
+  tensor = pa.Tensor.from_numpy(data)
+
+  object_id = plasma.ObjectID(np.random.bytes(20))
+  buf = client.create(object_id, pa.get_tensor_size(tensor))
+
+  stream = pa.FixedSizeBufferOutputStream(buf)
+  stream.set_memcopy_threads(4)
+  a = time.time()
+  pa.write_tensor(tensor, stream)
+  print("Writing took ", time.time() - a)

http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index eed9640..c6a9d9d 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -581,6 +581,10 @@ cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil:
             " arrow::io::FixedSizeBufferWriter"(WriteableFile):
         CFixedSizeBufferWriter(const shared_ptr[CBuffer]& buffer)
 
+        void set_memcopy_threads(int num_threads)
+        void set_memcopy_blocksize(int64_t blocksize)
+        void set_memcopy_threshold(int64_t threshold)
+
 
 cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
     enum MessageType" arrow::ipc::Message::Type":

http://git-wip-us.apache.org/repos/asf/arrow/blob/10f7158d/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index eda8de7..061a7a9 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -542,6 +542,21 @@ cdef class FixedSizeBufferOutputStream(NativeFile):
         self.is_writeable = 1
         self.is_open = True
 
+    def set_memcopy_threads(self, int num_threads):
+        cdef CFixedSizeBufferWriter* writer = \
+            <CFixedSizeBufferWriter*> self.wr_file.get()
+        writer.set_memcopy_threads(num_threads)
+
+    def set_memcopy_blocksize(self, int64_t blocksize):
+        cdef CFixedSizeBufferWriter* writer = \
+            <CFixedSizeBufferWriter*> self.wr_file.get()
+        writer.set_memcopy_blocksize(blocksize)
+
+    def set_memcopy_threshold(self, int64_t threshold):
+        cdef CFixedSizeBufferWriter* writer = \
+            <CFixedSizeBufferWriter*> self.wr_file.get()
+        writer.set_memcopy_threshold(threshold)
+
 
 # ----------------------------------------------------------------------
 # Arrow buffers


Mime
View raw message