airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1776] Capture stdout and stderr for logging
Date Wed, 01 Nov 2017 22:40:12 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-9-test 6ecdac701 -> 590d9fef7


[AIRFLOW-1776] Capture stdout and stderr for logging

The new logging framework was not properly
capturing stdout/stderr
output. Redirection the the correct logging
facility is required.

Closes #2745 from bolkedebruin/redirect_std

(cherry picked from commit 5b06b6666296abc8ade524ff5d438ab1210d2938)
Signed-off-by: Bolke de Bruin <bolke@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/590d9fef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/590d9fef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/590d9fef

Branch: refs/heads/v1-9-test
Commit: 590d9fef706d9d7a734e3b748c39ae1fee52ced2
Parents: 6ecdac7
Author: Bolke de Bruin <bolke@xs4all.nl>
Authored: Wed Nov 1 23:39:52 2017 +0100
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Wed Nov 1 23:40:06 2017 +0100

----------------------------------------------------------------------
 airflow/bin/cli.py                              | 104 ++++++++++---------
 .../config_templates/airflow_local_settings.py  |  18 ++--
 airflow/jobs.py                                 |  13 ++-
 airflow/models.py                               |   2 +
 airflow/utils/log/logging_mixin.py              |  68 ++++++++++++
 tests/utils/test_logging_mixin.py               |  51 +++++++++
 6 files changed, 197 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/590d9fef/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 3943aeb..91ec49b 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -54,7 +54,7 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
 
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
 from airflow.utils import db as db_utils
-from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin, redirect_stderr, redirect_stdout
 from airflow.www.app import cached_app
 
 from sqlalchemy import func
@@ -327,6 +327,7 @@ def run(args, dag=None):
     # Disable connection pooling to reduce the # of connections on the DB
     # while it's waiting for the task to finish.
     settings.configure_orm(disable_connection_pool=True)
+
     db_utils.pessimistic_connection_handling()
     if dag:
         args.dag_id = dag.dag_id
@@ -377,56 +378,57 @@ def run(args, dag=None):
     hostname = socket.getfqdn()
     log.info("Running on host %s", hostname)
 
