arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pcmor...@apache.org
Subject [arrow] branch master updated: ARROW-1795: [Plasma] Create flag to make Plasma store use a single memory-mapped file.
Date Fri, 17 Nov 2017 05:56:10 GMT
This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new cacbacd  ARROW-1795: [Plasma] Create flag to make Plasma store use a single memory-mapped
file.
cacbacd is described below

commit cacbacd439919742a0b6fbec27ee73b5af29347f
Author: Robert Nishihara <robertnishihara@gmail.com>
AuthorDate: Thu Nov 16 21:56:03 2017 -0800

    ARROW-1795: [Plasma] Create flag to make Plasma store use a single memory-mapped file.
    
    This adds the `-f` flag which tells the plasma store to use a single memory-mapped file.
This is accomplished by simply calling `dlmemalign`/`dlfree` on the entire space at startup.
    
    Question: Why does the test pass? Given that `plasma_client` is constructed with a release
delay of 64, shouldn't the store by unable to evict some objects? Yet they all seem to get
evicted just fine.
    
    cc @pcmoritz @atumanov
    
    Author: Robert Nishihara <robertnishihara@gmail.com>
    
    Closes #1327 from robertnishihara/allocateupfront and squashes the following commits:
    
    6f3b953 [Robert Nishihara] Augment test.
    0daeae3 [Robert Nishihara] Remove hard-coded values and update test.
    a446fbd [Robert Nishihara] Add a test.
    c2374b6 [Robert Nishihara] Add flag to tell the plasma store to use a single memory-mapped
file.
---
 cpp/src/plasma/store.cc             | 27 ++++++++++++++++++++++-----
 python/pyarrow/tests/test_plasma.py | 29 +++++++++++++++++++++++++++--
 2 files changed, 49 insertions(+), 7 deletions(-)

diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 7094aed..c6a19a5 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -676,12 +676,22 @@ class PlasmaStoreRunner {
   PlasmaStoreRunner() {}
 
   void Start(char* socket_name, int64_t system_memory, std::string directory,
-             bool hugepages_enabled) {
+             bool hugepages_enabled, bool use_one_memory_mapped_file) {
     // Create the event loop.
     loop_.reset(new EventLoop);
     store_.reset(
         new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled));
     plasma_config = store_->get_plasma_store_info();
+
+    // If the store is configured to use a single memory-mapped file, then we
+    // achieve that by mallocing and freeing a single large amount of space.
+    // that maximum allowed size up front.
+    if (use_one_memory_mapped_file) {
+      void* pointer = plasma::dlmemalign(BLOCK_SIZE, system_memory);
+      ARROW_CHECK(pointer != NULL);
+      plasma::dlfree(pointer);
+    }
+
     int socket = bind_ipc_sock(socket_name, true);
     // TODO(pcm): Check return value.
     ARROW_CHECK(socket >= 0);
@@ -716,14 +726,15 @@ void HandleSignal(int signal) {
 }
 
 void start_server(char* socket_name, int64_t system_memory, std::string plasma_directory,
-                  bool hugepages_enabled) {
+                  bool hugepages_enabled, bool use_one_memory_mapped_file) {
   // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
   // to a client that has already died, the store could die.
   signal(SIGPIPE, SIG_IGN);
 
   g_runner.reset(new PlasmaStoreRunner());
   signal(SIGTERM, HandleSignal);
-  g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled);
+  g_runner->Start(socket_name, system_memory, plasma_directory, hugepages_enabled,
+                  use_one_memory_mapped_file);
 }
 
 }  // namespace plasma
@@ -733,9 +744,11 @@ int main(int argc, char* argv[]) {
   // Directory where plasma memory mapped files are stored.
   std::string plasma_directory;
   bool hugepages_enabled = false;
+  // True if a single large memory-mapped file should be created at startup.
+  bool use_one_memory_mapped_file = false;
   int64_t system_memory = -1;
   int c;
-  while ((c = getopt(argc, argv, "s:m:d:h")) != -1) {
+  while ((c = getopt(argc, argv, "s:m:d:hf")) != -1) {
     switch (c) {
       case 'd':
         plasma_directory = std::string(optarg);
@@ -755,6 +768,9 @@ int main(int argc, char* argv[]) {
                         << "GB of memory.";
         break;
       }
+      case 'f':
+        use_one_memory_mapped_file = true;
+        break;
       default:
         exit(-1);
     }
@@ -808,5 +824,6 @@ int main(int argc, char* argv[]) {
   // available.
   plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
   ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
-  plasma::start_server(socket_name, system_memory, plasma_directory, hugepages_enabled);
+  plasma::start_server(socket_name, system_memory, plasma_directory, hugepages_enabled,
+                       use_one_memory_mapped_file);
 }
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index b73d92d..b28bd60 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -102,7 +102,8 @@ def assert_get_object_equal(unit_test, client1, client2, object_id,
 
 def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
                        use_valgrind=False, use_profiler=False,
-                       stdout_file=None, stderr_file=None):
+                       stdout_file=None, stderr_file=None,
+                       use_one_memory_mapped_file=False):
     """Start a plasma store process.
     Args:
         use_valgrind (bool): True if the plasma store should be started inside
@@ -113,6 +114,8 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
             no redirection should happen, then this should be None.
         stderr_file: A file handle opened for writing to redirect stderr to. If
             no redirection should happen, then this should be None.
+        use_one_memory_mapped_file: If True, then the store will use only a
+            single memory-mapped file.
     Return:
         A tuple of the name of the plasma store socket and the process ID of
             the plasma store process.
@@ -124,6 +127,8 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
     command = [plasma_store_executable,
                "-s", plasma_store_name,
                "-m", str(plasma_store_memory)]
+    if use_one_memory_mapped_file:
+        command += ["-f"]
     if use_valgrind:
         pid = subprocess.Popen(["valgrind",
                                 "--track-origins=yes",
@@ -147,10 +152,14 @@ def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
 class TestPlasmaClient(object):
 
     def setup_method(self, test_method):
+        use_one_memory_mapped_file = (test_method ==
+                                      self.test_use_one_memory_mapped_file)
+
         import pyarrow.plasma as plasma
         # Start Plasma store.
         plasma_store_name, self.p = start_plasma_store(
-            use_valgrind=os.getenv("PLASMA_VALGRIND") == "1")
+            use_valgrind=os.getenv("PLASMA_VALGRIND") == "1",
+            use_one_memory_mapped_file=use_one_memory_mapped_file)
         # Connect to Plasma.
         self.plasma_client = plasma.connect(plasma_store_name, "", 64)
         # For the eviction test
@@ -720,3 +729,19 @@ class TestPlasmaClient(object):
             assert object_ids[i] == recv_objid
             assert -1 == recv_dsize
             assert -1 == recv_msize
+
+    def test_use_one_memory_mapped_file(self):
+        # Fill the object store up with a large number of small objects and let
+        # them go out of scope.
+        for _ in range(100):
+            create_object(
+                self.plasma_client,
+                np.random.randint(1, DEFAULT_PLASMA_STORE_MEMORY // 20), 0)
+        # Create large objects that require the full object store size, and
+        # verify that they fit.
+        for _ in range(2):
+            create_object(self.plasma_client, DEFAULT_PLASMA_STORE_MEMORY, 0)
+        # Verify that an object that is too large does not fit.
+        with pytest.raises(pa.lib.PlasmaStoreFull):
+            create_object(self.plasma_client, DEFAULT_PLASMA_STORE_MEMORY + 1,
+                          0)

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <commits@arrow.apache.org>'].

Mime
View raw message