mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bmah...@apache.org
Subject git commit: Fixed a resource accounting performance regression in the Master.
Date Tue, 07 Oct 2014 01:36:53 GMT
Repository: mesos
Updated Branches:
  refs/heads/master a62dc9dd3 -> f348070b7


Fixed a resource accounting performance regression in the Master.

Review: https://reviews.apache.org/r/26392


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f348070b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f348070b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f348070b

Branch: refs/heads/master
Commit: f348070b7c29cdd72cf3d87a77ee07f9fc46ea72
Parents: a62dc9d
Author: Benjamin Mahler <bmahler@twitter.com>
Authored: Mon Oct 6 11:55:52 2014 -0700
Committer: Benjamin Mahler <bmahler@twitter.com>
Committed: Mon Oct 6 18:26:53 2014 -0700

----------------------------------------------------------------------
 src/master/http.cpp   |  13 ++++-
 src/master/master.cpp |  53 ++++++++++++++------
 src/master/master.hpp | 118 +++++++++++++++++++++++++--------------------
 3 files changed, 113 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f348070b/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 16a6a01..3fd4b45 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -116,7 +116,16 @@ JSON::Object model(const Framework& framework)
   object.values["registered_time"] = framework.registeredTime.secs();
   object.values["unregistered_time"] = framework.unregisteredTime.secs();
   object.values["active"] = framework.active;
-  object.values["resources"] = model(framework.used());
+
+  // TODO(bmahler): Consider deprecating this in favor of the split
+  // used and offered resources below.
+  object.values["resources"] =
+    model(framework.usedResources + framework.offeredResources);
+
+  // TODO(bmahler): Use these in the webui.
+  object.values["used_resources"] = model(framework.usedResources);
+  object.values["offered_resources"] = model(framework.offeredResources);
+
   object.values["hostname"] = framework.info.hostname();
   object.values["webui_url"] = framework.info.webui_url();
 
@@ -406,7 +415,7 @@ Future<Response> Master::Http::stats(const Request& request)
         totalResources += resource;
       }
     }