-    if args.local:
-        run_job = jobs.LocalTaskJob(
-            task_instance=ti,
-            mark_success=args.mark_success,
-            pickle_id=args.pickle,
-            ignore_all_deps=args.ignore_all_dependencies,
-            ignore_depends_on_past=args.ignore_depends_on_past,
-            ignore_task_deps=args.ignore_dependencies,
-            ignore_ti_state=args.force,
-            pool=args.pool)
-        run_job.run()
-    elif args.raw:
-        ti._run_raw_task(
-            mark_success=args.mark_success,
-            job_id=args.job_id,
-            pool=args.pool,
-        )
-    else:
-        pickle_id = None
-        if args.ship_dag:
-            try:
-                # Running remotely, so pickling the DAG
-                session = settings.Session()
-                pickle = DagPickle(dag)
-                session.add(pickle)
-                session.commit()
-                pickle_id = pickle.id
-                # TODO: This should be written to a log
-                print((
-                          'Pickled dag {dag} '
-                          'as pickle_id:{pickle_id}').format(**locals()))
-            except Exception as e:
-                print('Could not pickle the DAG')
-                print(e)
-                raise e
-
-        executor = GetDefaultExecutor()
-        executor.start()
-        print("Sending to executor.")
-        executor.queue_task_instance(
-            ti,
-            mark_success=args.mark_success,
-            pickle_id=pickle_id,
-            ignore_all_deps=args.ignore_all_dependencies,
-            ignore_depends_on_past=args.ignore_depends_on_past,
-            ignore_task_deps=args.ignore_dependencies,
-            ignore_ti_state=args.force,
-            pool=args.pool)
-        executor.heartbeat()
-        executor.end()
+    with redirect_stdout(log, logging.INFO), redirect_stderr(log, logging.WARN):
+        if args.local:
+            run_job = jobs.LocalTaskJob(
+                task_instance=ti,
+                mark_success=args.mark_success,
+                pickle_id=args.pickle,
+                ignore_all_deps=args.ignore_all_dependencies,
+                ignore_depends_on_past=args.ignore_depends_on_past,
+                ignore_task_deps=args.ignore_dependencies,
+                ignore_ti_state=args.force,
+                pool=args.pool)
+            run_job.run()
+        elif args.raw:
+            ti._run_raw_task(
+                mark_success=args.mark_success,
+                job_id=args.job_id,
+                pool=args.pool,
+            )
+        else:
+            pickle_id = None
+            if args.ship_dag:
+                try:
+                    # Running remotely, so pickling the DAG
+                    session = settings.Session()
+                    pickle = DagPickle(dag)
+                    session.add(pickle)
+                    session.commit()
+                    pickle_id = pickle.id
+                    # TODO: This should be written to a log
+                    print((
+                              'Pickled dag {dag} '
+                              'as pickle_id:{pickle_id}').format(**locals()))
+                except Exception as e:
+                    print('Could not pickle the DAG')
+                    print(e)
+                    raise e
+
+            executor = GetDefaultExecutor()
+            executor.start()
+            print("Sending to executor.")
+            executor.queue_task_instance(
+                ti,
+                mark_success=args.mark_success,
+                pickle_id=pickle_id,
+                ignore_all_deps=args.ignore_all_dependencies,
+                ignore_depends_on_past=args.ignore_depends_on_past,
+                ignore_task_deps=args.ignore_dependencies,
+                ignore_ti_state=args.force,
+                pool=args.pool)
+            executor.heartbeat()
+            executor.end()
 
     # Child processes should not flush or upload to remote
     if args.raw:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/590d9fef/airflow/config_templates/airflow_local_settings.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index 28c263e..020df8d 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -78,11 +78,20 @@ DEFAULT_LOGGING_CONFIG = {
         # },
     },
     'loggers': {
-        'airflow.processor' : {
-            'handlers': ['file.processor'],
+        '': {
+            'handlers': ['console'],
+            'level': LOG_LEVEL
+        },
+        'airflow': {
+            'handlers': ['console'],
             'level': LOG_LEVEL,
             'propagate': False,
         },
+        'airflow.processor': {
+            'handlers': ['file.processor'],
+            'level': LOG_LEVEL,
+            'propagate': True,
+        },
         'airflow.task': {
             'handlers': ['file.task'],
             'level': LOG_LEVEL,
@@ -93,10 +102,5 @@ DEFAULT_LOGGING_CONFIG = {
             'level': LOG_LEVEL,
             'propagate': True,
         },
-        'airflow': {
-            'handlers': ['console'],
-            'level': LOG_LEVEL,
-            'propagate': False,
-        },
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/590d9fef/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 7a7e564..aef28f1 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -54,7 +54,7 @@ from airflow.utils.dag_processing import (AbstractDagFileProcessor,
                                           list_py_file_paths)
 from airflow.utils.db import provide_session, pessimistic_connection_handling
 from airflow.utils.email import send_email
-from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter
 from airflow.utils.state import State
 
 Base = models.Base
@@ -344,6 +344,10 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
         def helper():
             # This helper runs in the newly created process
             log = logging.getLogger("airflow.processor")
+
+            stdout = StreamLogWriter(log, logging.INFO)
+            stderr = StreamLogWriter(log, logging.WARN)
+
             for handler in log.handlers:
                 try:
                     handler.set_context(file_path)
@@ -353,6 +357,10 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
                     pass
 
             try:
+                # redirect stdout/stderr to log
+                sys.stdout = stdout
+                sys.stderr = stderr
+
                 # Re-configure the ORM engine as there are issues with multiple processes
                 settings.configure_orm()
 
@@ -376,6 +384,9 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
                 # Log exceptions through the logging framework.
                 log.exception("Got an exception! Propagating...")
                 raise
+            finally:
+                sys.stdout = sys.__stdout__
+                sys.stderr = sys.__stderr__
 
         p = multiprocessing.Process(target=helper,
                                     args=(),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/590d9fef/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index e5bf857..4d11c5b 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -34,6 +34,7 @@ import itertools
 import zipfile
 import jinja2
 import json
+import logging
 import os
 import pickle
 import re
@@ -798,6 +799,7 @@ class TaskInstance(Base, LoggingMixin):
             self.state = state
         self.hostname = ''
         self.init_on_load()
+        self._log = logging.getLogger("airflow.task")
 
     @reconstructor
     def init_on_load(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/590d9fef/airflow/utils/log/logging_mixin.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py
index a3aad5b..c12bb8b 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -18,8 +18,11 @@ from __future__ import print_function
 from __future__ import unicode_literals
 
 import logging
+import sys
 import warnings
+
 from builtins import object
+from contextlib import contextmanager
 
 
 class LoggingMixin(object):
@@ -59,3 +62,68 @@ class LoggingMixin(object):
                 handler.set_context(task_instance)
             except AttributeError:
                 pass
+
+
+class StreamLogWriter(object):
+    encoding = False
+
+    """
+    Allows to redirect stdout and stderr to logger
+    """
+    def __init__(self, logger, level):
+        """
+        :param log: The log level method to write to, ie. log.debug, log.warning
+        :return:
+        """
+        self.logger = logger
+        self.level = level
+        self._buffer = str()
+
+    def write(self, message):
+        """
+        Do whatever it takes to actually log the specified logging record
+        :param message: message to log
+        """
+        if not message.endswith("\n"):
+            self._buffer += message
+        else:
+            self._buffer += message
+            self.logger.log(self.level, self._buffer)
+            self._buffer = str()
+
+    def flush(self):
+        """
+        Ensure all logging output has been flushed
+        """
+        if len(self._buffer) > 0:
+            self.logger.log(self.level, self._buffer)
+            self._buffer = str()
+
+    def isatty(self):
+        """
+        Returns False to indicate the fd is not connected to a tty(-like) device.
+        For compatibility reasons.
+        """
+        return False
+
+
+@contextmanager
+def redirect_stdout(logger, level):
+    writer = StreamLogWriter(logger, level)
+    try:
+        sys.stdout = writer
+        yield
+    finally:
+        sys.stdout = sys.__stdout__
+
+
+@contextmanager
+def redirect_stderr(logger, level):
+    writer = StreamLogWriter(logger, level)
+    try:
+        sys.stderr = writer
+        yield
+    finally:
+        sys.stderr = sys.__stderr__
+
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/590d9fef/tests/utils/test_logging_mixin.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_logging_mixin.py b/tests/utils/test_logging_mixin.py
index bf9e225..52d8b45 100644
--- a/tests/utils/test_logging_mixin.py
+++ b/tests/utils/test_logging_mixin.py
@@ -12,10 +12,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import mock
 import unittest
 import warnings
 
 from airflow.operators.bash_operator import BashOperator
+from airflow.utils.log.logging_mixin import StreamLogWriter
 from tests.test_utils.reset_warning_registry import reset_warning_registry
 
 
@@ -48,3 +50,52 @@ class TestLoggingMixin(unittest.TestCase):
 
     def tearDown(self):
         warnings.resetwarnings()
+
+
+class TestStreamLogWriter(unittest.TestCase):
+    def test_write(self):
+        logger = mock.MagicMock()
+        logger.log = mock.MagicMock()
+
+        log = StreamLogWriter(logger, 1)
+
+        msg = "test_message"
+        log.write(msg)
+
+        self.assertEqual(log._buffer, msg)
+
+        log.write("\n")
+        logger.log.assert_called_once_with(1, msg + "\n")
+
+        self.assertEqual(log._buffer, "")
+
+    def test_flush(self):
+        logger = mock.MagicMock()
+        logger.log = mock.MagicMock()
+
+        log = StreamLogWriter(logger, 1)
+
+        msg = "test_message"
+
+        log.write(msg)
+        self.assertEqual(log._buffer, msg)
+
+        log.flush()
+        logger.log.assert_called_once_with(1, msg)
+
+        self.assertEqual(log._buffer, "")
+
+    def test_isatty(self):
+        logger = mock.MagicMock()
+        logger.log = mock.MagicMock()
+
+        log = StreamLogWriter(logger, 1)
+        self.assertFalse(log.isatty())
+
+    def test_encoding(self):
+        logger = mock.MagicMock()
+        logger.log = mock.MagicMock()
+
+        log = StreamLogWriter(logger, 1)
+        self.assertFalse(log.encoding)
+


Mime
View raw message