mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject [13/14] mesos git commit: Introduced ResourceConversion to represent conversion to Resources.
Date Mon, 06 Nov 2017 23:07:03 GMT
Introduced ResourceConversion to represent conversion to Resources.

Currently, we couple offer operations with resource conversions. For
instance, we have interfaces like `Resources::apply` that takes an
offer operation. This becomes less ideal because an offer operation
might not represent a `conversion` anymore with new offer operation
like `CREATE_VOLUME` being introduced.

This patch decoupled resource conversions from offer operations.
`Resources::apply` will now take a `ResourceConversion` object. We
also provide some helpers to get a `ResourceConversion` from an offer
operation (if supported).

Review: https://reviews.apache.org/r/63511


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e55309ad
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e55309ad
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e55309ad

Branch: refs/heads/master
Commit: e55309ad8274164e23782a5cb53de93ffa0bc24e
Parents: 2f1efb9
Author: Jie Yu <yujie.jay@gmail.com>
Authored: Thu Nov 2 14:19:19 2017 +0100
Committer: Jie Yu <yujie.jay@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 include/mesos/resources.hpp    |  60 +++++--
 include/mesos/v1/resources.hpp |  60 +++++--
 src/common/resources.cpp       | 306 ++++++------------------------------
 src/common/resources_utils.cpp | 131 +++++++++++++++
 src/common/resources_utils.hpp |  17 ++
 src/v1/resources.cpp           | 306 ++++++------------------------------
 6 files changed, 332 insertions(+), 548 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e55309ad/include/mesos/resources.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/resources.hpp b/include/mesos/resources.hpp
index 53152b3..08c544d 100644
--- a/include/mesos/resources.hpp
+++ b/include/mesos/resources.hpp
@@ -36,6 +36,7 @@
 #include <stout/hashmap.hpp>
 #include <stout/json.hpp>
 #include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/try.hpp>
 
