aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wfar...@apache.org
Subject aurora git commit: Implements log rotation in the Thermos runner.
Date Thu, 17 Dec 2015 22:03:02 GMT
Repository: aurora
Updated Branches:
  refs/heads/master c0fdcdcc6 -> 3c33f663f


Implements log rotation in the Thermos runner.

Bugs closed: AURORA-95

Reviewed at https://reviews.apache.org/r/30695/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3c33f663
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3c33f663
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3c33f663

Branch: refs/heads/master
Commit: 3c33f663f70c9ef9026a35e3ca2e2b9360719b06
Parents: c0fdcdc
Author: George Sirois <george.sirois@gmail.com>
Authored: Thu Dec 17 14:02:59 2015 -0800
Committer: Bill Farner <wfarner@apache.org>
Committed: Thu Dec 17 14:02:59 2015 -0800

----------------------------------------------------------------------
 NEWS                                            |   3 +
 docs/configuration-reference.md                 |  38 +++
 docs/deploying-aurora-scheduler.md              |  18 +-
 .../executor/bin/thermos_executor_main.py       |  34 ++-
 .../aurora/executor/thermos_task_runner.py      |  29 +-
 .../python/apache/thermos/config/schema_base.py |  28 +-
 src/main/python/apache/thermos/core/process.py  | 300 +++++++++++++++++--
 src/main/python/apache/thermos/core/runner.py   |  49 ++-
 .../apache/thermos/runner/thermos_runner.py     |  27 ++
 .../python/apache/thermos/core/test_process.py  |  92 +++++-
 10 files changed, 574 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/NEWS
----------------------------------------------------------------------
diff --git a/NEWS b/NEWS
index 8b5a1a1..066925e 100644
--- a/NEWS
+++ b/NEWS
@@ -16,6 +16,9 @@
  - Added support for taking in an executor configuration in JSON via a command line argument
    `--custom_executor_config` which will override all other the command line arguments and
default
    values pertaining to the executor.
+- Log rotation has been added to the thermos runner. See the configuration reference for
details
+  on how configure rotation per-process. Command line options may also be passed through
the
+  scheduler in order to configure the global default behavior.
 
 0.10.0
 ------