-    foreach (const Resource& resource, slave->used()) {
+    foreach (const Resource& resource, slave->usedResources) {
       if (resource.type() == Value::SCALAR) {
         usedResources += resource;
       }

http://git-wip-us.apache.org/repos/asf/mesos/blob/f348070b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f05275b..86dc544 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3947,7 +3947,13 @@ void Master::addFramework(Framework* framework)
   message.mutable_master_info()->MergeFrom(info_);
   send(framework->pid, message);
 
-  allocator->frameworkAdded(framework->id, framework->info, framework->used());
+  // There should be no offered resources yet!
+  CHECK_EQ(Resources(), framework->offeredResources);
+
+  allocator->frameworkAdded(
+      framework->id,
+      framework->info,
+      framework->usedResources);
 
   // Export framework metrics.
 
@@ -4424,6 +4430,8 @@ void Master::_removeSlave(
 
 void Master::updateTask(Task* task, const TaskStatus& status)
 {
+  CHECK_NOTNULL(task);
+
   // Out-of-order updates should not occur, however in case they
   // do (e.g. MESOS-1799), prevent them here to ensure that the
   // resource accounting is not affected.
@@ -4435,16 +4443,38 @@ void Master::updateTask(Task* task, const TaskStatus& status)
     return;
   }
 
+  bool terminated =
+    !protobuf::isTerminalState(task->state()) &&
+    protobuf::isTerminalState(status.state());
+
+  // TODO(brenden) Consider wiping the `data` and `message` fields?
+  if (task->statuses_size() > 0 &&
+      task->statuses(task->statuses_size() - 1).state() == status.state()) {
+    task->mutable_statuses()->RemoveLast();
+  }
+  task->add_statuses()->CopyFrom(status);
+  task->set_state(status.state());
+
+  stats.tasks[status.state()]++;
+
   // Once the task becomes terminal, we recover the resources.
-  if (!protobuf::isTerminalState(task->state()) &&
-      protobuf::isTerminalState(status.state())) {
+  if (terminated) {
     allocator->resourcesRecovered(
         task->framework_id(),
         task->slave_id(),
         task->resources(),
         None());
 
-    switch (status.state()) {
+    // The slave owns the Task object and cannot be NULL.
+    Slave* slave = CHECK_NOTNULL(getSlave(task->slave_id()));
+    slave->taskTerminated(task);
+
+    Framework* framework = getFramework(task->framework_id());
+    if (framework != NULL) {
+      framework->taskTerminated(task);
+    }
+
+    switch (task->state()) {
       case TASK_FINISHED: ++metrics.tasks_finished; break;
       case TASK_FAILED:   ++metrics.tasks_failed;   break;
       case TASK_KILLED:   ++metrics.tasks_killed;   break;
@@ -4452,16 +4482,6 @@ void Master::updateTask(Task* task, const TaskStatus& status)
       default: break;
     }
   }
-
-  // TODO(brenden) Consider wiping the `data` and `message` fields?
-  if (task->statuses_size() > 0 &&
-      task->statuses(task->statuses_size() - 1).state() == status.state()) {
-    task->mutable_statuses()->RemoveLast();
-  }
-  task->add_statuses()->CopyFrom(status);
-  task->set_state(status.state());
-
-  stats.tasks[status.state()]++;
 }
 
 
@@ -4469,6 +4489,7 @@ void Master::removeTask(Task* task)
 {
   CHECK_NOTNULL(task);
 
+  // The slave owns the Task object and cannot be NULL.
   Slave* slave = CHECK_NOTNULL(getSlave(task->slave_id()));
 
   if (!protobuf::isTerminalState(task->state())) {
@@ -4479,7 +4500,7 @@ void Master::removeTask(Task* task)
                  << " in non-terminal state " << task->state();
 
     // If the task is not terminal, then the resources have
-    // not yet been released.
+    // not yet been recovered.
     allocator->resourcesRecovered(
         task->framework_id(),
         task->slave_id(),
@@ -5125,7 +5146,7 @@ double Master::_resources_used(const std::string& name)
   double used = 0.0;
 
   foreachvalue (Slave* slave, slaves.registered) {
-    foreach (const Resource& resource, slave->used()) {
+    foreach (const Resource& resource, slave->usedResources) {
       if (resource.name() == name && resource.type() == Value::SCALAR) {
         used += resource.scalar().value();
       }

http://git-wip-us.apache.org/repos/asf/mesos/blob/f348070b/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 0bf4546..5c0f224 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -843,17 +843,40 @@ struct Slave
       << " of framework " << task->framework_id();
 
     tasks[task->framework_id()][task->task_id()] = task;
+
+    if (!protobuf::isTerminalState(task->state())) {
+      usedResources += task->resources();
+    }
+
     LOG(INFO) << "Adding task " << task->task_id()
               << " with resources " << task->resources()
               << " on slave " << id << " (" << info.hostname() <<
")";
   }
 
+  // Notification of task termination, for resource accounting.
+  // TODO(bmahler): This is a hack for performance. We need to
+  // maintain resource counters because computing task resources
+  // functionally for all tasks is expensive, for now.
+  void taskTerminated(Task* task)
+  {
+    CHECK(protobuf::isTerminalState(task->state()));
+    CHECK(tasks[task->framework_id()].contains(task->task_id()))
+      << "Unknown task " << task->task_id()
+      << " of framework " << task->framework_id();
+
+    usedResources -= task->resources();
+  }
+
   void removeTask(Task* task)
   {
     CHECK(tasks[task->framework_id()].contains(task->task_id()))
       << "Unknown task " << task->task_id()
       << " of framework " << task->framework_id();
 
+    if (!protobuf::isTerminalState(task->state())) {
+      usedResources -= task->resources();
+    }
+
     tasks[task->framework_id()].erase(task->task_id());
     if (tasks[task->framework_id()].empty()) {
       tasks.erase(task->framework_id());
@@ -865,19 +888,17 @@ struct Slave
   void addOffer(Offer* offer)
   {
     CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
+
     offers.insert(offer);
-    VLOG(1) << "Adding offer " << offer->id()
-            << " with resources " << offer->resources()
-            << " on slave " << id << " (" << info.hostname() <<
")";
+    offeredResources += offer->resources();
   }
 
   void removeOffer(Offer* offer)
   {
     CHECK(offers.contains(offer)) << "Unknown offer " << offer->id();
+
+    offeredResources -= offer->resources();
     offers.erase(offer);
-    VLOG(1) << "Removing offer " << offer->id()
-            << " with resources " << offer->resources()
-            << " on slave " << id << " (" << info.hostname() <<
")";
   }
 
   bool hasExecutor(const FrameworkID& frameworkId,
@@ -895,6 +916,7 @@ struct Slave
       << " of framework " << frameworkId;
 
     executors[frameworkId][executorInfo.executor_id()] = executorInfo;
+    usedResources += executorInfo.resources();
   }
 
   void removeExecutor(const FrameworkID& frameworkId,
@@ -903,34 +925,13 @@ struct Slave
     CHECK(hasExecutor(frameworkId, executorId))
       << "Unknown executor " << executorId << " of framework " <<
frameworkId;
 
+    usedResources -= executors[frameworkId][executorId].resources();
     executors[frameworkId].erase(executorId);
     if (executors[frameworkId].empty()) {
       executors.erase(frameworkId);
     }
   }
 
-  Resources used() const
-  {
-    Resources used;
-
-    foreachkey (const FrameworkID& frameworkId, tasks) {
-      foreachvalue (const Task* task, tasks.find(frameworkId)->second) {
-        if (!protobuf::isTerminalState(task->state())) {
-          used += task->resources();
-        }
-      }
-    }
-
-    foreachkey (const FrameworkID& frameworkId, executors) {
-      foreachvalue (const ExecutorInfo& executorInfo,
-                    executors.find(frameworkId)->second) {
-        used += executorInfo.resources();
-      }
-    }
-
-    return used;
-  }
-
   const SlaveID id;
   const SlaveInfo info;
 
@@ -963,6 +964,9 @@ struct Slave
   // Active offers on this slave.
   hashset<Offer*> offers;
 
+  Resources usedResources;    // Active task / executor resources.
+  Resources offeredResources; // Offered resources.
+
   SlaveObserver* observer;
 
 private:
@@ -1014,6 +1018,24 @@ struct Framework
       << " of framework " << task->framework_id();
 
     tasks[task->task_id()] = task;
+
+    if (!protobuf::isTerminalState(task->state())) {
+      usedResources += task->resources();
+    }
+  }
+
+  // Notification of task termination, for resource accounting.
+  // TODO(bmahler): This is a hack for performance. We need to
+  // maintain resource counters because computing task resources
+  // functionally for all tasks is expensive, for now.
+  void taskTerminated(Task* task)
+  {
+    CHECK(protobuf::isTerminalState(task->state()));
+    CHECK(tasks.contains(task->task_id()))
+      << "Unknown task " << task->task_id()
+      << " of framework " << task->framework_id();
+
+    usedResources -= task->resources();
   }
 
   void addCompletedTask(const Task& task)
@@ -1028,6 +1050,10 @@ struct Framework
       << "Unknown task " << task->task_id()
       << " of framework " << task->framework_id();
 
+    if (!protobuf::isTerminalState(task->state())) {
+      usedResources -= task->resources();
+    }
+
     addCompletedTask(*task);
 
     tasks.erase(task->task_id());
@@ -1037,6 +1063,7 @@ struct Framework
   {
     CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
     offers.insert(offer);
+    offeredResources += offer->resources();
   }
 
   void removeOffer(Offer* offer)
@@ -1044,6 +1071,7 @@ struct Framework
     CHECK(offers.find(offer) != offers.end())
       << "Unknown offer " << offer->id();
 
+    offeredResources -= offer->resources();
     offers.erase(offer);
   }
 
@@ -1062,6 +1090,7 @@ struct Framework
       << " on slave " << slaveId;
 
     executors[slaveId][executorInfo.executor_id()] = executorInfo;
+    usedResources += executorInfo.resources();
   }
 
   void removeExecutor(const SlaveID& slaveId,
@@ -1072,36 +1101,13 @@ struct Framework
       << " of framework " << id
       << " of slave " << slaveId;
 
+    usedResources -= executors[slaveId][executorId].resources();
     executors[slaveId].erase(executorId);
     if (executors[slaveId].empty()) {
       executors.erase(slaveId);
     }
   }
 
-  Resources used() const
-  {
-    Resources used;
-
-    foreach (Offer* offer, offers) {
-      used += offer->resources();
-    }
-
-    foreachvalue (const Task* task, tasks) {
-      if (!protobuf::isTerminalState(task->state())) {
-        used += task->resources();
-      }
-    }
-
-    foreachkey (const SlaveID& slaveId, executors) {
-      foreachvalue (const ExecutorInfo& executorInfo,
-                    executors.find(slaveId)->second) {
-        used += executorInfo.resources();
-      }
-    }
-
-    return used;
-  }
-
   const FrameworkID id; // TODO(benh): Store this in 'info'.
 
   const FrameworkInfo info;
@@ -1135,6 +1141,11 @@ struct Framework
 
   hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo> > executors;
 
+  // TODO(bmahler): Summing set and ranges resources across slaves
+  // does not yield meaningful totals.
+  Resources usedResources;    // Active task / executor resources.
+  Resources offeredResources; // Offered resources.
+
 private:
   Framework(const Framework&);              // No copying.
   Framework& operator = (const Framework&); // No assigning.
@@ -1161,7 +1172,8 @@ struct Role
   {
     Resources resources;
     foreachvalue (Framework* framework, frameworks) {
-      resources += framework->used();
+      resources += framework->usedResources;
+      resources += framework->offeredResources;
     }
 
     return resources;


Mime
View raw message