airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1821] Enhance default logging config by removing extra loggers
Date Fri, 22 Dec 2017 13:07:35 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 1c8972df4 -> c0dffb57c


[AIRFLOW-1821] Enhance default logging config by removing extra loggers

Closes #2793 from jgao54/logging-enhancement


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c0dffb57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c0dffb57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c0dffb57

Branch: refs/heads/master
Commit: c0dffb57c211e4dc7441de36d5671473201cd69c
Parents: 1c8972d
Author: Joy Gao <joyg@wepay.com>
Authored: Fri Dec 22 14:07:29 2017 +0100
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Fri Dec 22 14:07:29 2017 +0100

----------------------------------------------------------------------
 UPDATING.md                                     |  61 +------
 .../config_templates/airflow_local_settings.py  | 109 +++++++-----
 airflow/config_templates/default_airflow.cfg    |   8 +-
 airflow/jobs.py                                 |   2 +-
 airflow/task/__init__.py                        |  13 ++
 airflow/task/task_runner/__init__.py            |  38 +++++
 airflow/task/task_runner/base_task_runner.py    | 170 +++++++++++++++++++
 airflow/task/task_runner/bash_task_runner.py    |  39 +++++
 airflow/task_runner/__init__.py                 |  38 -----
 airflow/task_runner/base_task_runner.py         | 170 -------------------
 airflow/task_runner/bash_task_runner.py         |  39 -----
 tests/jobs.py                                   |   3 +-
 tests/utils/test_log_handlers.py                |   2 +-
 tests/www/test_views.py                         |   4 +-
 14 files changed, 336 insertions(+), 360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 2fec5e9..9c39634 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -63,66 +63,7 @@ logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
 
 The logging configuration file needs to be on the `PYTHONPATH`, for example `$AIRFLOW_HOME/config`.
This directory is loaded by default. Of course you are free to add any directory to the `PYTHONPATH`,
this might be handy when you have the config in another directory or you mount a volume in
case of Docker. 
 
-You can take the config from `airflow/config_templates/airflow_local_settings.py` as a starting
point. Copy the contents to `${AIRFLOW_HOME}/config/airflow_local_settings.py`,  and alter
the config as you like.
-
-```
-LOGGING_CONFIG = {
-    'version': 1,
-    'disable_existing_loggers': False,
-    'formatters': {
-        'airflow.task': {
-            'format': LOG_FORMAT,
-        },
-    },
-    'handlers': {
-        'console': {
-            'class': 'logging.StreamHandler',
-            'formatter': 'airflow.task',
-            'stream': 'ext://sys.stdout'
-        },
-        'file.task': {
-            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
-            'formatter': 'airflow.task',
-            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
-            'filename_template': FILENAME_TEMPLATE,
-        },
-        # When using s3 or gcs, provide a customized LOGGING_CONFIG
-        # in airflow_local_settings within your PYTHONPATH, see UPDATING.md
-        # for details
-        's3.task': {
-            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
-            'formatter': 'airflow.task',
-            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
-            's3_log_folder': S3_LOG_FOLDER,
-            'filename_template': FILENAME_TEMPLATE,
-        },
-        'gcs.task': {
-            'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
-            'formatter': 'airflow.task',
-            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
-            'gcs_log_folder': GCS_LOG_FOLDER,
-            'filename_template': FILENAME_TEMPLATE,
-        },
-    },
-    'loggers': {
-        'airflow.task': {
-            'handlers': ['file.task'],
-            'level': LOG_LEVEL,
-            'propagate': False,
-        },
-        'airflow.task_runner': {
-            'handlers': ['file.task'],
-            'level': LOG_LEVEL,
-            'propagate': True,
-        },
-        'airflow': {
-            'handlers': ['console'],
-            'level': LOG_LEVEL,
-            'propagate': False,
-        },
-    }
-}
-```
+You can take the config from `airflow/config_templates/airflow_local_settings.py` as a starting
point. Copy the contents to `${AIRFLOW_HOME}/config/airflow_local_settings.py`,  and alter
the config as you like. 
 
 If you want to customize the logging (for example, use logging rotate), you can do this by
