mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [10/11] mesos git commit: Added a test for default executor reconnect upon an process restart.
Date Tue, 27 Sep 2016 04:21:16 GMT
Added a test for default executor reconnect upon an process restart.

This change adds a test for validating that the default executor
can reconnect with the agent upon an agent process restart if
framework checkpointing is enabled.

This test is disabled for now but would be enabled when the
launcher changes land.

Review: https://reviews.apache.org/r/52287/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7e89791c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7e89791c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7e89791c

Branch: refs/heads/master
Commit: 7e89791c45de10f48f0256a63e39b905f396b159
Parents: 70595be
Author: Anand Mazumdar <anand@apache.org>
Authored: Mon Sep 26 21:14:22 2016 -0700
Committer: Anand Mazumdar <anand@apache.org>
Committed: Mon Sep 26 21:20:39 2016 -0700

----------------------------------------------------------------------
 src/tests/slave_recovery_tests.cpp | 197 ++++++++++++++++++++++++++++++++
 1 file changed, 197 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7e89791c/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 618019c..c8275cb 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -21,6 +21,8 @@
 
 #include <gtest/gtest.h>
 
+#include <mesos/v1/agent/agent.hpp>
+
 #include <mesos/v1/executor/executor.hpp>
 
 #include <mesos/executor.hpp>
@@ -35,6 +37,7 @@
 #include <process/owned.hpp>
 #include <process/reap.hpp>
 
+#include <stout/hashset.hpp>
 #include <stout/none.hpp>
 #include <stout/numify.hpp>
 #include <stout/option.hpp>
@@ -515,6 +518,200 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectHTTPExecutor)
 }
 
 
+// The agent is stopped after dropping the updates for all tasks in the
+// task group from the default executor. When it comes back up with
+// recovery=reconnect, make sure the executor subscribes and the agent
+// properly sends the updates.
+//
+// TODO(anand): Remove the `ROOT_CGROUPS` prefix once the posix isolator
+// is nested aware.
+TYPED_TEST(SlaveRecoveryTest, DISABLED_ROOT_CGROUPS_ReconnectDefaultExecutor)
+{
+  Try<Owned<cluster::Master>> master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+  // Disable AuthN on the agent.
+  slave::Flags flags = this->CreateSlaveFlags();
+  flags.authenticate_http_readwrite = false;
+
+  Fetcher fetcher;
+
+  Try<TypeParam*> _containerizer = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(_containerizer);
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+
+  // Start the slave with a static process ID. This allows the executor to
+  // reconnect with the slave upon a process restart.
+  const string id("agent");
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    this->StartSlave(detector.get(), containerizer.get(), id, flags);
+  ASSERT_SOME(slave);
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true);
+
+  Resources resources =
+    Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  ExecutorInfo executorInfo;
+  executorInfo.set_type(ExecutorInfo::DEFAULT);
+
+  executorInfo.mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID);
+  executorInfo.mutable_resources()->CopyFrom(resources);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  scheduler::TestV1Mesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  {
+    v1::scheduler::Call call;
+    call.set_type(v1::scheduler::Call::SUBSCRIBE);
+    v1::scheduler::Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(evolve(frameworkInfo));
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // Update `executorInfo` with the subscribed `frameworkId`.
+  executorInfo.mutable_framework_id()->CopyFrom(devolve(frameworkId));
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  const v1::Offer& offer = offers->offers(0);
+  const SlaveID slaveId = devolve(offer.agent_id());
+
+  v1::TaskInfo taskInfo1 =
+    evolve(createTask(slaveId, resources, "sleep 1000"));
+
+  v1::TaskInfo taskInfo2 =
+    evolve(createTask(slaveId, resources, "sleep 1000"));
+
+  v1::TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(taskInfo1);
+  taskGroup.add_tasks()->CopyFrom(taskInfo2);
+
+  {
+    v1::scheduler::Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(v1::scheduler::Call::ACCEPT);
+
+    v1::scheduler::Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
+
+    v1::Offer::Operation::LaunchGroup* launchGroup =
+      operation->mutable_launch_group();
+
+    launchGroup->mutable_executor()->CopyFrom(evolve(executorInfo));
+    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
+
+    mesos.send(call);
+  }
+
+  Future<v1::executor::Call> updateCall1 =
+    DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+  Future<v1::executor::Call> updateCall2 =
+    DROP_HTTP_CALL(Call(), Call::UPDATE, _, ContentType::PROTOBUF);
+
+  Future<v1::agent::Call> waitCall1 = FUTURE_HTTP_CALL(
+      v1::agent::Call(),
+      v1::agent::Call::WAIT_NESTED_CONTAINER,
+      _,
+      ContentType::PROTOBUF);
+
+  Future<v1::agent::Call> waitCall2 = FUTURE_HTTP_CALL(
+      v1::agent::Call(),
+      v1::agent::Call::WAIT_NESTED_CONTAINER,
+      _,
+      ContentType::PROTOBUF);
+
+  // Stop the agent after dropping the update calls and upon receiving the
+  // wait calls. We can't drop the wait calls as doing so results in a
+  // '500 Interval Server Error' for the default executor leading to it
+  // failing fast.
+  AWAIT_READY(updateCall1);
+  AWAIT_READY(updateCall2);
+  AWAIT_READY(waitCall1);
+  AWAIT_READY(waitCall2);
+
+  slave.get()->terminate();
+
+  // The TASK_RUNNING updates for the tasks in a task group can be
+  // received in any order.
+  hashset<v1::TaskID> tasks;
+
+  tasks.insert(taskInfo1.task_id());
+  tasks.insert(taskInfo2.task_id());
+
+  Future<v1::executor::Call> subscribeCall =
+    FUTURE_HTTP_CALL(Call(), Call::SUBSCRIBE, _, ContentType::PROTOBUF);
+
+  Future<v1::scheduler::Event::Update> update1;
+  Future<v1::scheduler::Event::Update> update2;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&update1))
+    .WillOnce(FutureArg<1>(&update2))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  // Restart the slave (use same flags) with a new containerizer.
+  _containerizer = TypeParam::create(flags, true, &fetcher);
+  ASSERT_SOME(_containerizer);
+  containerizer.reset(_containerizer.get());
+
+  slave = this->StartSlave(detector.get(), containerizer.get(), id, flags);
+  ASSERT_SOME(slave);
+
+  // Ensure that the executor subscribes again.
+  AWAIT_READY(subscribeCall);
+
+  EXPECT_EQ(2, subscribeCall->subscribe().unacknowledged_updates().size());
+  EXPECT_EQ(2, subscribeCall->subscribe().unacknowledged_tasks().size());
+
+  // Scheduler should receive the recovered update.
+  AWAIT_READY(update1);
+  AWAIT_READY(update2);
+
+  EXPECT_EQ(v1::TASK_RUNNING, update1->status().state());
+  ASSERT_TRUE(tasks.contains(update1->status().task_id()));
+
+  tasks.erase(update1->status().task_id());
+
+  EXPECT_EQ(v1::TASK_RUNNING, update2->status().state());
+  ASSERT_TRUE(tasks.contains(update2->status().task_id()));
+}
+
+
 // The slave is stopped before the first update for a task is received
 // from the executor. When it comes back up with recovery=reconnect, make
 // sure the executor re-registers and the slave properly sends the update.


Mime
View raw message