mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [3/5] git commit: Added some slave recovery DockerContainerizer tests.
Date Sat, 16 Aug 2014 15:26:48 GMT
Added some slave recovery DockerContainerizer tests.

Review: https://reviews.apache.org/r/24765


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fd553815
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fd553815
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fd553815

Branch: refs/heads/master
Commit: fd55381510c96ae7bb48b36cae2bcc329d535382
Parents: 6d50275
Author: Benjamin Hindman <benjamin.hindman@gmail.com>
Authored: Fri Aug 15 20:56:17 2014 -0700
Committer: Benjamin Hindman <benjamin.hindman@gmail.com>
Committed: Sat Aug 16 08:25:39 2014 -0700

----------------------------------------------------------------------
 src/tests/docker_containerizer_tests.cpp | 364 +++++++++++++++++++++++++-
 1 file changed, 354 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fd553815/src/tests/docker_containerizer_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/docker_containerizer_tests.cpp b/src/tests/docker_containerizer_tests.cpp
index 0d7c3b1..3a55f5e 100644
--- a/src/tests/docker_containerizer_tests.cpp
+++ b/src/tests/docker_containerizer_tests.cpp
@@ -20,14 +20,18 @@
 #include <gtest/gtest.h>
 
 #include <process/future.hpp>
+#include <process/gmock.hpp>
 #include <process/subprocess.hpp>
 
 #include "linux/cgroups.hpp"
 
+#include "messages/messages.hpp"
+
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
 
 #include "slave/containerizer/docker.hpp"
+#include "slave/paths.hpp"
 #include "slave/slave.hpp"
 #include "slave/state.hpp"
 
@@ -43,7 +47,9 @@ using mesos::internal::slave::Slave;
 using mesos::internal::slave::DockerContainerizer;
 
 using process::Future;
+using process::Message;
 using process::PID;
+using process::UPID;
 
 using std::vector;
 using std::list;
@@ -51,6 +57,7 @@ using std::string;
 
 using testing::_;
 using testing::DoDefault;
+using testing::Eq;
 using testing::Invoke;
 using testing::Return;
 
@@ -189,7 +196,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch_Executor)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -299,7 +306,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Launch)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -390,7 +397,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Kill)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -618,7 +625,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Update)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -855,7 +862,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Logs)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -962,7 +969,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -1057,7 +1064,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD)
 }
 
 
-// The following test uses a docker image (mesosphere/inky) that has
+// The following test uses a Docker image (mesosphere/inky) that has
 // an entrypoint "echo" and a default command "inky".
 TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
 {
@@ -1070,7 +1077,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -1170,7 +1177,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Override)
 }
 
 
-// The following test uses a docker image (mesosphere/inky) that has
+// The following test uses a Docker image (mesosphere/inky) that has
 // an entrypoint "echo" and a default command "inky".
 TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
 {
@@ -1183,7 +1190,7 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
 
   MockDockerContainerizer dockerContainerizer(flags, docker);
 
-  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer);
+  Try<PID<Slave> > slave = StartSlave(&dockerContainerizer, flags);
   ASSERT_SOME(slave);
 
   MockScheduler sched;
@@ -1282,3 +1289,340 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_Default_CMD_Args)
 
   Shutdown();
 }
