mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vinodk...@apache.org
Subject svn commit: r1457101 - in /incubator/mesos/trunk/src: master/master.cpp master/master.hpp tests/slave_recovery_tests.cpp
Date Fri, 15 Mar 2013 20:37:56 GMT
Author: vinodkone
Date: Fri Mar 15 20:37:56 2013
New Revision: 1457101

URL: http://svn.apache.org/r1457101
Log:
Fixed master to properly remove a non-checkpointing framework when
a checkpointing slave exits.

Review: https://reviews.apache.org/r/9945

Modified:
    incubator/mesos/trunk/src/master/master.cpp
    incubator/mesos/trunk/src/master/master.hpp
    incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1457101&r1=1457100&r2=1457101&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Fri Mar 15 20:37:56 2013
@@ -501,35 +501,67 @@ void Master::exited(const UPID& pid)
                 << failoverTimeout << " to failover";
 
       // Delay dispatching a message to ourselves for the timeout.
-      delay(failoverTimeout, self(),
-            &Master::frameworkFailoverTimeout,
-            framework->id, framework->reregisteredTime);
+    delay(failoverTimeout,
+          self(),
+          &Master::frameworkFailoverTimeout,
+          framework->id,
+          framework->reregisteredTime);
 
       // Remove the framework's offers.
       foreach (Offer* offer, utils::copy(framework->offers)) {
-        allocator->resourcesRecovered(offer->framework_id(),
-                                      offer->slave_id(),
-                                      Resources(offer->resources()));
+        allocator->resourcesRecovered(
+            offer->framework_id(),
+            offer->slave_id(),
+            Resources(offer->resources()));
+
         removeOffer(offer);
       }
       return;
     }
   }
 
+  // The semantics when a slave gets disconnected are as follows:
+  // 1) If the slave is not checkpointing, the slave is immediately
+  //    removed and all tasks running on it are transitioned to LOST.
+  //    No resources are recovered, because the slave is removed.
+  // 2) If the slave is checkpointing, the frameworks running on it
+  //    fall into one of the 2 cases:
+  //    2.1) Framework is checkpointing: No immediate action is taken.
+  //         The slave is given a chance to reconnect until the slave
+  //         observer times out (75s) and removes the slave (Case 1).
+  //    2.2) Framework is not-checkpointing: The slave is not removed
+  //         but the framework is removed from the slave's structs,
+  //         its tasks transitioned to LOST and resources recovered.
   foreachvalue (Slave* slave, slaves) {
     if (slave->pid == pid) {
       LOG(INFO) << "Slave " << slave->id << "(" << slave->info.hostname()
                 << ") disconnected";
 
       // Remove the slave, if it is not checkpointing.
-      // TODO(vinod): Even if a slave is checkpointing, transition all
-      // tasks of frameworks that have disabled checkpointing.
       if (!slave->info.checkpoint()) {
         LOG(INFO) << "Removing disconnected slave " << slave->id
                   << "(" << slave->info.hostname() << ") "
                   << "because it is not checkpointing!";
         removeSlave(slave);
         return;
+      } else {
+        // If a slave is checkpointing, remove frameworks from this
+        // slave that have disabled checkpointing.
+        hashset<FrameworkID> ids;
+        foreachvalue (Task* task, utils::copy(slave->tasks)) {
+          if (!ids.contains(task->framework_id())) {
+            ids.insert(task->framework_id());
+            Framework* framework = getFramework(task->framework_id());
+            if (framework != NULL && !framework->info.checkpoint()) {
+              LOG(INFO) << "Removing framework " << task->framework_id()
+                        << " from disconnected slave " << slave->id
+                        << "(" << slave->info.hostname() << ") "
+                        << "because it is not checkpointing!";
+
+              removeFramework(slave, framework);
+            }
+          }
+        }
       }
     }
   }
@@ -1760,6 +1792,68 @@ void Master::removeFramework(Framework* 
 }
 
 
