Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 240FD200D5A for ; Wed, 8 Nov 2017 17:01:25 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 22AAD160BDA; Wed, 8 Nov 2017 16:01:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id F3640160C05 for ; Wed, 8 Nov 2017 17:01:23 +0100 (CET) Received: (qmail 41177 invoked by uid 500); 8 Nov 2017 16:01:23 -0000 Mailing-List: contact commits-help@mesos.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@mesos.apache.org Delivered-To: mailing list commits@mesos.apache.org Received: (qmail 41080 invoked by uid 99); 8 Nov 2017 16:01:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Nov 2017 16:01:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 87F5BDFFB5; Wed, 8 Nov 2017 16:01:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bbannier@apache.org To: commits@mesos.apache.org Date: Wed, 08 Nov 2017 16:01:28 -0000 Message-Id: <0521082e5f054e20bb4f05ea1860562a@git.apache.org> In-Reply-To: <444ab7d37d194dadadbc39343cd0f949@git.apache.org> References: <444ab7d37d194dadadbc39343cd0f949@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [7/9] mesos git commit: Synchronized agent resource versions via 'UpdateSlaveMessage'. archived-at: Wed, 08 Nov 2017 16:01:25 -0000 Synchronized agent resource versions via 'UpdateSlaveMessage'. This commit introduces agent resource versions to the master and agents. Agents are responsible for maintaining their resource versions. The resource versions are synchronized with the master via 'UpdateSlaveMessage'. Review: https://reviews.apache.org/r/63492/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/51a15496 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/51a15496 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/51a15496 Branch: refs/heads/master Commit: 51a1549611fbfcf1ad1a1eb5e3efe0f1221acb36 Parents: b1c9b46 Author: Benjamin Bannier Authored: Wed Nov 8 00:52:08 2017 +0100 Committer: Benjamin Bannier Committed: Wed Nov 8 15:51:20 2017 +0100 ---------------------------------------------------------------------- src/common/protobuf_utils.cpp | 46 ++++++++++++++++++++++++++++++++++++++ src/common/protobuf_utils.hpp | 11 +++++++++ src/master/master.cpp | 10 +++++++++ src/master/master.hpp | 2 ++ src/slave/slave.cpp | 31 +++++++++++++++++++++++++ src/slave/slave.hpp | 1 + 6 files changed, 101 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/common/protobuf_utils.cpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp index 7a4b87b..5739a63 100644 --- a/src/common/protobuf_utils.cpp +++ b/src/common/protobuf_utils.cpp @@ -755,6 +755,52 @@ void stripAllocationInfo(Offer::Operation* operation) } +RepeatedPtrField createResourceVersions( + const hashmap, UUID>& resourceVersions) +{ + RepeatedPtrField result; + + foreachpair ( + const Option& resourceProviderId, + const UUID& uuid, + resourceVersions) { + ResourceVersionUUID* entry = result.Add(); + + if (resourceProviderId.isSome()) { + entry->mutable_resource_provider_id()->CopyFrom(resourceProviderId.get()); + } + entry->set_uuid(uuid.toBytes()); + } + + return result; +} + + +hashmap, UUID> parseResourceVersions( + const RepeatedPtrField& resourceVersionUUIDs) +{ + hashmap, UUID> result; + + foreach ( + const ResourceVersionUUID& resourceVersionUUID, + resourceVersionUUIDs) { + const Option resourceProviderId = + resourceVersionUUID.has_resource_provider_id() + ? resourceVersionUUID.resource_provider_id() + : Option::none(); + + CHECK(!result.contains(resourceProviderId)); + + const Try uuid = UUID::fromBytes(resourceVersionUUID.uuid()); + CHECK_SOME(uuid); + + result.insert({std::move(resourceProviderId), std::move(uuid.get())}); + } + + return result; +} + + TimeInfo getCurrentTime() { TimeInfo timeInfo; http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/common/protobuf_utils.hpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp index 95f57da..0ca4c6d 100644 --- a/src/common/protobuf_utils.hpp +++ b/src/common/protobuf_utils.hpp @@ -192,6 +192,17 @@ void injectAllocationInfo( void stripAllocationInfo(Offer::Operation* operation); +// Helper function to pack a protobuf list of resource versions. +google::protobuf::RepeatedPtrField createResourceVersions( + const hashmap, UUID>& resourceVersions); + + +// Helper function to unpack a protobuf list of resource versions. +hashmap, UUID> parseResourceVersions( + const google::protobuf::RepeatedPtrField& + resourceVersionUUIDs); + + // Helper function that fills in a TimeInfo from the current time. TimeInfo getCurrentTime(); http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index ee212c1..01675ed 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -126,6 +126,8 @@ using process::http::authentication::Principal; using process::metrics::Counter; +using google::protobuf::RepeatedPtrField; + namespace mesos { namespace internal { namespace master { @@ -140,6 +142,7 @@ using mesos::master::detector::MasterDetector; static bool isValidFailoverTimeout(const FrameworkInfo& frameworkInfo); + class SlaveObserver : public ProtobufProcess { public: @@ -7089,6 +7092,13 @@ void Master::updateSlave(const UpdateSlaveMessage& message) newTotal.getOrElse(slave->totalResources.nonRevocable()) + newOversubscribed.getOrElse(slave->totalResources.revocable()); + // Agents which can support resource providers always update the + // master on their resource versions uuids via `UpdateSlaveMessage`. + if (slave->capabilities.resourceProvider) { + slave->resourceVersions = + protobuf::parseResourceVersions(message.resource_version_uuids()); + } + if (newSlaveResources == slave->totalResources) { LOG(INFO) << "Ignoring update on agent " << *slave << " as it reports no changes"; http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 0c1253a..adabc59 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -267,6 +267,8 @@ struct Slave SlaveObserver* observer; + hashmap, UUID> resourceVersions; + private: Slave(const Slave&); // No copying. Slave& operator=(const Slave&); // No assigning. http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 494d793..be46ebd 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -223,6 +223,7 @@ Slave::Slave(const string& id, resourceEstimator(_resourceEstimator), qosController(_qosController), authorizer(_authorizer), + resourceVersions({{Option::none(), UUID::random()}}), secretGenerator(nullptr) {} @@ -1277,6 +1278,8 @@ void Slave::registered( UpdateSlaveMessage message; message.mutable_slave_id()->CopyFrom(info.id()); + message.mutable_resource_version_uuids()->CopyFrom( + protobuf::createResourceVersions(resourceVersions)); if (capabilities.resourceProvider) { LOG(INFO) << "Forwarding total resources " << totalResources; @@ -1377,6 +1380,9 @@ void Slave::reregistered( UpdateSlaveMessage message; message.mutable_slave_id()->CopyFrom(info.id()); + message.mutable_resource_version_uuids()->CopyFrom( + protobuf::createResourceVersions(resourceVersions)); + if (capabilities.resourceProvider) { LOG(INFO) << "Forwarding total resources " << totalResources; @@ -6646,11 +6652,23 @@ void Slave::_forwardOversubscribed(const Future& oversubscribable) LOG(INFO) << "Forwarding total oversubscribed resources " << oversubscribed; + // We do not update the agent's resource version since + // oversubscribed resources cannot be used for any operations + // but launches. Since oversubscription is run at regular + // intervals updating the version could cause a lot of offer + // operation churn. + // + // TODO(bbannier): Revisit this if we modify the operations + // possible on oversubscribed resources. + UpdateSlaveMessage message; message.mutable_slave_id()->CopyFrom(info.id()); message.mutable_resource_categories()->set_oversubscribed(true); message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed); + message.mutable_resource_version_uuids()->CopyFrom( + protobuf::createResourceVersions(resourceVersions)); + CHECK_SOME(master); send(master.get(), message); } @@ -6706,6 +6724,15 @@ void Slave::handleResourceProviderMessage( totalResources -= oldTotal; totalResources += newTotal; + const UUID& resourceVersionUuid = + message->updateTotalResources->resourceVersionUuid; + + if (resourceVersions.contains(resourceProviderId)) { + resourceVersions.at(resourceProviderId) = resourceVersionUuid; + } else { + resourceVersions.insert({resourceProviderId, resourceVersionUuid}); + } + // Send the updated resources to the master if the agent is running. Note // that since we have already updated our copy of the latest resource // provider resources, it is safe to consume this message and wait for the @@ -6725,9 +6752,13 @@ void Slave::handleResourceProviderMessage( UpdateSlaveMessage updateSlaveMessage; updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id()); updateSlaveMessage.mutable_resource_categories()->set_total(true); + updateSlaveMessage.mutable_total_resources()->CopyFrom( totalResources); + updateSlaveMessage.mutable_resource_version_uuids()->CopyFrom( + protobuf::createResourceVersions(resourceVersions)); + send(master.get(), updateSlaveMessage); break; http://git-wip-us.apache.org/repos/asf/mesos/blob/51a15496/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index b2dc002..0124df4 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -665,6 +665,7 @@ private: ResourceProviderManager resourceProviderManager; process::Owned localResourceProviderDaemon; + hashmap, UUID> resourceVersions; protected: // Made protected for testing purposes.