mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vinodk...@apache.org
Subject [5/6] git commit: Updated master to update task unacknowledged state properly.
Date Tue, 21 Oct 2014 22:47:34 GMT
Updated master to update task unacknowledged state properly.

Review: https://reviews.apache.org/r/26701


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3c4e3fdf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3c4e3fdf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3c4e3fdf

Branch: refs/heads/master
Commit: 3c4e3fdf73fdbb2081e58fe3e9831b15d67bd440
Parents: da66970
Author: Vinod Kone <vinodkone@gmail.com>
Authored: Fri Oct 10 18:50:07 2014 -0700
Committer: Vinod Kone <vinodkone@gmail.com>
Committed: Tue Oct 21 15:47:09 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp      | 94 +++++++++++++++++++++++++++++++----------
 src/master/master.hpp      |  2 +-
 src/tests/master_tests.cpp | 84 ++++++++++++++++++++++++++++++++++++
 3 files changed, 156 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3c4e3fdf/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f04c085..9743eab 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2844,19 +2844,39 @@ void Master::statusUpdateAcknowledgement(
 
   Task* task = slave->getTask(frameworkId, taskId);
 
-  if (task != NULL && protobuf::isTerminalState(task->state())) {
-    removeTask(task);
+  if (task != NULL) {
+    // Status update state and uuid should be either set or unset
+    // together.
+    CHECK_EQ(task->has_status_update_uuid(), task->has_status_update_state());
+
+    if (!task->has_status_update_state()) {
+      // Task should have status update state set because it must have
+      // been set when the update corresponding to this
+      // acknowledgement was processed by the master. But in case this
+      // acknowledgement was intended for the old run of the master
+      // and the task belongs to a 0.20.0 slave, we could be here.
+      // Dropping the acknowledgement is safe because the slave will
+      // retry the update, at which point the master will set the
+      // status update state.
+      LOG(ERROR)
+        << "Ignoring status update acknowledgement message for task " << taskId
+        << " of framework " << *framework << " to slave " << *slave
+        << " because it no update was sent by this master";
+      metrics.invalid_status_update_acknowledgements++;
+      return;
+    }
+
+    // Remove the task once the terminal update is acknowledged.
+    if (protobuf::isTerminalState(task->status_update_state()) &&
+        task->status_update_uuid() == uuid) {
+      removeTask(task);
+     }
   }
 
   LOG(INFO) << "Forwarding status update acknowledgement "
             << UUID::fromBytes(uuid) << " for task " << taskId
             << " of framework " << *framework << " to slave " <<
*slave;
 
-  // TODO(bmahler): Once we store terminal unacknowledged updates in
-  // the master per MESOS-1410, this is where we'll find the
-  // unacknowledged task and remove it if present.
-  // Also, be sure to confirm Master::reconcile is still correct!
-
   StatusUpdateAcknowledgementMessage message;
   message.mutable_slave_id()->CopyFrom(slaveId);
   message.mutable_framework_id()->CopyFrom(frameworkId);
@@ -3294,12 +3314,8 @@ void Master::unregisterSlave(const UPID& from, const SlaveID&
slaveId)
 }
 
 
-// TODO(bmahler): The master will not release resources until the
-// slave receives acknowlegements for all non-terminal updates. This
-// means if a framework is down, the resources will remain allocated
-// even though the tasks are terminal on the slaves!
-// TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid' because
-// the status updates will be sent by the slave.
+// TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid'
+// because the status updates will be sent by the slave.
 void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
 {
   ++metrics.messages_status_update;
@@ -3359,7 +3375,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID&
pid)
 
   LOG(INFO) << "Status update " << update << " from slave " << *slave;
 
-  updateTask(task, update.status());
+  updateTask(task, update);
 
   // If the task is terminal and no acknowledgement is needed,
   // then remove the task now.
@@ -3947,7 +3963,7 @@ void Master::reconcile(
               TASK_LOST,
               "Task is unknown to the slave");
 
-          updateTask(task, update.status());
+          updateTask(task, update);
           removeTask(task);
 
           Framework* framework = getFramework(frameworkId);
@@ -4279,7 +4295,7 @@ void Master::removeFramework(Slave* slave, Framework* framework)
         (task->has_executor_id()
             ? Option<ExecutorID>(task->executor_id()) : None()));
 
-      updateTask(task, update.status());
+      updateTask(task, update);
       removeTask(task);
       forward(update, UPID(), framework);
     }
@@ -4396,7 +4412,7 @@ void Master::removeSlave(Slave* slave)
           (task->has_executor_id() ?
               Option<ExecutorID>(task->executor_id()) : None()));
 
-      updateTask(task, update.status());
+      updateTask(task, update);
       removeTask(task);
 
       updates.push_back(update);
@@ -4495,12 +4511,15 @@ void Master::_removeSlave(
 }
 
 