@@ -53,6 +54,10 @@
 
 namespace mesos {
 
+// Forward declaration.
+class ResourceConversion;
+
+
 // NOTE: Resource objects stored in the class are always valid, are in
 // the "post-reservation-refinement" format, and kept combined if possible.
 // It is the caller's responsibility to validate any Resource object or
@@ -464,26 +469,30 @@ public:
   // example frameworks to leverage.
   Option<Resources> find(const Resources& targets) const;
 
-  // Certain offer operations alter the offered resources. The
-  // following methods provide a convenient way to get the transformed
-  // resources by applying the given offer operation(s). Returns an
-  // Error if the offer operation(s) cannot be applied.
-  Try<Resources> apply(
-      const Offer::Operation& operation,
-      const Option<Resources>& convertedResources = None()) const;
+  // Applies a resource conversion by taking out the `consumed`
+  // resources and adding back the `converted` resources. Returns an
+  // Error if the conversion cannot be applied.
+  Try<Resources> apply(const ResourceConversion& conversion) const;
+
+  // Obtains the conversion from the given operation and applies the
+  // conversion. This method serves a syntax sugar for applying a
+  // resource conversion.
+  // TODO(jieyu): Consider remove this method once we updated all the
+  // call sites.
+  Try<Resources> apply(const Offer::Operation& operation) const;
 
   template <typename Iterable>
-  Try<Resources> apply(const Iterable& operations) const
+  Try<Resources> apply(const Iterable& iterable) const
   {
     Resources result = *this;
 
-    foreach (const Offer::Operation& operation, operations) {
-      Try<Resources> transformed = result.apply(operation);
-      if (transformed.isError()) {
-        return Error(transformed.error());
+    foreach (const auto& t, iterable) {
+      Try<Resources> converted = result.apply(t);
+      if (converted.isError()) {
+        return Error(converted.error());
       }
 
-      result = transformed.get();
+      result = converted.get();
     }
 
     return result;
@@ -663,6 +672,31 @@ hashmap<Key, Resources> operator+(
   return result;
 }
 
+
+/**
+ * Represents a resource conversion, usually as a result of an offer
+ * operation. See more details in `Resources::apply` method.
+ */
+class ResourceConversion
+{
+public:
+  typedef lambda::function<Try<Nothing>(const Resources&)> PostValidation;
+
+  ResourceConversion(
+      const Resources& _consumed,
+      const Resources& _converted,
+      const Option<PostValidation>& _postValidation = None())
+    : consumed(_consumed),
+      converted(_converted),
+      postValidation(_postValidation) {}
+
+  Try<Resources> apply(const Resources& resources) const;
+
+  Resources consumed;
+  Resources converted;
+  Option<PostValidation> postValidation;
+};
+
 } // namespace mesos {
 
 #endif // __RESOURCES_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/e55309ad/include/mesos/v1/resources.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/resources.hpp b/include/mesos/v1/resources.hpp
index 6c2191c..d59fa35 100644
--- a/include/mesos/v1/resources.hpp
+++ b/include/mesos/v1/resources.hpp
@@ -35,6 +35,7 @@
 #include <stout/hashmap.hpp>
 #include <stout/json.hpp>
 #include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
 #include <stout/option.hpp>
 #include <stout/try.hpp>
 
@@ -53,6 +54,10 @@
 namespace mesos {
 namespace v1 {
 
+// Forward declaration.
+class ResourceConversion;
+
+
 // NOTE: Resource objects stored in the class are always valid, are in
 // the "post-reservation-refinement" format, and kept combined if possible.
 // It is the caller's responsibility to validate any Resource object or
@@ -464,26 +469,30 @@ public:
   // example frameworks to leverage.
   Option<Resources> find(const Resources& targets) const;
 
-  // Certain offer operations alter the offered resources. The
-  // following methods provide a convenient way to get the transformed
-  // resources by applying the given offer operation(s). Returns an
-  // Error if the offer operation(s) cannot be applied.
-  Try<Resources> apply(
-      const Offer::Operation& operation,
-      const Option<Resources>& convertedResources = None()) const;
+  // Applies a resource conversion by taking out the `consumed`
+  // resources and adding back the `converted` resources. Returns an
+  // Error if the conversion cannot be applied.
+  Try<Resources> apply(const ResourceConversion& conversion) const;
+
+  // Obtains the conversion from the given operation and applies the
+  // conversion. This method serves a syntax sugar for applying a
+  // resource conversion.
+  // TODO(jieyu): Consider remove this method once we updated all the
+  // call sites.
+  Try<Resources> apply(const Offer::Operation& operation) const;
 
   template <typename Iterable>
-  Try<Resources> apply(const Iterable& operations) const
+  Try<Resources> apply(const Iterable& iterable) const
   {
     Resources result = *this;
 
-    foreach (const Offer::Operation& operation, operations) {
-      Try<Resources> transformed = result.apply(operation);
-      if (transformed.isError()) {
-        return Error(transformed.error());
+    foreach (const auto& t, iterable) {
+      Try<Resources> converted = result.apply(t);
+      if (converted.isError()) {
+        return Error(converted.error());
       }
 
-      result = transformed.get();
+      result = converted.get();
     }
 
     return result;
@@ -663,6 +672,31 @@ hashmap<Key, Resources> operator+(
   return result;
 }
 
+
+/**
+ * Represents a resource conversion, usually as a result of an offer
+ * operation. See more details in `Resources::apply` method.
+ */
+class ResourceConversion
+{
+public:
+  typedef lambda::function<Try<Nothing>(const Resources&)> PostValidation;
+
+  ResourceConversion(
+      const Resources& _consumed,
+      const Resources& _converted,
+      const Option<PostValidation>& _postValidation = None())
+    : consumed(_consumed),
+      converted(_converted),
+      postValidation(_postValidation) {}
+
+  Try<Resources> apply(const Resources& resources) const;
+
+  Resources consumed;
+  Resources converted;
+  Option<PostValidation> postValidation;
+};
+
 } // namespace v1 {
 } // namespace mesos {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e55309ad/src/common/resources.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources.cpp b/src/common/resources.cpp
index 914d3e2..4ccfdc1 100644
--- a/src/common/resources.cpp
+++ b/src/common/resources.cpp
@@ -1623,277 +1623,35 @@ Option<Resources> Resources::find(const Resources& targets)
const
 }
 
 
-Try<Resources> Resources::apply(
-    const Offer::Operation& operation,
-    const Option<Resources>& convertedResources) const
+Try<Resources> Resources::apply(const ResourceConversion& conversion) const
 {
-  Resources result = *this;
-
-  switch (operation.type()) {
-    case Offer::Operation::LAUNCH:
-      return Error("Cannot apply LAUNCH Operation");
-
-    case Offer::Operation::LAUNCH_GROUP:
-      return Error("Cannot apply LAUNCH_GROUP Operation");
-
-    case Offer::Operation::RESERVE: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for RESERVE Operation");
-      }
-
-      Option<Error> error = validate(operation.reserve().resources());
-      if (error.isSome()) {
-        return Error("Invalid RESERVE Operation: " + error->message);
-      }
-
-      foreach (const Resource& reserved, operation.reserve().resources()) {
-        if (!Resources::isReserved(reserved)) {
-          return Error("Invalid RESERVE Operation: Resource must be reserved");
-        } else if (!Resources::isDynamicallyReserved(reserved)) {
-          return Error(
-              "Invalid RESERVE Operation: Resource must be"
-              " dynamically reserved");
-        }
-
-        // Note that we only allow "pushing" a single reservation at time.
-        Resources resources = Resources(reserved).popReservation();
-
-        if (!result.contains(resources)) {
-          return Error("Invalid RESERVE Operation: " + stringify(result) +
-                       " does not contain " + stringify(resources));
-        }
-
-        result -= resources;
-        result.add(reserved);
-      }
-      break;
-    }
-
-    case Offer::Operation::UNRESERVE: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for UNRESERVE Operation");
-      }
-
-      Option<Error> error = validate(operation.unreserve().resources());
-      if (error.isSome()) {
-        return Error("Invalid UNRESERVE Operation: " + error->message);
-      }
-
-      foreach (const Resource& reserved, operation.unreserve().resources()) {
-        if (!Resources::isReserved(reserved)) {
-          return Error("Invalid UNRESERVE Operation: Resource is not reserved");
-        } else if (!Resources::isDynamicallyReserved(reserved)) {
-          return Error(
-              "Invalid UNRESERVE Operation: Resource is not"
-              " dynamically reserved");
-        }
-
-        if (!result.contains(reserved)) {
-          return Error("Invalid UNRESERVE Operation: " + stringify(result) +
-                       " does not contain " + stringify(reserved));
-        }
-
-        // Note that we only allow "popping" a single reservation at time.
-        Resources resources = Resources(reserved).popReservation();
-
-        result.subtract(reserved);
-        result += resources;
-      }
-      break;
-    }
-
-    case Offer::Operation::CREATE: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for CREATE Operation");
-      }
-
-      Option<Error> error = validate(operation.create().volumes());
-      if (error.isSome()) {
-        return Error("Invalid CREATE Operation: " + error->message);
-      }
-
-      foreach (const Resource& volume, operation.create().volumes()) {
-        if (!volume.has_disk()) {
-          return Error("Invalid CREATE Operation: Missing 'disk'");
-        } else if (!volume.disk().has_persistence()) {
-          return Error("Invalid CREATE Operation: Missing 'persistence'");
-        }
-
-        // Strip persistence and volume from the disk info so that we
-        // can subtract it from the original resources.
-        // TODO(jieyu): Non-persistent volumes are not supported for
-        // now. Persistent volumes can only be be created from regular
-        // disk resources. Revisit this once we start to support
-        // non-persistent volumes.
-        Resource stripped = volume;
-
-        if (stripped.disk().has_source()) {
-          stripped.mutable_disk()->clear_persistence();
-          stripped.mutable_disk()->clear_volume();
-        } else {
-          stripped.clear_disk();
-        }
-
-        // Since we only allow persistent volumes to be shared, the
-        // original resource must be non-shared.
-        stripped.clear_shared();
-
-        if (!result.contains(stripped)) {
-          return Error("Invalid CREATE Operation: Insufficient disk resources"
-                       " for persistent volume " + stringify(volume));
-        }
-
-        result.subtract(stripped);
-        result.add(volume);
-      }
-      break;
-    }
-
-    case Offer::Operation::DESTROY: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for DESTROY Operation");
-      }
-
-      Option<Error> error = validate(operation.destroy().volumes());
-      if (error.isSome()) {
-        return Error("Invalid DESTROY Operation: " + error->message);
-      }
-
-      foreach (const Resource& volume, operation.destroy().volumes()) {
-        if (!volume.has_disk()) {
-          return Error("Invalid DESTROY Operation: Missing 'disk'");
-        } else if (!volume.disk().has_persistence()) {
-          return Error("Invalid DESTROY Operation: Missing 'persistence'");
-        }
-
-        if (!result.contains(volume)) {
-          return Error(
-              "Invalid DESTROY Operation: Persistent volume does not exist");
-        }
-
-        result.subtract(volume);
-
-        if (result.contains(volume)) {
-          return Error(
-              "Invalid DESTROY Operation: Persistent volume " +
-              stringify(volume) + " cannot be removed due to additional " +
-              "shared copies");
-        }
-
-        // Strip persistence and volume from the disk info so that we
-        // can subtract it from the original resources.
-        Resource stripped = volume;
-
-        if (stripped.disk().has_source()) {
-          stripped.mutable_disk()->clear_persistence();
-          stripped.mutable_disk()->clear_volume();
-        } else {
-          stripped.clear_disk();
-        }
-
-        // Since we only allow persistent volumes to be shared, we
-        // return the resource to non-shared state after destroy.
-        stripped.clear_shared();
-
-        result.add(stripped);
-      }
-      break;
-    }
-
-    case Offer::Operation::CREATE_VOLUME: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for CREATE_VOLUME Operation");
-      }
-
-      const Resource& consumed = operation.create_volume().source();
-
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid CREATE_VOLUME Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
-
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
-
-    case Offer::Operation::DESTROY_VOLUME: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for DESTROY_VOLUME Operation");
-      }
-
-      const Resource& consumed = operation.destroy_volume().volume();
-
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid DESTROY_VOLUME Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
-
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
-
-    case Offer::Operation::CREATE_BLOCK: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for CREATE_BLOCK Operation");
-      }
-
-      const Resource& consumed = operation.create_block().source();
-
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid CREATE_BLOCK Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
-
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
-
-    case Offer::Operation::DESTROY_BLOCK: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for DESTROY_BLOCK Operation");
-      }
+  return conversion.apply(*this);
+}
 
