mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [12/16] mesos git commit: Removed ProcessBase::inject to support future event queues.
Date Tue, 01 Aug 2017 21:03:10 GMT
Removed ProcessBase::inject to support future event queues.

However, to support immediate termination a TerminateEvent can still
be injected and it is handled appropriately.

Review: https://reviews.apache.org/r/61057


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/535e77ed
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/535e77ed
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/535e77ed

Branch: refs/heads/master
Commit: 535e77ed2b0702988af0ccf41eaee8bd5e9f6e69
Parents: e122312
Author: Benjamin Hindman <benjamin.hindman@gmail.com>
Authored: Fri Jul 21 00:05:19 2017 -0700
Committer: Benjamin Hindman <benjamin.hindman@gmail.com>
Committed: Tue Aug 1 14:01:51 2017 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/event.hpp   |  5 +-
 3rdparty/libprocess/include/process/process.hpp | 16 ++----
 3rdparty/libprocess/src/process.cpp             | 51 ++++++++++++--------
 3rdparty/libprocess/src/tests/process_tests.cpp |  2 +-
 4 files changed, 40 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/535e77ed/3rdparty/libprocess/include/process/event.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp
index 6ae4207..6532bf4 100644
--- a/3rdparty/libprocess/include/process/event.hpp
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -197,8 +197,8 @@ private:
 
 struct TerminateEvent : Event
 {
-  explicit TerminateEvent(const UPID& _from)
-    : from(_from) {}
+  TerminateEvent(const UPID& _from, bool _inject)
+    : from(_from), inject(_inject) {}
 
   virtual void visit(EventVisitor* visitor) const
   {
@@ -206,6 +206,7 @@ struct TerminateEvent : Event
   }
 
   const UPID from;
+  const bool inject;
 
 private:
   // Not copyable, not assignable.

http://git-wip-us.apache.org/repos/asf/mesos/blob/535e77ed/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index b506096..9a41ed5 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -133,17 +133,6 @@ protected:
   virtual void lost(const UPID&) {}
 
   /**
-   * Puts the message at front of this process's message queue.
-   *
-   * @see process::Message
-   */
-  void inject(
-      const UPID& from,
-      const std::string& name,
-      const char* data = nullptr,
-      size_t length = 0);
-
-  /**
    * Sends the message to the specified `UPID`.
    *
    * @see process::Message
@@ -408,8 +397,11 @@ private:
 
   std::atomic<State> state = ATOMIC_VAR_INIT(State::BOTTOM);
 
+  // Flag for indicating that a terminate event has been injected.
+  std::atomic<bool> termination = ATOMIC_VAR_INIT(false);
+
   // Enqueue the specified message, request, or function call.
-  void enqueue(Event* event, bool inject = false);
+  void enqueue(Event* event);
 
   // Delegates for messages.
   std::map<std::string, UPID> delegates;

http://git-wip-us.apache.org/repos/asf/mesos/blob/535e77ed/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 47a2ea3..7395f00 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -3227,6 +3227,19 @@ void ProcessManager::resume(ProcessBase* process)
     if (!blocked) {
       CHECK_NOTNULL(event);
 
+      // Before serving this event check if we've triggered a
+      // terminate and if so purge all events until we get to the
+      // terminate event.
+      terminate = process->termination.load();
+      if (terminate) {
+        // Now purge all events until the terminate event.
+        while (!event->is<TerminateEvent>()) {
+          delete event;
+          event = process->events->dequeue();
+          CHECK_NOTNULL(event);
+        }
+      }
+
       // Determine if we should filter this event.
       //
       // NOTE: we use double-checked locking here to avoid
@@ -3395,9 +3408,9 @@ void ProcessManager::terminate(
     }
 
     if (sender != nullptr) {
-      process->enqueue(new TerminateEvent(sender->self()), inject);
+      process->enqueue(new TerminateEvent(sender->self(), inject));
     } else {
-      process->enqueue(new TerminateEvent(UPID()), inject);
+      process->enqueue(new TerminateEvent(UPID(), inject));
     }
   }
 }
@@ -3711,23 +3724,38 @@ size_t ProcessBase::eventCount<TerminateEvent>()
 }
 
 
-void ProcessBase::enqueue(Event* event, bool inject)
+void ProcessBase::enqueue(Event* event)
 {
   CHECK_NOTNULL(event);
 
   State old = state.load();
 
+  // Need to check if this is a terminate event _BEFORE_ we enqueue
+  // because it's possible that it'll get deleted after we enqueue it
+  // and before we use it again!
+  bool terminate =
+    event->is<TerminateEvent>() &&
+    event->as<TerminateEvent>().inject;
+
   switch (old) {
     case State::BOTTOM:
     case State::READY:
     case State::BLOCKED:
-      events->enqueue(event, inject);
+      events->enqueue(event);
       break;
     case State::TERMINATING:
       delete event;
       return;
   }
 
+  // We need to store terminate _AFTER_ we enqueue the event because
+  // the code in `ProcessMNager::resume` assumes that if it sees
+  // `termination` as true then there must be at least one event in
+  // the queue.
+  if (terminate) {
+    termination.store(true);
+  }
+
   // If we're BLOCKED then we need to try and enqueue us into the run
   // queue. It's possible that in the time we enqueued the event and
   // are attempting to enqueue us in the run queue another thread has
@@ -3746,21 +3774,6 @@ void ProcessBase::enqueue(Event* event, bool inject)
 }
 
 
-void ProcessBase::inject(
-    const UPID& from,
-    const string& name,
-    const char* data,
-    size_t length)
-{
-  if (!from)
-    return;
-
-  Message message = encode(from, pid, name, data, length);
-
-  enqueue(new MessageEvent(std::move(message)), true);
-}
-
-
 void ProcessBase::send(
     const UPID& to,
     const string& name,

http://git-wip-us.apache.org/repos/asf/mesos/blob/535e77ed/3rdparty/libprocess/src/tests/process_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index ed11909..30d0fb8 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -110,7 +110,7 @@ using testing::ReturnArg;
 
 TEST(ProcessTest, Event)
 {
-  Owned<Event> event(new TerminateEvent(UPID()));
+  Owned<Event> event(new TerminateEvent(UPID(), false));
   EXPECT_FALSE(event->is<MessageEvent>());
   EXPECT_FALSE(event->is<ExitedEvent>());
   EXPECT_TRUE(event->is<TerminateEvent>());


Mime
View raw message