mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vinodk...@apache.org
Subject svn commit: r1459092 - in /incubator/mesos/trunk/src: sched/sched.cpp tests/slave_recovery_tests.cpp
Date Wed, 20 Mar 2013 22:53:15 GMT
Author: vinodkone
Date: Wed Mar 20 22:53:14 2013
New Revision: 1459092

URL: http://svn.apache.org/r1459092
Log:
Added a more comprehensive slave recovery test to confirm that the
restarted slave can communicate with scheduler, executor and master.

Review: https://reviews.apache.org/r/9992

Modified:
    incubator/mesos/trunk/src/sched/sched.cpp
    incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp

Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1459092&r1=1459091&r2=1459092&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Wed Mar 20 22:53:14 2013
@@ -324,9 +324,7 @@ protected:
       return;
     }
 
-    VLOG(1) << "Status update: task " << status.task_id()
-            << " of framework " << update.framework_id()
-            << " is now in state " << status.state();
+    VLOG(1) << "Received status update " << update << " from " <<
pid;
 
     CHECK(framework.id() == update.framework_id());
 
@@ -359,6 +357,8 @@ protected:
       return;
     }
 
+    VLOG(1) << "Sending ACK for status update " << update << " to " <<
pid;
+
     StatusUpdateAcknowledgementMessage message;
     message.mutable_framework_id()->MergeFrom(framework.id());
     message.mutable_slave_id()->MergeFrom(update.slave_id());

Modified: incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp?rev=1459092&r1=1459091&r2=1459092&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp Wed Mar 20 22:53:14 2013
@@ -34,6 +34,7 @@
 
 #include "common/process_utils.hpp"
 #include "common/protobuf_utils.hpp"
+#include "common/resources.hpp"
 
 #include "detector/detector.hpp"
 
@@ -318,7 +319,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSla
 
   EXPECT_EQ(TASK_RUNNING, update.update().status().state());
 
-  // Capture the ack.
+  // Capture the ACK.
   WAIT_UNTIL(statusUpdateAckMsg);
   StatusUpdateAcknowledgementMessage ack;
   ack.ParseFromString(message4.body);
@@ -411,7 +412,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSla
 }
 
 
-// The slave is killed before the ACK for a status update is received.
+// The slave is killed before the update reaches the scheduler.
 // When the slave comes back up it resends the unacknowledged update.
 TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
 {
@@ -424,17 +425,17 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSta
         Trigger(&registerExecutorMsg),
         Return(false)));
 
+  trigger statusUpdateMsg;
+  EXPECT_MESSAGE(Eq(StatusUpdateMessage().GetTypeName()), _, _)
+  .WillOnce(DoAll(Trigger(&statusUpdateMsg),
+                  Return(true))) // Drop the first update from the executor.
+  .WillRepeatedly(Return(false));
+
   trigger updateFrameworkMsg;
   EXPECT_MESSAGE(Eq(UpdateFrameworkMessage().GetTypeName()), _, _)
     .WillOnce(DoAll(Trigger(&updateFrameworkMsg),
                     Return(false)));
 
-  trigger statusUpdateAckMsg;
-  EXPECT_MESSAGE(Eq(StatusUpdateAcknowledgementMessage().GetTypeName()), _, _)
-    .WillOnce(DoAll(Trigger(&statusUpdateAckMsg),
-                    Return(true))) // Drop the first ACK message.
-    .WillRepeatedly(Return(false));
-
   // Scheduler expectations.
   MockScheduler sched;
   FrameworkID frameworkId;
@@ -451,9 +452,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSta
   TaskStatus status;
   trigger statusUpdateCall;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(Return())
     .WillOnce(DoAll(SaveArg<1>(&status), // This is the update after recovery.
-                    Trigger(&statusUpdateCall)));
+                    Trigger(&statusUpdateCall)))
+    .WillRepeatedly(Return());
 
   // Enable checkpointing for the framework.
   FrameworkInfo frameworkInfo;
@@ -477,8 +478,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSta
   WAIT_UNTIL(registerExecutorMsg);
   UPID executorPid = message.from;
 
-  // Capture the ack.
-  WAIT_UNTIL(statusUpdateAckMsg);
+  // Wait for the update.
+  WAIT_UNTIL(statusUpdateMsg);
 
   this->stopSlave();
 
@@ -522,13 +523,13 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectE
 
   // Scheduler expectations.
   MockScheduler sched;
-  FrameworkID frameworkId;
   EXPECT_CALL(sched, registered(_, _, _));
 
   trigger resourceOffersCall;
   vector<Offer> offers;
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+    .WillOnce(DoAll(SaveArg<1>(&offers),
+                    Trigger(&resourceOffersCall)))
     .WillRepeatedly(Return());
 
   TaskStatus status;
@@ -604,13 +605,13 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnr
 
   // Scheduler expectations.
   MockScheduler sched;
-  FrameworkID frameworkId;
   EXPECT_CALL(sched, registered(_, _, _));
 
   trigger resourceOffersCall;
   vector<Offer> offers;
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+    .WillOnce(DoAll(SaveArg<1>(&offers),
+                    Trigger(&resourceOffersCall)))
     .WillRepeatedly(Return());
 
   TaskStatus status;
@@ -675,13 +676,13 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTer
 
   // Scheduler expectations.
   MockScheduler sched;
-  FrameworkID frameworkId;
   EXPECT_CALL(sched, registered(_, _, _));
 
   trigger resourceOffersCall;
   vector<Offer> offers;
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+    .WillOnce(DoAll(SaveArg<1>(&offers),
+                    Trigger(&resourceOffersCall)))
     .WillRepeatedly(Return());
 
   TaskStatus status;
@@ -709,7 +710,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTer
   tasks.push_back(task); // Long-running task.
   driver.launchTasks(offers[0].id(), tasks);
 