-      const Resource& consumed = operation.destroy_block().block();
 
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid DESTROY_BLOCK Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
+Try<Resources> Resources::apply(const Offer::Operation& operation) const
+{
+  Try<vector<ResourceConversion>> conversions =
+    getResourceConversions(operation);
 
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
+  if (conversions.isError()) {
+    return Error("Cannot get conversions: " + conversions.error());
+  }
 
-    case Offer::Operation::UNKNOWN:
-      return Error("Unknown offer operation");
+  Try<Resources> result = apply(conversions.get());
+  if (result.isError()) {
+    return Error(result.error());
   }
 
-  // The following are sanity checks to ensure the amount of each type of
-  // resource does not change.
+  // The following are sanity checks to ensure the amount of each type
+  // of resource does not change.
   // TODO(jieyu): Currently, we only check known resource types like
   // cpus, gpus, mem, disk, ports, etc. We should generalize this.
-
-  CHECK(result.cpus() == cpus());
-  CHECK(result.gpus() == gpus());
-  CHECK(result.mem() == mem());
-  CHECK(result.disk() == disk());
-  CHECK(result.ports() == ports());
+  CHECK(result->cpus() == cpus());
+  CHECK(result->gpus() == gpus());
+  CHECK(result->mem() == mem());
+  CHECK(result->disk() == disk());
+  CHECK(result->ports() == ports());
 
   return result;
 }
