mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [1/7] mesos git commit: Performance optimizations for message passing.
Date Wed, 19 Jul 2017 20:21:20 GMT
Repository: mesos
Updated Branches:
  refs/heads/master 4e6eb4ec5 -> 8f42d0c11


Performance optimizations for message passing.

Optimizations include:

* Factored out run queue and introduced lock free implementation. This
  also required adding the concept of an `epoch` to support proper
  settling and refactoring the increments/decrements of `running` to
  make it easier to reason about.

* Replaced the use of a condition variable (the `Gate`) with a
  semaphore.

Review: https://reviews.apache.org/r/60825


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6076dbc2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6076dbc2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6076dbc2

Branch: refs/heads/master
Commit: 6076dbc226de80d597a8e21ea392ecf4ef3027c1
Parents: 4e6eb4e
Author: Benjamin Hindman <benjamin.hindman@gmail.com>
Authored: Sun Jun 18 13:34:45 2017 -0700
Committer: Benjamin Hindman <benjamin.hindman@gmail.com>
Committed: Wed Jul 19 13:18:39 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |   2 +
 .../libprocess/cmake/ProcessConfigure.cmake     |   4 +
 3rdparty/libprocess/configure.ac                |  10 +-
 3rdparty/libprocess/src/CMakeLists.txt          |   2 +
 3rdparty/libprocess/src/process.cpp             | 213 +++++++++++--------
 3rdparty/libprocess/src/run_queue.hpp           | 182 ++++++++++++++++
 3rdparty/libprocess/src/semaphore.hpp           | 183 ++++++++++++++++
 7 files changed, 501 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index fb08b6a..4031297 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -180,6 +180,8 @@ libprocess_la_SOURCES =		\
   src/process.cpp		\
   src/process_reference.hpp	\
   src/reap.cpp			\
+  src/run_queue.hpp		\
+  src/semaphore.hpp		\
   src/socket.cpp		\
   src/subprocess.cpp		\
   src/subprocess_posix.cpp	\

http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/cmake/ProcessConfigure.cmake
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/cmake/ProcessConfigure.cmake b/3rdparty/libprocess/cmake/ProcessConfigure.cmake
index dd3be57..f9e70bd 100755
--- a/3rdparty/libprocess/cmake/ProcessConfigure.cmake
+++ b/3rdparty/libprocess/cmake/ProcessConfigure.cmake
@@ -40,6 +40,10 @@
 # party libraries, and where in the directory tree you need to look to get the
 # actual libraries.
 
+if (ENABLE_LOCK_FREE_RUN_QUEUE)
+  add_definitions(-DLOCK_FREE_RUN_QUEUE)
+endif ()
+
 if (ENABLE_SSL)
   find_package(OpenSSL REQUIRED)
 endif ()

http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/configure.ac
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/configure.ac b/3rdparty/libprocess/configure.ac
index e849b7d..cb2cf4f 100644
--- a/3rdparty/libprocess/configure.ac
+++ b/3rdparty/libprocess/configure.ac
@@ -69,6 +69,11 @@ AC_CONFIG_FILES([3rdparty/gmock_sources.cc])
 # Optional features.
 ###############################################################################
 
