mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [8/9] mesos git commit: Added logic for subscribing to task added/updated events.
Date Tue, 21 Jun 2016 04:09:48 GMT
Added logic for subscribing to task added/updated events.

This change sets the initial scaffolding in place allowing
a client to subscribe to events via the '/api/vX' endpoint.
Currently, only two events are supported i.e. `TASK_ADDED`/
`TASK_UPDATED`.

Review: https://reviews.apache.org/r/48879/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e878e16b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e878e16b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e878e16b

Branch: refs/heads/master
Commit: e878e16ba54419ae39c7fbcd3466afe1cc42a93a
Parents: f384ac9
Author: Anand Mazumdar <anand@apache.org>
Authored: Mon Jun 20 20:50:07 2016 -0700
Committer: Anand Mazumdar <anand@apache.org>
Committed: Mon Jun 20 20:50:07 2016 -0700

----------------------------------------------------------------------
 src/master/http.cpp   | 16 +++++++++++--
 src/master/master.cpp | 60 ++++++++++++++++++++++++++++++++++++++++++++++
 src/master/master.hpp | 54 +++++++++++++++++++++++++++++++++--------
 3 files changed, 118 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e878e16b/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 4bee2f1..3a53ef7 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -685,8 +685,20 @@ Future<Response> Master::Http::api(
     case mesos::master::Call::REMOVE_QUOTA:
       return NotImplemented();
 
-    case mesos::master::Call::SUBSCRIBE:
-      return NotImplemented();
+    case mesos::master::Call::SUBSCRIBE: {
+      Pipe pipe;
+      OK ok;
+
+      ok.headers["Content-Type"] = stringify(acceptType);
+      ok.type = Response::PIPE;
+      ok.reader = pipe.reader();
+
+      HttpConnection http {pipe.writer(), acceptType, UUID::random()};
+
+      master->subscribe(http);
+
+      return ok;
+    }
   }
 
   UNREACHABLE();

http://git-wip-us.apache.org/repos/asf/mesos/blob/e878e16b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 93eaded..8def715 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4716,6 +4716,7 @@ void Master::_registerSlave(
     machineId.set_ip(stringify(pid.address.ip));
 
     Slave* slave = new Slave(
+        this,
         slaveInfo,
         pid,
         machineId,
@@ -4960,6 +4961,7 @@ void Master::_reregisterSlave(
     machineId.set_ip(stringify(pid.address.ip));
 
     Slave* slave = new Slave(
+        this,
         slaveInfo,
         pid,
         machineId,
@@ -6850,6 +6852,14 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
     // If the task has already transitioned to a terminal state,
     // do not update its state.
     if (!protobuf::isTerminalState(task->state())) {
+      // Send a notification to all subscribers if the task transitioned
+      // to a new state.
+      if (!subscribers.subscribed.empty() &&
+          latestState.get() != task->state()) {
+        subscribers.send(protobuf::master::event::createTaskUpdated(
+            *task, latestState.get()));
+      }
+
       task->set_state(latestState.get());
     }
   } else {
@@ -6860,6 +6870,13 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
     // its state. Note that we are being defensive here because this should not
     // happen unless there is a bug in the master code.
     if (!protobuf::isTerminalState(task->state())) {
+      // Send a notification to all subscribers if the task transitioned
+      // to a new state.
+      if (!subscribers.subscribed.empty() && task->state() != status.state())
{
+        subscribers.send(protobuf::master::event::createTaskUpdated(
+            *task, status.state()));
+      }
+
       task->set_state(status.state());
     }
   }
@@ -7489,11 +7506,54 @@ void Slave::addTask(Task* task)
     usedResources[frameworkId] += task->resources();
   }
 
+  if (!master->subscribers.subscribed.empty()) {
+    master->subscribers.send(protobuf::master::event::createTaskAdded(*task));
+  }
+
   LOG(INFO) << "Adding task " << taskId
             << " with resources " << task->resources()
             << " on agent " << id << " (" << info.hostname() <<
")";
 }
 
