aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jsir...@apache.org
Subject aurora git commit: Thermos: Add ability to specify process outputs destination
Date Fri, 08 Jan 2016 16:18:21 GMT
Repository: aurora
Updated Branches:
  refs/heads/master 206a48bf8 -> 024bac9dc


Thermos: Add ability to specify process outputs destination

This patch will provide way to **optionally** specify running process outputs destination.
Implementation was built on top of https://reviews.apache.org/r/30695/

**What was changed:**

New `destination` parameter is available on global cluster level and also on each `Process`
level. Possible options are `file` (default), `stream` to parent process stdout/stderr, `mixed`
will split output to files and stream and finally `none` to discard any logs produced by running
process.

Testing Done:
Unit test coverage is provided for new functionality.

I did also manual testing with mesos/docker and I made sure that logs are being written to
expected files and also same output gets to docker daemon.

Bugs closed: AURORA-1548

Reviewed at https://reviews.apache.org/r/40922/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/024bac9d
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/024bac9d
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/024bac9d

Branch: refs/heads/master
Commit: 024bac9dcb8f37e4b31210e3a0a7aea2345a16ab
Parents: 206a48b
Author: Martin Hrabovcin <mhrabovcin@gmail.com>
Authored: Fri Jan 8 09:18:11 2016 -0700
Committer: john Sirois <jsirois@apache.org>
Committed: Fri Jan 8 09:18:11 2016 -0700

----------------------------------------------------------------------
 NEWS                                            |   3 +
 docs/configuration-reference.md                 |  25 ++-
 docs/deploying-aurora-scheduler.md              |  18 +-
 .../executor/bin/thermos_executor_main.py       |  19 +-
 .../aurora/executor/thermos_task_runner.py      |   6 +
 .../python/apache/thermos/config/schema_base.py |   6 +-
 src/main/python/apache/thermos/core/process.py  | 212 ++++++++++++++-----
 src/main/python/apache/thermos/core/runner.py   |  38 ++--
 .../apache/thermos/runner/thermos_runner.py     |  11 +-
 .../python/apache/thermos/core/test_process.py  | 169 ++++++++++++++-
 10 files changed, 422 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/024bac9d/NEWS
----------------------------------------------------------------------
diff --git a/NEWS b/NEWS
index e2b26d9..ecb26d2 100644
--- a/NEWS
+++ b/NEWS
@@ -13,6 +13,9 @@
 - Added support for jobs to specify arbitrary ZooKeeper paths for service registration. 
See
   https://github.com/apache/aurora/blob/master/docs/configuration-reference.md#announcer-objects
   for details.
+- Log destination is configurable for the thermos runner. See the configuration reference
for details
+  on how to configure destination per-process. Command line options may also be passed through
the
+  scheduler in order to configure the global default behavior.
 
 0.11.0
 ------

http://git-wip-us.apache.org/repos/asf/aurora/blob/024bac9d/docs/configuration-reference.md
----------------------------------------------------------------------
diff --git a/docs/configuration-reference.md b/docs/configuration-reference.md
index bea99a7..02fb4e6 100644
--- a/docs/configuration-reference.md
+++ b/docs/configuration-reference.md
@@ -148,21 +148,27 @@ schedule.
 
 #### logger
 
-The default behavior of Thermos is to allow stderr/stdout logs to grow unbounded. In the
event
-that you have large log volume, you may want to configure Thermos to automatically rotate
logs
+The default behavior of Thermos is to store  stderr/stdout logs in files which grow unbounded.
+In the event that you have large log volume, you may want to configure Thermos to automatically
rotate logs
 after they grow to a certain size, which can prevent your job from using more than its allocated
 disk space.
 
-A Logger union consists of a mode enum and a rotation policy. Rotation policies only apply
to
-loggers whose mode is `rotate`. The acceptable values for the LoggerMode enum are `standard`
-and `rotate`. The rotation policy applies to both stderr and stdout.
+A Logger union consists of a destination enum, a mode enum and a rotation policy.
+It's to set where the process logs should be sent using `destination`. Default
+option is `file`. Its also possible to specify `console` to get logs output
+to stdout/stderr, `none` to suppress any logs output or `both` to send logs to files and
+console output. In case of using `none` or `console` rotation attributes are ignored.
+Rotation policies only apply to loggers whose mode is `rotate`. The acceptable values
+for the LoggerMode enum are `standard` and `rotate`. The rotation policy applies to both
+stderr and stdout.
 
 By default, all processes use the `standard` LoggerMode.
 