+
+
+// The slave is stopped before the first update for a task is received
+// from the executor. When it comes back up we make sure the executor
+// re-registers and the slave properly sends the update.
+TEST_F(DockerContainerizerTest, ROOT_DOCKER_SlaveRecoveryTaskContainer)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  // Setup recovery slave flags.
+  flags.checkpoint = true;
+  flags.recover = "reconnect";
+  flags.strict = true;
+
+  Docker docker = Docker::create(tests::flags.docker, false).get();
+
+  // We put the containerizer on the heap so we can more easily
+  // control it's lifetime, i.e., when we invoke the destructor.
+  MockDockerContainerizer* dockerContainerizer1 =
+    new MockDockerContainerizer(flags, docker);
+
+  Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
+  ASSERT_SOME(slave1);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("busybox");
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+
+  task.mutable_command()->CopyFrom(command);
+  task.mutable_container()->CopyFrom(containerInfo);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<ContainerID> containerId;
+  EXPECT_CALL(*dockerContainerizer1, launch(_, _, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    Invoke(dockerContainerizer1,
+                           &MockDockerContainerizer::_launch)));
+
+  // Drop the first update from the executor.
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, _);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(containerId);
+
+  // Stop the slave before the status update is received.
+  AWAIT_READY(statusUpdateMessage);
+
+  Stop(slave1.get());
+
+  delete dockerContainerizer1;
+
+  Future<Message> reregisterExecutorMessage =
+    FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  MockDockerContainerizer* dockerContainerizer2 =
+    new MockDockerContainerizer(flags, docker);
+
+  Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
+  ASSERT_SOME(slave2);
+
+  // Ensure the executor re-registers.
+  AWAIT_READY(reregisterExecutorMessage);
+  UPID executorPid = reregisterExecutorMessage.get().from;
+
+  ReregisterExecutorMessage reregister;
+  reregister.ParseFromString(reregisterExecutorMessage.get().body);
+
+  // Executor should inform about the unacknowledged update.
+  ASSERT_EQ(1, reregister.updates_size());
+  const StatusUpdate& update = reregister.updates(0);
+  ASSERT_EQ(task.task_id(), update.status().task_id());
+  ASSERT_EQ(TASK_RUNNING, update.status().state());
+
+  // Scheduler should receive the recovered update.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_RUNNING, status.get().state());
+
+  // Make sure the container is still running.
+  Future<list<Docker::Container> > containers =
+    docker.ps(true, slave::DOCKER_NAME_PREFIX);
+
+  AWAIT_READY(containers);
+
+  ASSERT_TRUE(exists(containers.get(), containerId.get()));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+
+  delete dockerContainerizer2;
+}
+
+
+// The slave is stopped before the first update for a task is received
+// from the executor. When it comes back up we make sure the executor
+// re-registers and the slave properly sends the update.
+//
+// TODO(benh): This test is currently disabled because the executor
+// inside the image mesosphere/test-executor does not properly set the
+// executor PID that is uses during registration, so when the new
+// slave recovers it can't reconnect and instead destroys that
+// container. In particular, it uses '0' for it's IP which we properly
+// parse and can even properly use for sending other messages, but the
+// current implementation of 'UPID::operator bool ()' fails if the IP
+// component of a PID is '0'.
+TEST_F(DockerContainerizerTest, DISABLED_ROOT_DOCKER_SlaveRecoveryExecutorContainer)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  slave::Flags flags = CreateSlaveFlags();
+
+  // Setup recovery slave flags.
+  flags.checkpoint = true;
+  flags.recover = "reconnect";
+  flags.strict = true;
+
+  Docker docker = Docker::create(tests::flags.docker, false).get();
+
+  MockDockerContainerizer* dockerContainerizer1 =
+    new MockDockerContainerizer(flags, docker);
+
+  Try<PID<Slave> > slave1 = StartSlave(dockerContainerizer1, flags);
+  ASSERT_SOME(slave1);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->CopyFrom(offer.slave_id());
+  task.mutable_resources()->CopyFrom(offer.resources());
+
+  ExecutorInfo executorInfo;
+  ExecutorID executorId;
+  executorId.set_value("e1");
+  executorInfo.mutable_executor_id()->CopyFrom(executorId);
+
+  CommandInfo command;
+  command.set_value("test-executor");
+  executorInfo.mutable_command()->CopyFrom(command);
+
+  ContainerInfo containerInfo;
+  containerInfo.set_type(ContainerInfo::DOCKER);
+
+  ContainerInfo::DockerInfo dockerInfo;
+  dockerInfo.set_image("mesosphere/test-executor");
+
+  containerInfo.mutable_docker()->CopyFrom(dockerInfo);
+  executorInfo.mutable_container()->CopyFrom(containerInfo);
+
+  task.mutable_executor()->CopyFrom(executorInfo);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  Future<ContainerID> containerId;
+  Future<SlaveID> slaveId;
+  EXPECT_CALL(*dockerContainerizer1, launch(_, _, _, _, _, _, _))
+    .WillOnce(DoAll(FutureArg<0>(&containerId),
+                    FutureArg<4>(&slaveId),
+                    Invoke(dockerContainerizer1,
+                           &MockDockerContainerizer::_launchExecutor)));
+
+  // The test-executor in the image immediately sends a TASK_RUNNING
+  // followed by TASK_FINISHED (no sleep/delay in between) so we need
+  // to drop the first TWO updates that come from the executor rather
+  // than only the first update like above where we can control how
+  // the length of the task.
+  Future<StatusUpdateMessage> statusUpdateMessage1 =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, _);
+
+  // Drop the first update from the executor.
+  Future<StatusUpdateMessage> statusUpdateMessage2 =
+    DROP_PROTOBUF(StatusUpdateMessage(), _, _);
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(containerId);
+  AWAIT_READY(slaveId);
+
+  // We also need to wait until the container's pid has been been
+  // checkpointed so that when the next slave recovers it won't treat
+  // the executor as having gone lost!
+  string path = slave::paths::getForkedPidPath(
+      slave::paths::getMetaRootDir(flags.work_dir),
+      slaveId.get(),
+      frameworkId.get(),
+      executorId,
+      containerId.get());
+
+  Duration waited = Duration::zero();
+  do {
+    if (os::exists(path)) {
+      Try<string> read = os::read(path);
+      if (read.isSome() && read.get() != "") {
+        break;
+      }
+    }
+    os::sleep(Milliseconds(100));
+    waited += Milliseconds(100);
+  } while (waited < Seconds(3));
+
+  ASSERT_TRUE(os::exists(path));
+  ASSERT_SOME_NE("", os::read(path));
+
+  // Stop the slave before the status update is received.
+  AWAIT_READY(statusUpdateMessage1);
+  AWAIT_READY(statusUpdateMessage2);
+
+  Stop(slave1.get());
+
+  delete dockerContainerizer1;
+
+  Future<Message> reregisterExecutorMessage =
+    FUTURE_MESSAGE(Eq(ReregisterExecutorMessage().GetTypeName()), _, _);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return());       // Ignore subsequent updates.
+
+  MockDockerContainerizer* dockerContainerizer2 =
+    new MockDockerContainerizer(flags, docker);
+
+  Try<PID<Slave> > slave2 = StartSlave(dockerContainerizer2, flags);
+  ASSERT_SOME(slave2);
+
+  // Ensure the executor re-registers.
+  AWAIT_READY(reregisterExecutorMessage);
+  UPID executorPid = reregisterExecutorMessage.get().from;
+
+  ReregisterExecutorMessage reregister;
+  reregister.ParseFromString(reregisterExecutorMessage.get().body);
+
+  // Executor should inform about the unacknowledged update.
+  ASSERT_EQ(1, reregister.updates_size());
+  const StatusUpdate& update = reregister.updates(0);
+  ASSERT_EQ(task.task_id(), update.status().task_id());
+  ASSERT_EQ(TASK_RUNNING, update.status().state());
+
+  // Scheduler should receive the recovered update.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_RUNNING, status.get().state());
+
+  // Make sure the container is still running.
+  Future<list<Docker::Container> > containers =
+    docker.ps(true, slave::DOCKER_NAME_PREFIX);
+
+  AWAIT_READY(containers);
+
+  ASSERT_TRUE(exists(containers.get(), containerId.get()));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+
+  delete dockerContainerizer2;
+}


Mime
View raw message