+
+void Master::Subscribers::send(const mesos::master::Event& event)
+{
+  VLOG(1) << "Notifying all active subscribers about " << event.type() <<
" "
+          << "event";
+
+  foreachvalue (Subscriber subscriber, subscribed) {
+    subscriber.http.send<mesos::master::Event, v1::master::Event>(event);
+  }
+}
+
+
+void Master::exited(const UUID& id)
+{
+  if (!subscribers.subscribed.contains(id)) {
+    LOG(WARNING) << "Unknown subscriber" << id << " disconnected";
+    return;
+  }
+
+  subscribers.subscribed.erase(id);
+}
+
+
+void Master::subscribe(HttpConnection http)
+{
+  Subscribers::Subscriber subscriber{http};
+
+  subscribers.subscribed.put(http.streamId, subscriber);
+
+  LOG(INFO) << "Added subscriber: " << subscriber.http.streamId << " to
the "
+            << "list of active subscribers";
+
+  subscriber.http.closed()
+    .onAny(defer(self(),
+           [this, subscriber](const Future<Nothing>&) {
+             exited(subscriber.http.streamId);
+           }));
+}
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e878e16b/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 5874f89..2064f84 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -99,6 +99,7 @@ class WhitelistWatcher;
 
 namespace master {
 
+class Master;
 class SlaveObserver;
 
 struct BoundedRateLimiter;
@@ -108,7 +109,8 @@ struct Role;
 
 struct Slave
 {
-  Slave(const SlaveInfo& _info,
+  Slave(Master* const _master,
+        const SlaveInfo& _info,
         const process::UPID& _pid,
         const MachineID& _machineId,
         const std::string& _version,
@@ -118,7 +120,8 @@ struct Slave
           std::vector<ExecutorInfo>(),
         const std::vector<Task> tasks =
           std::vector<Task>())
-    : id(_info.id()),
+    : master(_master),
+      id(_info.id()),
       info(_info),
       machineId(_machineId),
       pid(_pid),
@@ -277,6 +280,7 @@ struct Slave
     checkpointedResources = totalResources.filter(needCheckpointing);
   }
 
+  Master* const master;
   const SlaveID id;
   const SlaveInfo info;
 
@@ -347,7 +351,8 @@ inline std::ostream& operator<<(std::ostream& stream, const
Slave& slave)
 }
 
 
-// Represents the streaming HTTP connection to a framework.
+// Represents the streaming HTTP connection to a framework or a client
+// subscribed to the '/api/vX' endpoint.
 struct HttpConnection
 {
   HttpConnection(const process::http::Pipe::Writer& _writer,
@@ -355,15 +360,16 @@ struct HttpConnection
                  UUID _streamId)
     : writer(_writer),
       contentType(_contentType),
-      streamId(_streamId),
-      encoder(lambda::bind(serialize, contentType, lambda::_1)) {}
+      streamId(_streamId) {}
 
-  // Converts the message to an Event before sending.
-  template <typename Message>
+  // We need to evolve the internal old style message/unversioned event into a
+  // versioned event e.g., `v1::scheduler::Event` or `v1::master::Event`.
+  template <typename Message, typename Event = v1::scheduler::Event>
   bool send(const Message& message)
   {
-    // We need to evolve the internal 'message' into a
-    // 'v1::scheduler::Event'.
+    ::recordio::Encoder<Event> encoder (lambda::bind(
+        serialize, contentType, lambda::_1));
+
     return writer.write(encoder.encode(evolve(message)));
   }
 
@@ -380,7 +386,6 @@ struct HttpConnection
   process::http::Pipe::Writer writer;
   ContentType contentType;
   UUID streamId;
-  ::recordio::Encoder<v1::scheduler::Event> encoder;
 };
 
 
@@ -572,6 +577,9 @@ protected:
   void exited(const FrameworkID& frameworkId, const HttpConnection& http);
   void _exited(Framework* framework);
 
+  // Invoked upon noticing a subscriber disconnection.
+  void exited(const UUID& id);
+
   // Invoked when the message is ready to be executed after
   // being throttled.
   // 'principal' being None indicates it is throttled by
@@ -885,6 +893,9 @@ private:
       bool force,
       const process::Future<bool>& authorized);
 
+  // Subscribes a client to the 'api/vX' endpoint.
+  void subscribe(HttpConnection http);
+
   void teardown(Framework* framework);
 
   void accept(
@@ -1406,6 +1417,7 @@ private:
 
   friend struct Framework;
   friend struct Metrics;
+  friend struct Slave;
 
   // NOTE: Since 'getOffer', 'getInverseOffer' and 'slaves' are
   // protected, we need to make the following functions friends.
@@ -1601,6 +1613,28 @@ private:
     Option<process::Owned<BoundedRateLimiter>> defaultLimiter;
   } frameworks;
 
+  struct Subscribers
+  {
+    // Represents a client subscribed to the 'api/vX' endpoint.
+    //
+    // TODO(anand): Add support for filtering. Some subscribers
+    // might only be interested in a subset of events.
+    struct Subscriber
+    {
+      Subscriber(const HttpConnection& _http)
+        : http(_http) {}
+
+      HttpConnection http;
+    };
+
+    // Sends the event to all subscribers connected to the 'api/vX' endpoint.
+    void send(const mesos::master::Event& event);
+
+    // Active subscribers to the 'api/vX' endpoint keyed by the stream
+    // identifier.
+    hashmap<UUID, Subscriber> subscribed;
+  } subscribers;
+
   hashmap<OfferID, Offer*> offers;
   hashmap<OfferID, process::Timer> offerTimers;
 


Mime
View raw message