@@ -2525,4 +2283,28 @@ ostream& operator<<(
   return stream << JSON::protobuf(resources);
 }
 
+
+Try<Resources> ResourceConversion::apply(const Resources& resources) const
+{
+  Resources result = resources;
+
+  if (!result.contains(consumed)) {
+    return Error(
+        stringify(result) + " does not contain " +
+        stringify(consumed));
+  }
+
+  result -= consumed;
+  result += converted;
+
+  if (postValidation.isSome()) {
+    Try<Nothing> validation = postValidation.get()(result);
+    if (validation.isError()) {
+      return Error(validation.error());
+    }
+  }
+
+  return result;
+}
+
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e55309ad/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
index 8304da4..7c48704 100644
--- a/src/common/resources_utils.cpp
+++ b/src/common/resources_utils.cpp
@@ -19,6 +19,8 @@
 
 #include "common/resources_utils.hpp"
 
+using std::vector;
+
 using google::protobuf::RepeatedPtrField;
 
 namespace mesos {
@@ -91,6 +93,135 @@ Try<Resources> applyCheckpointedResources(
 }
 
 
+namespace internal {
+
+// NOTE: Use template here so that it works for both internal and v1.
+template <typename TResources,
+          typename TResource,
+          typename TResourceConversion,
+          typename TOfferOperation>
+Try<vector<TResourceConversion>> getResourceConversions(
+    const TOfferOperation& operation)
+{
+  vector<TResourceConversion> conversions;
+
+  switch (operation.type()) {
+    case TOfferOperation::UNKNOWN:
+      return Error("Unknown offer operation");
+
+    case TOfferOperation::LAUNCH:
+    case TOfferOperation::LAUNCH_GROUP:
+    case TOfferOperation::CREATE_VOLUME:
+    case TOfferOperation::DESTROY_VOLUME:
+    case TOfferOperation::CREATE_BLOCK:
+    case TOfferOperation::DESTROY_BLOCK:
+      return Error("Offer operation not supported");
+
+    case TOfferOperation::RESERVE: {
+      foreach (const TResource& reserved, operation.reserve().resources()) {
+        // Note that we only allow "pushing" a single reservation at time.
+        TResources consumed = TResources(reserved).popReservation();
+        conversions.emplace_back(consumed, reserved);
+      }
+      break;
+    }
+
+    case TOfferOperation::UNRESERVE: {
+      foreach (const TResource& reserved, operation.unreserve().resources()) {
+        // Note that we only allow "popping" a single reservation at time.
+        TResources converted = TResources(reserved).popReservation();
+        conversions.emplace_back(reserved, converted);
+      }
+      break;
+    }
+
+    case TOfferOperation::CREATE: {
+      foreach (const TResource& volume, operation.create().volumes()) {
+        // Strip persistence and volume from the disk info so that we
+        // can subtract it from the original resources.
+        // TODO(jieyu): Non-persistent volumes are not supported for
+        // now. Persistent volumes can only be be created from regular
+        // disk resources. Revisit this once we start to support
+        // non-persistent volumes.
+        TResource stripped = volume;
+
+        if (stripped.disk().has_source()) {
+          stripped.mutable_disk()->clear_persistence();
+          stripped.mutable_disk()->clear_volume();
+        } else {
+          stripped.clear_disk();
+        }
+
+        // Since we only allow persistent volumes to be shared, the
+        // original resource must be non-shared.
+        stripped.clear_shared();
+
+        conversions.emplace_back(stripped, volume);
+      }
+      break;
+    }
+
+    case TOfferOperation::DESTROY: {
+      foreach (const TResource& volume, operation.destroy().volumes()) {
+        // Strip persistence and volume from the disk info so that we
+        // can subtract it from the original resources.
+        TResource stripped = volume;
+
+        if (stripped.disk().has_source()) {
+          stripped.mutable_disk()->clear_persistence();
+          stripped.mutable_disk()->clear_volume();
+        } else {
+          stripped.clear_disk();
+        }
+
+        // Since we only allow persistent volumes to be shared, we
+        // return the resource to non-shared state after destroy.
+        stripped.clear_shared();
+
+        conversions.emplace_back(
+            volume,
+            stripped,
+            [volume](const TResources& resources) -> Try<Nothing> {
+              if (resources.contains(volume)) {
+                return Error(
+                  "Persistent volume " + stringify(volume) + " cannot be "
+                  "removed due to additional shared copies");
+              }
+              return Nothing();
+            });
+      }
+      break;
+    }
+  }
+
+  return conversions;
+}
+
+} // namespace internal {
+
+
+Try<vector<ResourceConversion>> getResourceConversions(
+    const Offer::Operation& operation)
+{
+  return internal::getResourceConversions<
+      Resources,
+      Resource,
+      ResourceConversion,
+      Offer::Operation>(operation);
+}
+
+
+Try<vector<v1::ResourceConversion>> getResourceConversions(
+    const v1::Offer::Operation& operation)
+{
+  return internal::getResourceConversions<
+      v1::Resources,
+      v1::Resource,
+      v1::ResourceConversion,
+      v1::Offer::Operation>(operation);
+}
+
+
 Result<ResourceProviderID> getResourceProviderId(
     const Offer::Operation& operation)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e55309ad/src/common/resources_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.hpp b/src/common/resources_utils.hpp
index b8c29b3..5b74ff2 100644
--- a/src/common/resources_utils.hpp
+++ b/src/common/resources_utils.hpp
@@ -24,6 +24,9 @@
 #include <mesos/mesos.hpp>
 #include <mesos/resources.hpp>
 
+#include <mesos/v1/mesos.hpp>
+#include <mesos/v1/resources.hpp>
+
 #include <stout/error.hpp>
 #include <stout/nothing.hpp>
 #include <stout/option.hpp>
@@ -55,6 +58,20 @@ Result<ResourceProviderID> getResourceProviderId(
     const Offer::Operation& operation);
 
 
+// Returns the resource conversions from the given offer operation.
+// This helper assumes that the given operation has already been
+// validated.
+Try<std::vector<ResourceConversion>> getResourceConversions(
+    const Offer::Operation& operation);
+
+
+// Returns the resource conversions from the given offer operation.
+// This helper assumes that the given operation has already been
+// validated.
+Try<std::vector<v1::ResourceConversion>> getResourceConversions(
+    const v1::Offer::Operation& operation);
+
+
 // Resource format options to be used with the `convertResourceFormat` function.
 //
 // The preconditions of the options are asymmetric, centered around the

http://git-wip-us.apache.org/repos/asf/mesos/blob/e55309ad/src/v1/resources.cpp
----------------------------------------------------------------------
diff --git a/src/v1/resources.cpp b/src/v1/resources.cpp
index 58568b8..43d9b0f 100644
--- a/src/v1/resources.cpp
+++ b/src/v1/resources.cpp
@@ -39,6 +39,8 @@
 #include <stout/strings.hpp>
 #include <stout/unreachable.hpp>
 
+#include "common/resources_utils.hpp"
+
 using std::map;
 using std::ostream;
 using std::set;
@@ -1654,275 +1656,35 @@ Option<Resources> Resources::find(const Resources& targets)
const
 }
 
 
