mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mp...@apache.org
Subject [4/7] mesos git commit: Fixed `convertResourceFormat` uses with `validateAndUpgradeResources`.
Date Thu, 22 Jun 2017 22:48:18 GMT
Fixed `convertResourceFormat` uses with `validateAndUpgradeResources`.

Initially, it seemed like calling `convertResourceFormat` after
operation validation seemed safe since the operation validation
themselves performed `Resources::validate` within them.

However, the rest of the operation validation code relies on
the fact that the resources have been validated, and uses
functions such as `isDynamicallyReserved`. Since functions such as
`isDynamicallyReserved` now requires "post-reservation-refinement"
format, we must perform this conversion earlier.

In this patch, we use `upgradeResources` to perform resources
validation __and__ convert the resources before going into the
operation and task validation.

We really need a better plan for this going forward. MESOS-7702.

Review: https://reviews.apache.org/r/60283


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6524371c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6524371c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6524371c

Branch: refs/heads/master
Commit: 6524371cafeeaba16af3e7010c8228eac84ad8a4
Parents: 0d9baca
Author: Michael Park <mpark@apache.org>
Authored: Wed Jun 21 03:47:21 2017 -0700
Committer: Michael Park <mpark@apache.org>
Committed: Thu Jun 22 15:47:39 2017 -0700