defining one or more of the logging handles that [Python has to offer](https://docs.python.org/3/library/logging.handlers.html).
For more details about the Python logging, please refer to the [official logging documentation](https://docs.python.org/3/library/logging.html).
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/airflow/config_templates/airflow_local_settings.py
----------------------------------------------------------------------
diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py
index e5f4198..899e815 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -20,87 +20,106 @@ from airflow import configuration as conf
 # in this file instead of from airflow.cfg. Currently
 # there are other log format and level configurations in
 # settings.py and cli.py. Please see AIRFLOW-1455.
-
 LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
-LOG_FORMAT = conf.get('core', 'log_format')
+
+LOG_FORMAT = conf.get('core', 'LOG_FORMAT')
 
 BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
-PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
+
+PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')
 
 FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
+
 PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
 
+# Storage bucket url for remote logging
+# s3 buckets should start with "s3://"
+# gcs buckets should start with "gs://"
+REMOTE_BASE_LOG_FOLDER = ''
+
 DEFAULT_LOGGING_CONFIG = {
     'version': 1,
     'disable_existing_loggers': False,
     'formatters': {
-        'airflow.task': {
-            'format': LOG_FORMAT,
-        },
-        'airflow.processor': {
+        'airflow': {
             'format': LOG_FORMAT,
         },
     },
     'handlers': {
         'console': {
             'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
-            'formatter': 'airflow.task',
+            'formatter': 'airflow',
             'stream': 'sys.stdout'
         },
-        'file.task': {
+        'task': {
             'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
-            'formatter': 'airflow.task',
+            'formatter': 'airflow',
             'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
             'filename_template': FILENAME_TEMPLATE,
         },
-        'file.processor': {
+        'processor': {
             'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
-            'formatter': 'airflow.processor',
+            'formatter': 'airflow',
             'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
             'filename_template': PROCESSOR_FILENAME_TEMPLATE,
-        }
-        # When using s3 or gcs, provide a customized LOGGING_CONFIG
-        # in airflow_local_settings within your PYTHONPATH, see UPDATING.md
-        # for details
-        # 's3.task': {
-        #     'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
-        #     'formatter': 'airflow.task',
-        #     'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
-        #     's3_log_folder': S3_LOG_FOLDER,
-        #     'filename_template': FILENAME_TEMPLATE,
-        # },
-        # 'gcs.task': {
-        #     'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
-        #     'formatter': 'airflow.task',
-        #     'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
-        #     'gcs_log_folder': GCS_LOG_FOLDER,
-        #     'filename_template': FILENAME_TEMPLATE,
-        # },
+        },
     },
     'loggers': {
-        '': {
-            'handlers': ['console'],
-            'level': LOG_LEVEL
-        },
-        'airflow': {
-            'handlers': ['console'],
-            'level': LOG_LEVEL,
-            'propagate': False,
-        },
         'airflow.processor': {
-            'handlers': ['file.processor'],
+            'handlers': ['processor'],
             'level': LOG_LEVEL,
             'propagate': False,
         },
         'airflow.task': {
-            'handlers': ['file.task'],
+            'handlers': ['task'],
             'level': LOG_LEVEL,
             'propagate': False,
         },
-        'airflow.task_runner': {
-            'handlers': ['file.task'],
-            'level': LOG_LEVEL,
-            'propagate': True,
+    },
+    'root': {
+        'handlers': ['console'],
+        'level': LOG_LEVEL,
+    }
+}
+
+REMOTE_HANDLERS = {
+    's3': {
+        'task': {
+            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
+            'formatter': 'airflow',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            's3_log_folder': REMOTE_BASE_LOG_FOLDER,
+            'filename_template': FILENAME_TEMPLATE,
+        },
+        'processor': {
+            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
+            'formatter': 'airflow',
+            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
+            's3_log_folder': REMOTE_BASE_LOG_FOLDER,
+            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
+        },
+    },
+    'gcs': {
+        'task': {
+            'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
+            'formatter': 'airflow',
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
+            'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
+            'filename_template': FILENAME_TEMPLATE,
+        },
+        'processor': {
+            'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
+            'formatter': 'airflow',
+            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
+            'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
+            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
         },
     }
 }
+
+REMOTE_LOGGING = conf.get('core', 'remote_logging')
+
+if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
+        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
+elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
+        DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index 4564117..a7a3b7d 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -37,7 +37,9 @@ base_log_folder = {AIRFLOW_HOME}/logs
 
 # Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
 # must supply an Airflow connection id that provides access to the storage
-# location.
+# location. If remote_logging is set to true, see UPDATING.md for additional
+# configuration requirements.
+remote_logging = False
 remote_log_conn_id =
 encrypt_s3_logs = False
 
@@ -137,8 +139,8 @@ secure_mode = False
 unit_test_mode = False
 
 # Name of handler to read task instance logs.
-# Default to use file task handler.
-task_log_reader = file.task
+# Default to use task handler.
+task_log_reader = task
 
 # Whether to enable pickling for xcom (note that this is insecure and allows for
 # RCE exploits). This will be deprecated in Airflow 2.0 (be forced to False).

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index e1ff035..106859f 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -46,7 +46,7 @@ from airflow.exceptions import AirflowException
 from airflow.logging_config import configure_logging
 from airflow.models import DAG, DagRun
 from airflow.settings import Stats
-from airflow.task_runner import get_task_runner
+from airflow.task.task_runner import get_task_runner
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
 from airflow.utils import asciiart, timezone
 from airflow.utils.dag_processing import (AbstractDagFileProcessor,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/airflow/task/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/task/__init__.py b/airflow/task/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/airflow/task/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/airflow/task/task_runner/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/task/task_runner/__init__.py b/airflow/task/task_runner/__init__.py
new file mode 100644
index 0000000..50636d0
--- /dev/null
+++ b/airflow/task/task_runner/__init__.py
@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow import configuration
+from airflow.task.task_runner.bash_task_runner import BashTaskRunner
+from airflow.exceptions import AirflowException
+
+_TASK_RUNNER = configuration.get('core', 'TASK_RUNNER')
+
+
+def get_task_runner(local_task_job):
+    """
+    Get the task runner that can be used to run the given job.
+
+    :param local_task_job: The LocalTaskJob associated with the TaskInstance
+    that needs to be executed.
+    :type local_task_job: airflow.jobs.LocalTaskJob
+    :return: The task runner to use to run the task.
+    :rtype: airflow.task.task_runner.base_task_runner.BaseTaskRunner
+    """
+    if _TASK_RUNNER == "BashTaskRunner":
+        return BashTaskRunner(local_task_job)
+    elif _TASK_RUNNER == "CgroupTaskRunner":
+        from airflow.contrib.task_runner.cgroup_task_runner import CgroupTaskRunner
+        return CgroupTaskRunner(local_task_job)
+    else:
+        raise AirflowException("Unknown task runner type {}".format(_TASK_RUNNER))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/airflow/task/task_runner/base_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task/task_runner/base_task_runner.py b/airflow/task/task_runner/base_task_runner.py
new file mode 100644
index 0000000..664a873
--- /dev/null
+++ b/airflow/task/task_runner/base_task_runner.py
@@ -0,0 +1,170 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import unicode_literals
+
+import getpass
+import os
+import json
+import subprocess
+import threading
+
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+from airflow import configuration as conf
+from tempfile import mkstemp
+
+
+PYTHONPATH_VAR = 'PYTHONPATH'
+
+
+class BaseTaskRunner(LoggingMixin):
+    """
+    Runs Airflow task instances by invoking the `airflow run` command with raw
+    mode enabled in a subprocess.
+    """
+
+    def __init__(self, local_task_job):
+        """
+        :param local_task_job: The local task job associated with running the
+        associated task instance.
+        :type local_task_job: airflow.jobs.LocalTaskJob
+        """
+        # Pass task instance context into log handlers to setup the logger.
+        super(BaseTaskRunner, self).__init__(local_task_job.task_instance)
+        self._task_instance = local_task_job.task_instance
+
+        popen_prepend = []
+        cfg_path = None
+        if self._task_instance.run_as_user:
+            self.run_as_user = self._task_instance.run_as_user
+        else:
+            try:
+                self.run_as_user = conf.get('core', 'default_impersonation')
+            except conf.AirflowConfigException:
+                self.run_as_user = None
+
+        # Add sudo commands to change user if we need to. Needed to handle SubDagOperator
+        # case using a SequentialExecutor.
+        if self.run_as_user and (self.run_as_user != getpass.getuser()):
+            self.log.debug("Planning to run as the %s user", self.run_as_user)
+            cfg_dict = conf.as_dict(display_sensitive=True)
+            cfg_subset = {
+                'core': cfg_dict.get('core', {}),
+                'smtp': cfg_dict.get('smtp', {}),
+                'scheduler': cfg_dict.get('scheduler', {}),
+                'webserver': cfg_dict.get('webserver', {}),
+            }
+            temp_fd, cfg_path = mkstemp()
+
+            # Give ownership of file to user; only they can read and write
+            subprocess.call(
+                ['sudo', 'chown', self.run_as_user, cfg_path],
+                close_fds=True
+            )
+            subprocess.call(
+                ['sudo', 'chmod', '600', cfg_path],
+                close_fds=True
+            )
+
+            with os.fdopen(temp_fd, 'w') as temp_file:
+                json.dump(cfg_subset, temp_file)
+
+            # propagate PYTHONPATH environment variable
+            pythonpath_value = os.environ.get(PYTHONPATH_VAR, '')
+            popen_prepend = ['sudo', '-H', '-u', self.run_as_user]
+
+            if pythonpath_value:
+                popen_prepend.append('{}={}'.format(PYTHONPATH_VAR, pythonpath_value))
+
+        self._cfg_path = cfg_path
+        self._command = popen_prepend + self._task_instance.command_as_list(
+            raw=True,
+            pickle_id=local_task_job.pickle_id,
+            mark_success=local_task_job.mark_success,
+            job_id=local_task_job.id,
+            pool=local_task_job.pool,
+            cfg_path=cfg_path,
+        )
+        self.process = None
+
+    def _read_task_logs(self, stream):
+        while True:
+            line = stream.readline()
+            if isinstance(line, bytes):
+                line = line.decode('utf-8')
+            if len(line) == 0:
+                break
+            self.log.info(u'Job {}: Subtask {} %s'.format(
+                self._task_instance.job_id, self._task_instance.task_id),
+                line.rstrip('\n'))
+
+    def run_command(self, run_with, join_args=False):
+        """
+        Run the task command
+
+        :param run_with: list of tokens to run the task command with
+        E.g. ['bash', '-c']
+        :type run_with: list
+        :param join_args: whether to concatenate the list of command tokens
+        E.g. ['airflow', 'run'] vs ['airflow run']
+        :param join_args: bool
+        :return: the process that was run
+        :rtype: subprocess.Popen
+        """
+        cmd = [" ".join(self._command)] if join_args else self._command
+        full_cmd = run_with + cmd
+        self.log.info('Running: %s', full_cmd)
+        proc = subprocess.Popen(
+            full_cmd,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
+            universal_newlines=True,
+            close_fds=True,
+        )
+
+        # Start daemon thread to read subprocess logging output
+        log_reader = threading.Thread(
+            target=self._read_task_logs,
+            args=(proc.stdout,),
+        )
+        log_reader.daemon = True
+        log_reader.start()
+        return proc
+
+    def start(self):
+        """
+        Start running the task instance in a subprocess.
+        """
+        raise NotImplementedError()
+
+    def return_code(self):
+        """
+        :return: The return code associated with running the task instance or
+        None if the task is not yet done.
+        :rtype int:
+        """
+        raise NotImplementedError()
+
+    def terminate(self):
+        """
+        Kill the running task instance.
+        """
+        raise NotImplementedError()
+
+    def on_finish(self):
+        """
+        A callback that should be called when this is done running.
+        """
+        if self._cfg_path and os.path.isfile(self._cfg_path):
+            subprocess.call(['sudo', 'rm', self._cfg_path], close_fds=True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/airflow/task/task_runner/bash_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task/task_runner/bash_task_runner.py b/airflow/task/task_runner/bash_task_runner.py
new file mode 100644
index 0000000..ba0d57b
--- /dev/null
+++ b/airflow/task/task_runner/bash_task_runner.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import psutil
+
+from airflow.task.task_runner.base_task_runner import BaseTaskRunner
+from airflow.utils.helpers import kill_process_tree
+
+
+class BashTaskRunner(BaseTaskRunner):
+    """
+    Runs the raw Airflow task by invoking through the Bash shell.
+    """
+    def __init__(self, local_task_job):
+        super(BashTaskRunner, self).__init__(local_task_job)
+
+    def start(self):
+        self.process = self.run_command(['bash', '-c'], join_args=True)
+
+    def return_code(self):
+        return self.process.poll()
+
+    def terminate(self):
+        if self.process and psutil.pid_exists(self.process.pid):
+            kill_process_tree(self.log, self.process.pid)
+
+    def on_finish(self):
+        super(BashTaskRunner, self).on_finish()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/airflow/task_runner/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/__init__.py b/airflow/task_runner/__init__.py
deleted file mode 100644
index e0527cb..0000000
--- a/airflow/task_runner/__init__.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from airflow import configuration
-from airflow.task_runner.bash_task_runner import BashTaskRunner
-from airflow.exceptions import AirflowException
-
-_TASK_RUNNER = configuration.get('core', 'TASK_RUNNER')
-
-
-def get_task_runner(local_task_job):
-    """
-    Get the task runner that can be used to run the given job.
-
-    :param local_task_job: The LocalTaskJob associated with the TaskInstance
-    that needs to be executed.
-    :type local_task_job: airflow.jobs.LocalTaskJob
-    :return: The task runner to use to run the task.
-    :rtype: airflow.task_runner.base_task_runner.BaseTaskRunner
-    """
-    if _TASK_RUNNER == "BashTaskRunner":
-        return BashTaskRunner(local_task_job)
-    elif _TASK_RUNNER == "CgroupTaskRunner":
-        from airflow.contrib.task_runner.cgroup_task_runner import CgroupTaskRunner
-        return CgroupTaskRunner(local_task_job)
-    else:
-        raise AirflowException("Unknown task runner type {}".format(_TASK_RUNNER))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/airflow/task_runner/base_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py
deleted file mode 100644
index 664a873..0000000
--- a/airflow/task_runner/base_task_runner.py
+++ /dev/null
@@ -1,170 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from __future__ import unicode_literals
-
-import getpass
-import os
-import json
-import subprocess
-import threading
-
-from airflow.utils.log.logging_mixin import LoggingMixin
-
-from airflow import configuration as conf
-from tempfile import mkstemp
-
-
-PYTHONPATH_VAR = 'PYTHONPATH'
-
-
-class BaseTaskRunner(LoggingMixin):
-    """
-    Runs Airflow task instances by invoking the `airflow run` command with raw
-    mode enabled in a subprocess.
-    """
-
-    def __init__(self, local_task_job):
-        """
-        :param local_task_job: The local task job associated with running the
-        associated task instance.
-        :type local_task_job: airflow.jobs.LocalTaskJob
-        """
-        # Pass task instance context into log handlers to setup the logger.
-        super(BaseTaskRunner, self).__init__(local_task_job.task_instance)
-        self._task_instance = local_task_job.task_instance
-
-        popen_prepend = []
-        cfg_path = None
-        if self._task_instance.run_as_user:
-            self.run_as_user = self._task_instance.run_as_user
-        else:
-            try:
-                self.run_as_user = conf.get('core', 'default_impersonation')
-            except conf.AirflowConfigException:
-                self.run_as_user = None
-
-        # Add sudo commands to change user if we need to. Needed to handle SubDagOperator
-        # case using a SequentialExecutor.
-        if self.run_as_user and (self.run_as_user != getpass.getuser()):
-            self.log.debug("Planning to run as the %s user", self.run_as_user)
-            cfg_dict = conf.as_dict(display_sensitive=True)
-            cfg_subset = {
-                'core': cfg_dict.get('core', {}),
-                'smtp': cfg_dict.get('smtp', {}),
-                'scheduler': cfg_dict.get('scheduler', {}),
-                'webserver': cfg_dict.get('webserver', {}),
-            }
-            temp_fd, cfg_path = mkstemp()
-
-            # Give ownership of file to user; only they can read and write
-            subprocess.call(
-                ['sudo', 'chown', self.run_as_user, cfg_path],
-                close_fds=True
-            )
-            subprocess.call(
-                ['sudo', 'chmod', '600', cfg_path],
-                close_fds=True
-            )
-
-            with os.fdopen(temp_fd, 'w') as temp_file:
-                json.dump(cfg_subset, temp_file)
-
-            # propagate PYTHONPATH environment variable
-            pythonpath_value = os.environ.get(PYTHONPATH_VAR, '')
-            popen_prepend = ['sudo', '-H', '-u', self.run_as_user]
-
-            if pythonpath_value:
-                popen_prepend.append('{}={}'.format(PYTHONPATH_VAR, pythonpath_value))
-
-        self._cfg_path = cfg_path
-        self._command = popen_prepend + self._task_instance.command_as_list(
-            raw=True,
-            pickle_id=local_task_job.pickle_id,
-            mark_success=local_task_job.mark_success,
-            job_id=local_task_job.id,
-            pool=local_task_job.pool,
-            cfg_path=cfg_path,
-        )
-        self.process = None
-
-    def _read_task_logs(self, stream):
-        while True:
-            line = stream.readline()
-            if isinstance(line, bytes):
-                line = line.decode('utf-8')
-            if len(line) == 0:
-                break
-            self.log.info(u'Job {}: Subtask {} %s'.format(
-                self._task_instance.job_id, self._task_instance.task_id),
-                line.rstrip('\n'))
-
-    def run_command(self, run_with, join_args=False):
-        """
-        Run the task command
-
-        :param run_with: list of tokens to run the task command with
-        E.g. ['bash', '-c']
-        :type run_with: list
-        :param join_args: whether to concatenate the list of command tokens
-        E.g. ['airflow', 'run'] vs ['airflow run']
-        :param join_args: bool
-        :return: the process that was run
-        :rtype: subprocess.Popen
-        """
-        cmd = [" ".join(self._command)] if join_args else self._command
-        full_cmd = run_with + cmd
-        self.log.info('Running: %s', full_cmd)
-        proc = subprocess.Popen(
-            full_cmd,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.STDOUT,
-            universal_newlines=True,
-            close_fds=True,
-        )
-
-        # Start daemon thread to read subprocess logging output
-        log_reader = threading.Thread(
-            target=self._read_task_logs,
-            args=(proc.stdout,),
-        )
-        log_reader.daemon = True
-        log_reader.start()
-        return proc
-
-    def start(self):
-        """
-        Start running the task instance in a subprocess.
-        """
-        raise NotImplementedError()
-
-    def return_code(self):
-        """
-        :return: The return code associated with running the task instance or
-        None if the task is not yet done.
-        :rtype int:
-        """
-        raise NotImplementedError()
-
-    def terminate(self):
-        """
-        Kill the running task instance.
-        """
-        raise NotImplementedError()
-
-    def on_finish(self):
-        """
-        A callback that should be called when this is done running.
-        """
-        if self._cfg_path and os.path.isfile(self._cfg_path):
-            subprocess.call(['sudo', 'rm', self._cfg_path], close_fds=True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/airflow/task_runner/bash_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/bash_task_runner.py b/airflow/task_runner/bash_task_runner.py
deleted file mode 100644
index 109b44c..0000000
--- a/airflow/task_runner/bash_task_runner.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import psutil
-
-from airflow.task_runner.base_task_runner import BaseTaskRunner
-from airflow.utils.helpers import kill_process_tree
-
-
-class BashTaskRunner(BaseTaskRunner):
-    """
-    Runs the raw Airflow task by invoking through the Bash shell.
-    """
-    def __init__(self, local_task_job):
-        super(BashTaskRunner, self).__init__(local_task_job)
-
-    def start(self):
-        self.process = self.run_command(['bash', '-c'], join_args=True)
-
-    def return_code(self):
-        return self.process.poll()
-
-    def terminate(self):
-        if self.process and psutil.pid_exists(self.process.pid):
-            kill_process_tree(self.log, self.process.pid)
-
-    def on_finish(self):
-        super(BashTaskRunner, self).on_finish()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index bdaccab..3ddcd73 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -36,8 +36,9 @@ from airflow.jobs import BackfillJob, SchedulerJob, LocalTaskJob
 from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
-from airflow.task_runner.base_task_runner import BaseTaskRunner
+from airflow.task.task_runner.base_task_runner import BaseTaskRunner
 from airflow.utils import timezone
+
 from airflow.utils.dates import days_ago
 from airflow.utils.db import provide_session
 from airflow.utils.state import State

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/tests/utils/test_log_handlers.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py
index f64bb32..b4a9ae2 100644
--- a/tests/utils/test_log_handlers.py
+++ b/tests/utils/test_log_handlers.py
@@ -30,7 +30,7 @@ from airflow.utils.state import State
 
 DEFAULT_DATE = datetime(2016, 1, 1)
 TASK_LOGGER = 'airflow.task'
-FILE_TASK_HANDLER = 'file.task'
+FILE_TASK_HANDLER = 'task'
 
 
 class TestFileTaskLogHandler(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c0dffb57/tests/www/test_views.py
----------------------------------------------------------------------
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 0051848..017176d 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -326,9 +326,9 @@ class TestLogView(unittest.TestCase):
         configuration.load_test_config()
         logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
         current_dir = os.path.dirname(os.path.abspath(__file__))
-        logging_config['handlers']['file.task']['base_log_folder'] = os.path.normpath(
+        logging_config['handlers']['task']['base_log_folder'] = os.path.normpath(
             os.path.join(current_dir, 'test_logs'))
-        logging_config['handlers']['file.task']['filename_template'] = \
+        logging_config['handlers']['task']['filename_template'] = \
             '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number
}}.log'
 
         # Write the custom logging configuration to a file


Mime
View raw message