-Try<Resources> Resources::apply(
-    const Offer::Operation& operation,
-    const Option<Resources>& convertedResources) const
+Try<Resources> Resources::apply(const ResourceConversion& conversion) const
 {
-  Resources result = *this;
-
-  switch (operation.type()) {
-    case Offer::Operation::LAUNCH:
-      return Error("Cannot apply LAUNCH Operation");
-
-    case Offer::Operation::LAUNCH_GROUP:
-      return Error("Cannot apply LAUNCH_GROUP Operation");
-
-    case Offer::Operation::RESERVE: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for RESERVE Operation");
-      }
-
-      Option<Error> error = validate(operation.reserve().resources());
-      if (error.isSome()) {
-        return Error("Invalid RESERVE Operation: " + error->message);
-      }
-
-      foreach (const Resource& reserved, operation.reserve().resources()) {
-        if (!Resources::isReserved(reserved)) {
-          return Error("Invalid RESERVE Operation: Resource must be reserved");
-        } else if (!Resources::isDynamicallyReserved(reserved)) {
-          return Error(
-              "Invalid RESERVE Operation: Resource must be"
-              " dynamically reserved");
-        }
-
-        Resources resources = Resources(reserved).popReservation();
-
-        if (!result.contains(resources)) {
-          return Error("Invalid RESERVE Operation: " + stringify(result) +
-                       " does not contain " + stringify(resources));
-        }
-
-        result -= resources;
-        result.add(reserved);
-      }
-      break;
-    }
-
-    case Offer::Operation::UNRESERVE: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for UNRESERVE Operation");
-      }
-
-      Option<Error> error = validate(operation.unreserve().resources());
-      if (error.isSome()) {
-        return Error("Invalid UNRESERVE Operation: " + error->message);
-      }
-
-      foreach (const Resource& reserved, operation.unreserve().resources()) {
-        if (!Resources::isReserved(reserved)) {
-          return Error("Invalid UNRESERVE Operation: Resource is not reserved");
-        } else if (!Resources::isDynamicallyReserved(reserved)) {
-          return Error(
-              "Invalid UNRESERVE Operation: Resource is not"
-              " dynamically reserved");
-        }
-
-        if (!result.contains(reserved)) {
-          return Error("Invalid UNRESERVE Operation: " + stringify(result) +
-                       " does not contain " + stringify(reserved));
-        }
-
-        Resources resources = Resources(reserved).popReservation();
-
-        result.subtract(reserved);
-        result += resources;
-      }
-      break;
-    }
-
-    case Offer::Operation::CREATE: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for CREATE Operation");
-      }
-
-      Option<Error> error = validate(operation.create().volumes());
-      if (error.isSome()) {
-        return Error("Invalid CREATE Operation: " + error->message);
-      }
-
-      foreach (const Resource& volume, operation.create().volumes()) {
-        if (!volume.has_disk()) {
-          return Error("Invalid CREATE Operation: Missing 'disk'");
-        } else if (!volume.disk().has_persistence()) {
-          return Error("Invalid CREATE Operation: Missing 'persistence'");
-        }
-
-        // Strip persistence and volume from the disk info so that we
-        // can subtract it from the original resources.
-        // TODO(jieyu): Non-persistent volumes are not supported for
-        // now. Persistent volumes can only be be created from regular
-        // disk resources. Revisit this once we start to support
-        // non-persistent volumes.
-        Resource stripped = volume;
-
-        if (stripped.disk().has_source()) {
-          stripped.mutable_disk()->clear_persistence();
-          stripped.mutable_disk()->clear_volume();
-        } else {
-          stripped.clear_disk();
-        }
-
-        // Since we only allow persistent volumes to be shared, the
-        // original resource must be non-shared.
-        stripped.clear_shared();
-
-        if (!result.contains(stripped)) {
-          return Error("Invalid CREATE Operation: Insufficient disk resources"
-                       " for persistent volume " + stringify(volume));
-        }
-
-        result.subtract(stripped);
-        result.add(volume);
-      }
-      break;
-    }
-
-    case Offer::Operation::DESTROY: {
-      if (convertedResources.isSome()) {
-        return Error(
-            "Converted resources not expected for DESTROY Operation");
-      }
-
-      Option<Error> error = validate(operation.destroy().volumes());
-      if (error.isSome()) {
-        return Error("Invalid DESTROY Operation: " + error->message);
-      }
-
-      foreach (const Resource& volume, operation.destroy().volumes()) {
-        if (!volume.has_disk()) {
-          return Error("Invalid DESTROY Operation: Missing 'disk'");
-        } else if (!volume.disk().has_persistence()) {
-          return Error("Invalid DESTROY Operation: Missing 'persistence'");
-        }
-
-        if (!result.contains(volume)) {
-          return Error(
-              "Invalid DESTROY Operation: Persistent volume does not exist");
-        }
-
-        result.subtract(volume);
-
-        if (result.contains(volume)) {
-          return Error(
-              "Invalid DESTROY Operation: Persistent volume " +
-              stringify(volume) + " cannot be removed due to additional " +
-              "shared copies");
-        }
-
-        // Strip persistence and volume from the disk info so that we
-        // can subtract it from the original resources.
-        Resource stripped = volume;
-
-        if (stripped.disk().has_source()) {
-          stripped.mutable_disk()->clear_persistence();
-          stripped.mutable_disk()->clear_volume();
-        } else {
-          stripped.clear_disk();
-        }
-
-        // Since we only allow persistent volumes to be shared, we
-        // return the resource to non-shared state after destroy.
-        stripped.clear_shared();
-
-        result.add(stripped);
-      }
-      break;
-    }
-
-    case Offer::Operation::CREATE_VOLUME: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for CREATE_VOLUME Operation");
-      }
-
-      const Resource& consumed = operation.create_volume().source();
-
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid CREATE_VOLUME Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
-
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
-
-    case Offer::Operation::DESTROY_VOLUME: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for DESTROY_VOLUME Operation");
-      }
-
-      const Resource& consumed = operation.destroy_volume().volume();
-
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid DESTROY_VOLUME Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
-
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
-
-    case Offer::Operation::CREATE_BLOCK: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for CREATE_BLOCK Operation");
-      }
-
-      const Resource& consumed = operation.create_block().source();
-
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid CREATE_BLOCK Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
-
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
-
-    case Offer::Operation::DESTROY_BLOCK: {
-      if (convertedResources.isNone()) {
-        return Error(
-            "Converted resources not specified for DESTROY_BLOCK Operation");
-      }
+  return conversion.apply(*this);
+}
 
