aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wick...@apache.org
Subject incubator-aurora git commit: Patch ResourceManager into OSS Aurora.
Date Tue, 13 Jan 2015 00:44:16 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 196188715 -> c8154b2dc


Patch ResourceManager into OSS Aurora.

Testing Done:
e2e

Bugs closed: AURORA-1002

Reviewed at https://reviews.apache.org/r/29828/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/c8154b2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/c8154b2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/c8154b2d

Branch: refs/heads/master
Commit: c8154b2dc05247b56e2608ecf7d03195510dfc49
Parents: 1961887
Author: Brian Wickman <wickman@apache.org>
Authored: Mon Jan 12 16:43:50 2015 -0800
Committer: Brian Wickman <wickman@apache.org>
Committed: Mon Jan 12 16:43:50 2015 -0800

----------------------------------------------------------------------
 .../python/apache/aurora/executor/bin/BUILD     |  2 +
 .../executor/bin/thermos_executor_main.py       | 15 ++-
 .../python/apache/aurora/executor/common/BUILD  | 16 ++++
 .../aurora/executor/common/resource_manager.py  | 99 ++++++++++++++++++++
 .../python/apache/aurora/executor/common/BUILD  | 26 +++++
 .../executor/common/test_resource_manager.py    | 50 ++++++++++
 .../common/test_resource_manager_integration.py | 94 +++++++++++++++++++
 7 files changed, 301 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c8154b2d/src/main/python/apache/aurora/executor/bin/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/BUILD b/src/main/python/apache/aurora/executor/bin/BUILD
index 0434c7b..4d4f3ab 100644
--- a/src/main/python/apache/aurora/executor/bin/BUILD
+++ b/src/main/python/apache/aurora/executor/bin/BUILD
@@ -25,11 +25,13 @@ python_binary(
     'src/main/python/apache/aurora/executor/common:announcer',
     'src/main/python/apache/aurora/executor/common:executor_timeout',
     'src/main/python/apache/aurora/executor/common:health_checker',
+    'src/main/python/apache/aurora/executor/common:resource_manager',
     'src/main/python/apache/aurora/executor/common:sandbox',
     'src/main/python/apache/aurora/executor:executor_detector',
     'src/main/python/apache/aurora/executor:executor_vars',
     'src/main/python/apache/aurora/executor:aurora_executor',
     'src/main/python/apache/aurora/executor:thermos_task_runner',
+    'src/main/python/apache/thermos/common:path',
   ]
 )
 

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c8154b2d/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
index 9df9b4b..f7d8977 100644
--- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
+++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
@@ -29,7 +29,9 @@ from apache.aurora.executor.aurora_executor import AuroraExecutor
 from apache.aurora.executor.common.announcer import DefaultAnnouncerCheckerProvider
 from apache.aurora.executor.common.executor_timeout import ExecutorTimeout
 from apache.aurora.executor.common.health_checker import HealthCheckerProvider
+from apache.aurora.executor.common.resource_manager import ResourceManagerProvider
 from apache.aurora.executor.thermos_task_runner import DefaultThermosTaskRunnerProvider
+from apache.thermos.common.path import TaskPath
 
 app.configure(debug=True)
 LogOptions.set_simple(True)
@@ -63,6 +65,14 @@ app.add_option(
          'be of the form $ROOT/$ROLE/$ENVIRONMENT/$JOBNAME.')
 
 
+app.add_option(
+    '--checkpoint-root',
+    dest='checkpoint_root',
+    metavar='PATH',
+    default=TaskPath.DEFAULT_CHECKPOINT_ROOT,
+    help='The checkpoint root where Thermos task checkpoints are stored.')
+
+
 # TODO(wickman) Consider just having the OSS version require pip installed
 # thermos_runner binaries on every machine and instead of embedding the pex
 # as a resource, shell out to one on the PATH.
@@ -86,7 +96,10 @@ def proxy_main():
     )
 
     # status providers:
-    status_providers = [HealthCheckerProvider()]
+    status_providers = [
+        HealthCheckerProvider(),
+        ResourceManagerProvider(checkpoint_root=options.checkpoint_root)
+    ]
 
     if options.announcer_enable:
       if options.announcer_ensemble is None:

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c8154b2d/src/main/python/apache/aurora/executor/common/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/BUILD b/src/main/python/apache/aurora/executor/common/BUILD
index 142ec0d..e64362e 100644
--- a/src/main/python/apache/aurora/executor/common/BUILD
+++ b/src/main/python/apache/aurora/executor/common/BUILD
@@ -99,3 +99,19 @@ python_library(
     'api/src/main/thrift/org/apache/aurora/gen:py-thrift',
   ]
 )
+
+python_library(
+  name = 'resource_manager',
+  sources = ['resource_manager.py'],
+  dependencies = [
+    '3rdparty/python:psutil',
+    '3rdparty/python:twitter.common.dirutil',
+    '3rdparty/python:twitter.common.exceptions',
+    '3rdparty/python:twitter.common.log',
+    '3rdparty/python:twitter.common.quantity',
+    'api/src/main/thrift/org/apache/aurora/gen:py-thrift',
+    'src/main/python/apache/aurora/executor/common:status_checker',
+    'src/main/python/apache/aurora/executor/common:task_info',
+    'src/main/python/apache/thermos/monitoring',
+  ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c8154b2d/src/main/python/apache/aurora/executor/common/resource_manager.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/resource_manager.py b/src/main/python/apache/aurora/executor/common/resource_manager.py
new file mode 100644
index 0000000..bf69e7e
--- /dev/null
+++ b/src/main/python/apache/aurora/executor/common/resource_manager.py
@@ -0,0 +1,99 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from mesos.interface import mesos_pb2
+from twitter.common.metrics import LambdaGauge
+
+from apache.aurora.executor.common.status_checker import (
+    StatusChecker,
+    StatusCheckerProvider,
+    StatusResult
+)
+from apache.aurora.executor.common.task_info import mesos_task_instance_from_assigned_task
+from apache.thermos.common.path import TaskPath
+from apache.thermos.monitoring.monitor import TaskMonitor
+from apache.thermos.monitoring.resource import TaskResourceMonitor
+
+
+class ResourceManager(StatusChecker):
+  """ Manage resources consumed by a Task """
+
+  def __init__(self, resources, resource_monitor):
+    """
+      resources: Resources object specifying cpu, ram, disk limits for the task
+      resource_monitor: The ResourceMonitor to monitor resources
+    """
+    self._resource_monitor = resource_monitor
+    # TODO(wickman) Remove cpu/ram reporting if MESOS-1458 is resolved.
+    self._max_cpu = resources.cpu().get()
+    self._max_ram = resources.ram().get()
+    self._max_disk = resources.disk().get()
+    self._kill_reason = None
+
+  @property
+  def _num_procs(self):
+    """ Total number of processes the task consists of (including child processes) """
+    return self._resource_monitor.sample()[1].num_procs
+
+  @property
+  def _ps_sample(self):
+    """ ProcessSample representing the aggregate resource consumption of the Task's processes
"""
+    return self._resource_monitor.sample()[1].process_sample
+
+  @property
+  def _disk_sample(self):
+    """ Integer in bytes representing the disk consumption in the Task's sandbox """
+    return self._resource_monitor.sample()[1].disk_usage
+
+  @property
+  def status(self):
+    sample = self._disk_sample
+    if sample > self._max_disk:
+      return StatusResult('Disk limit exceeded.  Reserved %s bytes vs used %s bytes.' % (
+          self._max_disk, sample), mesos_pb2.TASK_FAILED)
+
+  def name(self):
+    return 'resource_manager'
+
+  def register_metrics(self):
+    self.metrics.register(LambdaGauge('disk_used', lambda: self._disk_sample))
+    self.metrics.register(LambdaGauge('disk_reserved', lambda: self._max_disk))
+    self.metrics.register(LambdaGauge('disk_percent',
+        lambda: 1.0 * self._disk_sample / self._max_disk))
+    self.metrics.register(LambdaGauge('cpu_used', lambda: self._ps_sample.rate))
+    self.metrics.register(LambdaGauge('cpu_reserved', lambda: self._max_cpu))
+    self.metrics.register(LambdaGauge('cpu_percent',
+        lambda: 1.0 * self._ps_sample.rate / self._max_cpu))
+    self.metrics.register(LambdaGauge('ram_used', lambda: self._ps_sample.rss))
+    self.metrics.register(LambdaGauge('ram_reserved', lambda: self._max_ram))
+    self.metrics.register(LambdaGauge('ram_percent',
+        lambda: 1.0 * self._ps_sample.rss / self._max_ram))
+
+  def start(self):
+    super(ResourceManager, self).start()
+    self.register_metrics()
+    self._resource_monitor.start()
+
+
+class ResourceManagerProvider(StatusCheckerProvider):
+  def __init__(self, checkpoint_root):
+    self._checkpoint_root = checkpoint_root
+
+  def from_assigned_task(self, assigned_task, sandbox):
+    task_id = assigned_task.taskId
+    resources = mesos_task_instance_from_assigned_task(assigned_task).task().resources()
+    task_path = TaskPath(root=self._checkpoint_root, task_id=task_id)
+    task_monitor = TaskMonitor(task_path, task_id)
+    resource_monitor = TaskResourceMonitor(task_monitor, sandbox.root)
+    return ResourceManager(resources, resource_monitor)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c8154b2d/src/test/python/apache/aurora/executor/common/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/BUILD b/src/test/python/apache/aurora/executor/common/BUILD
index 2bf6b2d..395ced2 100644
--- a/src/test/python/apache/aurora/executor/common/BUILD
+++ b/src/test/python/apache/aurora/executor/common/BUILD
@@ -109,3 +109,29 @@ python_tests(
     'src/main/python/apache/aurora/executor/common:executor_timeout',
   ],
 )
+
+python_tests(
+  name = 'resource_manager',
+  sources = [ 'test_resource_manager.py' ],
+  dependencies = [
+    '3rdparty/python:mock',
+    'src/main/python/apache/aurora/config:schema',
+    'src/main/python/apache/aurora/executor/common:resource_manager',
+    'src/main/python/apache/thermos/config',
+    'src/main/python/apache/thermos/monitoring',
+  ]
+)
+
+python_tests(
+  name = 'resource_manager_integration',
+  sources = [ 'test_resource_manager_integration.py' ],
+  dependencies = [
+    '3rdparty/python:mesos.interface',
+    '3rdparty/python:twitter.common.contextutil',
+    '3rdparty/python:twitter.common.dirutil',
+    'api/src/main/thrift/org/apache/aurora/gen:py-thrift',
+    'src/main/python/apache/aurora/config:schema',
+    'src/main/python/apache/aurora/executor/common:sandbox',
+    'src/main/python/apache/aurora/executor/common:resource_manager',
+  ]
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c8154b2d/src/test/python/apache/aurora/executor/common/test_resource_manager.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_resource_manager.py b/src/test/python/apache/aurora/executor/common/test_resource_manager.py
new file mode 100644
index 0000000..a898e4d
--- /dev/null
+++ b/src/test/python/apache/aurora/executor/common/test_resource_manager.py
@@ -0,0 +1,50 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import mock
+from mesos.interface import mesos_pb2
+
+from apache.aurora.executor.common.resource_manager import ResourceManager
+from apache.thermos.config.schema import Resources
+from apache.thermos.monitoring.process import ProcessSample
+from apache.thermos.monitoring.resource import ResourceMonitorBase
+
+
+def _mock_resource_monitor(num_procs=0, process_sample=ProcessSample.empty(), disk_usage=0):
+  mock_resource_monitor = mock.Mock(spec=ResourceMonitorBase)
+  mock_resource_monitor.sample.return_value = (
+      12345,  # timestamp
+      ResourceMonitorBase.ResourceResult(num_procs, process_sample, disk_usage))
+
+  return mock_resource_monitor
+
+
+def test_resource_manager():
+  resource_manager = ResourceManager(
+      Resources(cpu=1, ram=128, disk=256),
+      _mock_resource_monitor())
+
+  assert resource_manager.status is None
+
+
+def test_resource_manager_disk_exceeded():
+  resources = Resources(cpu=1, ram=128, disk=256)
+  resource_manager = ResourceManager(
+      resources,
+      _mock_resource_monitor(disk_usage=resources.disk().get() * 2))
+
+  result = resource_manager.status
+  assert result is not None
+  assert result.reason.startswith('Disk limit exceeded')
+  assert result.status == mesos_pb2.TASK_FAILED

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/c8154b2d/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
b/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
new file mode 100644
index 0000000..e3c766f
--- /dev/null
+++ b/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
@@ -0,0 +1,94 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import time
+
+from mesos.interface import mesos_pb2
+from twitter.common.contextutil import temporary_dir
+from twitter.common.dirutil import safe_open
+
+from apache.aurora.executor.common.resource_manager import ResourceManagerProvider
+from apache.aurora.executor.common.sandbox import DirectorySandbox
+
+
+# TODO(jcohen): There should really be a single canonical source for creating test jobs/tasks
+def make_assigned_task(thermos_config, assigned_ports=None):
+  from gen.apache.aurora.api.constants import AURORA_EXECUTOR_NAME
+  from gen.apache.aurora.api.ttypes import AssignedTask, ExecutorConfig, Identity, TaskConfig
+
+  assigned_ports = assigned_ports or {}
+  executor_config = ExecutorConfig(name=AURORA_EXECUTOR_NAME, data=thermos_config.json_dumps())
+  task_config = TaskConfig(
+    owner=Identity(role=thermos_config.role().get(), user=thermos_config.role().get()),
+    environment=thermos_config.environment().get(),
+    jobName=thermos_config.name().get(),
+    executorConfig=executor_config)
+
+  return AssignedTask(instanceId=12345, task=task_config, assignedPorts=assigned_ports)
+
+
+def make_job(role, environment, name, primary_port, portmap):
+  from apache.aurora.config.schema.base import (
+    Announcer,
+    Job,
+    Process,
+    Resources,
+    Task,
+  )
+  task = Task(
+    name='ignore2',
+    processes=[Process(name='ignore3', cmdline='ignore4')],
+    resources=Resources(cpu=1, ram=1, disk=1))
+  job = Job(
+    role=role,
+    environment=environment,
+    name=name,
+    cluster='ignore1',
+    task=task,
+    announce=Announcer(primary_port=primary_port, portmap=portmap))
+  return job
+
+
+def test_resource_manager():
+  with temporary_dir() as td:
+    sandbox = os.path.join(td, 'sandbox')
+    root = os.path.join(td, 'thermos')
+
+    rm = ResourceManagerProvider(root).from_assigned_task(
+        make_assigned_task(
+            make_job('some-role', 'some-env', 'some-job', 'http', portmap={'http': 80})),
+        DirectorySandbox(sandbox))
+
+    assert rm.status is None
+
+    try:
+      rm.start()
+
+      with safe_open(os.path.join(sandbox, 'foo.txt'), 'w') as fp:
+        fp.write(4097 * 'x')
+
+      # N.B. The sleep below makes this test inherently vulnerable to flakiness
+      # TODO(jcohen): Investigate using threading.event to avoid the need for a timeout entirely.
+      timeout = 0
+      while rm.status is None and timeout < 5:
+        timeout += 0.1
+        time.sleep(0.1)
+
+      result = rm.status
+      assert result is not None
+      assert result.reason.startswith('Disk limit exceeded')
+      assert result.status == mesos_pb2.TASK_FAILED
+    finally:
+      rm._resource_monitor.kill()


Mime
View raw message