-  // Capture the executor's pid.
+  // Capture the executor pid.
   WAIT_UNTIL(registerExecutorMsg);
   UPID executorPid = message.from;
 
@@ -736,19 +737,19 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTer
 
 
 // The slave is stopped after a non-terminal update is received.
-// Slave is restarted in recovery=cleanup mode. It kills the command executor,
-// and transitions the task to FAILED.
+// Slave is restarted in recovery=cleanup mode. It kills the command
+// executor, and transitions the task to FAILED.
 TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
 {
   // Scheduler expectations.
   MockScheduler sched;
-  FrameworkID frameworkId;
   EXPECT_CALL(sched, registered(_, _, _));
 
   trigger resourceOffersCall;
   vector<Offer> offers;
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+    .WillOnce(DoAll(SaveArg<1>(&offers),
+                    Trigger(&resourceOffersCall)))
     .WillRepeatedly(Return());
 
   TaskStatus status;
@@ -807,7 +808,8 @@ TYPED_TEST(SlaveRecoveryTest, RemoveNonC
   trigger resourceOffersCall;
   vector<Offer> offers;
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+    .WillOnce(DoAll(SaveArg<1>(&offers),
+                    Trigger(&resourceOffersCall)))
     .WillRepeatedly(Return());
 
   TaskStatus status;
@@ -862,13 +864,14 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpo
   trigger resourceOffersCall;
   vector<Offer> offers;
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(DoAll(SaveArg<1>(&offers), Trigger(&resourceOffersCall)))
+    .WillOnce(DoAll(SaveArg<1>(&offers),
+                    Trigger(&resourceOffersCall)))
     .WillRepeatedly(Return());
 
   TaskStatus status;
-  trigger statusUpdateCall1, statusUpdateCall2;
+  trigger statusUpdateCall;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(Trigger(&statusUpdateCall1))
+    .WillOnce(Trigger(&statusUpdateCall))
     .WillRepeatedly(Return());
 
   // Disable checkpointing for the framework.
@@ -890,7 +893,7 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpo
   driver.launchTasks(offers[0].id(), tasks);
 
   // Wait for TASK_RUNNING update.
-  WAIT_UNTIL(statusUpdateCall1);
+  WAIT_UNTIL(statusUpdateCall);
 
   // Simulate a 'UpdateFrameworkMessage' to ensure framework pid is
   // not being checkpointed.
@@ -909,3 +912,91 @@ TYPED_TEST(SlaveRecoveryTest, NonCheckpo
   driver.stop();
   driver.join();
 }
+
+
+// Scheduler asks a restarted slave to kill a task that has been
+// running before the slave restarted. This test ensures that a
+// restarted slave is able to communicate with all components
+// (scheduler, master, executor).
+TYPED_TEST(SlaveRecoveryTest, KillTask)
+{
+  // Message expectations.
+  trigger statusUpdateMsg;
+  EXPECT_MESSAGE(Eq(StatusUpdateMessage().GetTypeName()), _, _)
+  .WillOnce(DoAll(Trigger(&statusUpdateMsg),
+                  Return(true))) // Drop the first update from the executor.
+  .WillRepeatedly(Return(false));
+
+  trigger reregisterSlaveMsg;
+  EXPECT_MESSAGE(Eq(ReregisterSlaveMessage().GetTypeName()), _, _)
+    .WillOnce(DoAll(Trigger(&reregisterSlaveMsg),
+                    Return(false)));
+
+  // Scheduler expectations.
+  MockScheduler sched;
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  trigger resourceOffersCall1, resourceOffersCall2;
+  vector<Offer> offers1, offers2;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(DoAll(SaveArg<1>(&offers1),
+                    Trigger(&resourceOffersCall1)))
+    .WillOnce(DoAll(SaveArg<1>(&offers2),
+                    Trigger(&resourceOffersCall2)))
+    .WillRepeatedly(Return());
+
+  TaskStatus status1, status2;
+  trigger statusUpdateCall1, statusUpdateCall2;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(DoAll(SaveArg<1>(&status1), // TASK_RUNNING update.
+                    Trigger(&statusUpdateCall1)))
+    .WillOnce(DoAll(SaveArg<1>(&status2), // TASK_FAILED update.
+                    Trigger(&statusUpdateCall2)));
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, this->master);
+
+  driver.start();
+
+  WAIT_UNTIL(resourceOffersCall1);
+
+  EXPECT_NE(0u, offers1.size());
+
+  TaskInfo task = createTask(offers1[0], "sleep 1000");
+  vector<TaskInfo> tasks;
+  tasks.push_back(task); // Long-running task.
+  driver.launchTasks(offers1[0].id(), tasks);
+
+  // Wait for TASK_RUNNING update.
+  WAIT_UNTIL(statusUpdateMsg);
+
+  // Restart the slave.
+  this->stopSlave();
+  this->startSlave();
+
+  // Wait for the slave to re-register.
+  WAIT_UNTIL(reregisterSlaveMsg);
+
+  // Wait for retried TASK_RUNNING update.
+  WAIT_UNTIL(statusUpdateCall1);
+  ASSERT_EQ(TASK_RUNNING, status1.state());
+
+  // Kill the task.
+  driver.killTask(task.task_id());
+
+  // Wait for TASK_FAILED update.
+  WAIT_UNTIL(statusUpdateCall2);
+  ASSERT_EQ(TASK_FAILED, status2.state());
+
+  // Make sure all slave resources are reoffered.
+  WAIT_UNTIL(resourceOffersCall2);
+  ASSERT_EQ(
+      Resources(offers1[0].resources()), Resources(offers2[0].resources()));
+
+  driver.stop();
+  driver.join();
+}



Mime
View raw message