-void Master::updateTask(Task* task, const TaskStatus& status)
+void Master::updateTask(Task* task, const StatusUpdate& update)
 {
   CHECK_NOTNULL(task);
 
+  // Get the unacknowledged status.
+  const TaskStatus& status  = update.status();
+
   // Out-of-order updates should not occur, however in case they
-  // do (e.g. MESOS-1799), prevent them here to ensure that the
+  // do (e.g., due to bugs), prevent them here to ensure that the
   // resource accounting is not affected.
   if (protobuf::isTerminalState(task->state()) &&
       !protobuf::isTerminalState(status.state())) {
@@ -4511,9 +4530,33 @@ void Master::updateTask(Task* task, const TaskStatus& status)
     return;
   }
 
-  bool terminated =
-    !protobuf::isTerminalState(task->state()) &&
-    protobuf::isTerminalState(status.state());
+  // Get the latest state.
+  Option<TaskState> latestState;
+  if (update.has_latest_state()) {
+    latestState = update.latest_state();
+  }
+
+  // Set 'terminated' to true if this is the first time the task
+  // transitioned to terminal state. Also set the latest state.
+  bool terminated;
+  if (latestState.isSome()) {
+    // This update must be from >= 0.21.0 slave.
+    terminated = !protobuf::isTerminalState(task->state()) &&
+                 protobuf::isTerminalState(latestState.get());
+
+    task->set_state(latestState.get());
+  } else {
+    // This update must be from a pre 0.21.0 slave or generated by the
+    // master.
+    terminated = !protobuf::isTerminalState(task->state()) &&
+                 protobuf::isTerminalState(status.state());
+
+    task->set_state(status.state());
+  }
+
+  // Set the status update state and uuid for the task.
+  task->set_status_update_state(status.state());
+  task->set_status_update_uuid(update.uuid());
 
   // TODO(brenden) Consider wiping the `message` field?
   if (task->statuses_size() > 0 &&
@@ -4530,7 +4573,12 @@ void Master::updateTask(Task* task, const TaskStatus& status)
   // MESOS-1746.
   task->mutable_statuses(task->statuses_size() - 1)->clear_data();
 
-  task->set_state(status.state());
+  LOG(INFO) << "Updating the latest state of task " << task->task_id()
+            << " of framework " << task->framework_id()
+            << " to " << task->state()
+            << (task->state() != status.state()
+                ? " (status update state: " + stringify(status.state()) + ")"
+                : "");
 
   stats.tasks[status.state()]++;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c4e3fdf/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 18898e9..b1a2cd0 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -378,7 +378,7 @@ protected:
 
   // Transitions the task, and recovers resources if the task becomes
   // terminal.
-  void updateTask(Task* task, const TaskStatus& status);
+  void updateTask(Task* task, const StatusUpdate& update);
 
   // Removes the task.
   void removeTask(Task* task);

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c4e3fdf/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index d9dc40c..f60e376 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -62,6 +62,7 @@ using namespace mesos::internal::tests;
 using mesos::internal::master::Master;
 
 using mesos::internal::master::allocator::AllocatorProcess;
+using mesos::internal::master::allocator::HierarchicalDRFAllocatorProcess;
 
 using mesos::internal::slave::GarbageCollectorProcess;
 using mesos::internal::slave::Slave;
@@ -2359,6 +2360,89 @@ TEST_F(MasterTest, UnacknowledgedTerminalTask)
 }
 
 
+// This test ensures that the master releases resources for a
+// terminated task even when it receives a non-terminal update (with
+// latest state set).
+TEST_F(MasterTest, ReleaseResourcesForTerminalTaskWithPendingUpdates)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = "cpus:1;mem:64";
+  Try<PID<Slave> > slave = StartSlave(&containerizer, slaveFlags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 64, "*"))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Drop all the updates from master to scheduler.
+  DROP_PROTOBUFS(StatusUpdateMessage(), master.get(), _);
+
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+  Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+
+  driver.start();
+
+  // Wait until TASK_RUNNING is sent to the master.
+  AWAIT_READY(statusUpdateMessage);
+
+  // Ensure status update manager handles TASK_RUNNING update.
+  AWAIT_READY(__statusUpdate);
+
+  Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate);
+
+  // Now send TASK_FINISHED update.
+  TaskStatus finishedStatus;
+  finishedStatus = statusUpdateMessage.get().update().status();
+  finishedStatus.set_state(TASK_FINISHED);
+  execDriver->sendStatusUpdate(finishedStatus);
+
+  // Ensure status update manager handles TASK_FINISHED update.
+  AWAIT_READY(__statusUpdate2);
+
+  Future<Nothing> resourcesRecovered = FUTURE_DISPATCH(
+      _, &HierarchicalDRFAllocatorProcess::resourcesRecovered);
+
+  // Advance the clock so that the status update manager resends
+  // TASK_RUNNING update with 'latest_state' as TASK_FINISHED.
+  Clock::pause();
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::resume();
+
+  // Ensure the resources are recovered.
+  AWAIT_READY(resourcesRecovered);
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // This test ensures that the web UI of a framework is included in the
 // state.json endpoint, if provided by the framework.
 TEST_F(MasterTest, FrameworkWebUIUrl)


Mime
View raw message