+void Master::removeFramework(Slave* slave, Framework* framework)
+{
+  CHECK_NOTNULL(slave);
+  CHECK_NOTNULL(framework);
+
+  // Remove pointers to framework's tasks in slaves, and send status updates.
+  foreachvalue (Task* task, utils::copy(framework->tasks)) {
+    // A framework might not actually exist because the master failed
+    // over and the framework hasn't reconnected yet. For more info
+    // please see the comments in 'removeFramework(Framework*)'.
+    StatusUpdateMessage message;
+    StatusUpdate* update = message.mutable_update();
+    update->mutable_framework_id()->MergeFrom(task->framework_id());
+
+    if (task->has_executor_id()) {
+      update->mutable_executor_id()->MergeFrom(task->executor_id());
+    }
+
+    update->mutable_slave_id()->MergeFrom(task->slave_id());
+    TaskStatus* status = update->mutable_status();
+    status->mutable_task_id()->MergeFrom(task->task_id());
+    status->set_state(TASK_LOST);
+    status->set_message("Slave " + slave->info.hostname() + " disconnected");
+    update->set_timestamp(Clock::now());
+    update->set_uuid(UUID::random().toBytes());
+    send(framework->pid, message);
+
+    // Remove the task from slave and framework.
+    removeTask(task);
+  }
+
+  // Remove and rescind offers from this slave given to this framework.
+  foreach (Offer* offer, utils::copy(slave->offers)) {
+    if (framework->offers.contains(offer)) {
+      allocator->resourcesRecovered(
+          offer->framework_id(),
+          offer->slave_id(),
+          Resources(offer->resources()));
+
+      // Remove the offer from slave and framework.
+      removeOffer(offer, true); // Rescind.
+    }
+  }
+
+  // Remove the framework's executors from the slave and framework
+  // for proper resource accounting.
+  if (slave->executors.contains(framework->id)) {
+    foreachkey (const ExecutorID& executorId,
+                utils::copy(slave->executors[framework->id])) {
+
+      allocator->resourcesRecovered(
+          framework->id,
+          slave->id,
+          slave->executors[framework->id][executorId].resources());
+
+      framework->removeExecutor(slave->id, executorId);
+      slave->removeExecutor(framework->id, executorId);
+    }
+  }
+}
+
+
 void Master::addSlave(Slave* slave, bool reregister)
 {
   CHECK(slave != NULL);
@@ -1869,6 +1963,7 @@ void Master::removeSlave(Slave* slave)
   // Remove pointers to slave's tasks in frameworks, and send status updates
   foreachvalue (Task* task, utils::copy(slave->tasks)) {
     Framework* framework = getFramework(task->framework_id());
+
     // A framework might not actually exist because the master failed
     // over and the framework hasn't reconnected. This can be a tricky
     // situation for frameworks that want to have high-availability,
@@ -1956,9 +2051,8 @@ void Master::removeTask(Task* task)
   slave->removeTask(task);
 
   // Tell the allocator about the recovered resources.
-  allocator->resourcesRecovered(task->framework_id(),
-                                task->slave_id(),
-                                Resources(task->resources()));
+  allocator->resourcesRecovered(
+      task->framework_id(), task->slave_id(), Resources(task->resources()));
 
   delete task;
 }

Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1457101&r1=1457100&r2=1457101&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Fri Mar 15 20:37:56 2013
@@ -154,6 +154,10 @@ protected:
   // reschedule offers that were assigned to this framework.
   void removeFramework(Framework* framework);
 
+  // Remove a framework from the slave, i.e., kill all of its tasks,
+  // remove its offers and reallocate its resources.
+  void removeFramework(Slave* slave, Framework* framework);
+
   // Add a slave.
   void addSlave(Slave* slave, bool reregister = false);
 

Modified: incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp?rev=1457101&r1=1457100&r2=1457101&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp Fri Mar 15 20:37:56 2013
@@ -141,10 +141,6 @@ public:
   static void SetUpTestCase()
   {
     IsolationTest<T>::SetUpTestCase();
-
-    // Enable checkpointing for the framework.
-    frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-    frameworkInfo.set_checkpoint(true);
   }
 
   virtual void SetUp()
@@ -183,12 +179,17 @@ protected:
     isolationModule = new T();
     s = new Slave(this->slaveFlags, true, isolationModule, &files);
     slave = process::spawn(s);
-
     detector = new BasicMasterDetector(master, slave, true);
+
+    running = true;
   }
 
   void stopSlave(bool shutdown = false)
   {
+    if (!running) {
+      return;
+    }
+
     delete detector;
 
     if (shutdown) {
@@ -198,8 +199,9 @@ protected:
     }
     process::wait(slave);
     delete s;
-
     delete isolationModule;
+
+    running = false;
   }
 
   HierarchicalDRFAllocatorProcess allocator;
@@ -209,17 +211,11 @@ protected:
   Slave* s;
   Files files;
   BasicMasterDetector* detector;
-  MockScheduler sched;
-  TaskStatus status;
   PID<Master> master;
   PID<Slave> slave;
-  static FrameworkInfo frameworkInfo;
+  bool running; // Is the slave running?
 };
 