http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/docs/configuration-reference.md
----------------------------------------------------------------------
diff --git a/docs/configuration-reference.md b/docs/configuration-reference.md
index 12c8021..cf63cfa 100644
--- a/docs/configuration-reference.md
+++ b/docs/configuration-reference.md
@@ -12,6 +12,7 @@ Aurora + Thermos Configuration Reference
       - [ephemeral](#ephemeral)
       - [min_duration](#min_duration)
       - [final](#final)
+      - [logger](#logger)
 - [Task Schema](#task-schema)
     - [Task Object](#task-object)
       - [name](#name-1)
@@ -72,6 +73,7 @@ behavior with its optional attributes. Remember, Processes are handled by
Thermo
    **ephemeral**      | Boolean     | When True, this is an ephemeral process. (Default:
False)
    **min_duration**   | Integer     | Minimum duration between process restarts in seconds.
(Default: 15)
    **final**          | Boolean     | When True, this process is a finalizing one that should
run last. (Default: False)
+   **logger**         | Logger      | Struct defining the log behavior for the process. (Default:
Empty)
 
 #### name
 
@@ -144,6 +146,42 @@ vice-versa, however finalizing processes may depend upon other
 finalizing processes and otherwise run as a typical process
 schedule.
 
+#### logger
+
+The default behavior of Thermos is to allow stderr/stdout logs to grow unbounded. In the
event
+that you have large log volume, you may want to configure Thermos to automatically rotate
logs
+after they grow to a certain size, which can prevent your job from using more than its allocated
+disk space.
+
+A Logger union consists of a mode enum and a rotation policy. Rotation policies only apply
to
+loggers whose mode is `rotate`. The acceptable values for the LoggerMode enum are `standard`
+and `rotate`. The rotation policy applies to both stderr and stdout.
+
+By default, all processes use the `standard` LoggerMode.
+
+  **Attribute Name**  | **Type**     | **Description**
+  ------------------- | :----------: | ---------------------------------
+   **mode**           | LoggerMode   | Mode of the logger. (Required)
+   **rotate**         | RotatePolicy | An optional rotation policy.
+
+A RotatePolicy describes log rotation behavior for when `mode` is set to `rotate`. It is
ignored
+otherwise.
+
+  **Attribute Name**  | **Type**     | **Description**
+  ------------------- | :----------: | ---------------------------------
+   **log_size**       | Integer      | Maximum size (in bytes) of an individual log file.
(Default: 100 MiB)
+   **backups**        | Integer      | The maximum number of backups to retain. (Default:
5)
+
+An example process configuration is as follows:
+
+        process = Process(
+          name='process',
+          logger=Logger(
+            mode=LoggerMode('rotate'),
+            rotate=RotatePolicy(log_size=5*MB, backups=5)
+          )
+        )
+
 Task Schema
 ===========
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/docs/deploying-aurora-scheduler.md
----------------------------------------------------------------------
diff --git a/docs/deploying-aurora-scheduler.md b/docs/deploying-aurora-scheduler.md
index 29fafca..caaf42b 100644
--- a/docs/deploying-aurora-scheduler.md
+++ b/docs/deploying-aurora-scheduler.md
@@ -15,6 +15,7 @@ machines.  This guide helps you get the scheduler set up and troubleshoot
some c
   - [Considerations for running jobs in docker](#considerations-for-running-jobs-in-docker)
   - [Security Considerations](#security-considerations)
   - [Configuring Resource Oversubscription](#configuring-resource-oversubscription)
+  - [Process Log Rotation](#process-log-rotation)
 - [Running Aurora](#running-aurora)
   - [Maintaining an Aurora Installation](#maintaining-an-aurora-installation)
   - [Monitoring](#monitoring)
@@ -166,13 +167,28 @@ script does not access resources outside of the sandbox, as when the
script is r
 docker container those resources will not exist.
 
 A scheduler flag, `-global_container_mounts` allows mounting paths from the host (i.e., the
slave)
-into all containers on that host. The format is a comma seperated list of host_path:container_path[:mode]
+into all containers on that host. The format is a comma separated list of host_path:container_path[:mode]
 tuples. For example `-global_container_mounts=/opt/secret_keys_dir:/mnt/secret_keys_dir:ro`
mounts
 `/opt/secret_keys_dir` from the slaves into all launched containers. Valid modes are `ro`
and `rw`.
 
 In order to correctly execute processes inside a job, the docker container must have python
2.7
 installed.
 
+### Process Log Rotation
+By default, Thermos will not rotate the stdout/stderr logs from child processes and they
will grow
+without bound. An individual user may change this behavior via configuration on the Process
object,
+but it may also be desirable to change the default configuration for the entire cluster.
+In order to enable rotation by default, the following flags can be applied to Thermos (through
the
+-thermos_executor_flags argument to the Aurora scheduler):
+
+    --runner-logger-mode=rotate
+    --runner-rotate-log-size-mb=100
+    --runner-rotate-log-backups=10
+
+In the above example, each instance of the Thermos runner will rotate stderr/stdout logs
once they
+reach 100 MiB in size and keep a maximum of 10 backups. If a user has provided a custom setting
for
+their process, it will override these default settings.
+
 ## Running Aurora
 Configure a supervisor like [Monit](http://mmonit.com/monit/) or
 [supervisord](http://supervisord.org/) to run the created `scheduler.sh` file and restart
it

http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
index 0d02dc1..7b7ef4b 100644
--- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
+++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
@@ -28,6 +28,7 @@ import traceback
 from twitter.common import app, log
 from twitter.common.log.options import LogOptions
 
+from apache.aurora.config.schema.base import LoggerMode
 from apache.aurora.executor.aurora_executor import AuroraExecutor
 from apache.aurora.executor.common.announcer import DefaultAnnouncerCheckerProvider
 from apache.aurora.executor.common.executor_timeout import ExecutorTimeout
@@ -54,6 +55,8 @@ LogOptions.set_simple(True)
 LogOptions.set_disk_log_level('DEBUG')
 LogOptions.set_log_dir(CWD)
 
+_LOGGER_TYPES = ', '.join(LoggerMode.VALUES)
+
 
 app.add_option(
     '--announcer-enable',
@@ -96,6 +99,27 @@ app.add_option(
     default=False)
 
 
+app.add_option(
+    '--runner-logger-mode',
+    dest='runner_logger_mode',
+    type=str,
+    default=None,
+    help='The type of logger [%s] to use for all processes run by thermos.' % _LOGGER_TYPES)
+
+app.add_option(
+    '--runner-rotate-log-size-mb',
+    dest='runner_rotate_log_size_mb',
+    type=int,
+    help='Maximum size of the rotated stdout/stderr logs emitted by the thermos runner in
MiB.')
+
+
+app.add_option(
+    '--runner-rotate-log-backups',
+    dest='runner_rotate_log_backups',
+    type=int,
+    help='Maximum number of rotated stdout/stderr logs emitted by the thermos runner.')
+
+
 # TODO(wickman) Consider just having the OSS version require pip installed
 # thermos_runner binaries on every machine and instead of embedding the pex
 # as a resource, shell out to one on the PATH.
@@ -141,7 +165,10 @@ def initialize(options):
     thermos_runner_provider = UserOverrideThermosTaskRunnerProvider(
       dump_runner_pex(),
       checkpoint_root,
-      artifact_dir=cwd_path
+      artifact_dir=cwd_path,
+      process_logger_mode=options.runner_logger_mode,
+      rotate_log_size_mb=options.runner_rotate_log_size_mb,
+      rotate_log_backups=options.runner_rotate_log_backups
     )
     thermos_runner_provider.set_role(None)
 
@@ -154,7 +181,10 @@ def initialize(options):
     thermos_runner_provider = DefaultThermosTaskRunnerProvider(
       dump_runner_pex(),
       checkpoint_root,
-      artifact_dir=cwd_path
+      artifact_dir=cwd_path,
+      process_logger_mode=options.runner_logger_mode,
+      rotate_log_size_mb=options.runner_rotate_log_size_mb,
+      rotate_log_backups=options.runner_rotate_log_backups
     )
 
     thermos_executor = AuroraExecutor(

http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/main/python/apache/aurora/executor/thermos_task_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/thermos_task_runner.py b/src/main/python/apache/aurora/executor/thermos_task_runner.py
index 14e8b4b..25fcca2 100644
--- a/src/main/python/apache/aurora/executor/thermos_task_runner.py
+++ b/src/main/python/apache/aurora/executor/thermos_task_runner.py
@@ -72,7 +72,10 @@ class ThermosTaskRunner(TaskRunner):
                checkpoint_root,
                artifact_dir=None,
                clock=time,
-               hostname=None):
+               hostname=None,
+               process_logger_mode=None,
+               rotate_log_size_mb=None,
+               rotate_log_backups=None):
     """
       runner_pex       location of the thermos_runner pex that this task runner should use
       task_id          task_id assigned by scheduler
@@ -98,6 +101,9 @@ class ThermosTaskRunner(TaskRunner):
     self._clock = clock
     self._artifact_dir = artifact_dir or safe_mkdtemp()
     self._hostname = hostname or socket.gethostname()
+    self._process_logger_mode = process_logger_mode
+    self._rotate_log_size_mb = rotate_log_size_mb
+    self._rotate_log_backups = rotate_log_backups
 
     # wait events
     self._dead = threading.Event()
@@ -233,13 +239,17 @@ class ThermosTaskRunner(TaskRunner):
                   sandbox=host_sandbox or self._root,
                   task_id=self._task_id,
                   thermos_json=self._task_filename,
-                  hostname=self._hostname)
+                  hostname=self._hostname,
+                  process_logger_mode=self._process_logger_mode,
+                  rotate_log_size_mb=self._rotate_log_size_mb,
+                  rotate_log_backups=self._rotate_log_backups)
 
     if getpass.getuser() == 'root' and self._role:
       params.update(setuid=self._role)
 
     cmdline_args = [sys.executable, self._runner_pex]
-    cmdline_args.extend('--%s=%s' % (flag, value) for flag, value in params.items())
+    cmdline_args.extend(
+        '--%s=%s' % (flag, value) for flag, value in params.items() if value is not None)
     if self._enable_chroot:
       cmdline_args.extend(['--enable_chroot'])
     for name, port in self._ports.items():
@@ -342,7 +352,10 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider):
                max_wait=Amount(1, Time.MINUTES),
                preemption_wait=Amount(1, Time.MINUTES),
                poll_interval=Amount(500, Time.MILLISECONDS),
-               clock=time):
+               clock=time,
+               process_logger_mode=None,
+               rotate_log_size_mb=None,
+               rotate_log_backups=None):
     self._artifact_dir = artifact_dir or safe_mkdtemp()
     self._checkpoint_root = checkpoint_root
     self._clock = clock
@@ -351,6 +364,9 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider):
     self._poll_interval = poll_interval
     self._preemption_wait = preemption_wait
     self._task_runner_class = task_runner_class
+    self._process_logger_mode = process_logger_mode
+    self._rotate_log_size_mb = rotate_log_size_mb
+    self._rotate_log_backups = rotate_log_backups
 
   def _get_role(self, assigned_task):
     return None if assigned_task.task.container.docker else assigned_task.task.job.role
@@ -379,7 +395,10 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider):
         self._checkpoint_root,
         artifact_dir=self._artifact_dir,
         clock=self._clock,
-        hostname=assigned_task.slaveHost)
+        hostname=assigned_task.slaveHost,
+        process_logger_mode=self._process_logger_mode,
+        rotate_log_size_mb=self._rotate_log_size_mb,
+        rotate_log_backups=self._rotate_log_backups)
 
     return HttpLifecycleManager.wrap(runner, mesos_task, mesos_ports)
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/main/python/apache/thermos/config/schema_base.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/schema_base.py b/src/main/python/apache/thermos/config/schema_base.py
index f9143cc..5552108 100644
--- a/src/main/python/apache/thermos/config/schema_base.py
+++ b/src/main/python/apache/thermos/config/schema_base.py
@@ -14,7 +14,19 @@
 
 # checkstyle: noqa
 
-from pystachio import Boolean, Default, Empty, Float, Integer, List, Map, Required, String,
Struct
+from pystachio import (
+    Boolean,
+    Default,
+    Empty,
+    Enum,
+    Float,
+    Integer,
+    List,
+    Map,
+    Required,
+    String,
+    Struct
+)
 
 # Define constants for resources
 BYTES = 1
@@ -45,6 +57,19 @@ class Constraint(Struct):
   order = List(String)
 
 
+class RotatePolicy(Struct):
+  log_size = Default(Integer, 100*MB)
+  backups = Default(Integer, 5)
+
+
+LoggerMode = Enum('standard', 'rotate')
+
+
+class Logger(Struct):
+  mode = Required(LoggerMode)
+  rotate = RotatePolicy
+
+
 class Process(Struct):
   cmdline = Required(String)
   name    = Required(String)
@@ -60,6 +85,7 @@ class Process(Struct):
   min_duration  = Default(Integer, 5)      # integer seconds
   final         = Default(Boolean, False)  # if this process should be a finalizing process
                                            # that should always be run after regular processes
+  logger        = Default(Logger, Empty)
 
 
 class Task(Struct):

http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/main/python/apache/thermos/core/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py
index f214bcc..8efdfdc 100644
--- a/src/main/python/apache/thermos/core/process.py
+++ b/src/main/python/apache/thermos/core/process.py
@@ -20,9 +20,11 @@ commandline in a subprocess of its own.
 
 """
 
+import errno
 import grp
 import os
 import pwd
+import select
 import signal
 import subprocess
 import sys
@@ -30,9 +32,9 @@ import time
 from abc import abstractmethod
 
 from twitter.common import log
-from twitter.common.dirutil import lock_file, safe_mkdir, safe_open
+from twitter.common.dirutil import lock_file, safe_delete, safe_mkdir, safe_open
 from twitter.common.lang import Interface
-from twitter.common.quantity import Amount, Time
+from twitter.common.quantity import Amount, Data, Time
 from twitter.common.recordio import ThriftRecordReader, ThriftRecordWriter
 
 from gen.apache.thermos.ttypes import ProcessState, ProcessStatus, RunnerCkpt
@@ -54,6 +56,17 @@ class Platform(Interface):
     pass
 
 
+class LoggerMode(object):
+  STANDARD = 'standard'
+  ROTATE = 'rotate'
+
+  _ALL_MODES = [STANDARD, ROTATE]
+
+  @staticmethod
+  def is_valid(mode):
+    return mode in LoggerMode._ALL_MODES
+
+
 class ProcessBase(object):
   """
     Encapsulate a running process for a task.
@@ -67,7 +80,8 @@ class ProcessBase(object):
   CONTROL_WAIT_CHECK_INTERVAL = Amount(100, Time.MILLISECONDS)
   MAXIMUM_CONTROL_WAIT = Amount(1, Time.MINUTES)
 
-  def __init__(self, name, cmdline, sequence, pathspec, sandbox_dir, user=None, platform=None):
+  def __init__(self, name, cmdline, sequence, pathspec, sandbox_dir, user=None, platform=None,
+               logger_mode=LoggerMode.STANDARD, rotate_log_size=None, rotate_log_backups=None):
     """
       required:
         name        = name of the process
@@ -78,8 +92,12 @@ class ProcessBase(object):
         platform    = Platform providing fork, clock, getpid
 
       optional:
-        user        = the user to run as (if unspecified, will default to current user.)
-                      if specified to a user that is not the current user, you must have
root access
+        user               = the user to run as (if unspecified, will default to current
user.)
+                             if specified to a user that is not the current user, you must
have root
+                             access
+        logger_mode        = The type of logger to use for the process.
+        rotate_log_size    = The maximum size of the rotated stdout/stderr logs.
+        rotate_log_backups = The maximum number of rotated stdout/stderr log backups.
     """
     self._name = name
     self._cmdline = cmdline
@@ -90,14 +108,24 @@ class ProcessBase(object):
       safe_mkdir(self._sandbox)
     self._pid = None
     self._fork_time = None
-    self._stdout = None
-    self._stderr = None
     self._user = user
     self._ckpt = None
     self._ckpt_head = -1
     if platform is None:
       raise ValueError("Platform must be specified")
     self._platform = platform
+    self._logger_mode = logger_mode
+    self._rotate_log_size = rotate_log_size
+    self._rotate_log_backups = rotate_log_backups
+
+    if not LoggerMode.is_valid(self._logger_mode):
+      raise ValueError("Logger mode %s is invalid." % self._logger_mode)
+
+    if self._logger_mode == LoggerMode.ROTATE:
+      if self._rotate_log_size.as_(Data.BYTES) <= 0:
+        raise ValueError('Log size cannot be less than one byte.')
+      if self._rotate_log_backups <= 0:
+        raise ValueError('Log backups cannot be less than one.')
 
   def _log(self, msg):
     log.debug('[process:%5s=%s]: %s' % (self._pid, self.name(), msg))
@@ -151,6 +179,9 @@ class ProcessBase(object):
   def ckpt_file(self):
     return self._pathspec.getpath('process_checkpoint')
 
+  def process_logdir(self):
+    return self._pathspec.getpath('process_logdir')
+
   def _setup_ckpt(self):
     """Set up the checkpoint: must be run on the parent."""
     self._log('initializing checkpoint file: %s' % self.ckpt_file())
@@ -207,11 +238,9 @@ class ProcessBase(object):
         raise self.PermissionError('Must be root to run processes as other users!')
     self._fork_time = self._platform.clock().time()
     self._setup_ckpt()
-    self._stdout = safe_open(self._pathspec.with_filename('stdout').getpath('process_logdir'),
"a")
-    self._stderr = safe_open(self._pathspec.with_filename('stderr').getpath('process_logdir'),
"a")
-    uid, gid = user.pw_uid, user.pw_gid
-    os.chown(self._stdout.name, uid, gid)
-    os.chown(self._stderr.name, uid, gid)
+    # Since the forked process is responsible for creating log files, it needs to own the
log dir.
+    safe_mkdir(self.process_logdir())
+    os.chown(self.process_logdir(), user.pw_uid, user.pw_gid)
 
   def _finalize_fork(self):
     self._write_initial_update()
@@ -315,10 +344,6 @@ class Process(ProcessBase):
 
   def execute(self):
     """Perform final initialization and launch target process commandline in a subprocess."""
-    if not self._stderr:
-      raise RuntimeError('self._stderr not set up!')
-    if not self._stdout:
-      raise RuntimeError('self._stdout not set up!')
 
     user, _ = self._getpwuid()
     username, homedir = user.pw_name, user.pw_dir
@@ -348,19 +373,32 @@ class Process(ProcessBase):
     if os.path.exists(thermos_profile):
       env.update(BASH_ENV=thermos_profile)
 
-    self._popen = subprocess.Popen(["/bin/bash", "-c", self.cmdline()],
-                                   stderr=self._stderr,
-                                   stdout=self._stdout,
-                                   close_fds=self.FD_CLOEXEC,
-                                   cwd=sandbox,
-                                   env=env)
+    subprocess_args = {
+      'args': ["/bin/bash", "-c", self.cmdline()],
+      'close_fds': self.FD_CLOEXEC,
+      'cwd': sandbox,
+      'env': env,
+      'pathspec': self._pathspec
+    }
+
+    if self._logger_mode == LoggerMode.ROTATE:
+      log_size = int(self._rotate_log_size.as_(Data.BYTES))
+      self._log('Starting subprocess with log rotation. Size: %s, Backups: %s' % (
+        log_size, self._rotate_log_backups))
+      executor = LogRotatingSubprocessExecutor(max_bytes=log_size,
+                                               max_backups=self._rotate_log_backups,
+                                               **subprocess_args)
+    else:
+      self._log('Starting subprocess with no log rotation.')
+      executor = SubprocessExecutor(**subprocess_args)
+
+    pid = executor.start()
 
     self._write_process_update(state=ProcessState.RUNNING,
-                               pid=self._popen.pid,
+                               pid=pid,
                                start_time=start_time)
 
-    # wait for job to finish
-    rc = self._popen.wait()
+    rc = executor.wait()
 
     # indicate that we have finished/failed
     if rc < 0:
@@ -378,3 +416,215 @@ class Process(ProcessBase):
   def finish(self):
     self._log('Coordinator exiting.')
     sys.exit(0)
+
+
+class SubprocessExecutorBase(object):
+  """
+  Encapsulate execution of a subprocess.
+  """
+
+  def __init__(self, args, close_fds, cwd, env, pathspec):
+    """
+      required:
+        args        = The arguments to pass to the subprocess.
+        close_fds   = Close file descriptors argument to Popen.
+        cwd         = The current working directory.
+        env         = Environment variables to be passed to the subprocess.
+        pathspec    = TaskPath object for synthesizing path names.
+    """
+    self._args = args
+    self._close_fds = close_fds
+    self._cwd = cwd
+    self._env = env
+    self._pathspec = pathspec
+    self._popen = None
+
+  def _get_log_path(self, log_name):
+    return self._pathspec.with_filename(log_name).getpath('process_logdir')
+
+  def _start_subprocess(self, stderr, stdout):
+    return subprocess.Popen(self._args,
+                            stderr=stderr,
+                            stdout=stdout,
+                            close_fds=self._close_fds,
+                            cwd=self._cwd,
+                            env=self._env)
+
+  def start(self):
+    """Start the subprocess and immediately return the resulting pid."""
+    raise NotImplementedError()
+
+  def wait(self):
+    """Wait for the subprocess to finish executing and return the return code."""
+    raise NotImplementedError()
+
+
+class SubprocessExecutor(SubprocessExecutorBase):
+  """
+  Basic implementation of a SubprocessExecutor that writes stderr/stdout unconstrained log
files.
+  """
+
+  def __init__(self, args, close_fds, cwd, env, pathspec):
+    """See SubprocessExecutorBase.__init__"""
+    self._stderr = None
+    self._stdout = None
+    super(SubprocessExecutor, self).__init__(args, close_fds, cwd, env, pathspec)
+
+  def start(self):
+    self._stderr = safe_open(self._get_log_path('stderr'), 'a')
+    self._stdout = safe_open(self._get_log_path('stdout'), 'a')
+
+    self._popen = self._start_subprocess(self._stderr, self._stdout)
+    return self._popen.pid
+
+  def wait(self):
+    return self._popen.wait()
+
+
+class LogRotatingSubprocessExecutor(SubprocessExecutorBase):
+  """
+  Implementation of a SubprocessExecutor that implements log rotation for stderr/stdout.
+  """
+
+  READ_BUFFER_SIZE = 2 ** 16
+
+  def __init__(self, args, close_fds, cwd, env, pathspec, max_bytes, max_backups):
+    """
+    See SubprocessExecutorBase.__init__
+
+    Takes additional arguments:
+      max_bytes   = The maximum size of an individual log file.
+      max_backups = The maximum number of log file backups to create.
+    """
+    self._max_bytes = max_bytes
+    self._max_backups = max_backups
+    self._stderr = None
+    self._stdout = None
+    super(LogRotatingSubprocessExecutor, self).__init__(args, close_fds, cwd, env, pathspec)
+
+  def start(self):
+    self._stderr = RotatingFileHandler(self._get_log_path('stderr'),
+                                       self._max_bytes,
+                                       self._max_backups)
+    self._stdout = RotatingFileHandler(self._get_log_path('stdout'),
+                                       self._max_bytes,
+                                       self._max_backups)
+
+    self._popen = self._start_subprocess(subprocess.PIPE, subprocess.PIPE)
+    return self._popen.pid
+
+  def wait(self):
+    stdout = self._popen.stdout.fileno()
+    stderr = self._popen.stderr.fileno()
+    pipes = {
+      stderr: self._stderr,
+      stdout: self._stdout
+    }
+
+    rc = None
+    # Read until there is a return code AND both of the pipes have reached EOF.
+    while rc is None or pipes:
+      rc = self._popen.poll()
+
+      read_results, _, _ = select.select(pipes.keys(), [], [], 1)
+      for fd in read_results:
+        handler = pipes[fd]
+        buf = os.read(fd, self.READ_BUFFER_SIZE)
+
+        if len(buf) == 0:
+          del pipes[fd]
+        else:
+          handler.write(buf)
+
+    return rc
+
+
+class FileHandler(object):
+  """
+  Base file handler.
+  """
+
+  def __init__(self, filename, mode='w'):
+    """
+      required:
+        filename = The file name.
+
+      optional:
+        mode = Mode to open the file in.
+    """
+    self.file = safe_open(filename, mode=mode)
+    self.filename = filename
+    self.mode = mode
+    self.closed = False
+
+  def close(self):
+    if not self.closed:
+      self.file.close()
+      self.closed = True
+
+  def write(self, b):
+    self.file.write(b)
+    self.file.flush()
+
+
+class RotatingFileHandler(FileHandler):
+  """
+  File handler that implements max size/rotation.
+  """
+
+  def __init__(self, filename, max_bytes, max_backups, mode='w'):
+    """
+      required:
+        filename    = The file name.
+        max_bytes   = The maximum size of an individual log file.
+        max_backups = The maximum number of log file backups to create.
+
+      optional:
+        mode = Mode to open the file in.
+    """
+    if max_bytes > 0 and max_backups <= 0:
+      raise ValueError('A positive value for max_backups must be specified if max_bytes >
0.')
+    self._max_bytes = max_bytes
+    self._max_backups = max_backups
+    super(RotatingFileHandler, self).__init__(filename, mode)
+
+  def write(self, b):
+    super(RotatingFileHandler, self).write(b)
+    if self.should_rollover():
+      self.rollover()
+
+  def swap_files(self, src, tgt):
+    if os.path.exists(tgt):
+      safe_delete(tgt)
+
+    try:
+      os.rename(src, tgt)
+    except OSError as e:
+      if e.errno != errno.ENOENT:
+        raise
+
+  def make_indexed_filename(self, index):
+    return '%s.%d' % (self.filename, index)
+
+  def should_rollover(self):
+    if self._max_bytes <= 0 or self._max_backups <= 0:
+      return False
+
+    if self.file.tell() >= self._max_bytes:
+      return True
+
+    return False
+
+  def rollover(self):
+    """
+    Perform the rollover of the log.
+    """
+    self.file.close()
+    for i in range(self._max_backups - 1, 0, -1):
+      src = self.make_indexed_filename(i)
+      tgt = self.make_indexed_filename(i + 1)
+      if os.path.exists(src):
+        self.swap_files(src, tgt)
+
+    self.swap_files(self.filename, self.make_indexed_filename(1))
+    self.file = safe_open(self.filename, mode='w')

http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/main/python/apache/thermos/core/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py
index f949f27..11c06a8 100644
--- a/src/main/python/apache/thermos/core/runner.py
+++ b/src/main/python/apache/thermos/core/runner.py
@@ -47,10 +47,10 @@ import time
 import traceback
 from contextlib import contextmanager
 
-from pystachio import Environment
+from pystachio import Empty, Environment
 from twitter.common import log
 from twitter.common.dirutil import safe_mkdir
-from twitter.common.quantity import Amount, Time
+from twitter.common.quantity import Amount, Data, Time
 from twitter.common.recordio import ThriftRecordReader
 
 from apache.thermos.common.ckpt import (
@@ -70,7 +70,7 @@ from apache.thermos.config.schema import ThermosContext
 
 from .helper import TaskRunnerHelper
 from .muxer import ProcessMuxer
-from .process import Process
+from .process import LoggerMode, Process
 
 from gen.apache.thermos.ttypes import (
     ProcessState,
@@ -418,7 +418,8 @@ class TaskRunner(object):
 
   def __init__(self, task, checkpoint_root, sandbox, log_dir=None,
                task_id=None, portmap=None, user=None, chroot=False, clock=time,
-               universal_handler=None, planner_class=TaskPlanner, hostname=None):
+               universal_handler=None, planner_class=TaskPlanner, hostname=None,
+               process_logger_mode=None, rotate_log_size_mb=None, rotate_log_backups=None):
     """
       required:
         task (config.Task) = the task to run
@@ -440,6 +441,9 @@ class TaskRunner(object):
         universal_handler = checkpoint record handler (only used for testing)
         planner_class (TaskPlanner class) = TaskPlanner class to use for constructing the
task
                             planning policy.
+        process_logger_mode (string) = The type of logger to use for all processes.
+        rotate_log_size_mb (integer) = The maximum size of the rotated stdout/stderr logs
in MiB.
+        rotate_log_backups (integer) = The maximum number of rotated stdout/stderr log backups.
     """
     if not issubclass(planner_class, TaskPlanner):
       raise TypeError('planner_class must be a TaskPlanner.')
@@ -462,6 +466,9 @@ class TaskRunner(object):
     self._portmap = portmap or {}
     self._launch_time = launch_time
     self._log_dir = log_dir or os.path.join(sandbox, '.logs')
+    self._process_logger_mode = process_logger_mode
+    self._rotate_log_size_mb = rotate_log_size_mb
+    self._rotate_log_backups = rotate_log_backups
     self._pathspec = TaskPath(root=checkpoint_root, task_id=self._task_id, log_dir=self._log_dir)
     self._hostname = hostname or socket.gethostname()
     try:
@@ -687,6 +694,9 @@ class TaskRunner(object):
       if pid == 0 and self._ckpt is not None:
         self._ckpt.close()
       return pid
+
+    logger_mode, rotate_log_size, rotate_log_backups = self._build_process_logger_args(process)
+
     return Process(
       process.name().get(),
       process.cmdline().get(),
@@ -695,7 +705,36 @@ class TaskRunner(object):
       self._sandbox,
       self._user,
       chroot=self._chroot,
-      fork=close_ckpt_and_fork)
+      fork=close_ckpt_and_fork,
+      logger_mode=logger_mode,
+      rotate_log_size=rotate_log_size,
+      rotate_log_backups=rotate_log_backups)
+
+  def _build_process_logger_args(self, process):
+    """
+      Build the appropriate logging configuration based on flags + process
+      configuration settings.
+
+      If no configuration (neither flags nor process config), default to
+      "standard" mode.
+    """
+    logger = process.logger()
+    if logger is Empty:
+      if self._process_logger_mode:
+        return (
+          self._process_logger_mode,
+          Amount(self._rotate_log_size_mb, Data.MB),
+          self._rotate_log_backups
+        )
+      else:
+        return LoggerMode.STANDARD, None, None
+    else:
+      mode = logger.mode().get()
+      if mode == LoggerMode.ROTATE:
+        rotate = logger.rotate()
+        return mode, Amount(rotate.log_size().get(), Data.BYTES), rotate.backups().get()
+      else:
+        return mode, None, None
 
   def deadlocked(self, plan=None):
     """Check whether a plan is deadlocked, i.e. there are no running/runnable processes,
and the

http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/main/python/apache/thermos/runner/thermos_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/runner/thermos_runner.py b/src/main/python/apache/thermos/runner/thermos_runner.py
index bd8cf7f..a36bd2a 100644
--- a/src/main/python/apache/thermos/runner/thermos_runner.py
+++ b/src/main/python/apache/thermos/runner/thermos_runner.py
@@ -102,6 +102,30 @@ app.add_option(
          "the locally-resolved hostname.")
 
 
+app.add_option(
+    '--process_logger_mode',
+    dest='process_logger_mode',
+    type=str,
+    default=None,
+    help='The type of logger to use for all processes run by thermos.')
+
+
+app.add_option(
+    '--rotate_log_size_mb',
+    dest='rotate_log_size_mb',
+    type=int,
+    default=None,
+    help='Maximum size of the rotated stdout/stderr logs emitted by the thermos runner in
MiB.')
+
+
+app.add_option(
+    '--rotate_log_backups',
+    dest='rotate_log_backups',
+    type=int,
+    default=None,
+    help='Maximum number of rotated stdout/stderr logs emitted by the thermos runner.')
+
+
 def get_task_from_options(opts):
   tasks = ThermosConfigLoader.load_json(opts.thermos_json)
   if len(tasks.tasks()) == 0:
@@ -167,6 +191,9 @@ def proxy_main(args, opts):
       chroot=opts.chroot,
       planner_class=CappedTaskPlanner,
       hostname=opts.hostname,
+      process_logger_mode=opts.process_logger_mode,
+      rotate_log_size_mb=opts.rotate_log_size_mb,
+      rotate_log_backups=opts.rotate_log_backups
   )
 
   for sig in (signal.SIGUSR1, signal.SIGUSR2):

http://git-wip-us.apache.org/repos/asf/aurora/blob/3c33f663/src/test/python/apache/thermos/core/test_process.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/core/test_process.py b/src/test/python/apache/thermos/core/test_process.py
index 5e6ad2f..261371d 100644
--- a/src/test/python/apache/thermos/core/test_process.py
+++ b/src/test/python/apache/thermos/core/test_process.py
@@ -22,10 +22,11 @@ import mock
 import pytest
 from twitter.common.contextutil import temporary_dir
 from twitter.common.dirutil import safe_mkdir
+from twitter.common.quantity import Amount, Data
 from twitter.common.recordio import ThriftRecordReader
 
 from apache.thermos.common.path import TaskPath
-from apache.thermos.core.process import Process
+from apache.thermos.core.process import LoggerMode, LogRotatingSubprocessExecutor, Process
 
 from gen.apache.thermos.ttypes import RunnerCkpt
 
@@ -86,10 +87,7 @@ def test_simple_process():
     rc = wait_for_rc(taskpath.getpath('process_checkpoint'))
 
     assert rc == 0
-    stdout = taskpath.with_filename('stdout').getpath('process_logdir')
-    assert os.path.exists(stdout)
-    with open(stdout, 'r') as fp:
-      assert fp.read() == 'hello world\n'
+    assert_log_content(taskpath, 'stdout', 'hello world\n')
 
 
 @mock.patch('os.chown')
@@ -185,3 +183,87 @@ def test_cloexec():
 
   assert run_with_class(TestWithoutCloexec) == 0
   assert run_with_class(TestProcess) != 0
+
+
+STDERR = 'for i in {1..31};do echo "stderr" 1>&2; done;'
+STDOUT = 'for i in {1..31};do echo "stdout";done;'
+
+
+def test_log_standard():
+  with temporary_dir() as td:
+    taskpath = make_taskpath(td)
+    sandbox = setup_sandbox(td, taskpath)
+
+    script = STDERR + STDOUT
+    p = TestProcess('process', script, 0, taskpath, sandbox)
+    p.start()
+
+    rc = wait_for_rc(taskpath.getpath('process_checkpoint'))
+    assert rc == 0
+    assert_log_content(taskpath, 'stdout', 'stdout\n' * 31)
+    assert_log_content(taskpath, 'stderr', 'stderr\n' * 31)
+
+
+def test_log_rotation():
+  # During testing, read one byte at a time to make the file sizes deterministic.
+  LogRotatingSubprocessExecutor.READ_BUFFER_SIZE = 1
+
+  def assert_stderr(taskpath, solo=True):
+    if solo:
+      assert_log_content(taskpath, 'stdout', '')
+
+    assert_log_content(taskpath, 'stderr', 'stderr\n')
+    assert_log_content(taskpath, 'stderr.1', 'stderr\n' * 10)
+    assert_log_content(taskpath, 'stderr.2', 'stderr\n' * 10)
+    assert_log_dne(taskpath, 'stderr.3')
+
+  def assert_stdout(taskpath, solo=True):
+    if solo:
+      assert_log_content(taskpath, 'stderr', '')
+
+    assert_log_content(taskpath, 'stdout', 'stdout\n')
+    assert_log_content(taskpath, 'stdout.1', 'stdout\n' * 10)
+    assert_log_content(taskpath, 'stdout.2', 'stdout\n' * 10)
+    assert_log_dne(taskpath, 'stdout.3')
+
+  def assert_both(taskpath):
+    assert_stderr(taskpath, solo=False)
+    assert_stdout(taskpath, solo=False)
+
+  scenarios = [
+    (STDERR + STDOUT, assert_both),
+    (STDERR, assert_stderr),
+    (STDOUT, assert_stdout)
+  ]
+
+  for script, assertion in scenarios:
+    with temporary_dir() as td:
+      taskpath = make_taskpath(td)
+      sandbox = setup_sandbox(td, taskpath)
+
+      p = TestProcess(
+          'process',
+          script,
+          0,
+          taskpath,
+          sandbox,
+          logger_mode=LoggerMode.ROTATE,
+          rotate_log_size=Amount(70, Data.BYTES),
+          rotate_log_backups=2)
+      p.start()
+
+      rc = wait_for_rc(taskpath.getpath('process_checkpoint'))
+      assert rc == 0
+      assertion(taskpath)
+
+
+def assert_log_content(taskpath, log_name, expected_content):
+  log = taskpath.with_filename(log_name).getpath('process_logdir')
+  assert os.path.exists(log)
+  with open(log, 'r') as fp:
+    assert fp.read() == expected_content
+
+
+def assert_log_dne(taskpath, log_name):
+  log = taskpath.with_filename(log_name).getpath('process_logdir')
+  assert not os.path.exists(log)


Mime
View raw message