-      const Resource& consumed = operation.destroy_block().block();
 
-      if (!result.contains(consumed)) {
-        return Error(
-            "Invalid DESTROY_BLOCK Operation: " + stringify(result) +
-            " does not contain " + stringify(consumed));
-      }
+Try<Resources> Resources::apply(const Offer::Operation& operation) const
+{
+  Try<vector<ResourceConversion>> conversions =
+    getResourceConversions(operation);
 
-      result.subtract(consumed);
-      result += convertedResources.get();
-      break;
-    }
+  if (conversions.isError()) {
+    return Error("Cannot get conversions: " + conversions.error());
+  }
 
-    case Offer::Operation::UNKNOWN:
-      return Error("Unknown offer operation");
+  Try<Resources> result = apply(conversions.get());
+  if (result.isError()) {
+    return Error(result.error());
   }
 
-  // The following are sanity checks to ensure the amount of each type of
-  // resource does not change.
+  // The following are sanity checks to ensure the amount of each type
+  // of resource does not change.
   // TODO(jieyu): Currently, we only check known resource types like
   // cpus, gpus, mem, disk, ports, etc. We should generalize this.
-
-  CHECK(result.cpus() == cpus());
-  CHECK(result.gpus() == gpus());
-  CHECK(result.mem() == mem());
-  CHECK(result.disk() == disk());
-  CHECK(result.ports() == ports());
+  CHECK(result->cpus() == cpus());
+  CHECK(result->gpus() == gpus());
+  CHECK(result->mem() == mem());
+  CHECK(result->disk() == disk());
+  CHECK(result->ports() == ports());
 
   return result;
 }
@@ -2554,5 +2316,29 @@ ostream& operator<<(
   return stream << JSON::protobuf(resources);
 }
 
+
+Try<Resources> ResourceConversion::apply(const Resources& resources) const
+{
+  Resources result = resources;
+
+  if (!result.contains(consumed)) {
+    return Error(
+        stringify(result) + " does not contain " +
+        stringify(consumed));
+  }
+
+  result -= consumed;
+  result += converted;
+
+  if (postValidation.isSome()) {
+    Try<Nothing> validation = postValidation.get()(result);
+    if (validation.isError()) {
+      return Error(validation.error());
+    }
+  }
+
+  return result;
+}
+
 } // namespace v1 {
 } // namespace mesos {


Mime
View raw message