Return-Path: X-Original-To: apmail-mesos-commits-archive@www.apache.org Delivered-To: apmail-mesos-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A872B17D82 for ; Tue, 7 Oct 2014 01:36:53 +0000 (UTC) Received: (qmail 30144 invoked by uid 500); 7 Oct 2014 01:36:53 -0000 Delivered-To: apmail-mesos-commits-archive@mesos.apache.org Received: (qmail 30121 invoked by uid 500); 7 Oct 2014 01:36:53 -0000 Mailing-List: contact commits-help@mesos.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mesos.apache.org Delivered-To: mailing list commits@mesos.apache.org Received: (qmail 30112 invoked by uid 99); 7 Oct 2014 01:36:53 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Oct 2014 01:36:53 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4B5988B454B; Tue, 7 Oct 2014 01:36:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bmahler@apache.org To: commits@mesos.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: Fixed a resource accounting performance regression in the Master. Date: Tue, 7 Oct 2014 01:36:53 +0000 (UTC) Repository: mesos Updated Branches: refs/heads/master a62dc9dd3 -> f348070b7 Fixed a resource accounting performance regression in the Master. Review: https://reviews.apache.org/r/26392 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f348070b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f348070b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f348070b Branch: refs/heads/master Commit: f348070b7c29cdd72cf3d87a77ee07f9fc46ea72 Parents: a62dc9d Author: Benjamin Mahler Authored: Mon Oct 6 11:55:52 2014 -0700 Committer: Benjamin Mahler Committed: Mon Oct 6 18:26:53 2014 -0700 ---------------------------------------------------------------------- src/master/http.cpp | 13 ++++- src/master/master.cpp | 53 ++++++++++++++------ src/master/master.hpp | 118 +++++++++++++++++++++++++-------------------- 3 files changed, 113 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/f348070b/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index 16a6a01..3fd4b45 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -116,7 +116,16 @@ JSON::Object model(const Framework& framework) object.values["registered_time"] = framework.registeredTime.secs(); object.values["unregistered_time"] = framework.unregisteredTime.secs(); object.values["active"] = framework.active; - object.values["resources"] = model(framework.used()); + + // TODO(bmahler): Consider deprecating this in favor of the split + // used and offered resources below. + object.values["resources"] = + model(framework.usedResources + framework.offeredResources); + + // TODO(bmahler): Use these in the webui. + object.values["used_resources"] = model(framework.usedResources); + object.values["offered_resources"] = model(framework.offeredResources); + object.values["hostname"] = framework.info.hostname(); object.values["webui_url"] = framework.info.webui_url(); @@ -406,7 +415,7 @@ Future Master::Http::stats(const Request& request) totalResources += resource; } } - foreach (const Resource& resource, slave->used()) { + foreach (const Resource& resource, slave->usedResources) { if (resource.type() == Value::SCALAR) { usedResources += resource; } http://git-wip-us.apache.org/repos/asf/mesos/blob/f348070b/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index f05275b..86dc544 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3947,7 +3947,13 @@ void Master::addFramework(Framework* framework) message.mutable_master_info()->MergeFrom(info_); send(framework->pid, message); - allocator->frameworkAdded(framework->id, framework->info, framework->used()); + // There should be no offered resources yet! + CHECK_EQ(Resources(), framework->offeredResources); + + allocator->frameworkAdded( + framework->id, + framework->info, + framework->usedResources); // Export framework metrics. @@ -4424,6 +4430,8 @@ void Master::_removeSlave( void Master::updateTask(Task* task, const TaskStatus& status) { + CHECK_NOTNULL(task); + // Out-of-order updates should not occur, however in case they // do (e.g. MESOS-1799), prevent them here to ensure that the // resource accounting is not affected. @@ -4435,16 +4443,38 @@ void Master::updateTask(Task* task, const TaskStatus& status) return; } + bool terminated = + !protobuf::isTerminalState(task->state()) && + protobuf::isTerminalState(status.state()); + + // TODO(brenden) Consider wiping the `data` and `message` fields? + if (task->statuses_size() > 0 && + task->statuses(task->statuses_size() - 1).state() == status.state()) { + task->mutable_statuses()->RemoveLast(); + } + task->add_statuses()->CopyFrom(status); + task->set_state(status.state()); + + stats.tasks[status.state()]++; + // Once the task becomes terminal, we recover the resources. - if (!protobuf::isTerminalState(task->state()) && - protobuf::isTerminalState(status.state())) { + if (terminated) { allocator->resourcesRecovered( task->framework_id(), task->slave_id(), task->resources(), None()); - switch (status.state()) { + // The slave owns the Task object and cannot be NULL. + Slave* slave = CHECK_NOTNULL(getSlave(task->slave_id())); + slave->taskTerminated(task); + + Framework* framework = getFramework(task->framework_id()); + if (framework != NULL) { + framework->taskTerminated(task); + } + + switch (task->state()) { case TASK_FINISHED: ++metrics.tasks_finished; break; case TASK_FAILED: ++metrics.tasks_failed; break; case TASK_KILLED: ++metrics.tasks_killed; break; @@ -4452,16 +4482,6 @@ void Master::updateTask(Task* task, const TaskStatus& status) default: break; } } - - // TODO(brenden) Consider wiping the `data` and `message` fields? - if (task->statuses_size() > 0 && - task->statuses(task->statuses_size() - 1).state() == status.state()) { - task->mutable_statuses()->RemoveLast(); - } - task->add_statuses()->CopyFrom(status); - task->set_state(status.state()); - - stats.tasks[status.state()]++; } @@ -4469,6 +4489,7 @@ void Master::removeTask(Task* task) { CHECK_NOTNULL(task); + // The slave owns the Task object and cannot be NULL. Slave* slave = CHECK_NOTNULL(getSlave(task->slave_id())); if (!protobuf::isTerminalState(task->state())) { @@ -4479,7 +4500,7 @@ void Master::removeTask(Task* task) << " in non-terminal state " << task->state(); // If the task is not terminal, then the resources have - // not yet been released. + // not yet been recovered. allocator->resourcesRecovered( task->framework_id(), task->slave_id(), @@ -5125,7 +5146,7 @@ double Master::_resources_used(const std::string& name) double used = 0.0; foreachvalue (Slave* slave, slaves.registered) { - foreach (const Resource& resource, slave->used()) { + foreach (const Resource& resource, slave->usedResources) { if (resource.name() == name && resource.type() == Value::SCALAR) { used += resource.scalar().value(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/f348070b/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 0bf4546..5c0f224 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -843,17 +843,40 @@ struct Slave << " of framework " << task->framework_id(); tasks[task->framework_id()][task->task_id()] = task; + + if (!protobuf::isTerminalState(task->state())) { + usedResources += task->resources(); + } + LOG(INFO) << "Adding task " << task->task_id() << " with resources " << task->resources() << " on slave " << id << " (" << info.hostname() << ")"; } + // Notification of task termination, for resource accounting. + // TODO(bmahler): This is a hack for performance. We need to + // maintain resource counters because computing task resources + // functionally for all tasks is expensive, for now. + void taskTerminated(Task* task) + { + CHECK(protobuf::isTerminalState(task->state())); + CHECK(tasks[task->framework_id()].contains(task->task_id())) + << "Unknown task " << task->task_id() + << " of framework " << task->framework_id(); + + usedResources -= task->resources(); + } + void removeTask(Task* task) { CHECK(tasks[task->framework_id()].contains(task->task_id())) << "Unknown task " << task->task_id() << " of framework " << task->framework_id(); + if (!protobuf::isTerminalState(task->state())) { + usedResources -= task->resources(); + } + tasks[task->framework_id()].erase(task->task_id()); if (tasks[task->framework_id()].empty()) { tasks.erase(task->framework_id()); @@ -865,19 +888,17 @@ struct Slave void addOffer(Offer* offer) { CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id(); + offers.insert(offer); - VLOG(1) << "Adding offer " << offer->id() - << " with resources " << offer->resources() - << " on slave " << id << " (" << info.hostname() << ")"; + offeredResources += offer->resources(); } void removeOffer(Offer* offer) { CHECK(offers.contains(offer)) << "Unknown offer " << offer->id(); + + offeredResources -= offer->resources(); offers.erase(offer); - VLOG(1) << "Removing offer " << offer->id() - << " with resources " << offer->resources() - << " on slave " << id << " (" << info.hostname() << ")"; } bool hasExecutor(const FrameworkID& frameworkId, @@ -895,6 +916,7 @@ struct Slave << " of framework " << frameworkId; executors[frameworkId][executorInfo.executor_id()] = executorInfo; + usedResources += executorInfo.resources(); } void removeExecutor(const FrameworkID& frameworkId, @@ -903,34 +925,13 @@ struct Slave CHECK(hasExecutor(frameworkId, executorId)) << "Unknown executor " << executorId << " of framework " << frameworkId; + usedResources -= executors[frameworkId][executorId].resources(); executors[frameworkId].erase(executorId); if (executors[frameworkId].empty()) { executors.erase(frameworkId); } } - Resources used() const - { - Resources used; - - foreachkey (const FrameworkID& frameworkId, tasks) { - foreachvalue (const Task* task, tasks.find(frameworkId)->second) { - if (!protobuf::isTerminalState(task->state())) { - used += task->resources(); - } - } - } - - foreachkey (const FrameworkID& frameworkId, executors) { - foreachvalue (const ExecutorInfo& executorInfo, - executors.find(frameworkId)->second) { - used += executorInfo.resources(); - } - } - - return used; - } - const SlaveID id; const SlaveInfo info; @@ -963,6 +964,9 @@ struct Slave // Active offers on this slave. hashset offers; + Resources usedResources; // Active task / executor resources. + Resources offeredResources; // Offered resources. + SlaveObserver* observer; private: @@ -1014,6 +1018,24 @@ struct Framework << " of framework " << task->framework_id(); tasks[task->task_id()] = task; + + if (!protobuf::isTerminalState(task->state())) { + usedResources += task->resources(); + } + } + + // Notification of task termination, for resource accounting. + // TODO(bmahler): This is a hack for performance. We need to + // maintain resource counters because computing task resources + // functionally for all tasks is expensive, for now. + void taskTerminated(Task* task) + { + CHECK(protobuf::isTerminalState(task->state())); + CHECK(tasks.contains(task->task_id())) + << "Unknown task " << task->task_id() + << " of framework " << task->framework_id(); + + usedResources -= task->resources(); } void addCompletedTask(const Task& task) @@ -1028,6 +1050,10 @@ struct Framework << "Unknown task " << task->task_id() << " of framework " << task->framework_id(); + if (!protobuf::isTerminalState(task->state())) { + usedResources -= task->resources(); + } + addCompletedTask(*task); tasks.erase(task->task_id()); @@ -1037,6 +1063,7 @@ struct Framework { CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id(); offers.insert(offer); + offeredResources += offer->resources(); } void removeOffer(Offer* offer) @@ -1044,6 +1071,7 @@ struct Framework CHECK(offers.find(offer) != offers.end()) << "Unknown offer " << offer->id(); + offeredResources -= offer->resources(); offers.erase(offer); } @@ -1062,6 +1090,7 @@ struct Framework << " on slave " << slaveId; executors[slaveId][executorInfo.executor_id()] = executorInfo; + usedResources += executorInfo.resources(); } void removeExecutor(const SlaveID& slaveId, @@ -1072,36 +1101,13 @@ struct Framework << " of framework " << id << " of slave " << slaveId; + usedResources -= executors[slaveId][executorId].resources(); executors[slaveId].erase(executorId); if (executors[slaveId].empty()) { executors.erase(slaveId); } } - Resources used() const - { - Resources used; - - foreach (Offer* offer, offers) { - used += offer->resources(); - } - - foreachvalue (const Task* task, tasks) { - if (!protobuf::isTerminalState(task->state())) { - used += task->resources(); - } - } - - foreachkey (const SlaveID& slaveId, executors) { - foreachvalue (const ExecutorInfo& executorInfo, - executors.find(slaveId)->second) { - used += executorInfo.resources(); - } - } - - return used; - } - const FrameworkID id; // TODO(benh): Store this in 'info'. const FrameworkInfo info; @@ -1135,6 +1141,11 @@ struct Framework hashmap > executors; + // TODO(bmahler): Summing set and ranges resources across slaves + // does not yield meaningful totals. + Resources usedResources; // Active task / executor resources. + Resources offeredResources; // Offered resources. + private: Framework(const Framework&); // No copying. Framework& operator = (const Framework&); // No assigning. @@ -1161,7 +1172,8 @@ struct Role { Resources resources; foreachvalue (Framework* framework, frameworks) { - resources += framework->used(); + resources += framework->usedResources; + resources += framework->offeredResources; } return resources;