+AC_ARG_ENABLE([lock_free_run_queue],
+              AS_HELP_STRING([--enable-lock-free-run-queue],
+                             [enables the lock free run queue]),
+                             [], [enable_lock_free_run_queue=no])
+
 AC_ARG_ENABLE([hardening],
               AS_HELP_STRING([--disable-hardening],
                              [disables security measures such as stack
@@ -230,11 +235,14 @@ AC_ARG_WITH([svn],
 ###############################################################################
 AC_ARG_VAR([TEST_DRIVER], [executable and arguments of a test driver])
 
-
 ###############################################################################
 # Compiler checks.
 ###############################################################################
 
+# Check if we should use the lock free run queue.
+AS_IF([test "x$enable_lock_free_run_queue" = "xyes"],
+      [AC_DEFINE([LOCK_FREE_RUN_QUEUE])])
+
 # Check to see if we should harden or not.
 AM_CONDITIONAL([ENABLE_HARDENING], [test x"$enable_hardening" = "xyes"])
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/CMakeLists.txt b/3rdparty/libprocess/src/CMakeLists.txt
index 670dc15..03959ae 100644
--- a/3rdparty/libprocess/src/CMakeLists.txt
+++ b/3rdparty/libprocess/src/CMakeLists.txt
@@ -55,6 +55,8 @@ set(PROCESS_SRC
   process.cpp
   process_reference.hpp
   reap.cpp
+  run_queue.hpp
+  semaphore.hpp
   socket.cpp
   subprocess.cpp
   time.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index b4d7791..dff78b0 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -115,6 +115,7 @@
 #include "event_loop.hpp"
 #include "gate.hpp"
 #include "process_reference.hpp"
+#include "run_queue.hpp"
 
 namespace inet = process::network::inet;
 namespace inet4 = process::network::inet4;
@@ -571,9 +572,11 @@ private:
   // Gates for waiting threads (protected by processes_mutex).
   map<ProcessBase*, Gate*> gates;
 
-  // Queue of runnable processes (implemented using list).
-  list<ProcessBase*> runq;
-  std::recursive_mutex runq_mutex;
+  // Queue of runnable processes.
+  //
+  // See run_queue.hpp for more information about the RUN_QUEUE
+  // preprocessor definition.
+  RUN_QUEUE runq;
 
   // Number of running processes, to support Clock::settle operation.
   std::atomic_long running;
@@ -593,6 +596,7 @@ private:
   std::atomic_bool finalizing;
 };
 
+
 static internal::Flags* libprocess_flags = new internal::Flags();
 
 // Synchronization primitives for `initialize`.
@@ -627,9 +631,6 @@ static SocketManager* socket_manager = nullptr;
 // Active ProcessManager (eventually will probably be thread-local).
 static ProcessManager* process_manager = nullptr;
 
-// Scheduling gate that threads wait at when there is nothing to run.
-static Gate* gate = new Gate();
-
 // Used for authenticating HTTP requests.
 static AuthenticatorManager* authenticator_manager = nullptr;
 
@@ -2756,7 +2757,7 @@ void ProcessManager::finalize()
 
   // Send signal to all processing threads to stop running.
   joining_threads.store(true);
-  gate->open();
+  runq.decomission();
   EventLoop::stop();
 
   // Join all threads.
@@ -2811,43 +2812,29 @@ long ProcessManager::init_threads()
 
   threads.reserve(num_worker_threads + 1);
 
-  struct
-  {
-    void operator()() const
-    {
-      do {
-        ProcessBase* process = process_manager->dequeue();
-        if (process == nullptr) {
-          Gate::state_t old = gate->approach();
-          process = process_manager->dequeue();
-          if (process == nullptr) {
-            if (joining_threads.load()) {
-              break;
-            }
-            gate->arrive(old); // Wait at gate if idle.
-            continue;
-          } else {
-            gate->leave();
-          }
-        }
-        process_manager->resume(process);
-      } while (true);
-
-      // Threads are joining. Delete the thread local `_executor_`
-      // pointer to prevent a memory leak.
-      delete _executor_;
-      _executor_ = nullptr;
-    }
-
-    // We hold a constant reference to `joining_threads` to make it clear that
-    // this value is only being tested (read), and not manipulated.
-    const std::atomic_bool& joining_threads;
-  } worker{joining_threads};
-
   // Create processing threads.
   for (long i = 0; i < num_worker_threads; i++) {
     // Retain the thread handles so that we can join when shutting down.
-    threads.emplace_back(new std::thread(worker));
+    threads.emplace_back(new std::thread(
+        [this]() {
+          running.fetch_add(1);
+          do {
+            ProcessBase* process = dequeue();
+            if (process == nullptr) {
+              if (joining_threads.load()) {
+                break;
+              }
+            } else {
+              resume(process);
+            }
+          } while (true);
+          running.fetch_sub(1);
+
+          // Threads are joining. Delete the thread local `_executor_`
+          // pointer to prevent a memory leak.
+          delete _executor_;
+          _executor_ = nullptr;
+        }));
   }
 
   // Create a thread for the event loop.
@@ -3281,9 +3268,6 @@ void ProcessManager::resume(ProcessBase* process)
   }
 
   __process__ = nullptr;
-
-  CHECK_GE(running.load(), 1);
-  running.fetch_sub(1);
 }
 
 
@@ -3463,23 +3447,22 @@ bool ProcessManager::wait(const UPID& pid)
       // Check if it is runnable in order to donate this thread.
       if (process->state == ProcessBase::BOTTOM ||
           process->state == ProcessBase::READY) {
-        synchronized (runq_mutex) {
-          list<ProcessBase*>::iterator it =
-            find(runq.begin(), runq.end(), process);
-          if (it != runq.end()) {
-            // Found it! Remove it from the run queue since we'll be
-            // donating our thread and also increment 'running' before
-            // leaving this 'runq' protected critical section so that
-            // everyone that is waiting for the processes to settle
-            // continue to wait (otherwise they could see nothing in
-            // 'runq' and 'running' equal to 0 between when we exit
-            // this critical section and increment 'running').
-            runq.erase(it);
-            running.fetch_add(1);
-          } else {
-            // Another thread has resumed the process ...
-            process = nullptr;
-          }
+        // Assume that we'll be able to successfully extract the
+        // process from the run queue and optimistically increment
+        // `running` so that `Clock::settle` properly waits. In the
+        // event that we aren't able to extract the process from the
+        // run queue then we'll decrement `running`. Note that we
+        // can't assume that `running` is already non-zero because any
+        // thread may call `wait`, and thus we can't assume that we're
+        // calling it from a process that is already running.
+        running.fetch_add(1);
+
+        // Try and extract the process from the run queue. This may
+        // fail because another thread might resume the process first
+        // or the run queue might not support arbitrary extraction.
+        if (!runq.extract(process)) {
+          running.fetch_sub(1);
+          process = nullptr;
         }
       } else {
         // Process is not runnable, so no need to donate ...
@@ -3491,12 +3474,21 @@ bool ProcessManager::wait(const UPID& pid)
   if (process != nullptr) {
     VLOG(2) << "Donating thread to " << process->pid << " while waiting";
     ProcessBase* donator = __process__;
-    process_manager->resume(process);
+    resume(process);
+    running.fetch_sub(1);
     __process__ = donator;
   }
 
+  // NOTE: `process` is possibly deleted at this point and we must not
+  // use it!
+
   // TODO(benh): Donating only once may not be sufficient, so we might
   // still deadlock here ... perhaps warn if that's the case?
+  //
+  // In fact, we might want to support the ability to donate a thread
+  // to any process for a limited number of messages while we wait
+  // (i.e., donate for a message, check and see if our gate is open,
+  // if not, keep donating).
 
   // Now arrive at the gate and wait until it opens.
   if (gate != nullptr) {
@@ -3571,13 +3563,7 @@ void ProcessManager::enqueue(ProcessBase* process)
   // it's not running. Otherwise, check and see which thread this
   // process was last running on, and put it on that threads runq.
 
-  synchronized (runq_mutex) {
-    CHECK(find(runq.begin(), runq.end(), process) == runq.end());
-    runq.push_back(process);
-  }
-
-  // Wake up the processing thread if necessary.
-  gate->open();
+  runq.enqueue(process);
 }
 
 
@@ -3587,44 +3573,83 @@ ProcessBase* ProcessManager::dequeue()
   // are no processes to run, and this is not a dedicated thread, then
   // steal one from another threads runq.
 
-  ProcessBase* process = nullptr;
+  running.fetch_sub(1);
 
-  synchronized (runq_mutex) {
-    if (!runq.empty()) {
-      process = runq.front();
-      runq.pop_front();
-      // Increment the running count of processes in order to support
-      // the Clock::settle() operation (this must be done atomically
-      // with removing the process from the runq).
-      running.fetch_add(1);
-    }
-  }
+  runq.wait();
+
+  // Need to increment `running` before we dequeue from `runq` so that
+  // `Clock::settle` properly waits.
+  running.fetch_add(1);
 
-  return process;
+  ////////////////////////////////////////////////////////////
+  // NOTE: contract with the run queue is that we'll always //
+  // call `wait` _BEFORE_ we call `dequeue`.                //
+  ////////////////////////////////////////////////////////////
+  return runq.dequeue();
 }
 
 
+// NOTE: it's possible that a thread not controlled by libprocess is
+// trying to enqueue a process (e.g., due to `spawn` or because it's
+// doing a `dispatch` or `send`) and thus we'll settle when in fact we
+// should not have. There is nothing easy we can do to prevent this
+// and it hasn't been a problem historically in the usage we've seen
+// in the Mesos project.
 void ProcessManager::settle()
 {
   bool done = true;
   do {
     done = true; // Assume to start that we are settled.
 
-    synchronized (runq_mutex) {
-      if (!runq.empty()) {
-        done = false;
-        continue;
-      }
+    // See comments below as to how `epoch` helps us mitigate races
+    // with `running` and `runq`.
+    long old = runq.epoch.load();
 
-      if (running.load() > 0) {
-        done = false;
-        continue;
-      }
+    if (running.load() > 0) {
+      done = false;
+      continue;
+    }
 
-      if (!Clock::settled()) {
-        done = false;
-        continue;
-      }
+    // Race #1: it's possible that a thread starts running here
+    // because the semaphore had been signaled but nobody has woken
+    // up yet.
+
+    if (!runq.empty()) {
+      done = false;
+      continue;
+    }
+
+    // Race #2: it's possible that `runq` will get added to at this
+    // point given some threads might be running due to 'Race #1'.
+
+    if (running.load() > 0) {
+      done = false;
+      continue;
+    }
+
+    // If at this point _no_ threads are running then it must be the
+    // case that either nothing has been added to `runq` (and thus
+    // nothing really is running or will be about to run) OR
+    // `runq.epoch` must have been incremented (because the thread
+    // that enqueued something into `runq.epoch` now isn't running so
+    // it must have incremented `runq.epoch` before it decremented
+    // `running`).
+    //
+    // Note that we check `runq.epoch` _after_ we check the clock
+    // because it's possible that the clock will also add to the
+    // `runq` but in so doing it will also increment `runq.epoch`
+    // which we'll guarantee that we don't settle (and
+    // `Clock::settled()` takes care to atomically ensure that
+    // `runq.epoch` is incremented before it returns).
+
+    if (!Clock::settled()) {
+      done = false;
+      continue;
+    }
+
+    if (old != runq.epoch.load()) {
+      done = false;
+      continue;
     }
   } while (!done);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/src/run_queue.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/run_queue.hpp b/3rdparty/libprocess/src/run_queue.hpp
new file mode 100644
index 0000000..109c300
--- /dev/null
+++ b/3rdparty/libprocess/src/run_queue.hpp
@@ -0,0 +1,182 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __PROCESS_RUN_QUEUE_HPP__
+#define __PROCESS_RUN_QUEUE_HPP__
+
+// At _configuration_ (i.e., build) time you can specify
+// RUN_QUEUE=... as an environment variable (i.e., just like CC or
+// CXXFLAGS) to pick the run queue implementation. If nothing is
+// specified we'll default to the LockingRunQueue.
+//
+// Alternatively we could have made this be a _runtime_ decision but
+// for performance reasons we wanted the run queue implementation to
+// be compile-time optimized (e.g., inlined, etc).
+//
+// Note that care should be taken not to reconfigure with a different
+// value of RUN_QUEUE when reusing a build directory!
+#define RUN_QUEUE LockingRunQueue
+
+#ifdef LOCK_FREE_RUN_QUEUE
+#define RUN_QUEUE LockFreeRunQueue
+#include <concurrentqueue.h>
+#endif // LOCK_FREE_RUN_QUEUE
+
+#include <algorithm>
+#include <list>
+
+#include <process/process.hpp>
+
+#include <stout/synchronized.hpp>
+
+#include "semaphore.hpp"
+
+namespace process {
+
+class LockingRunQueue
+{
+public:
+  bool extract(ProcessBase* process)
+  {
+    synchronized (mutex) {
+      std::list<ProcessBase*>::iterator it = std::find(
+          processes.begin(),
+          processes.end(),
+          process);
+
+      if (it != processes.end()) {
+        processes.erase(it);
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  void wait()
+  {
+    semaphore.wait();
+  }
+
+  void enqueue(ProcessBase* process)
+  {
+    synchronized (mutex) {
+      processes.push_back(process);
+    }
+    epoch.fetch_add(1);
+    semaphore.signal();
+  }
+
+  // Precondition: `wait` must get called before `dequeue`!
+  ProcessBase* dequeue()
+  {
+    synchronized (mutex) {
+      if (!processes.empty()) {
+        ProcessBase* process = processes.front();
+        processes.pop_front();
+        return process;
+      }
+    }
+
+    return nullptr;
+  }
+
+  // NOTE: this function can't be const because `synchronized (mutex)`
+  // is not const ...
+  bool empty()
+  {
+    synchronized (mutex) {
+      return processes.empty();
+    }
+  }
+
+  void decomission()
+  {
+    semaphore.decomission();
+  }
+
+  // Epoch used to capture changes to the run queue when settling.
+  std::atomic_long epoch = ATOMIC_VAR_INIT(0L);
+
+private:
+  std::list<ProcessBase*> processes;
+  std::mutex mutex;
+
+  // Semaphore used for threads to wait.
+  DecomissionableKernelSemaphore semaphore;
+};
+
+
+#ifdef LOCK_FREE_RUN_QUEUE
+class LockFreeRunQueue
+{
+public:
+  bool extract(ProcessBase*)
+  {
+    // NOTE: moodycamel::ConcurrentQueue does not provide a way to
+    // implement extract so we simply return false here.
+    return false;
+  }
+
+  void wait()
+  {
+    semaphore.wait();
+  }
+
+  void enqueue(ProcessBase* process)
+  {
+    queue.enqueue(process);
+    epoch.fetch_add(1);
+    semaphore.signal();
+  }
+
+  // Precondition: `wait` must get called before `dequeue`!
+  ProcessBase* dequeue()
+  {
+    // NOTE: we loop _forever_ until we actually dequeue a process
+    // because the contract for using the run queue is that `wait`
+    // must be called first so we know that there is something to be
+    // dequeued or the run queue has been decommissioned and we should
+    // just return `nullptr`.
+    ProcessBase* process = nullptr;
+    while (!queue.try_dequeue(process)) {
+      if (semaphore.decomissioned()) {
+        break;
+      }
+    }
+    return process;
+  }
+
+  bool empty() const
+  {
+    return queue.size_approx() == 0;
+  }
+
+  void decomission()
+  {
+    semaphore.decomission();
+  }
+
+  // Epoch used to capture changes to the run queue when settling.
+  std::atomic_long epoch = ATOMIC_VAR_INIT(0L);
+
+private:
+  moodycamel::ConcurrentQueue<ProcessBase*> queue;
+
+  // Semaphore used for threads to wait for the queue.
+  DecomissionableKernelSemaphore semaphore;
+};
+#endif // LOCK_FREE_RUN_QUEUE
+
+} // namespace process {
+
+#endif // __PROCESS_RUN_QUEUE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/6076dbc2/3rdparty/libprocess/src/semaphore.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/semaphore.hpp b/3rdparty/libprocess/src/semaphore.hpp
new file mode 100644
index 0000000..0143883
--- /dev/null
+++ b/3rdparty/libprocess/src/semaphore.hpp
@@ -0,0 +1,183 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __PROCESS_SEMAPHORE_HPP__
+#define __PROCESS_SEMAPHORE_HPP__
+
+#ifdef __MACH__
+#include <mach/mach.h>
+#elif __WINDOWS__
+#include <stout/windows.hpp>
+#else
+#include <semaphore.h>
+#endif // __MACH__
+
+#include <stout/check.hpp>
+
+// TODO(benh): Introduce a user-level semaphore that _only_ traps into
+// the kernel if the thread would actually need to wait.
+
+// TODO(benh): Add tests for these!
+
+#ifdef __MACH__
+class KernelSemaphore
+{
+public:
+  KernelSemaphore()
+  {
+    CHECK_EQ(
+        KERN_SUCCESS,
+        semaphore_create(mach_task_self(), &semaphore, SYNC_POLICY_FIFO, 0));
+  }
+
+  KernelSemaphore(const KernelSemaphore& other) = delete;
+
+  ~KernelSemaphore()
+  {
+    CHECK_EQ(KERN_SUCCESS, semaphore_destroy(mach_task_self(), semaphore));
+  }
+
+  KernelSemaphore& operator=(const KernelSemaphore& other) = delete;
+
+  void wait()
+  {
+    CHECK_EQ(KERN_SUCCESS, semaphore_wait(semaphore));
+  }
+
+  void signal()
+  {
+    CHECK_EQ(KERN_SUCCESS, semaphore_signal(semaphore));
+  }
+
+private:
+  semaphore_t semaphore;
+};
+#elif __WINDOWS__
+class KernelSemaphore
+{
+public:
+  KernelSemaphore()
+  {
+    semaphore = CHECK_NOTNULL(CreateSemaphore(nullptr, 0, LONG_MAX, nullptr));
+  }
+
+  KernelSemaphore(const KernelSemaphore& other) = delete;
+
+  ~KernelSemaphore()
+  {
+    CHECK(CloseHandle(semaphore));
+  }
+
+  KernelSemaphore& operator=(const KernelSemaphore& other) = delete;
+
+  void wait()
+  {
+    CHECK_EQ(WAIT_OBJECT_0, WaitForSingleObject(semaphore, INFINITE));
+  }
+
+  void signal()
+  {
+    CHECK(ReleaseSemaphore(semaphore, 1, nullptr));
+  }
+
+private:
+  HANDLE semaphore;
+};
+#else
+class KernelSemaphore
+{
+public:
+  KernelSemaphore()
+  {
+    PCHECK(sem_init(&semaphore, 0, 0) == 0);
+  }
+
+  KernelSemaphore(const KernelSemaphore& other) = delete;
+
+  ~KernelSemaphore()
+  {
+    PCHECK(sem_destroy(&semaphore) == 0);
+  }
+
+  KernelSemaphore& operator=(const KernelSemaphore& other) = delete;
+
+  void wait()
+  {
+    int result = sem_wait(&semaphore);
+
+    while (result != 0 && errno == EINTR) {
+      result = sem_wait(&semaphore);
+    }
+
+    PCHECK(result == 0);
+  }
+
+  void signal()
+  {
+    PCHECK(sem_post(&semaphore) == 0);
+  }
+
+private:
+  sem_t semaphore;
+};
+#endif // __MACH__
+
+
+// Provides a "decomissionable" kernel semaphore which allows us to
+// effectively flush all waiters and keep any future threads from
+// waiting. In order to be able to decomission the semaphore we need
+// to keep around the number of waiters so we can signal them all.
+class DecomissionableKernelSemaphore : public KernelSemaphore
+{
+public:
+  void wait()
+  {
+    // NOTE: we must check `commissioned` AFTER we have incremented
+    // `waiters` otherwise we might race with `decomission()` and fail
+    // to properly get signaled.
+    waiters.fetch_add(1);
+
+    if (!comissioned.load()) {
+      waiters.fetch_sub(1);
+      return;
+    }
+
+    KernelSemaphore::wait();
+
+    waiters.fetch_sub(1);
+  }
+
+  void decomission()
+  {
+    comissioned.store(false);
+
+    // Now signal all the waiters so they wake up and stop
+    // waiting. Note that this may do more `signal()` than necessary
+    // but since no future threads will wait that doesn't matter (it
+    // would only matter if we cared about the value of the semaphore
+    // which in the current implementation we don't).
+    for (size_t i = waiters.load(); i > 0; i--) {
+      signal();
+    }
+  }
+
+  bool decomissioned() const
+  {
+    return !comissioned.load();
+  }
+
+private:
+  std::atomic<bool> comissioned = ATOMIC_VAR_INIT(true);
+  std::atomic<size_t> waiters = ATOMIC_VAR_INIT(0);
+};
+
+#endif // __PROCESS_SEMAPHORE_HPP__


Mime
View raw message