-// Initialize static members here.
-template <typename T>
-FrameworkInfo SlaveRecoveryTest<T>::frameworkInfo;
-
 
 #ifdef __linux__
 typedef ::testing::Types<ProcessBasedIsolationModule, CgroupsIsolationModule>
@@ -269,21 +265,27 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSla
         Return(false)));
 
   // Scheduler expectations.
+  MockScheduler sched;
   FrameworkID frameworkId;
-  EXPECT_CALL(this->sched, registered(_, _, _))
+  EXPECT_CALL(sched, registered(_, _, _))
     .WillOnce(SaveArg<1>(&frameworkId));
 
   trigger resourceOffersCall;
   vector<Offer> offers;
-  EXPECT_CALL(this->sched, resourceOffers(_, _))
+  EXPECT_CALL(sched, resourceOffers(_, _))
     .WillOnce(DoAll(SaveArg<1>(&offers),
                     Trigger(&resourceOffersCall)))
     .WillRepeatedly(Return());
 
-  EXPECT_CALL(this->sched, statusUpdate(_, _))
+  EXPECT_CALL(sched, statusUpdate(_, _))
     .WillRepeatedly(Return());
 
-  MesosSchedulerDriver driver(&this->sched, this->frameworkInfo, this->master);
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, this->master);
 
   driver.start();
 
@@ -435,25 +437,31 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSta
     .WillRepeatedly(Return(false));
 
   // Scheduler expectations.
+  MockScheduler sched;
   FrameworkID frameworkId;
-  EXPECT_CALL(this->sched, registered(_, _, _))
+  EXPECT_CALL(sched, registered(_, _, _))
     .WillOnce(SaveArg<1>(&frameworkId));
 
   trigger resourceOffersCall;
   vector<Offer> offers;
-  EXPECT_CALL(this->sched, resourceOffers(_, _))
+  EXPECT_CALL(sched, resourceOffers(_, _))
     .WillOnce(DoAll(SaveArg<1>(&offers),
                     Trigger(&resourceOffersCall)))
     .WillRepeatedly(Return());
 
   TaskStatus status;
   trigger statusUpdateCall;
-  EXPECT_CALL(this->sched, statusUpdate(_, _))
+  EXPECT_CALL(sched, statusUpdate(_, _))
     .WillOnce(Return())
     .WillOnce(DoAll(SaveArg<1>(&status), // This is the update after recovery.
                     Trigger(&statusUpdateCall)));
 
-  MesosSchedulerDriver driver(&this->sched, this->frameworkInfo, this->master);
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, this->master);
 
   driver.start();
 
@@ -514,23 +522,29 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectE
       Return(false)));
 
   // Scheduler expectations.
+  MockScheduler sched;
   FrameworkID frameworkId;
-  EXPECT_CALL(this->sched, registered(_, _, _));
+  EXPECT_CALL(sched, registered(_, _, _));
 
   trigger resourceOffersCall;
   vector<Offer> offers;
-  EXPECT_CALL(this->sched, resourceOffers(_, _))
+  EXPECT_CALL(sched, resourceOffers(_, _))
     .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
     .WillRepeatedly(Return());
 
   TaskStatus status;
   trigger statusUpdateCall;
-  EXPECT_CALL(this->sched, statusUpdate(_, _))
+  EXPECT_CALL(sched, statusUpdate(_, _))
     .WillOnce(DoAll(SaveArg<1>(&status), // This is the update after recovery.
                     Trigger(&statusUpdateCall)))
     .WillRepeatedly(Return());
 
-  MesosSchedulerDriver driver(&this->sched, this->frameworkInfo, this->master);
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, this->master);
 
   driver.start();
 
@@ -590,23 +604,29 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnr
         Return(true))); // Drop the executor registration message.
 
   // Scheduler expectations.
+  MockScheduler sched;
   FrameworkID frameworkId;
-  EXPECT_CALL(this->sched, registered(_, _, _));
+  EXPECT_CALL(sched, registered(_, _, _));
 
   trigger resourceOffersCall;
   vector<Offer> offers;
-  EXPECT_CALL(this->sched, resourceOffers(_, _))
+  EXPECT_CALL(sched, resourceOffers(_, _))
     .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
     .WillRepeatedly(Return());
 
   TaskStatus status;
   trigger statusUpdateCall;
-  EXPECT_CALL(this->sched, statusUpdate(_, _))
+  EXPECT_CALL(sched, statusUpdate(_, _))
     .WillOnce(DoAll(SaveArg<1>(&status), // This is the update after recovery.
                     Trigger(&statusUpdateCall)))
     .WillRepeatedly(Return());
 