-  **Attribute Name**  | **Type**     | **Description**
-  ------------------- | :----------: | ---------------------------------
-   **mode**           | LoggerMode   | Mode of the logger. (Required)
-   **rotate**         | RotatePolicy | An optional rotation policy.
+  **Attribute Name**  | **Type**          | **Description**
+  ------------------- | :---------------: | ---------------------------------
+   **destination**    | LoggerDestination | Destination of logs. (Default: `file`)
+   **mode**           | LoggerMode        | Mode of the logger. (Default: `standard`)
+   **rotate**         | RotatePolicy      | An optional rotation policy.
 
 A RotatePolicy describes log rotation behavior for when `mode` is set to `rotate`. It is
ignored
 otherwise.
@@ -177,6 +183,7 @@ An example process configuration is as follows:
         process = Process(
           name='process',
           logger=Logger(
+            destination=LoggerDestination('both'),
             mode=LoggerMode('rotate'),
             rotate=RotatePolicy(log_size=5*MB, backups=5)
           )

http://git-wip-us.apache.org/repos/asf/aurora/blob/024bac9d/docs/deploying-aurora-scheduler.md
----------------------------------------------------------------------
diff --git a/docs/deploying-aurora-scheduler.md b/docs/deploying-aurora-scheduler.md
index c0988e8..c62354e 100644
--- a/docs/deploying-aurora-scheduler.md
+++ b/docs/deploying-aurora-scheduler.md
@@ -15,7 +15,7 @@ machines.  This guide helps you get the scheduler set up and troubleshoot
some c
   - [Considerations for running jobs in docker](#considerations-for-running-jobs-in-docker)
   - [Security Considerations](#security-considerations)
   - [Configuring Resource Oversubscription](#configuring-resource-oversubscription)
-  - [Process Log Rotation](#process-log-rotation)
+  - [Process Logs](#process-logs)
 - [Running Aurora](#running-aurora)
   - [Maintaining an Aurora Installation](#maintaining-an-aurora-installation)
   - [Monitoring](#monitoring)
@@ -172,7 +172,21 @@ tuples. For example `-global_container_mounts=/opt/secret_keys_dir:/mnt/secret_k
 In order to correctly execute processes inside a job, the docker container must have python
2.7
 installed.
 
-### Process Log Rotation
+### Process Logs
+
+#### Log destination
+By default, Thermos will write process stdout/stderr to log files in the sandbox. Process
object configuration
+allows specifying alternate log file destinations like streamed stdout/stderr or suppression
of all log output.
+Default behavior can be configured for the entire cluster with the following flag (through
the -thermos_executor_flags
+argument to the Aurora scheduler):
+
+    --runner-logger-destination=both
+
+`both` configuration will send logs to files and stream to parent stdout/stderr outputs.
+
+See [this document](configuration-reference.md#logger) for all destination options.
+
+#### Log rotation
 By default, Thermos will not rotate the stdout/stderr logs from child processes and they
will grow
 without bound. An individual user may change this behavior via configuration on the Process
object,
 but it may also be desirable to change the default configuration for the entire cluster.

http://git-wip-us.apache.org/repos/asf/aurora/blob/024bac9d/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
index cfade22..1272693 100644
--- a/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
+++ b/src/main/python/apache/aurora/executor/bin/thermos_executor_main.py
@@ -28,7 +28,7 @@ import traceback
 from twitter.common import app, log
 from twitter.common.log.options import LogOptions
 
-from apache.aurora.config.schema.base import LoggerMode
+from apache.aurora.config.schema.base import LoggerDestination, LoggerMode
 from apache.aurora.executor.aurora_executor import AuroraExecutor
 from apache.aurora.executor.common.announcer import DefaultAnnouncerCheckerProvider
 from apache.aurora.executor.common.executor_timeout import ExecutorTimeout
@@ -55,7 +55,8 @@ LogOptions.set_simple(True)
 LogOptions.set_disk_log_level('DEBUG')
 LogOptions.set_log_dir(CWD)
 
-_LOGGER_TYPES = ', '.join(LoggerMode.VALUES)
+_LOGGER_DESTINATIONS = ', '.join(LoggerDestination.VALUES)
+_LOGGER_MODES = ', '.join(LoggerMode.VALUES)
 
 
 app.add_option(
@@ -107,11 +108,21 @@ app.add_option(
 
 
 app.add_option(
+    '--runner-logger-destination',
+    dest='runner_logger_destination',
+    type=str,
+    default='file',
+    help='The logger destination [%s] to use for all processes run by thermos.'
+      % _LOGGER_DESTINATIONS)
+
+
+app.add_option(
     '--runner-logger-mode',
     dest='runner_logger_mode',
     type=str,
     default=None,
-    help='The type of logger [%s] to use for all processes run by thermos.' % _LOGGER_TYPES)
+    help='The logger mode [%s] to use for all processes run by thermos.' % _LOGGER_MODES)
+
 
 app.add_option(
     '--runner-rotate-log-size-mb',
@@ -184,6 +195,7 @@ def initialize(options):
       dump_runner_pex(),
       checkpoint_root,
       artifact_dir=cwd_path,
+      process_logger_destination=options.runner_logger_destination,
       process_logger_mode=options.runner_logger_mode,
       rotate_log_size_mb=options.runner_rotate_log_size_mb,
       rotate_log_backups=options.runner_rotate_log_backups,
@@ -201,6 +213,7 @@ def initialize(options):
       dump_runner_pex(),
       checkpoint_root,
       artifact_dir=cwd_path,
+      process_logger_destination=options.runner_logger_destination,
       process_logger_mode=options.runner_logger_mode,
       rotate_log_size_mb=options.runner_rotate_log_size_mb,
       rotate_log_backups=options.runner_rotate_log_backups,

http://git-wip-us.apache.org/repos/asf/aurora/blob/024bac9d/src/main/python/apache/aurora/executor/thermos_task_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/thermos_task_runner.py b/src/main/python/apache/aurora/executor/thermos_task_runner.py
index c019fc9..3896e38 100644
--- a/src/main/python/apache/aurora/executor/thermos_task_runner.py
+++ b/src/main/python/apache/aurora/executor/thermos_task_runner.py
@@ -73,6 +73,7 @@ class ThermosTaskRunner(TaskRunner):
                artifact_dir=None,
                clock=time,
                hostname=None,
+               process_logger_destination=None,
                process_logger_mode=None,
                rotate_log_size_mb=None,
                rotate_log_backups=None,
@@ -104,6 +105,7 @@ class ThermosTaskRunner(TaskRunner):
     self._clock = clock
     self._artifact_dir = artifact_dir or safe_mkdtemp()
     self._hostname = hostname or socket.gethostname()
+    self._process_logger_destination = process_logger_destination
     self._process_logger_mode = process_logger_mode
     self._rotate_log_size_mb = rotate_log_size_mb
     self._rotate_log_backups = rotate_log_backups
@@ -243,6 +245,7 @@ class ThermosTaskRunner(TaskRunner):
                   task_id=self._task_id,
                   thermos_json=self._task_filename,
                   hostname=self._hostname,
+                  process_logger_destination=self._process_logger_destination,
                   process_logger_mode=self._process_logger_mode,
                   rotate_log_size_mb=self._rotate_log_size_mb,
                   rotate_log_backups=self._rotate_log_backups)
@@ -359,6 +362,7 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider):
                preemption_wait=Amount(1, Time.MINUTES),
                poll_interval=Amount(500, Time.MILLISECONDS),
                clock=time,
+               process_logger_destination=None,
                process_logger_mode=None,
                rotate_log_size_mb=None,
                rotate_log_backups=None):
@@ -371,6 +375,7 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider):
     self._poll_interval = poll_interval
     self._preemption_wait = preemption_wait
     self._task_runner_class = task_runner_class
+    self._process_logger_destination = process_logger_destination
     self._process_logger_mode = process_logger_mode
     self._rotate_log_size_mb = rotate_log_size_mb
     self._rotate_log_backups = rotate_log_backups
@@ -403,6 +408,7 @@ class DefaultThermosTaskRunnerProvider(TaskRunnerProvider):
         artifact_dir=self._artifact_dir,
         clock=self._clock,
         hostname=assigned_task.slaveHost,
+        process_logger_destination=self._process_logger_destination,
         process_logger_mode=self._process_logger_mode,
         rotate_log_size_mb=self._rotate_log_size_mb,
         rotate_log_backups=self._rotate_log_backups,

http://git-wip-us.apache.org/repos/asf/aurora/blob/024bac9d/src/main/python/apache/thermos/config/schema_base.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/config/schema_base.py b/src/main/python/apache/thermos/config/schema_base.py
index 5552108..a6768e6 100644
--- a/src/main/python/apache/thermos/config/schema_base.py
+++ b/src/main/python/apache/thermos/config/schema_base.py
@@ -62,11 +62,15 @@ class RotatePolicy(Struct):
   backups = Default(Integer, 5)
 
 
+LoggerDestination = Enum('file', 'console', 'both', 'none')
+
+
 LoggerMode = Enum('standard', 'rotate')
 
 
 class Logger(Struct):
-  mode = Required(LoggerMode)
+  destination = Default(LoggerDestination, LoggerDestination('file'))
+  mode = Default(LoggerMode, LoggerMode('standard'))
   rotate = RotatePolicy
 
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/024bac9d/src/main/python/apache/thermos/core/process.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/process.py b/src/main/python/apache/thermos/core/process.py
index 8a181b0..c343b2d 100644
--- a/src/main/python/apache/thermos/core/process.py
+++ b/src/main/python/apache/thermos/core/process.py
@@ -57,6 +57,19 @@ class Platform(Interface):
     pass
 
 
+class LoggerDestination(object):
+  FILE = 'file'
+  CONSOLE = 'console'
+  BOTH = 'both'
+  NONE = 'none'
+
+  _ALL_DESTINATIONS = [FILE, CONSOLE, BOTH, NONE]
+
+  @staticmethod
+  def is_valid(destination):
+    return destination in LoggerDestination._ALL_DESTINATIONS
+
+
 class LoggerMode(object):
   STANDARD = 'standard'
   ROTATE = 'rotate'
@@ -82,7 +95,8 @@ class ProcessBase(object):
   MAXIMUM_CONTROL_WAIT = Amount(1, Time.MINUTES)
 
   def __init__(self, name, cmdline, sequence, pathspec, sandbox_dir, user=None, platform=None,
-               logger_mode=LoggerMode.STANDARD, rotate_log_size=None, rotate_log_backups=None):
+               logger_destination=LoggerDestination.FILE, logger_mode=LoggerMode.STANDARD,
+               rotate_log_size=None, rotate_log_backups=None):
     """
       required:
         name        = name of the process
@@ -96,6 +110,7 @@ class ProcessBase(object):
         user               = the user to run as (if unspecified, will default to current
user.)
                              if specified to a user that is not the current user, you must
have root
                              access
+        logger_destination = The destination for logs output.
         logger_mode        = The type of logger to use for the process.
         rotate_log_size    = The maximum size of the rotated stdout/stderr logs.
         rotate_log_backups = The maximum number of rotated stdout/stderr log backups.
@@ -115,10 +130,14 @@ class ProcessBase(object):
     if platform is None:
       raise ValueError("Platform must be specified")
     self._platform = platform
+    self._logger_destination = logger_destination
     self._logger_mode = logger_mode
     self._rotate_log_size = rotate_log_size
     self._rotate_log_backups = rotate_log_backups
 
+    if not LoggerDestination.is_valid(self._logger_destination):
+      raise ValueError("Logger destination %s is invalid." % self._logger_destination)
+
     if not LoggerMode.is_valid(self._logger_mode):
       raise ValueError("Logger mode %s is invalid." % self._logger_mode)
 
@@ -390,16 +409,15 @@ class Process(ProcessBase):
       'pathspec': self._pathspec
     }
 
-    if self._logger_mode == LoggerMode.ROTATE:
-      log_size = int(self._rotate_log_size.as_(Data.BYTES))
-      self._log('Starting subprocess with log rotation. Size: %s, Backups: %s' % (
-        log_size, self._rotate_log_backups))
-      executor = LogRotatingSubprocessExecutor(max_bytes=log_size,
-                                               max_backups=self._rotate_log_backups,
-                                               **subprocess_args)
-    else:
-      self._log('Starting subprocess with no log rotation.')
-      executor = SubprocessExecutor(**subprocess_args)
+    log_destination_resolver = LogDestinationResolver(self._pathspec,
+                                                       destination=self._logger_destination,
+                                                       mode=self._logger_mode,
+                                                       rotate_log_size=self._rotate_log_size,
+                                                       rotate_log_backups=self._rotate_log_backups)
+    stdout, stderr = log_destination_resolver.get_handlers()
+    executor = PipedSubprocessExecutor(stdout=stdout,
+                                       stderr=stderr,
+                                       **subprocess_args)
 
     pid = executor.start()
 
@@ -448,9 +466,6 @@ class SubprocessExecutorBase(object):
     self._pathspec = pathspec
     self._popen = None
 
-  def _get_log_path(self, log_name):
-    return self._pathspec.with_filename(log_name).getpath('process_logdir')
-
   def _start_subprocess(self, stderr, stdout):
     return subprocess.Popen(self._args,
                             stderr=stderr,
@@ -468,57 +483,26 @@ class SubprocessExecutorBase(object):
     raise NotImplementedError()
 
 
-class SubprocessExecutor(SubprocessExecutorBase):
+class PipedSubprocessExecutor(SubprocessExecutorBase):
   """
-  Basic implementation of a SubprocessExecutor that writes stderr/stdout unconstrained log
files.
-  """
-
-  def __init__(self, args, close_fds, cwd, env, pathspec):
-    """See SubprocessExecutorBase.__init__"""
-    self._stderr = None
-    self._stdout = None
-    super(SubprocessExecutor, self).__init__(args, close_fds, cwd, env, pathspec)
-
-  def start(self):
-    self._stderr = safe_open(self._get_log_path('stderr'), 'a')
-    self._stdout = safe_open(self._get_log_path('stdout'), 'a')
-
-    self._popen = self._start_subprocess(self._stderr, self._stdout)
-    return self._popen.pid
-
-  def wait(self):
-    return self._popen.wait()
-
-
-class LogRotatingSubprocessExecutor(SubprocessExecutorBase):
-  """
-  Implementation of a SubprocessExecutor that implements log rotation for stderr/stdout.
+  Implementation of SubprocessExecutorBase that passes logs to provided destinations
   """
 
   READ_BUFFER_SIZE = 2 ** 16
 
-  def __init__(self, args, close_fds, cwd, env, pathspec, max_bytes, max_backups):
+  def __init__(self, args, close_fds, cwd, env, pathspec, stdout=None, stderr=None):
     """
     See SubprocessExecutorBase.__init__
 
     Takes additional arguments:
-      max_bytes   = The maximum size of an individual log file.
-      max_backups = The maximum number of log file backups to create.
+      stdout = Destination handler for stdout output. Default is /dev/null.
+      stderr = Destination handler for stderr output. Default is /dev/null.
     """
-    self._max_bytes = max_bytes
-    self._max_backups = max_backups
-    self._stderr = None
-    self._stdout = None
-    super(LogRotatingSubprocessExecutor, self).__init__(args, close_fds, cwd, env, pathspec)
+    super(PipedSubprocessExecutor, self).__init__(args, close_fds, cwd, env, pathspec)
+    self._stderr = stderr
+    self._stdout = stdout
 
   def start(self):
-    self._stderr = RotatingFileHandler(self._get_log_path('stderr'),
-                                       self._max_bytes,
-                                       self._max_backups)
-    self._stdout = RotatingFileHandler(self._get_log_path('stdout'),
-                                       self._max_bytes,
-                                       self._max_backups)
-
     self._popen = self._start_subprocess(subprocess.PIPE, subprocess.PIPE)
     return self._popen.pid
 
@@ -548,6 +532,83 @@ class LogRotatingSubprocessExecutor(SubprocessExecutorBase):
     return rc
 
 
+class LogDestinationResolver(object):
+  """
+  Resolves correct stdout/stderr destinations based on process configuration
+  """
+
+  STDOUT = 'stdout'
+  STDERR = 'stderr'
+
+  def __init__(self, pathspec, destination=LoggerDestination.FILE, mode=LoggerMode.STANDARD,
+               rotate_log_size=None, rotate_log_backups=None):
+    """
+    pathspec           = TaskPath object for synthesizing path names.
+    destination        = Log destination.
+    logger_mode        = The type of logger to use for the process.
+    rotate_log_size    = The maximum size of the rotated stdout/stderr logs.
+    rotate_log_backups = The maximum number of rotated stdout/stderr log backups.
+    """
+    self._pathspec = pathspec
+    self._destination = destination
+    self._mode = mode
+    self._rotate_log_size = rotate_log_size
+    self._rotate_log_backups = rotate_log_backups
+
+    if not LoggerDestination.is_valid(self._destination):
+      raise ValueError("Logger destination %s is invalid." % self._destination)
+
+    if not LoggerMode.is_valid(self._mode):
+      raise ValueError("Logger mode %s is invalid." % self._mode)
+
+  def get_handlers(self):
+    """
+    Creates stdout/stderr handler by provided configuration
+    """
+    return self._get_handler(self.STDOUT), self._get_handler(self.STDERR)
+
+  def _get_handler(self, name):
+    """
+    Constructs correct handler by provided configuration
+    """
+
+    # On no destination write logs to /dev/null
+    if self._destination == LoggerDestination.NONE:
+      return StreamHandler(safe_open(os.devnull, 'w'))
+
+    # Streamed logs to predefined outputs
+    if self._destination == LoggerDestination.CONSOLE:
+      return self._get_stream(name)
+
+    # Streaming AND file logs are required
+    if self._destination == LoggerDestination.BOTH:
+      return TeeHandler(self._get_stream(name), self._get_file(name))
+
+    # File only logs are required
+    return self._get_file(name)
+
+  def _get_file(self, name):
+    if self._mode == LoggerMode.STANDARD:
+      return FileHandler(self._get_log_path(name))
+    if self._mode == LoggerMode.ROTATE:
+      log_size = int(self._rotate_log_size.as_(Data.BYTES))
+      return RotatingFileHandler(self._get_log_path(name),
+                                 log_size,
+                                 self._rotate_log_backups)
+
+  def _get_stream(self, name):
+    """
+    Returns OS stream by name
+    """
+    if name == self.STDOUT:
+      return StreamHandler(sys.stdout)
+    if name == self.STDERR:
+      return StreamHandler(sys.stderr)
+
+  def _get_log_path(self, log_name):
+    return self._pathspec.with_filename(log_name).getpath('process_logdir')
+
+
 class FileHandler(object):
   """
   Base file handler.
@@ -637,3 +698,48 @@ class RotatingFileHandler(FileHandler):
 
     self.swap_files(self.filename, self.make_indexed_filename(1))
     self.file = safe_open(self.filename, mode='w')
+
+
+class StreamHandler(object):
+  """
+  Stream handler wraps stream objects and allows configuration of whether objects
+  should be closed when ending a subprocess.
+  """
+
+  def __init__(self, stream, close=False):
+    """
+    stream = Wrapped stream object.
+    """
+    self._stream = stream
+    self._close = close
+
+  def write(self, b):
+    self._stream.write(b)
+    self._stream.flush()
+
+  def close(self):
+    if self._close:
+      self._stream.close()
+
+
+class TeeHandler(object):
+  """
+  Tee handler mimicks the unix tee command and splits output between two destinations
+  """
+
+  def __init__(self, first, second):
+    """
+      required:
+        first  = First destination
+        second = Second destination
+    """
+    self._first = first
+    self._second = second
+
+  def write(self, b):
+    self._first.write(b)
+    self._second.write(b)
+
+  def close(self):
+    self._first.close()
+    self._second.close()

http://git-wip-us.apache.org/repos/asf/aurora/blob/024bac9d/src/main/python/apache/thermos/core/runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/core/runner.py b/src/main/python/apache/thermos/core/runner.py
index 5623dce..3ebf86e 100644
--- a/src/main/python/apache/thermos/core/runner.py
+++ b/src/main/python/apache/thermos/core/runner.py
@@ -70,7 +70,7 @@ from apache.thermos.config.schema import ThermosContext
 
 from .helper import TaskRunnerHelper
 from .muxer import ProcessMuxer
-from .process import LoggerMode, Process
+from .process import LoggerDestination, LoggerMode, Process
 
 from gen.apache.thermos.ttypes import (
     ProcessState,
@@ -419,7 +419,8 @@ class TaskRunner(object):
   def __init__(self, task, checkpoint_root, sandbox, log_dir=None,
                task_id=None, portmap=None, user=None, chroot=False, clock=time,
                universal_handler=None, planner_class=TaskPlanner, hostname=None,
-               process_logger_mode=None, rotate_log_size_mb=None, rotate_log_backups=None,
+               process_logger_destination=None, process_logger_mode=None,
+               rotate_log_size_mb=None, rotate_log_backups=None,
                preserve_env=False):
     """
       required:
@@ -442,7 +443,8 @@ class TaskRunner(object):
         universal_handler = checkpoint record handler (only used for testing)
         planner_class (TaskPlanner class) = TaskPlanner class to use for constructing the
task
                             planning policy.
-        process_logger_mode (string) = The type of logger to use for all processes.
+        process_logger_destination (string) = The destination of logger to use for all processes.
+        process_logger_mode (string) = The mode of logger to use for all processes.
         rotate_log_size_mb (integer) = The maximum size of the rotated stdout/stderr logs
in MiB.
         rotate_log_backups (integer) = The maximum number of rotated stdout/stderr log backups.
         preserve_env (boolean) = whether or not env variables for the runner should be in
the
@@ -469,6 +471,7 @@ class TaskRunner(object):
     self._portmap = portmap or {}
     self._launch_time = launch_time
     self._log_dir = log_dir or os.path.join(sandbox, '.logs')
+    self._process_logger_destination = process_logger_destination
     self._process_logger_mode = process_logger_mode
     self._rotate_log_size_mb = rotate_log_size_mb
     self._rotate_log_backups = rotate_log_backups
@@ -699,7 +702,10 @@ class TaskRunner(object):
         self._ckpt.close()
       return pid
 
-    logger_mode, rotate_log_size, rotate_log_backups = self._build_process_logger_args(process)
+    (logger_destination,
+     logger_mode,
+     rotate_log_size,
+     rotate_log_backups) = self._build_process_logger_args(process)
 
     return Process(
       process.name().get(),
@@ -710,6 +716,7 @@ class TaskRunner(object):
       self._user,
       chroot=self._chroot,
       fork=close_ckpt_and_fork,
+      logger_destination=logger_destination,
       logger_mode=logger_mode,
       rotate_log_size=rotate_log_size,
       rotate_log_backups=rotate_log_backups,
@@ -723,23 +730,28 @@ class TaskRunner(object):
       If no configuration (neither flags nor process config), default to
       "standard" mode.
     """
+    destination, mode, size, backups = None, None, None, None
     logger = process.logger()
     if logger is Empty:
+      if self._process_logger_destination:
+        destination = self._process_logger_destination
+      else:
+        destination = LoggerDestination.FILE
+
       if self._process_logger_mode:
-        return (
-          self._process_logger_mode,
-          Amount(self._rotate_log_size_mb, Data.MB),
-          self._rotate_log_backups
-        )
+        mode = self._process_logger_mode,
+        size = Amount(self._rotate_log_size_mb, Data.MB)
+        backups = self._rotate_log_backups
       else:
-        return LoggerMode.STANDARD, None, None
+        mode = LoggerMode.STANDARD
     else:
+      destination = logger.destination().get()
       mode = logger.mode().get()
       if mode == LoggerMode.ROTATE:
         rotate = logger.rotate()
-        return mode, Amount(rotate.log_size().get(), Data.BYTES), rotate.backups().get()
-      else:
-        return mode, None, None
+        size = Amount(rotate.log_size().get(), Data.BYTES)
+        backups = rotate.backups().get()
+    return destination, mode, size, backups
 
   def deadlocked(self, plan=None):
     """Check whether a plan is deadlocked, i.e. there are no running/runnable processes,
and the

http://git-wip-us.apache.org/repos/asf/aurora/blob/024bac9d/src/main/python/apache/thermos/runner/thermos_runner.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/runner/thermos_runner.py b/src/main/python/apache/thermos/runner/thermos_runner.py
index 3dacd45..0d06e8e 100644
--- a/src/main/python/apache/thermos/runner/thermos_runner.py
+++ b/src/main/python/apache/thermos/runner/thermos_runner.py
@@ -111,11 +111,19 @@ app.add_option(
 
 
 app.add_option(
+    '--process_logger_destination',
+    dest='process_logger_destination',
+    type=str,
+    default=None,
+    help='The destination of logger to use for all processes run by thermos.')
+
+
+app.add_option(
     '--process_logger_mode',
     dest='process_logger_mode',
     type=str,
     default=None,
-    help='The type of logger to use for all processes run by thermos.')
+    help='The logger mode to use for all processes run by thermos.')
 
 
 app.add_option(
@@ -199,6 +207,7 @@ def proxy_main(args, opts):
       chroot=opts.chroot,
       planner_class=CappedTaskPlanner,
       hostname=opts.hostname,
+      process_logger_destination=opts.process_logger_destination,
       process_logger_mode=opts.process_logger_mode,
       rotate_log_size_mb=opts.rotate_log_size_mb,
       rotate_log_backups=opts.rotate_log_backups,

http://git-wip-us.apache.org/repos/asf/aurora/blob/024bac9d/src/test/python/apache/thermos/core/test_process.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/core/test_process.py b/src/test/python/apache/thermos/core/test_process.py
index da4c494..c339c91 100644
--- a/src/test/python/apache/thermos/core/test_process.py
+++ b/src/test/python/apache/thermos/core/test_process.py
@@ -16,17 +16,28 @@ import grp
 import os
 import pwd
 import random
+import StringIO
+import sys
 import time
 
 import mock
 import pytest
-from twitter.common.contextutil import temporary_dir
+from twitter.common.contextutil import mutable_sys, temporary_dir
 from twitter.common.dirutil import safe_mkdir
 from twitter.common.quantity import Amount, Data
 from twitter.common.recordio import ThriftRecordReader
 
 from apache.thermos.common.path import TaskPath
-from apache.thermos.core.process import LoggerMode, LogRotatingSubprocessExecutor, Process
+from apache.thermos.core.process import (
+    FileHandler,
+    LogDestinationResolver,
+    LoggerDestination,
+    LoggerMode,
+    PipedSubprocessExecutor,
+    Process,
+    StreamHandler,
+    TeeHandler
+)
 
 from gen.apache.thermos.ttypes import RunnerCkpt
 
@@ -206,7 +217,7 @@ def test_log_standard():
 
 def test_log_rotation():
   # During testing, read one byte at a time to make the file sizes deterministic.
-  LogRotatingSubprocessExecutor.READ_BUFFER_SIZE = 1
+  PipedSubprocessExecutor.READ_BUFFER_SIZE = 1
 
   def assert_stderr(taskpath, solo=True):
     if solo:
@@ -290,3 +301,155 @@ def test_preserve_env(*mocks):
 
       assert rc == 0
       assert_log_content(taskpath, 'stdout', expectation + '\n')
+
+
+def test_tee_class():
+  fileout = StringIO.StringIO()
+  stdout = StringIO.StringIO()
+  tee = TeeHandler(fileout, stdout)
+
+  tee.write("TEST")
+  assert fileout.getvalue() == "TEST"
+  assert stdout.getvalue() == "TEST"
+
+  tee.write("SECOND")
+  assert fileout.getvalue() == "TESTSECOND"
+  assert stdout.getvalue() == "TESTSECOND"
+
+
+def assert_log_file_exists(taskpath, log_name):
+  log = taskpath.with_filename(log_name).getpath('process_logdir')
+  assert os.path.exists(log)
+
+
+def test_resolver_none_output():
+  with temporary_dir() as td:
+    taskpath = make_taskpath(td)
+    r = LogDestinationResolver(taskpath, destination=LoggerDestination.NONE)
+    stdout, stderr = r.get_handlers()
+    assert type(stdout) == StreamHandler
+    assert type(stderr) == StreamHandler
+
+
+def test_resolver_console_output():
+  with temporary_dir() as td:
+    taskpath = make_taskpath(td)
+    r = LogDestinationResolver(taskpath, destination=LoggerDestination.CONSOLE)
+    stdout, stderr = r.get_handlers()
+    assert type(stdout) == StreamHandler
+    assert type(stderr) == StreamHandler
+    assert stdout._stream == sys.stdout
+    assert stderr._stream == sys.stderr
+
+
+def test_resolver_file_output():
+  with temporary_dir() as td:
+    taskpath = make_taskpath(td)
+    r = LogDestinationResolver(taskpath, destination=LoggerDestination.FILE)
+    stdout, stderr = r.get_handlers()
+    assert type(stdout) == FileHandler
+    assert type(stderr) == FileHandler
+    assert_log_file_exists(taskpath, 'stdout')
+    assert_log_file_exists(taskpath, 'stderr')
+
+
+def test_resolver_both_output():
+  with temporary_dir() as td:
+    taskpath = make_taskpath(td)
+    r = LogDestinationResolver(taskpath, destination=LoggerDestination.BOTH)
+    stdout, stderr = r.get_handlers()
+    assert type(stdout) == TeeHandler
+    assert type(stderr) == TeeHandler
+    assert_log_file_exists(taskpath, 'stdout')
+    assert_log_file_exists(taskpath, 'stderr')
+
+
+def test_resolver_both_with_rotation_output():
+  with temporary_dir() as td:
+    taskpath = make_taskpath(td)
+    r = LogDestinationResolver(taskpath, destination=LoggerDestination.BOTH,
+                               mode=LoggerMode.ROTATE,
+                               rotate_log_size=Amount(70, Data.BYTES),
+                               rotate_log_backups=2)
+    stdout, stderr = r.get_handlers()
+    assert type(stdout) == TeeHandler
+    assert type(stderr) == TeeHandler
+    assert_log_file_exists(taskpath, 'stdout')
+    assert_log_file_exists(taskpath, 'stderr')
+
+
+def test_log_none():
+  with temporary_dir() as td:
+    taskpath = make_taskpath(td)
+    sandbox = setup_sandbox(td, taskpath)
+
+    p = TestProcess('process', 'echo hello world', 0, taskpath, sandbox,
+                    logger_destination=LoggerDestination.NONE)
+    p.start()
+    rc = wait_for_rc(taskpath.getpath('process_checkpoint'))
+
+    assert rc == 0
+    assert_log_dne(taskpath, 'stdout')
+    assert_log_dne(taskpath, 'stderr')
+
+
+def test_log_console():
+  with temporary_dir() as td:
+    taskpath = make_taskpath(td)
+    sandbox = setup_sandbox(td, taskpath)
+
+    # Create file stdout for capturing output. We can't use StringIO mock
+    # because TestProcess is running fork.
+    with open(os.path.join(td, 'sys_stdout'), 'w+') as stdout:
+      with open(os.path.join(td, 'sys_stderr'), 'w+') as stderr:
+        with mutable_sys():
+          sys.stdout, sys.stderr = stdout, stderr
+
+          p = TestProcess('process', 'echo hello world; echo >&2 hello stderr', 0,
+                          taskpath, sandbox, logger_destination=LoggerDestination.CONSOLE)
+          p.start()
+          rc = wait_for_rc(taskpath.getpath('process_checkpoint'))
+
+          assert rc == 0
+          # Check no log files were created in std path
+          assert_log_dne(taskpath, 'stdout')
+          assert_log_dne(taskpath, 'stderr')
+
+          # Check mock stdout
+          stdout.seek(0)
+          assert stdout.read() == 'hello world\n'
+
+          # Check mock stderr
+          stderr.seek(0)
+          assert stderr.read() == 'hello stderr\n'
+
+
+def test_log_tee():
+  with temporary_dir() as td:
+    taskpath = make_taskpath(td)
+    sandbox = setup_sandbox(td, taskpath)
+
+    # Create file stdout for capturing output. We can't use StringIO mock
+    # because TestProcess is running fork.
+    with open(os.path.join(td, 'sys_stdout'), 'w+') as stdout:
+      with open(os.path.join(td, 'sys_stderr'), 'w+') as stderr:
+        with mutable_sys():
+          sys.stdout, sys.stderr = stdout, stderr
+
+          p = TestProcess('process', 'echo hello world; echo >&2 hello stderr', 0,
+                          taskpath, sandbox, logger_destination=LoggerDestination.BOTH)
+          p.start()
+          rc = wait_for_rc(taskpath.getpath('process_checkpoint'))
+
+          assert rc == 0
+          # Check log files were created in std path with correct content
+          assert_log_content(taskpath, 'stdout', 'hello world\n')
+          assert_log_content(taskpath, 'stderr', 'hello stderr\n')
+
+          # Check mock stdout
+          stdout.seek(0)
+          assert stdout.read() == 'hello world\n'
+
+          # Check mock stderr
+          stderr.seek(0)
+          assert stderr.read() == 'hello stderr\n'


Mime
View raw message