mesos-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qianzh...@apache.org
Subject [mesos] 16/21: Added a test `CgroupsIsolatorTest.ROOT_CGROUPS_CFS_TaskGroupLimits`.
Date Fri, 20 Mar 2020 09:03:13 GMT
This is an automated email from the ASF dual-hosted git repository.

qianzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit a06ff8ee57c60b95e84cab1930b21c6e8a6f95a1
Author: Qian Zhang <zhq527725@gmail.com>
AuthorDate: Sat Jan 11 10:10:42 2020 +0800

    Added a test `CgroupsIsolatorTest.ROOT_CGROUPS_CFS_TaskGroupLimits`.
    
    Review: https://reviews.apache.org/r/71983
---
 src/tests/containerizer/cgroups_isolator_tests.cpp | 443 +++++++++++++++++++++
 1 file changed, 443 insertions(+)

diff --git a/src/tests/containerizer/cgroups_isolator_tests.cpp b/src/tests/containerizer/cgroups_isolator_tests.cpp
index 3578f88..f4425f0 100644
--- a/src/tests/containerizer/cgroups_isolator_tests.cpp
+++ b/src/tests/containerizer/cgroups_isolator_tests.cpp
@@ -28,6 +28,7 @@
 #include "slave/gc_process.hpp"
 
 #include "slave/containerizer/mesos/containerizer.hpp"
+#include "slave/containerizer/mesos/paths.hpp"
 
 #include "slave/containerizer/mesos/isolators/cgroups/constants.hpp"
 #include "slave/containerizer/mesos/isolators/cgroups/subsystems/net_cls.hpp"
@@ -66,10 +67,14 @@ using mesos::internal::slave::NetClsHandle;
 using mesos::internal::slave::NetClsHandleManager;
 using mesos::internal::slave::Slave;
 
+using mesos::internal::slave::containerizer::paths::getCgroupPath;
+
 using mesos::master::detector::MasterDetector;
 
+using mesos::v1::scheduler::Call;
 using mesos::v1::scheduler::Event;
 
+using process::Clock;
 using process::Future;
 using process::Owned;
 using process::Queue;
@@ -82,6 +87,7 @@ using std::string;
 using std::vector;
 
 using testing::_;
+using testing::AllOf;
 using testing::DoAll;
 using testing::InvokeWithoutArgs;
 using testing::Return;
@@ -849,6 +855,443 @@ TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_CFS_CommandTaskInfiniteLimits)
 }
 
 