----------------------------------------------------------------------
 src/master/master.cpp               | 139 ++++++----
 src/tests/master_tests.cpp          | 463 +++++++++++++++++++++++++++++++
 src/tests/resource_offers_tests.cpp |   3 +-
 3 files changed, 555 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6524371c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index b0818e1..9961e70 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4202,8 +4202,8 @@ void Master::_accept(
   CHECK_READY(_authorizations);
   list<Future<bool>> authorizations = _authorizations.get();
 
-  // We iterate by copy here since we call `convertResourceFormat` on it
-  // after validation which modifies the `Operation`.
+  // We iterate by copy here since we call `validateAndUpgradeResources`
+  // on it which modifies the `Operation`.
   foreach (Offer::Operation operation, accept.operations()) {
     switch (operation.type()) {
       // The RESERVE operation allows a principal to reserve resources.
@@ -4231,16 +4231,21 @@ void Master::_accept(
           continue;
         }
 
-        Option<Principal> principal = framework->info.has_principal()
-          ? Principal(framework->info.principal())
-          : Option<Principal>::none();
+        Option<Error> error = validateAndUpgradeResources(
+            operation.mutable_reserve()->mutable_resources());
 
-        // Make sure this reserve operation is valid.
-        Option<Error> error = validation::operation::validate(
-            operation.reserve(),
-            principal,
-            slave->capabilities,
-            framework->info);
+        if (error.isNone()) {
+          Option<Principal> principal = framework->info.has_principal()
+            ? Principal(framework->info.principal())
+            : Option<Principal>::none();
+
+          // Make sure this reserve operation is valid.
+          error = validation::operation::validate(
+              operation.reserve(),
+              principal,
+              slave->capabilities,
+              framework->info);
+        }
 
         if (error.isSome()) {
           drop(
@@ -4250,8 +4255,6 @@ void Master::_accept(
           continue;
         }
 
-        convertResourceFormat(&operation, POST_RESERVATION_REFINEMENT);
-
         // Test the given operation on the included resources.
         Try<Resources> resources = _offeredResources.apply(operation);
         if (resources.isError()) {
@@ -4298,17 +4301,19 @@ void Master::_accept(
           continue;
         }
 
-        // Make sure this unreserve operation is valid.
-        Option<Error> error = validation::operation::validate(
-            operation.unreserve());
+        Option<Error> error = validateAndUpgradeResources(
+            operation.mutable_unreserve()->mutable_resources());
+
+        if (error.isNone()) {
+          // Make sure this unreserve operation is valid.
+          error = validation::operation::validate(operation.unreserve());
+        }
 
         if (error.isSome()) {
           drop(framework, operation, error->message);
           continue;
         }
 
-        convertResourceFormat(&operation, POST_RESERVATION_REFINEMENT);
-
         // Test the given operation on the included resources.
         Try<Resources> resources = _offeredResources.apply(operation);
         if (resources.isError()) {
@@ -4354,17 +4359,22 @@ void Master::_accept(
           continue;
         }
 
-        Option<Principal> principal = framework->info.has_principal()
-          ? Principal(framework->info.principal())
-          : Option<Principal>::none();
-
-        // Make sure this create operation is valid.
-        Option<Error> error = validation::operation::validate(
-            operation.create(),
-            slave->checkpointedResources,
-            principal,
-            slave->capabilities,
-            framework->info);
+        Option<Error> error = validateAndUpgradeResources(
+            operation.mutable_create()->mutable_volumes());
+
+        if (error.isNone()) {
+          Option<Principal> principal = framework->info.has_principal()
+            ? Principal(framework->info.principal())
+            : Option<Principal>::none();
+
+          // Make sure this create operation is valid.
+          error = validation::operation::validate(
+              operation.create(),
+              slave->checkpointedResources,
+              principal,
+              slave->capabilities,
+              framework->info);
+        }
 
         if (error.isSome()) {
           drop(
@@ -4374,8 +4384,6 @@ void Master::_accept(
           continue;
         }
 
-        convertResourceFormat(&operation, POST_RESERVATION_REFINEMENT);
-
         Try<Resources> resources = _offeredResources.apply(operation);
         if (resources.isError()) {
           drop(framework, operation, resources.error());
@@ -4421,20 +4429,23 @@ void Master::_accept(
           continue;
         }
 
-        // Make sure this destroy operation is valid.
-        Option<Error> error = validation::operation::validate(
-            operation.destroy(),
-            slave->checkpointedResources,
-            slave->usedResources,
-            slave->pendingTasks);
+        Option<Error> error = validateAndUpgradeResources(
+            operation.mutable_destroy()->mutable_volumes());
+
+        if (error.isNone()) {
+          // Make sure this destroy operation is valid.
+          error = validation::operation::validate(
+              operation.destroy(),
+              slave->checkpointedResources,
+              slave->usedResources,
+              slave->pendingTasks);
+        }
 
         if (error.isSome()) {
           drop(framework, operation, error->message);
           continue;
         }
 
-        convertResourceFormat(&operation, POST_RESERVATION_REFINEMENT);
-
         // If any offer from this slave contains a volume that needs
         // to be destroyed, we should process it, but we should also
         // rescind those offers.
@@ -4589,13 +4600,20 @@ void Master::_accept(
           Resources available =
             _offeredResources.nonShared() + offeredSharedResources;
 
-          const Option<Error>& validationError = validation::task::validate(
-              task,
-              framework,
-              slave,
-              available);
+          Option<Error> error =
+            validateAndUpgradeResources(task.mutable_resources());
+
+          if (error.isNone() && task.has_executor()) {
+            error = validateAndUpgradeResources(
+                task.mutable_executor()->mutable_resources());
+          }
+
+          if (error.isNone()) {
+            error =
+              validation::task::validate(task, framework, slave, available);
+          }
 
-          if (validationError.isSome()) {
+          if (error.isSome()) {
             const StatusUpdate& update = protobuf::createStatusUpdate(
                 framework->id(),
                 task.slave_id(),
@@ -4603,7 +4621,7 @@ void Master::_accept(
                 TASK_ERROR,
                 TaskStatus::SOURCE_MASTER,
                 None(),
-                validationError.get().message,
+                error->message,
                 TaskStatus::REASON_TASK_INVALID);
 
             metrics->tasks_error++;
@@ -4706,6 +4724,30 @@ void Master::_accept(
           }
         }
 
+        Offer::Operation::LaunchGroup* launchGroup =
+          operation.mutable_launch_group();
+
+        Option<Error> error;
+
+        if (launchGroup->has_executor()) {
+          error = validateAndUpgradeResources(
+              launchGroup->mutable_executor()->mutable_resources());
+        }
+
+        foreach (
+            TaskInfo& task,
+            *launchGroup->mutable_task_group()->mutable_tasks()) {
+          if (error.isSome()) {
+            break;
+          }
+
+          error = validateAndUpgradeResources(task.mutable_resources());
+          if (error.isNone() && task.has_executor()) {
+            error = validateAndUpgradeResources(
+                task.mutable_executor()->mutable_resources());
+          }
+        }
+
         // Note that we do not fill in the `ExecutorInfo.framework_id`
         // since we do not have to support backwards compatibility like
         // in the `Launch` operation case.
@@ -4720,9 +4762,10 @@ void Master::_accept(
         // TODO(anindya_sinha): If task group uses shared resources, this
         // validation needs to be enhanced to accommodate multiple copies
         // of shared resources across tasks within the task group.
-        Option<Error> error =
-          validation::task::group::validate(
+        if (error.isNone()) {
+          error = validation::task::group::validate(
               taskGroup, executor, framework, slave, _offeredResources);
+        }
 
         Option<TaskStatus::Reason> reason = None();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6524371c/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 652f37a..6cd4be7 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -73,6 +73,8 @@
 #include "tests/resources_utils.hpp"
 #include "tests/utils.hpp"
 
+using google::protobuf::RepeatedPtrField;
+
 using mesos::internal::master::Master;
 
 using mesos::internal::master::allocator::MesosAllocatorProcess;
@@ -115,6 +117,7 @@ using testing::Eq;
 using testing::Not;
 using testing::Return;
 using testing::SaveArg;
+using testing::WithParamInterface;
 
 namespace mesos {
 namespace internal {
@@ -7120,6 +7123,466 @@ TEST_F(MasterTest, IgnoreOldAgentReregistration)
   Clock::settle();
 }
 
+
+class MasterTestPrePostReservationRefinement
+  : public MasterTest,
+    public WithParamInterface<bool> {
+public:
+  Resources inboundResources(RepeatedPtrField<Resource> resources)
+  {
+    // If reservation refinement is enabled, inbound resources are already
+    // in the "post-reservation-refinement" format and should not need to
+    // be upgraded.
+    if (GetParam()) {
+      return resources;
+    }
+
+    CHECK_NONE(validateAndUpgradeResources(&resources));
+    return resources;
+  }
+
+  RepeatedPtrField<Resource> outboundResources(
+      RepeatedPtrField<Resource> resources)
+  {
+    // If reservation refinement is enabled, outbound resources are already
+    // in the "post-reservation-refinement" format and should not need to
+    // be downgraded.
+    if (GetParam()) {
+      return resources;
+    }
+
+    CHECK_SOME(downgradeResources(&resources));
+    return resources;
+  }
+};
+
+
+// Parameterized on reservation-refinement.
+INSTANTIATE_TEST_CASE_P(
+    bool,
+    MasterTestPrePostReservationRefinement,
+    ::testing::Values(true, false));
+
+
+// This tests that a framework can launch a task with
+// and without the RESERVATION_REFINEMENT capability.
+TEST_P(MasterTestPrePostReservationRefinement, LaunchTask)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role(DEFAULT_TEST_ROLE);
+
+  // TODO(mpark): Remove this once `RESERVATION_REFINEMENT`
+  // is removed from `DEFAULT_FRAMEWORK_INFO`.
+  frameworkInfo.clear_capabilities();
+
+  if (GetParam()) {
+    frameworkInfo.add_capabilities()->set_type(
+        FrameworkInfo::Capability::RESERVATION_REFINEMENT);
+  }
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers->size());
+  Offer offer = offers->front();
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offer.slave_id());
+  task.mutable_resources()->MergeFrom(offer.resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<Nothing> update;
+  EXPECT_CALL(containerizer, update(_, inboundResources(offer.resources())))
+    .WillOnce(DoAll(FutureSatisfy(&update), Return(Nothing())));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status->state());
+  EXPECT_TRUE(status->has_executor_id());
+  EXPECT_EQ(exec.id, status->executor_id());
+
+  AWAIT_READY(update);
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This tests that a framework can launch a task group
+// with and without the RESERVATION_REFINEMENT capability.
+TEST_P(MasterTestPrePostReservationRefinement, LaunchGroup)
+{
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role(DEFAULT_TEST_ROLE);
+
+  // TODO(mpark): Remove this once `RESERVATION_REFINEMENT`
+  // is removed from `DEFAULT_FRAMEWORK_INFO`.
+  frameworkInfo.clear_capabilities();
+
+  if (GetParam()) {
+    frameworkInfo.add_capabilities()->set_type(
+        v1::FrameworkInfo::Capability::RESERVATION_REFINEMENT);
+  }
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+#ifndef USE_SSL_SOCKET
+  // Disable operator API authentication for the default executor. Executor
+  // authentication currently has SSL as a dependency, so we cannot require
+  // executors to authenticate with the agent operator API if Mesos was not
+  // built with SSL support.
+  flags.authenticate_http_readwrite = false;
+#endif // USE_SSL_SOCKET
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  RepeatedPtrField<Resource> resources =
+    Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::ExecutorInfo executorInfo;
+  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+  executorInfo.mutable_resources()->CopyFrom(
+      evolve<v1::Resource>(outboundResources(resources)));
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID& agentId = offer.agent_id();
+
+  v1::TaskInfo taskInfo = v1::createTask(
+      agentId, evolve<v1::Resource>(resources), SLEEP_COMMAND(1000));
+
+  taskInfo.mutable_resources()->CopyFrom(evolve<v1::Resource>(
+      outboundResources(devolve<Resource>(taskInfo.resources()))));
+
+  v1::TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(taskInfo);
+
+  Future<v1::scheduler::Event::Update> update;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
+
+    v1::Offer::Operation::LaunchGroup* launchGroup =
+      operation->mutable_launch_group();
+
+    launchGroup->mutable_executor()->CopyFrom(executorInfo);
+    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(update);
+
+  EXPECT_EQ(TASK_RUNNING, update->status().state());
+  EXPECT_EQ(taskInfo.task_id(), update->status().task_id());
+  EXPECT_TRUE(update->status().has_timestamp());
+
+  // Ensure that the task sandbox symbolic link is created.
+  EXPECT_TRUE(os::exists(path::join(
+      slave::paths::getExecutorLatestRunPath(
+          flags.work_dir,
+          devolve(agentId),
+          devolve(frameworkId),
+          devolve(executorInfo.executor_id())),
+      "tasks",
+      taskInfo.task_id().value())));
+
+  // Verify that the executor's type is exposed in the agent's state
+  // endpoint.
+  Future<Response> response = process::http::get(
+      slave.get()->pid,
+      "state",
+      None(),
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+  Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+  ASSERT_SOME(parse);
+  JSON::Object state = parse.get();
+
+  EXPECT_SOME_EQ(
+      JSON::String(v1::ExecutorInfo::Type_Name(executorInfo.type())),
+      state.find<JSON::String>("frameworks[0].executors[0].type"));
+}
+
+
+// This tests that a framework can perform the operations
+// RESERVE, CREATE, DESTROY, and UNRESERVE in that order
+// with and without the RESERVATION_REFINEMENT capability.
+TEST_P(MasterTestPrePostReservationRefinement,
+       ReserveCreateLaunchDestroyUnreserve)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role(DEFAULT_TEST_ROLE);
+
+  // TODO(mpark): Remove this once `RESERVATION_REFINEMENT`
+  // is removed from `DEFAULT_FRAMEWORK_INFO`.
+  frameworkInfo.clear_capabilities();
+
+  if (GetParam()) {
+    frameworkInfo.add_capabilities()->set_type(
+        FrameworkInfo::Capability::RESERVATION_REFINEMENT);
+  }
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(5);
+  masterFlags.roles = frameworkInfo.role();
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = "cpus:8;disk:512";
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  Resources unreservedCpus = Resources::parse("cpus:8").get();
+  Resources unreservedDisk = Resources::parse("disk:512").get();
+
+  Resources reservedCpus =
+    unreservedCpus.pushReservation(createDynamicReservationInfo(
+        frameworkInfo.role(), frameworkInfo.principal()));
+
+  Resources reservedDisk =
+    unreservedDisk.pushReservation(createDynamicReservationInfo(
+        frameworkInfo.role(), frameworkInfo.principal()));
+
+  Resources volume = createPersistentVolume(
+      createDiskResource("512", DEFAULT_TEST_ROLE, None(), None()),
+      "id1",
+      "path1",
+      frameworkInfo.principal(),
+      frameworkInfo.principal());
+
+  // We use this to capture offers from 'resourceOffers'.
+  Future<vector<Offer>> offers;
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // The expectation for the first offer.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  // In the first offer, expect an offer with unreserved resources.
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(1u, offers->size());
+  Offer offer = offers->front();
+
+  EXPECT_TRUE(inboundResources(offer.resources())
+                .contains(allocatedResources(
+                    unreservedCpus + unreservedDisk, frameworkInfo.role())));
+
+  // The expectation for the next offer.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  // We don't use the `RESERVE` helper function here currently because it
+  // takes `Resources` as its parameter, and the result of `outboundResources`
+  // may be in the "pre-reservation-refinement" format.
+  Offer::Operation reserve;
+  reserve.set_type(Offer::Operation::RESERVE);
+  reserve.mutable_reserve()->mutable_resources()->CopyFrom(
+      outboundResources(reservedCpus + reservedDisk));
+
+  // Reserve the resources.
+  driver.acceptOffers({offer.id()}, {reserve}, filters);
+
+  // In the next offer, expect an offer with reserved resources.
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(1u, offers->size());
+  offer = offers->front();
+
+  EXPECT_TRUE(inboundResources(offer.resources())
+                .contains(allocatedResources(
+                    reservedCpus + reservedDisk, frameworkInfo.role())));
+
+  // The expectation for the next offer.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  // We don't use the `CREATE` helper function here currently because it
+  // takes `Resources` as its parameter, and the result of `outboundResources`
+  // may be in the "pre-reservation-refinement" format.
+  Offer::Operation create;
+  create.set_type(Offer::Operation::CREATE);
+  create.mutable_create()->mutable_volumes()->CopyFrom(
+      outboundResources(volume));
+
+  // Create a volume.
+  driver.acceptOffers({offer.id()}, {create}, filters);
+
+  // In the next offer, expect an offer with reserved resources.
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(1u, offers->size());
+  offer = offers->front();
+
+  EXPECT_TRUE(inboundResources(offer.resources())
+                .contains(allocatedResources(
+                    reservedCpus + volume, frameworkInfo.role())));
+
+  // The expectation for the next offer.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  // We don't use the `DESTROY` helper function here currently because it
+  // takes `Resources` as its parameter, and the result of `outboundResources`
+  // may be in the "pre-reservation-refinement" format.
+  Offer::Operation destroy;
+  destroy.set_type(Offer::Operation::DESTROY);
+  destroy.mutable_destroy()->mutable_volumes()->CopyFrom(
+      outboundResources(volume));
+
+  // Destroy the volume.
+  driver.acceptOffers({offer.id()}, {destroy}, filters);
+
+  // In the next offer, expect an offer with unreserved resources.
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(1u, offers->size());
+  offer = offers.get()[0];
+
+  EXPECT_TRUE(inboundResources(offer.resources())
+                .contains(allocatedResources(
+                    reservedCpus + reservedDisk, frameworkInfo.role())));
+
+  // The expectation for the next offer.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  // We don't use the `UNRESERVE` helper function here currently because it
+  // takes `Resources` as its parameter, and the result of `outboundResources`
+  // may be in the "pre-reservation-refinement" format.
+  Offer::Operation unreserve;
+  unreserve.set_type(Offer::Operation::UNRESERVE);
+  unreserve.mutable_unreserve()->mutable_resources()->CopyFrom(
+      outboundResources(reservedCpus + reservedDisk));
+
+  // Unreserve the resources.
+  driver.acceptOffers({offer.id()}, {unreserve}, filters);
+
+  // In the next offer, expect an offer with unreserved resources.
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(1u, offers->size());
+  offer = offers.get()[0];
+
+  EXPECT_TRUE(inboundResources(offer.resources())
+                .contains(allocatedResources(
+                    unreservedCpus + unreservedDisk, frameworkInfo.role())));
+
+  driver.stop();
+  driver.join();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/6524371c/src/tests/resource_offers_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp
index c2bbf83..427a652 100644
--- a/src/tests/resource_offers_tests.cpp
+++ b/src/tests/resource_offers_tests.cpp
@@ -255,8 +255,7 @@ TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError)
   EXPECT_EQ(TASK_ERROR, status->state());
   EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status->reason());
   EXPECT_TRUE(status->has_message());
-  EXPECT_TRUE(strings::startsWith(
-        status->message(), "Task uses invalid resources"))
+  EXPECT_TRUE(strings::contains(status->message(), "Invalid scalar resource"))
     << status->message();
 
   MockScheduler sched2;


Mime
View raw message