-  MesosSchedulerDriver driver(&this->sched, this->frameworkInfo, this->master);
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, this->master);
 
   driver.start();
 
@@ -655,23 +675,29 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTer
         Return(false)));
 
   // Scheduler expectations.
+  MockScheduler sched;
   FrameworkID frameworkId;
-  EXPECT_CALL(this->sched, registered(_, _, _));
+  EXPECT_CALL(sched, registered(_, _, _));
 
   trigger resourceOffersCall;
   vector<Offer> offers;
-  EXPECT_CALL(this->sched, resourceOffers(_, _))
+  EXPECT_CALL(sched, resourceOffers(_, _))
     .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
     .WillRepeatedly(Return());
 
   TaskStatus status;
   trigger statusUpdateCall1, statusUpdateCall2;
-  EXPECT_CALL(this->sched, statusUpdate(_, _))
+  EXPECT_CALL(sched, statusUpdate(_, _))
     .WillOnce(Trigger(&statusUpdateCall1))
     .WillOnce(DoAll(SaveArg<1>(&status), // This is the update after recovery.
                     Trigger(&statusUpdateCall2)));
 
-  MesosSchedulerDriver driver(&this->sched, this->frameworkInfo, this->master);
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, this->master);
 
   driver.start();
 
@@ -716,23 +742,29 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTer
 TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
 {
   // Scheduler expectations.
+  MockScheduler sched;
   FrameworkID frameworkId;
-  EXPECT_CALL(this->sched, registered(_, _, _));
+  EXPECT_CALL(sched, registered(_, _, _));
 
   trigger resourceOffersCall;
   vector<Offer> offers;
-  EXPECT_CALL(this->sched, resourceOffers(_, _))
+  EXPECT_CALL(sched, resourceOffers(_, _))
     .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
     .WillRepeatedly(Return());
 
   TaskStatus status;
   trigger statusUpdateCall1, statusUpdateCall2;
-  EXPECT_CALL(this->sched, statusUpdate(_, _))
+  EXPECT_CALL(sched, statusUpdate(_, _))
     .WillOnce(Trigger(&statusUpdateCall1))
     .WillOnce(DoAll(SaveArg<1>(&status), // This is the update after recovery.
                     Trigger(&statusUpdateCall2)));
 
-  MesosSchedulerDriver driver(&this->sched, this->frameworkInfo, this->master);
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, this->master);
 
   driver.start();
 
@@ -745,7 +777,7 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExe
   tasks.push_back(task); // Long-running task.
   driver.launchTasks(offers[0].id(), tasks);
 
-  // Stop the slave before the executor is registered.
+  // Wait for TASK_RUNNING update.
   WAIT_UNTIL(statusUpdateCall1);
 
   sleep(1); // Give enough time for the ACK to be checkpointed.
@@ -763,3 +795,57 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExe
   driver.stop();
   driver.join();
 }
+
+
+// This test checks whether a non-checkpointing framework is
+// properly removed, when a checkpointing slave is disconnected.
+TYPED_TEST(SlaveRecoveryTest, NonCheckpointingFramework)
+{
+  // Scheduler expectations.
+  MockScheduler sched;
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  trigger resourceOffersCall;
+  vector<Offer> offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+    .WillRepeatedly(Return());
+
+  TaskStatus status;
+  trigger statusUpdateCall1, statusUpdateCall2;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(Trigger(&statusUpdateCall1))
+    .WillOnce(DoAll(SaveArg<1>(&status), // Update after slave exited.
+                    Trigger(&statusUpdateCall2)));
+
+  // Disable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(false);
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, this->master);
+
+  driver.start();
+
+  WAIT_UNTIL(resourceOffersCall);
+
+  EXPECT_NE(0u, offers.size());
+
+  TaskInfo task = createTask(offers[0], "sleep 1000");
+  vector<TaskInfo> tasks;
+  tasks.push_back(task); // Long-running task.
+  driver.launchTasks(offers[0].id(), tasks);
+
+  // Wait for TASK_RUNNING update.
+  WAIT_UNTIL(statusUpdateCall1);
+
+  this->stopSlave();
+
+  // Scheduler should receive the TASK_LOST update.
+  WAIT_UNTIL(statusUpdateCall2);
+  ASSERT_EQ(TASK_LOST, status.state());
+
+  driver.stop();
+  driver.join();
+}



Mime
View raw message