+// This test verifies the default executor container's CPU and memory
+// soft & hard limits can be updated correctly when launching task groups
+// and killing tasks, and also verifies task's CPU and memory soft & hard
+// limits can be set correctly.
+TEST_F(CgroupsIsolatorTest, ROOT_CGROUPS_CFS_TaskGroupLimits)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Disable AuthN on the agent.
+  slave::Flags flags = CreateSlaveFlags();
+  flags.isolation = "cgroups/cpu,cgroups/mem";
+  flags.cgroups_enable_cfs = true;
+  flags.authenticate_http_readwrite = false;
+
+  Fetcher fetcher(flags);
+
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, true, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+
+  Owned<MesosContainerizer> containerizer(_containerizer.get());
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), containerizer.get(), flags);
+
+  ASSERT_SOME(slave);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers1;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers1));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  v1::ExecutorInfo executorInfo = v1::createExecutorInfo(
+      v1::DEFAULT_EXECUTOR_ID,
+      None(),
+      "cpus:0.1;mem:32;disk:32",
+      v1::ExecutorInfo::DEFAULT,
+      frameworkId);
+
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::settle();
+
+  AWAIT_READY(offers1);
+  ASSERT_FALSE(offers1->offers().empty());
+
+  const v1::Offer& offer1 = offers1->offers(0);
+  const v1::AgentID& agentId = offer1.agent_id();
+
+  // Launch the first task group which has two tasks, task1 has no resource
+  // limits specified but task2 has.
+  v1::TaskInfo taskInfo1 = v1::createTask(
+      agentId,
+      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(),
+      SLEEP_COMMAND(1000));
+
+  mesos::v1::Value::Scalar cpuLimit, memLimit;
+  cpuLimit.set_value(0.5);
+  memLimit.set_value(64);
+
+  google::protobuf::Map<string, mesos::v1::Value::Scalar> resourceLimits;
+  resourceLimits.insert({"cpus", cpuLimit});
+  resourceLimits.insert({"mem", memLimit});
+
+  v1::TaskInfo taskInfo2 = v1::createTask(
+      agentId,
+      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(),
+      SLEEP_COMMAND(1000),
+      None(),
+      "test-task",
+      id::UUID::random().toString(),
+      resourceLimits);
+
+  taskInfo1.mutable_container()->set_type(mesos::v1::ContainerInfo::MESOS);
+  taskInfo1.mutable_container()->mutable_linux_info()->set_share_cgroups(false);
+  taskInfo2.mutable_container()->set_type(mesos::v1::ContainerInfo::MESOS);
+  taskInfo2.mutable_container()->mutable_linux_info()->set_share_cgroups(false);
+
+  Future<v1::scheduler::Event::Update> startingUpdate1;
+  Future<v1::scheduler::Event::Update> runningUpdate1;
+  Future<v1::scheduler::Event::Update> killedUpdate1;
+
+  testing::Sequence task1;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task1)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&startingUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task1)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo1.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .InSequence(task1)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&killedUpdate1),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Future<v1::scheduler::Event::Update> startingUpdate2;
+  Future<v1::scheduler::Event::Update> runningUpdate2;
+  Future<v1::scheduler::Event::Update> killedUpdate2;
+
+  testing::Sequence task2;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&startingUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo2.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_KILLED))))
+    .InSequence(task2)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&killedUpdate2),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  Future<v1::scheduler::Event::Offers> offers2;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return());
+
+  {
+    v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP(
+        executorInfo,
+        v1::createTaskGroupInfo({taskInfo1, taskInfo2}));
+
+    Call call = v1::createCallAccept(frameworkId, offer1, {launchGroup});
+
+    // Set a 0s filter to immediately get another offer to launch
+    // the second task group.
+    call.mutable_accept()->mutable_filters()->set_refuse_seconds(0);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(startingUpdate1);
+  AWAIT_READY(runningUpdate1);
+
+  AWAIT_READY(startingUpdate2);
+  AWAIT_READY(runningUpdate2);
+
+  Future<hashset<ContainerID>> containers = containerizer->containers();
+  AWAIT_READY(containers);
+  ASSERT_EQ(3u, containers->size());
+
+  // Get task container IDs.
+  const v1::ContainerStatus& containerStatus1 =
+    runningUpdate1->status().container_status();
+
+  ASSERT_TRUE(containerStatus1.has_container_id());
+  ASSERT_TRUE(containerStatus1.container_id().has_parent());
+
+  const v1::ContainerID& taskContainerId1 = containerStatus1.container_id();
+
+  const v1::ContainerStatus& containerStatus2 =
+    runningUpdate2->status().container_status();
+
+  ASSERT_TRUE(containerStatus2.has_container_id());
+  ASSERT_TRUE(containerStatus2.container_id().has_parent());
+
+  const v1::ContainerID& taskContainerId2 = containerStatus2.container_id();
+
+  EXPECT_EQ(taskContainerId1.parent(), taskContainerId2.parent());
+
+  // Get the executor container ID.
+  const v1::ContainerID& executorContainerId = taskContainerId1.parent();
+
+  Result<string> cpuHierarchy = cgroups::hierarchy("cpu");
+  ASSERT_SOME(cpuHierarchy);
+
+  Result<string> memoryHierarchy = cgroups::hierarchy("memory");
+  ASSERT_SOME(memoryHierarchy);
+
+  const string& executorCgroup =
+    path::join(flags.cgroups_root, executorContainerId.value());
+
+  const string& taskCgroup1 =
+    getCgroupPath(flags.cgroups_root, devolve(taskContainerId1));
+
+  const string& taskCgroup2 =
+    getCgroupPath(flags.cgroups_root, devolve(taskContainerId2));
+
+  // The CPU shares of the executor container is the sum of its own CPU
+  // request (0.1) + task1's CPU request (0.1) + task2's CPU request (0.1),
+  // i.e. 0.3.
+  EXPECT_SOME_EQ(
+      (uint64_t)(CPU_SHARES_PER_CPU * 0.3),
+      cgroups::cpu::shares(cpuHierarchy.get(), executorCgroup));
+
+  // The CPU shares of task1 is its CPU request (0.1).
+  EXPECT_SOME_EQ(
+      (uint64_t)(CPU_SHARES_PER_CPU * 0.1),
+      cgroups::cpu::shares(cpuHierarchy.get(), taskCgroup1));
+
+  // The CPU shares of task2 is its CPU request (0.1).
+  EXPECT_SOME_EQ(
+      (uint64_t)(CPU_SHARES_PER_CPU * 0.1),
+      cgroups::cpu::shares(cpuHierarchy.get(), taskCgroup2));
+
+  // The CFS quota of the executor container is the sum of its own CPU
+  // request (0.1) + task1's CPU request (0.1) + task2's CPU limit (0.5),
+  // i.e. 0.7.
+  Try<Duration> cfsQuota =
+    cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), executorCgroup);
+
+  ASSERT_SOME(cfsQuota);
+  EXPECT_EQ(0.7 * CPU_CFS_PERIOD.ms(), cfsQuota->ms());
+
+  // The CFS quota of task1 is its CPU request (0.1).
+  cfsQuota = cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), taskCgroup1);
+  ASSERT_SOME(cfsQuota);
+  EXPECT_EQ(0.1 * CPU_CFS_PERIOD.ms(), cfsQuota->ms());
+
+  // The CFS quota of task2 is its CPU limit (0.5).
+  cfsQuota = cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), taskCgroup2);
+  ASSERT_SOME(cfsQuota);
+  EXPECT_EQ(0.5 * CPU_CFS_PERIOD.ms(), cfsQuota->ms());
+
+  // The memory soft limit of the executor container is the sum of its
+  // own memory request (32MB) + task1's memory request (32MB) + task2's
+  // memory request (32MB), i.e. 96MB.
+  EXPECT_SOME_EQ(
+      Megabytes(96),
+      cgroups::memory::soft_limit_in_bytes(
+          memoryHierarchy.get(), executorCgroup));
+
+  // The memory soft limit of task1 is its memory request (32MB).
+  EXPECT_SOME_EQ(
+      Megabytes(32),
+      cgroups::memory::soft_limit_in_bytes(
+          memoryHierarchy.get(), taskCgroup1));
+
+  // The memory soft limit of task2 is its memory request (32MB).
+  EXPECT_SOME_EQ(
+      Megabytes(32),
+      cgroups::memory::soft_limit_in_bytes(
+          memoryHierarchy.get(), taskCgroup2));
+
+  // The memory hard limit of the executor container is the sum of its
+  // own memory request (32MB) + task1's memory request (32MB) + task2's
+  // memory limit (64MB), i.e. 128MB.
+  EXPECT_SOME_EQ(
+      Megabytes(128),
+      cgroups::memory::limit_in_bytes(memoryHierarchy.get(), executorCgroup));
+
+  // The memory hard limit of task1 is its memory request (32MB).
+  EXPECT_SOME_EQ(
+      Megabytes(32),
+      cgroups::memory::limit_in_bytes(memoryHierarchy.get(), taskCgroup1));
+
+  // The memory hard limit of task2 is its memory limit (64MB).
+  EXPECT_SOME_EQ(
+      Megabytes(64),
+      cgroups::memory::limit_in_bytes(memoryHierarchy.get(), taskCgroup2));
+
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::settle();
+  Clock::resume();
+
+  AWAIT_READY(offers2);
+  ASSERT_FALSE(offers1->offers().empty());
+
+  const v1::Offer& offer2 = offers2->offers(0);
+
+  // Launch the second task group which has only one task: task3, and this
+  // task has no resource limits specified.
+  v1::TaskInfo taskInfo3 = v1::createTask(
+      agentId,
+      v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(),
+      SLEEP_COMMAND(1000));
+
+  taskInfo3.mutable_container()->set_type(mesos::v1::ContainerInfo::MESOS);
+  taskInfo3.mutable_container()->mutable_linux_info()->set_share_cgroups(false);
+
+  Future<v1::scheduler::Event::Update> startingUpdate3;
+  Future<v1::scheduler::Event::Update> runningUpdate3;
+
+  testing::Sequence task3;
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo3.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_STARTING))))
+    .InSequence(task3)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&startingUpdate3),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  EXPECT_CALL(
+      *scheduler,
+      update(_, AllOf(
+          TaskStatusUpdateTaskIdEq(taskInfo3.task_id()),
+          TaskStatusUpdateStateEq(v1::TASK_RUNNING))))
+    .InSequence(task3)
+    .WillOnce(
+        DoAll(
+            FutureArg<1>(&runningUpdate3),
+            v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  mesos.send(
+      v1::createCallAccept(
+          frameworkId,
+          offer2,
+          {v1::LAUNCH_GROUP(
+              executorInfo, v1::createTaskGroupInfo({taskInfo3}))}));
+
+  AWAIT_READY(startingUpdate3);
+  AWAIT_READY(runningUpdate3);
+
+  // The CPU shares of the executor container is the sum of its own CPU
+  // request (0.1) + task1's CPU request (0.1) + task2's CPU request (0.1)
+  // + task3's CPU request (0.1), i.e. 0.4.
+  EXPECT_SOME_EQ(
+      (uint64_t)(CPU_SHARES_PER_CPU * 0.4),
+      cgroups::cpu::shares(cpuHierarchy.get(), executorCgroup));
+
+  // The CFS quota of the executor container is the sum of its own CPU
+  // request (0.1) + task1's CPU request (0.1) + task2's CPU limit (0.5)
+  // + task3's CPU request (0.1), i.e. 0.8.
+  cfsQuota = cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), executorCgroup);
+  ASSERT_SOME(cfsQuota);
+  EXPECT_EQ(0.8 * CPU_CFS_PERIOD.ms(), cfsQuota->ms());
+
+  // The memory soft limit of the executor container is the sum of its
+  // own memory request (32MB) + task1's memory request (32MB) + task2's
+  // memory request (32MB) + task3's memory request (32MB) i.e. 128MB.
+  EXPECT_SOME_EQ(
+      Megabytes(128),
+      cgroups::memory::soft_limit_in_bytes(
+          memoryHierarchy.get(), executorCgroup));
+
+  // The memory hard limit of the executor container is the sum of its
+  // own memory request (32MB) + task1's memory request (32MB) + task2's
+  // memory limit (64MB) + task3's memory request (32MB), i.e. 160MB.
+  EXPECT_SOME_EQ(
+      Megabytes(160),
+      cgroups::memory::limit_in_bytes(memoryHierarchy.get(), executorCgroup));
+
+  // Now kill a task in the first task group.
+  mesos.send(v1::createCallKill(frameworkId, taskInfo1.task_id()));
+
+  // Both of the two tasks in the first group will be killed.
+  AWAIT_READY(killedUpdate1);
+  AWAIT_READY(killedUpdate2);
+
+  // The CPU shares of the executor container is the sum of its own CPU
+  // request (0.1) + task3's CPU request (0.1), i.e. 0.2.
+  EXPECT_SOME_EQ(
+      (uint64_t)(CPU_SHARES_PER_CPU * 0.2),
+      cgroups::cpu::shares(cpuHierarchy.get(), executorCgroup));
+
+  // The CFS quota of the executor container is also the sum of its own CPU
+  // request (0.1) + task3's CPU request (0.1), i.e. 0.2.
+  cfsQuota = cgroups::cpu::cfs_quota_us(cpuHierarchy.get(), executorCgroup);
+  ASSERT_SOME(cfsQuota);
+  EXPECT_EQ(0.2 * CPU_CFS_PERIOD.ms(), cfsQuota->ms());
+
+  // The memory soft limit of the executor container is the sum of its
+  // own memory request (32MB) + task3's memory request (32MB) i.e. 64MB.
+  EXPECT_SOME_EQ(
+      Megabytes(64),
+      cgroups::memory::soft_limit_in_bytes(
+          memoryHierarchy.get(), executorCgroup));
+
+  // We only update the memory hard limit if it is the first time or when
+  // we're raising the existing limit (see `MemorySubsystemProcess::update`
+  // for details). So now the memory hard limit of the executor container
+  // should still be 160MB.
+  EXPECT_SOME_EQ(
+      Megabytes(160),
+      cgroups::memory::limit_in_bytes(memoryHierarchy.get(), executorCgroup));
+}
+
+
 // This test verifies the limit swap functionality. Note that We use
 // the default executor here in order to exercise both the increasing
 // and decreasing of the memory limit.


Mime
View raw message