airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [3/4] incubator-airflow git commit: [AIRFLOW-1604] Rename logger to log
Date Tue, 19 Sep 2017 08:17:32 GMT
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/sftp_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sftp_operator.py b/airflow/contrib/operators/sftp_operator.py
index 5abfc51..44ea66d 100644
--- a/airflow/contrib/operators/sftp_operator.py
+++ b/airflow/contrib/operators/sftp_operator.py
@@ -81,12 +81,12 @@ class SFTPOperator(BaseOperator):
             if self.operation.lower() == SFTPOperation.GET:
                 file_msg = "from {0} to {1}".format(self.remote_filepath,
                                                     self.local_filepath)
-                self.logger.debug("Starting to transfer %s", file_msg)
+                self.log.debug("Starting to transfer %s", file_msg)
                 sftp_client.get(self.remote_filepath, self.local_filepath)
             else:
                 file_msg = "from {0} to {1}".format(self.local_filepath,
                                                     self.remote_filepath)
-                self.logger.debug("Starting to transfer file %s", file_msg)
+                self.log.debug("Starting to transfer file %s", file_msg)
                 sftp_client.put(self.local_filepath, self.remote_filepath)
 
         except Exception as e:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/vertica_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_operator.py b/airflow/contrib/operators/vertica_operator.py
index fc9cf3b..7a319f2 100644
--- a/airflow/contrib/operators/vertica_operator.py
+++ b/airflow/contrib/operators/vertica_operator.py
@@ -39,6 +39,6 @@ class VerticaOperator(BaseOperator):
         self.sql = sql
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         hook = VerticaHook(vertica_conn_id=self.vertica_conn_id)
         hook.run(self.sql)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/operators/vertica_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py
index 35ff3c6..5e769fc 100644
--- a/airflow/contrib/operators/vertica_to_hive.py
+++ b/airflow/contrib/operators/vertica_to_hive.py
@@ -103,7 +103,7 @@ class VerticaToHiveTransfer(BaseOperator):
         hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
         vertica = VerticaHook(vertica_conn_id=self.vertica_conn_id)
 
-        self.logger.info("Dumping Vertica query results to local file")
+        self.log.info("Dumping Vertica query results to local file")
         conn = vertica.get_conn()
         cursor = conn.cursor()
         cursor.execute(self.sql)
@@ -119,7 +119,7 @@ class VerticaToHiveTransfer(BaseOperator):
             f.flush()
             cursor.close()
             conn.close()
-            self.logger.info("Loading file into Hive")
+            self.log.info("Loading file into Hive")
             hive.load_file(
                 f.name,
                 self.hive_table,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/bigquery_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py
index 630cebe..90a3264 100644
--- a/airflow/contrib/sensors/bigquery_sensor.py
+++ b/airflow/contrib/sensors/bigquery_sensor.py
@@ -59,7 +59,7 @@ class BigQueryTableSensor(BaseSensorOperator):
 
     def poke(self, context):
         table_uri = '{0}:{1}.{2}'.format(self.project_id, self.dataset_id, self.table_id)
-        self.logger.info('Sensor checks existence of table: %s', table_uri)
+        self.log.info('Sensor checks existence of table: %s', table_uri)
         hook = BigQueryHook(
             bigquery_conn_id=self.bigquery_conn_id,
             delegate_to=self.delegate_to)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/datadog_sensor.py b/airflow/contrib/sensors/datadog_sensor.py
index 4ee45f9..e1a9169 100644
--- a/airflow/contrib/sensors/datadog_sensor.py
+++ b/airflow/contrib/sensors/datadog_sensor.py
@@ -67,7 +67,7 @@ class DatadogSensor(BaseSensorOperator):
             tags=self.tags)
 
         if isinstance(response, dict) and response.get('status', 'ok') != 'ok':
-            self.logger.error("Unexpected Datadog result: %s", response)
+            self.log.error("Unexpected Datadog result: %s", response)
             raise AirflowException("Datadog returned unexpected result")
 
         if self.response_check:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py
index 034fcb6..3ecaa42 100644
--- a/airflow/contrib/sensors/emr_base_sensor.py
+++ b/airflow/contrib/sensors/emr_base_sensor.py
@@ -36,11 +36,11 @@ class EmrBaseSensor(BaseSensorOperator):
         response = self.get_emr_response()
 
         if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
-            self.logger.info('Bad HTTP response: %s', response)
+            self.log.info('Bad HTTP response: %s', response)
             return False
 
         state = self.state_from_response(response)
-        self.logger.info('Job flow currently %s', state)
+        self.log.info('Job flow currently %s', state)
 
         if state in self.NON_TERMINAL_STATES:
             return False

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py
index e5610a1..87b65c8 100644
--- a/airflow/contrib/sensors/emr_job_flow_sensor.py
+++ b/airflow/contrib/sensors/emr_job_flow_sensor.py
@@ -41,7 +41,7 @@ class EmrJobFlowSensor(EmrBaseSensor):
     def get_emr_response(self):
         emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
 
-        self.logger.info('Poking cluster %s', self.job_flow_id)
+        self.log.info('Poking cluster %s', self.job_flow_id)
         return emr.describe_cluster(ClusterId=self.job_flow_id)
 
     def state_from_response(self, response):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py
index e131d77..003d2d1 100644
--- a/airflow/contrib/sensors/emr_step_sensor.py
+++ b/airflow/contrib/sensors/emr_step_sensor.py
@@ -45,7 +45,7 @@ class EmrStepSensor(EmrBaseSensor):
     def get_emr_response(self):
         emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
 
-        self.logger.info('Poking step %s on cluster %s', self.step_id, self.job_flow_id)
+        self.log.info('Poking step %s on cluster %s', self.step_id, self.job_flow_id)
         return emr.describe_step(ClusterId=self.job_flow_id, StepId=self.step_id)
 
     def state_from_response(self, response):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/ftp_sensor.py b/airflow/contrib/sensors/ftp_sensor.py
index 2e604e9..bd66c32 100644
--- a/airflow/contrib/sensors/ftp_sensor.py
+++ b/airflow/contrib/sensors/ftp_sensor.py
@@ -42,7 +42,7 @@ class FTPSensor(BaseSensorOperator):
 
     def poke(self, context):
         with self._create_hook() as hook:
-            self.logger.info('Poking for %s', self.path)
+            self.log.info('Poking for %s', self.path)
             try:
                 hook.get_mod_time(self.path)
             except ftplib.error_perm as e:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/gcs_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/gcs_sensor.py b/airflow/contrib/sensors/gcs_sensor.py
index 800c1bd..384e26f 100644
--- a/airflow/contrib/sensors/gcs_sensor.py
+++ b/airflow/contrib/sensors/gcs_sensor.py
@@ -54,7 +54,7 @@ class GoogleCloudStorageObjectSensor(BaseSensorOperator):
         self.delegate_to = delegate_to
 
     def poke(self, context):
-        self.logger.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+        self.log.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
         hook = GoogleCloudStorageHook(
             google_cloud_storage_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to)
@@ -116,7 +116,7 @@ class GoogleCloudStorageObjectUpdatedSensor(BaseSensorOperator):
         self.delegate_to = delegate_to
 
     def poke(self, context):
-        self.logger.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
+        self.log.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
         hook = GoogleCloudStorageHook(
             google_cloud_storage_conn_id=self.google_cloud_conn_id,
             delegate_to=self.delegate_to)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/hdfs_sensors.py b/airflow/contrib/sensors/hdfs_sensors.py
index 11e8b07..1893f01 100644
--- a/airflow/contrib/sensors/hdfs_sensors.py
+++ b/airflow/contrib/sensors/hdfs_sensors.py
@@ -28,7 +28,7 @@ class HdfsSensorRegex(HdfsSensor):
         :return: Bool depending on the search criteria
         """
         sb = self.hook(self.hdfs_conn_id).get_conn()
-        self.logger.info(
+        self.log.info(
             'Poking for {self.filepath} to be a directory with files matching {self.regex.pattern}'.format(**locals())
         )
         result = [f for f in sb.ls([self.filepath], include_toplevel=False) if
@@ -56,10 +56,10 @@ class HdfsSensorFolder(HdfsSensor):
         result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
         result = self.filter_for_filesize(result, self.file_size)
         if self.be_empty:
-            self.logger.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals()))
+            self.log.info('Poking for filepath {self.filepath} to a empty directory'.format(**locals()))
             return len(result) == 1 and result[0]['path'] == self.filepath
         else:
-            self.logger.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
+            self.log.info('Poking for filepath {self.filepath} to a non empty directory'.format(**locals()))
             result.pop(0)
             return bool(result) and result[0]['file_type'] == 'f'
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/jira_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/jira_sensor.py b/airflow/contrib/sensors/jira_sensor.py
index 4cbc676..1dc7b50 100644
--- a/airflow/contrib/sensors/jira_sensor.py
+++ b/airflow/contrib/sensors/jira_sensor.py
@@ -97,7 +97,7 @@ class JiraTicketSensor(JiraSensor):
                                                *args, **kwargs)
 
     def poke(self, context):
-        self.logger.info('Jira Sensor checking for change in ticket: %s', self.ticket_id)
+        self.log.info('Jira Sensor checking for change in ticket: %s', self.ticket_id)
 
         self.jira_operator.method_name = "issue"
         self.jira_operator.jira_method_args = {
@@ -123,19 +123,19 @@ class JiraTicketSensor(JiraSensor):
                             and getattr(field_value, 'name'):
                         result = self.expected_value.lower() == field_value.name.lower()
                     else:
-                        self.logger.warning(
+                        self.log.warning(
                             "Not implemented checker for issue field %s which "
                             "is neither string nor list nor Jira Resource",
                             self.field
                         )
 
         except JIRAError as jira_error:
-            self.logger.error("Jira error while checking with expected value: %s", jira_error)
+            self.log.error("Jira error while checking with expected value: %s", jira_error)
         except Exception as e:
-            self.logger.error("Error while checking with expected value %s:", self.expected_value)
-            self.logger.exception(e)
+            self.log.error("Error while checking with expected value %s:", self.expected_value)
+            self.log.exception(e)
         if result is True:
-            self.logger.info("Issue field %s has expected value %s, returning success", self.field, self.expected_value)
+            self.log.info("Issue field %s has expected value %s, returning success", self.field, self.expected_value)
         else:
-            self.logger.info("Issue field %s don't have expected value %s yet.", self.field, self.expected_value)
+            self.log.info("Issue field %s don't have expected value %s yet.", self.field, self.expected_value)
         return result

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/redis_key_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/redis_key_sensor.py b/airflow/contrib/sensors/redis_key_sensor.py
index 220d766..6cc314b 100644
--- a/airflow/contrib/sensors/redis_key_sensor.py
+++ b/airflow/contrib/sensors/redis_key_sensor.py
@@ -38,5 +38,5 @@ class RedisKeySensor(BaseSensorOperator):
         self.key = key
 
     def poke(self, context):
-        self.logger.info('Sensor check existence of key: %s', self.key)
+        self.log.info('Sensor check existence of key: %s', self.key)
         return RedisHook(self.redis_conn_id).key_exists(self.key)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/sensors/wasb_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/wasb_sensor.py b/airflow/contrib/sensors/wasb_sensor.py
index 1a54e12..4295a25 100644
--- a/airflow/contrib/sensors/wasb_sensor.py
+++ b/airflow/contrib/sensors/wasb_sensor.py
@@ -47,7 +47,7 @@ class WasbBlobSensor(BaseSensorOperator):
         self.check_options = check_options
 
     def poke(self, context):
-        self.logger.info(
+        self.log.info(
             'Poking for blob: {self.blob_name}\n'
             'in wasb://{self.container_name}'.format(**locals())
         )
@@ -85,7 +85,7 @@ class WasbPrefixSensor(BaseSensorOperator):
         self.check_options = check_options
 
     def poke(self, context):
-        self.logger.info(
+        self.log.info(
             'Poking for prefix: {self.prefix}\n'
             'in wasb://{self.container_name}'.format(**locals())
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/contrib/task_runner/cgroup_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/task_runner/cgroup_task_runner.py b/airflow/contrib/task_runner/cgroup_task_runner.py
index 5d2518d..0022fd6 100644
--- a/airflow/contrib/task_runner/cgroup_task_runner.py
+++ b/airflow/contrib/task_runner/cgroup_task_runner.py
@@ -72,10 +72,10 @@ class CgroupTaskRunner(BaseTaskRunner):
         for path_element in path_split:
             name_to_node = {x.name: x for x in node.children}
             if path_element not in name_to_node:
-                self.logger.debug("Creating cgroup %s in %s", path_element, node.path)
+                self.log.debug("Creating cgroup %s in %s", path_element, node.path)
                 node = node.create_cgroup(path_element)
             else:
-                self.logger.debug(
+                self.log.debug(
                     "Not creating cgroup %s in %s since it already exists",
                     path_element, node.path
                 )
@@ -94,20 +94,20 @@ class CgroupTaskRunner(BaseTaskRunner):
         for path_element in path_split:
             name_to_node = {x.name: x for x in node.children}
             if path_element not in name_to_node:
-                self.logger.warning("Cgroup does not exist: %s", path)
+                self.log.warning("Cgroup does not exist: %s", path)
                 return
             else:
                 node = name_to_node[path_element]
         # node is now the leaf node
         parent = node.parent
-        self.logger.debug("Deleting cgroup %s/%s", parent, node.name)
+        self.log.debug("Deleting cgroup %s/%s", parent, node.name)
         parent.delete_cgroup(node.name)
 
     def start(self):
         # Use bash if it's already in a cgroup
         cgroups = self._get_cgroup_names()
         if cgroups["cpu"] != "/" or cgroups["memory"] != "/":
-            self.logger.debug(
+            self.log.debug(
                 "Already running in a cgroup (cpu: %s memory: %s) so not creating another one",
                 cgroups.get("cpu"), cgroups.get("memory")
             )
@@ -133,7 +133,7 @@ class CgroupTaskRunner(BaseTaskRunner):
         mem_cgroup_node = self._create_cgroup(self.mem_cgroup_name)
         self._created_mem_cgroup = True
         if self._mem_mb_limit > 0:
-            self.logger.debug(
+            self.log.debug(
                 "Setting %s with %s MB of memory",
                 self.mem_cgroup_name, self._mem_mb_limit
             )
@@ -143,14 +143,14 @@ class CgroupTaskRunner(BaseTaskRunner):
         cpu_cgroup_node = self._create_cgroup(self.cpu_cgroup_name)
         self._created_cpu_cgroup = True
         if self._cpu_shares > 0:
-            self.logger.debug(
+            self.log.debug(
                 "Setting %s with %s CPU shares",
                 self.cpu_cgroup_name, self._cpu_shares
             )
             cpu_cgroup_node.controller.shares = self._cpu_shares
 
         # Start the process w/ cgroups
-        self.logger.debug(
+        self.log.debug(
             "Starting task process with cgroups cpu,memory: %s",
             cgroup_name
         )
@@ -168,7 +168,7 @@ class CgroupTaskRunner(BaseTaskRunner):
         # I wasn't able to track down the root cause of the package install failures, but
         # we might want to revisit that approach at some other point.
         if return_code == 137:
-            self.logger.warning("Task failed with return code of 137. This may indicate "
+            self.log.warning("Task failed with return code of 137. This may indicate "
                               "that it was killed due to excessive memory usage. "
                               "Please consider optimizing your task or using the "
                               "resources argument to reserve more memory for your task")
@@ -176,7 +176,7 @@ class CgroupTaskRunner(BaseTaskRunner):
 
     def terminate(self):
         if self.process and psutil.pid_exists(self.process.pid):
-            kill_process_tree(self.logger, self.process.pid)
+            kill_process_tree(self.log, self.process.pid)
 
     def on_finish(self):
         # Let the OOM watcher thread know we're done to avoid false OOM alarms

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index 7812f96..c387eeb 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -19,7 +19,7 @@ from airflow.executors.local_executor import LocalExecutor
 from airflow.executors.sequential_executor import SequentialExecutor
 
 from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 DEFAULT_EXECUTOR = None
 
@@ -41,7 +41,7 @@ def GetDefaultExecutor():
 
     DEFAULT_EXECUTOR = _get_executor(executor_name)
 
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
     log.info("Using executor %s", executor_name)
 
     return DEFAULT_EXECUTOR

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 1197958..410a558 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -14,7 +14,7 @@
 from builtins import range
 
 from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
 PARALLELISM = configuration.getint('core', 'PARALLELISM')
@@ -46,7 +46,7 @@ class BaseExecutor(LoggingMixin):
     def queue_command(self, task_instance, command, priority=1, queue=None):
         key = task_instance.key
         if key not in self.queued_tasks and key not in self.running:
-            self.logger.info("Adding to queue: %s", command)
+            self.log.info("Adding to queue: %s", command)
             self.queued_tasks[key] = (command, priority, queue, task_instance)
 
     def queue_task_instance(
@@ -99,9 +99,9 @@ class BaseExecutor(LoggingMixin):
         else:
             open_slots = self.parallelism - len(self.running)
 
-        self.logger.debug("%s running task instances", len(self.running))
-        self.logger.debug("%s in queue", len(self.queued_tasks))
-        self.logger.debug("%s open slots", open_slots)
+        self.log.debug("%s running task instances", len(self.running))
+        self.log.debug("%s in queue", len(self.queued_tasks))
+        self.log.debug("%s open slots", open_slots)
 
         sorted_queue = sorted(
             [(k, v) for k, v in self.queued_tasks.items()],
@@ -122,13 +122,13 @@ class BaseExecutor(LoggingMixin):
                 self.running[key] = command
                 self.execute_async(key, command=command, queue=queue)
             else:
-                self.logger.debug(
+                self.log.debug(
                     'Task is already running, not sending to executor: %s',
                     key
                 )
 
         # Calling child class sync method
-        self.logger.debug("Calling the %s sync method", self.__class__)
+        self.log.debug("Calling the %s sync method", self.__class__)
         self.sync()
 
     def change_state(self, key, state):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 39c895c..360a276 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -24,7 +24,7 @@ from celery import states as celery_states
 from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow.executors.base_executor import BaseExecutor
 from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 PARALLELISM = configuration.get('core', 'PARALLELISM')
 
@@ -53,7 +53,7 @@ class CeleryConfig(object):
     try:
         celery_ssl_active = configuration.getboolean('celery', 'CELERY_SSL_ACTIVE')
     except AirflowConfigException as e:
-        log = LoggingMixin().logger
+        log = LoggingMixin().log
         log.warning("Celery Executor will run without SSL")
 
     try:
@@ -76,7 +76,7 @@ app = Celery(
 
 @app.task
 def execute_command(command):
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
     log.info("Executing command in Celery: %s", command)
     try:
         subprocess.check_call(command, shell=True)
@@ -99,13 +99,13 @@ class CeleryExecutor(BaseExecutor):
         self.last_state = {}
 
     def execute_async(self, key, command, queue=DEFAULT_QUEUE):
-        self.logger.info("[celery] queuing {key} through celery, queue={queue}".format(**locals()))
+        self.log.info("[celery] queuing {key} through celery, queue={queue}".format(**locals()))
         self.tasks[key] = execute_command.apply_async(
             args=[command], queue=queue)
         self.last_state[key] = celery_states.PENDING
 
     def sync(self):
-        self.logger.debug("Inquiring about %s celery task(s)", len(self.tasks))
+        self.log.debug("Inquiring about %s celery task(s)", len(self.tasks))
         for key, async in list(self.tasks.items()):
             try:
                 state = async.state
@@ -123,11 +123,11 @@ class CeleryExecutor(BaseExecutor):
                         del self.tasks[key]
                         del self.last_state[key]
                     else:
-                        self.logger.info("Unexpected state: %s", async.state)
+                        self.log.info("Unexpected state: %s", async.state)
                     self.last_state[key] = async.state
             except Exception as e:
-                self.logger.error("Error syncing the celery executor, ignoring it:")
-                self.logger.exception(e)
+                self.log.error("Error syncing the celery executor, ignoring it:")
+                self.log.exception(e)
 
     def end(self, synchronous=False):
         if synchronous:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/dask_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py
index 8a56506..07b8a82 100644
--- a/airflow/executors/dask_executor.py
+++ b/airflow/executors/dask_executor.py
@@ -53,10 +53,10 @@ class DaskExecutor(BaseExecutor):
         if future.done():
             key = self.futures[future]
             if future.exception():
-                self.logger.error("Failed to execute task: %s", repr(future.exception()))
+                self.log.error("Failed to execute task: %s", repr(future.exception()))
                 self.fail(key)
             elif future.cancelled():
-                self.logger.error("Failed to execute task")
+                self.log.error("Failed to execute task")
                 self.fail(key)
             else:
                 self.success(key)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/local_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index 9730737..f9eceb3 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -20,7 +20,7 @@ from builtins import range
 
 from airflow import configuration
 from airflow.executors.base_executor import BaseExecutor
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
 PARALLELISM = configuration.get('core', 'PARALLELISM')
@@ -40,14 +40,14 @@ class LocalWorker(multiprocessing.Process, LoggingMixin):
                 # Received poison pill, no more tasks to run
                 self.task_queue.task_done()
                 break
-            self.logger.info("%s running %s", self.__class__.__name__, command)
+            self.log.info("%s running %s", self.__class__.__name__, command)
             command = "exec bash -c '{0}'".format(command)
             try:
                 subprocess.check_call(command, shell=True)
                 state = State.SUCCESS
             except subprocess.CalledProcessError as e:
                 state = State.FAILED
-                self.logger.error("Failed to execute task %s.", str(e))
+                self.log.error("Failed to execute task %s.", str(e))
                 # TODO: Why is this commented out?
                 # raise e
             self.result_queue.put((key, state))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/executors/sequential_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py
index 7d08a4b..a15450d 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -37,14 +37,14 @@ class SequentialExecutor(BaseExecutor):
 
     def sync(self):
         for key, command in self.commands_to_run:
-            self.logger.info("Executing command: %s", command)
+            self.log.info("Executing command: %s", command)
 
             try:
                 subprocess.check_call(command, shell=True)
                 self.change_state(key, State.SUCCESS)
             except subprocess.CalledProcessError as e:
                 self.change_state(key, State.FAILED)
-                self.logger.error("Failed to execute task %s.", str(e))
+                self.log.error("Failed to execute task %s.", str(e))
 
         self.commands_to_run = []
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/S3_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py
index 2f7e6ee..c405001 100644
--- a/airflow/hooks/S3_hook.py
+++ b/airflow/hooks/S3_hook.py
@@ -16,7 +16,7 @@ from __future__ import division
 
 from future import standard_library
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 standard_library.install_aliases()
 import re
@@ -87,7 +87,7 @@ def _parse_s3_config(config_file_name, config_format='boto', profile=None):
             if Config.has_option(cred_section, 'calling_format'):
                 calling_format = Config.get(cred_section, 'calling_format')
         except:
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.warning("Option Error in parsing s3 config file")
             raise
         return (access_key, secret_key, calling_format)
@@ -378,7 +378,7 @@ class S3Hook(BaseHook):
                     offset = chunk * multipart_bytes
                     bytes = min(multipart_bytes, key_size - offset)
                     with FileChunkIO(filename, 'r', offset=offset, bytes=bytes) as fp:
-                        self.logger.info('Sending chunk %s of %s...', chunk + 1, total_chunks)
+                        self.log.info('Sending chunk %s of %s...', chunk + 1, total_chunks)
                         mp.upload_part_from_file(fp, part_num=chunk + 1)
             except:
                 mp.cancel_upload()
@@ -391,7 +391,7 @@ class S3Hook(BaseHook):
             key_size = key_obj.set_contents_from_filename(filename,
                                                           replace=replace,
                                                           encrypt_key=encrypt)
-        self.logger.info(
+        self.log.info(
             "The key {key} now contains {key_size} bytes".format(**locals())
         )
 
@@ -432,6 +432,6 @@ class S3Hook(BaseHook):
         key_size = key_obj.set_contents_from_string(string_data,
                                                     replace=replace,
                                                     encrypt_key=encrypt)
-        self.logger.info(
+        self.log.info(
             "The key {key} now contains {key_size} bytes".format(**locals())
         )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/base_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/base_hook.py b/airflow/hooks/base_hook.py
index 4617b98..92313ca 100644
--- a/airflow/hooks/base_hook.py
+++ b/airflow/hooks/base_hook.py
@@ -23,7 +23,7 @@ import random
 from airflow import settings
 from airflow.models import Connection
 from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 CONN_ENV_PREFIX = 'AIRFLOW_CONN_'
 
@@ -76,7 +76,7 @@ class BaseHook(LoggingMixin):
     def get_connection(cls, conn_id):
         conn = random.choice(cls.get_connections(conn_id))
         if conn.host:
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.info("Using connection to: %s", conn.host)
         return conn
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/dbapi_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/dbapi_hook.py b/airflow/hooks/dbapi_hook.py
index 85eebd0..bbdedd7 100644
--- a/airflow/hooks/dbapi_hook.py
+++ b/airflow/hooks/dbapi_hook.py
@@ -158,7 +158,7 @@ class DbApiHook(BaseHook):
                 for s in sql:
                     if sys.version_info[0] < 3:
                         s = s.encode('utf-8')
-                    self.logger.info(s)
+                    self.log.info(s)
                     if parameters is not None:
                         cur.execute(s, parameters)
                     else:
@@ -216,12 +216,12 @@ class DbApiHook(BaseHook):
                     cur.execute(sql, values)
                     if commit_every and i % commit_every == 0:
                         conn.commit()
-                        self.logger.info(
+                        self.log.info(
                             "Loaded {i} into {table} rows so far".format(**locals())
                         )
 
             conn.commit()
-        self.logger.info(
+        self.log.info(
             "Done loading. Loaded a total of {i} rows".format(**locals()))
 
     @staticmethod

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index af3ae9b..0b13670 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -68,7 +68,7 @@ class DruidHook(BaseHook):
         while running:
             req_status = requests.get("{0}/{1}/status".format(url, druid_task_id))
 
-            self.logger.info("Job still running for %s seconds...", sec)
+            self.log.info("Job still running for %s seconds...", sec)
 
             sec = sec + 1
 
@@ -87,4 +87,4 @@ class DruidHook(BaseHook):
             else:
                 raise AirflowException('Could not get status of the job, got %s', status)
 
-        self.logger.info('Successful index')
+        self.log.info('Successful index')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 70d7642..7c73491 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -201,7 +201,7 @@ class HiveCliHook(BaseHook):
                 hive_cmd.extend(['-f', f.name])
 
                 if verbose:
-                    self.logger.info(" ".join(hive_cmd))
+                    self.log.info(" ".join(hive_cmd))
                 sp = subprocess.Popen(
                     hive_cmd,
                     stdout=subprocess.PIPE,
@@ -215,7 +215,7 @@ class HiveCliHook(BaseHook):
                         break
                     stdout += line.decode('UTF-8')
                     if verbose:
-                        self.logger.info(line.decode('UTF-8').strip())
+                        self.log.info(line.decode('UTF-8').strip())
                 sp.wait()
 
                 if sp.returncode:
@@ -246,7 +246,7 @@ class HiveCliHook(BaseHook):
             for query in query_set:
 
                 query_preview = ' '.join(query.split())[:50]
-                self.logger.info("Testing HQL [%s (...)]", query_preview)
+                self.log.info("Testing HQL [%s (...)]", query_preview)
                 if query_set == insert:
                     query = other + '; explain ' + query
                 else:
@@ -255,16 +255,16 @@ class HiveCliHook(BaseHook):
                     self.run_cli(query, verbose=False)
                 except AirflowException as e:
                     message = e.args[0].split('\n')[-2]
-                    self.logger.info(message)
+                    self.log.info(message)
                     error_loc = re.search('(\d+):(\d+)', message)
                     if error_loc and error_loc.group(1).isdigit():
                         l = int(error_loc.group(1))
                         begin = max(l-2, 0)
                         end = min(l+3, len(query.split('\n')))
                         context = '\n'.join(query.split('\n')[begin:end])
-                        self.logger.info("Context :\n %s", context)
+                        self.log.info("Context :\n %s", context)
                 else:
-                    self.logger.info("SUCCESS")
+                    self.log.info("SUCCESS")
 
     def load_df(
             self,
@@ -397,7 +397,7 @@ class HiveCliHook(BaseHook):
                 hql += "TBLPROPERTIES({tprops})\n"
         hql += ";"
         hql = hql.format(**locals())
-        self.logger.info(hql)
+        self.log.info(hql)
         self.run_cli(hql)
         hql = "LOAD DATA LOCAL INPATH '{filepath}' "
         if overwrite:
@@ -408,7 +408,7 @@ class HiveCliHook(BaseHook):
                 ["{0}='{1}'".format(k, v) for k, v in partition.items()])
             hql += "PARTITION ({pvals});"
         hql = hql.format(**locals())
-        self.logger.info(hql)
+        self.log.info(hql)
         self.run_cli(hql)
 
     def kill(self):
@@ -662,7 +662,7 @@ class HiveServer2Hook(BaseHook):
 
         # impyla uses GSSAPI instead of KERBEROS as a auth_mechanism identifier
         if auth_mechanism == 'KERBEROS':
-            self.logger.warning(
+            self.log.warning(
                 "Detected deprecated 'KERBEROS' for authMechanism for %s. Please use 'GSSAPI' instead",
                 self.hiveserver2_conn_id
             )
@@ -696,7 +696,7 @@ class HiveServer2Hook(BaseHook):
                     # may be `SET` or DDL
                     records = cur.fetchall()
                 except ProgrammingError:
-                    self.logger.debug("get_results returned no records")
+                    self.log.debug("get_results returned no records")
                 if records:
                     results = {
                         'data': records,
@@ -716,7 +716,7 @@ class HiveServer2Hook(BaseHook):
         schema = schema or 'default'
         with self.get_conn(schema) as conn:
             with conn.cursor() as cur:
-                self.logger.info("Running query: %s", hql)
+                self.log.info("Running query: %s", hql)
                 cur.execute(hql)
                 schema = cur.description
                 with open(csv_filepath, 'wb') as f:
@@ -734,8 +734,8 @@ class HiveServer2Hook(BaseHook):
 
                         writer.writerows(rows)
                         i += len(rows)
-                        self.logger.info("Written %s rows so far.", i)
-                    self.logger.info("Done. Loaded a total of %s rows.", i)
+                        self.log.info("Written %s rows so far.", i)
+                    self.log.info("Done. Loaded a total of %s rows.", i)
 
     def get_records(self, hql, schema='default'):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/http_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/http_hook.py b/airflow/hooks/http_hook.py
index f168bc8..b8075a0 100644
--- a/airflow/hooks/http_hook.py
+++ b/airflow/hooks/http_hook.py
@@ -82,7 +82,7 @@ class HttpHook(BaseHook):
                                    headers=headers)
 
         prepped_request = session.prepare_request(req)
-        self.logger.info("Sending '%s' to url: %s", self.method, url)
+        self.log.info("Sending '%s' to url: %s", self.method, url)
         return self.run_and_check(session, prepped_request, extra_options)
 
     def run_and_check(self, session, prepped_request, extra_options):
@@ -107,12 +107,12 @@ class HttpHook(BaseHook):
             # Tried rewrapping, but not supported. This way, it's possible
             # to get reason and code for failure by checking first 3 chars
             # for the code, or do a split on ':'
-            self.logger.error("HTTP error: %s", response.reason)
+            self.log.error("HTTP error: %s", response.reason)
             if self.method not in ('GET', 'HEAD'):
                 # The sensor uses GET, so this prevents filling up the log
                 # with the body every time the GET 'misses'.
                 # That's ok to do, because GETs should be repeatable and
                 # all data should be visible in the log (no post data)
-                self.logger.error(response.text)
+                self.log.error(response.text)
             raise AirflowException(str(response.status_code)+":"+response.reason)
         return response

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/oracle_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/oracle_hook.py b/airflow/hooks/oracle_hook.py
index f439daa..71c67e0 100644
--- a/airflow/hooks/oracle_hook.py
+++ b/airflow/hooks/oracle_hook.py
@@ -101,11 +101,11 @@ class OracleHook(DbApiHook):
             cur.execute(sql)
             if i % commit_every == 0:
                 conn.commit()
-                self.logger.info('Loaded {i} into {table} rows so far'.format(**locals()))
+                self.log.info('Loaded {i} into {table} rows so far'.format(**locals()))
         conn.commit()
         cur.close()
         conn.close()
-        self.logger.info('Done loading. Loaded a total of {i} rows'.format(**locals()))
+        self.log.info('Done loading. Loaded a total of {i} rows'.format(**locals()))
 
     def bulk_insert_rows(self, table, rows, target_fields=None, commit_every=5000):
         """A performant bulk insert for cx_Oracle that uses prepared statements via `executemany()`.
@@ -129,13 +129,13 @@ class OracleHook(DbApiHook):
                 cursor.prepare(prepared_stm)
                 cursor.executemany(None, row_chunk)
                 conn.commit()
-                self.logger.info('[%s] inserted %s rows', table, row_count)
+                self.log.info('[%s] inserted %s rows', table, row_count)
                 # Empty chunk
                 row_chunk = []
         # Commit the leftover chunk
         cursor.prepare(prepared_stm)
         cursor.executemany(None, row_chunk)
         conn.commit()
-        self.logger.info('[%s] inserted %s rows', table, row_count)
+        self.log.info('[%s] inserted %s rows', table, row_count)
         cursor.close()
         conn.close()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/pig_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/pig_hook.py b/airflow/hooks/pig_hook.py
index 29beb54..276b37a 100644
--- a/airflow/hooks/pig_hook.py
+++ b/airflow/hooks/pig_hook.py
@@ -62,7 +62,7 @@ class PigCliHook(BaseHook):
                     pig_properties_list = self.pig_properties.split()
                     pig_cmd.extend(pig_properties_list)
                 if verbose:
-                    self.logger.info(" ".join(pig_cmd))
+                    self.log.info(" ".join(pig_cmd))
                 sp = subprocess.Popen(
                     pig_cmd,
                     stdout=subprocess.PIPE,
@@ -73,7 +73,7 @@ class PigCliHook(BaseHook):
                 for line in iter(sp.stdout.readline, ''):
                     stdout += line
                     if verbose:
-                        self.logger.info(line.strip())
+                        self.log.info(line.strip())
                 sp.wait()
 
                 if sp.returncode:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/webhdfs_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/webhdfs_hook.py b/airflow/hooks/webhdfs_hook.py
index e7df328..4510d29 100644
--- a/airflow/hooks/webhdfs_hook.py
+++ b/airflow/hooks/webhdfs_hook.py
@@ -17,14 +17,14 @@ from airflow import configuration
 
 from hdfs import InsecureClient, HdfsError
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 _kerberos_security_mode = configuration.get("core", "security") == "kerberos"
 if _kerberos_security_mode:
     try:
         from hdfs.ext.kerberos import KerberosClient
     except ImportError:
-        log = LoggingMixin().logger
+        log = LoggingMixin().log
         log.error("Could not load the Kerberos extension for the WebHDFSHook.")
         raise
 from airflow.exceptions import AirflowException
@@ -49,7 +49,7 @@ class WebHDFSHook(BaseHook):
         nn_connections = self.get_connections(self.webhdfs_conn_id)
         for nn in nn_connections:
             try:
-                self.logger.debug('Trying namenode %s', nn.host)
+                self.log.debug('Trying namenode %s', nn.host)
                 connection_str = 'http://{nn.host}:{nn.port}'.format(nn=nn)
                 if _kerberos_security_mode:
                     client = KerberosClient(connection_str)
@@ -57,10 +57,10 @@ class WebHDFSHook(BaseHook):
                     proxy_user = self.proxy_user or nn.login
                     client = InsecureClient(connection_str, user=proxy_user)
                 client.status('/')
-                self.logger.debug('Using namenode %s for hook', nn.host)
+                self.log.debug('Using namenode %s for hook', nn.host)
                 return client
             except HdfsError as e:
-                self.logger.debug(
+                self.log.debug(
                     "Read operation on namenode {nn.host} failed witg error: {e.message}".format(**locals())
                 )
         nn_hosts = [c.host for c in nn_connections]
@@ -101,4 +101,4 @@ class WebHDFSHook(BaseHook):
                  overwrite=overwrite,
                  n_threads=parallelism,
                  **kwargs)
-        self.logger.debug("Uploaded file %s to %s", source, destination)
+        self.log.debug("Uploaded file %s to %s", source, destination)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/hooks/zendesk_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/zendesk_hook.py b/airflow/hooks/zendesk_hook.py
index 4634b22..533e9d0 100644
--- a/airflow/hooks/zendesk_hook.py
+++ b/airflow/hooks/zendesk_hook.py
@@ -37,7 +37,7 @@ class ZendeskHook(BaseHook):
         """
         retry_after = int(
             rate_limit_exception.response.headers.get('Retry-After', 60))
-        self.logger.info(
+        self.log.info(
             "Hit Zendesk API rate limit. Pausing for %s seconds",
             retry_after
         )
@@ -75,7 +75,7 @@ class ZendeskHook(BaseHook):
                     # `github.zendesk...`
                     # in it, but the call function needs it removed.
                     next_url = next_page.split(self.__url)[1]
-                    self.logger.info("Calling %s", next_url)
+                    self.log.info("Calling %s", next_url)
                     more_res = zendesk.call(next_url)
                     results.extend(more_res[key])
                     if next_page == more_res['next_page']:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index f855320..3c79ed9 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -52,7 +52,7 @@ from airflow.utils.dag_processing import (AbstractDagFileProcessor,
                                           list_py_file_paths)
 from airflow.utils.db import provide_session, pessimistic_connection_handling
 from airflow.utils.email import send_email
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.state import State
 
 Base = models.Base
@@ -116,7 +116,7 @@ class BaseJob(Base, LoggingMixin):
         try:
             self.on_kill()
         except:
-            self.logger.error('on_kill() method failed')
+            self.log.error('on_kill() method failed')
         session.merge(job)
         session.commit()
         session.close()
@@ -179,7 +179,7 @@ class BaseJob(Base, LoggingMixin):
 
         self.heartbeat_callback(session=session)
         session.close()
-        self.logger.debug('[heart] Boom.')
+        self.log.debug('[heart] Boom.')
 
     def run(self):
         Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1)
@@ -271,7 +271,7 @@ class BaseJob(Base, LoggingMixin):
             ["{}".format(x) for x in reset_tis])
         session.commit()
 
-        self.logger.info(
+        self.log.info(
             "Reset the following %s TaskInstances:\n\t%s",
             len(reset_tis), task_instance_str
         )
@@ -358,7 +358,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
             # responsive file tailing
             parent_dir, _ = os.path.split(log_file)
 
-            _log = LoggingMixin().logger
+            _log = LoggingMixin().log
 
             # Create the parent directory for the log file if necessary.
             if not os.path.isdir(parent_dir):
@@ -438,7 +438,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
         # Arbitrarily wait 5s for the process to die
         self._process.join(5)
         if sigkill and self._process.is_alive():
-            self.logger.warning("Killing PID %s", self._process.pid)
+            self.log.warning("Killing PID %s", self._process.pid)
             os.kill(self._process.pid, signal.SIGKILL)
 
     @property
@@ -478,7 +478,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
         if not self._result_queue.empty():
             self._result = self._result_queue.get_nowait()
             self._done = True
-            self.logger.debug("Waiting for %s", self._process)
+            self.log.debug("Waiting for %s", self._process)
             self._process.join()
             return True
 
@@ -488,7 +488,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
             # Get the object from the queue or else join() can hang.
             if not self._result_queue.empty():
                 self._result = self._result_queue.get_nowait()
-            self.logger.debug("Waiting for %s", self._process)
+            self.log.debug("Waiting for %s", self._process)
             self._process.join()
             return True
 
@@ -578,7 +578,7 @@ class SchedulerJob(BaseJob):
         self.using_sqlite = False
         if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
             if self.max_threads > 1:
-                self.logger.error("Cannot use more than 1 thread when using sqlite. Setting max_threads to 1")
+                self.log.error("Cannot use more than 1 thread when using sqlite. Setting max_threads to 1")
             self.max_threads = 1
             self.using_sqlite = True
 
@@ -610,7 +610,7 @@ class SchedulerJob(BaseJob):
         tasks that should have succeeded in the past hour.
         """
         if not any([ti.sla for ti in dag.tasks]):
-            self.logger.info(
+            self.log.info(
                 "Skipping SLA check for %s because no tasks in DAG have SLAs",
                 dag
             )
@@ -693,7 +693,7 @@ class SchedulerJob(BaseJob):
             notification_sent = False
             if dag.sla_miss_callback:
                 # Execute the alert callback
-                self.logger.info(' --------------> ABOUT TO CALL SLA MISS CALL BACK ')
+                self.log.info(' --------------> ABOUT TO CALL SLA MISS CALL BACK ')
                 dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
                 notification_sent = True
             email_content = """\
@@ -843,7 +843,7 @@ class SchedulerJob(BaseJob):
                 task_start_dates = [t.start_date for t in dag.tasks]
                 if task_start_dates:
                     next_run_date = dag.normalize_schedule(min(task_start_dates))
-                    self.logger.debug(
+                    self.log.debug(
                         "Next run date based on tasks %s",
                         next_run_date
                     )
@@ -863,7 +863,7 @@ class SchedulerJob(BaseJob):
                 if next_run_date == dag.start_date:
                     next_run_date = dag.normalize_schedule(dag.start_date)
 
-                self.logger.debug(
+                self.log.debug(
                     "Dag start date: %s. Next run date: %s",
                     dag.start_date, next_run_date
                 )
@@ -914,17 +914,17 @@ class SchedulerJob(BaseJob):
         dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)
         active_dag_runs = []
         for run in dag_runs:
-            self.logger.info("Examining DAG run %s", run)
+            self.log.info("Examining DAG run %s", run)
             # don't consider runs that are executed in the future
             if run.execution_date > datetime.now():
-                self.logger.error(
+                self.log.error(
                     "Execution date is in future: %s",
                     run.execution_date
                 )
                 continue
 
             if len(active_dag_runs) >= dag.max_active_runs:
-                self.logger.info("Active dag runs > max_active_run.")
+                self.log.info("Active dag runs > max_active_run.")
                 continue
 
             # skip backfill dagruns for now as long as they are not really scheduled
@@ -941,7 +941,7 @@ class SchedulerJob(BaseJob):
                 active_dag_runs.append(run)
 
         for run in active_dag_runs:
-            self.logger.debug("Examining active DAG run: %s", run)
+            self.log.debug("Examining active DAG run: %s", run)
             # this needs a fresh session sometimes tis get detached
             tis = run.get_task_instances(state=(State.NONE,
                                                 State.UP_FOR_RETRY))
@@ -962,7 +962,7 @@ class SchedulerJob(BaseJob):
                 if ti.are_dependencies_met(
                         dep_context=DepContext(flag_upstream_failed=True),
                         session=session):
-                    self.logger.debug('Queuing task: %s', ti)
+                    self.log.debug('Queuing task: %s', ti)
                     queue.append(ti.key)
 
         session.close()
@@ -1020,7 +1020,7 @@ class SchedulerJob(BaseJob):
             session.commit()
 
         if tis_changed > 0:
-            self.logger.warning(
+            self.log.warning(
                 "Set %s task instances to state=%s as their associated DagRun was not in RUNNING state",
                 tis_changed, new_state
             )
@@ -1069,13 +1069,13 @@ class SchedulerJob(BaseJob):
         task_instances_to_examine = ti_query.all()
 
         if len(task_instances_to_examine) == 0:
-            self.logger.info("No tasks to consider for execution.")
+            self.log.info("No tasks to consider for execution.")
             return executable_tis
 
         # Put one task instance on each line
         task_instance_str = "\n\t".join(
             ["{}".format(x) for x in task_instances_to_examine])
-        self.logger.info("Tasks up for execution:\n\t%s", task_instance_str)
+        self.log.info("Tasks up for execution:\n\t%s", task_instance_str)
 
         # Get the pool settings
         pools = {p.pool: p for p in session.query(models.Pool).all()}
@@ -1096,7 +1096,7 @@ class SchedulerJob(BaseJob):
                 open_slots = pools[pool].open_slots(session=session)
 
             num_queued = len(task_instances)
-            self.logger.info(
+            self.log.info(
                 "Figuring out tasks to run in Pool(name={pool}) with {open_slots} "
                 "open slots and {num_queued} task instances in queue".format(
                     **locals()
@@ -1111,7 +1111,7 @@ class SchedulerJob(BaseJob):
 
             for task_instance in priority_sorted_task_instances:
                 if open_slots <= 0:
-                    self.logger.info(
+                    self.log.info(
                         "Not scheduling since there are %s open slots in pool %s",
                         open_slots, pool
                     )
@@ -1133,12 +1133,12 @@ class SchedulerJob(BaseJob):
 
                 current_task_concurrency = dag_id_to_possibly_running_task_count[dag_id]
                 task_concurrency_limit = simple_dag_bag.get_dag(dag_id).concurrency
-                self.logger.info(
+                self.log.info(
                     "DAG %s has %s/%s running and queued tasks",
                     dag_id, current_task_concurrency, task_concurrency_limit
                 )
                 if current_task_concurrency >= task_concurrency_limit:
-                    self.logger.info(
+                    self.log.info(
                         "Not executing %s since the number of tasks running or queued from DAG %s"
                         " is >= to the DAG's task concurrency limit of %s",
                         task_instance, dag_id, task_concurrency_limit
@@ -1146,7 +1146,7 @@ class SchedulerJob(BaseJob):
                     continue
 
                 if self.executor.has_task(task_instance):
-                    self.logger.debug(
+                    self.log.debug(
                         "Not handling task %s as the executor reports it is running",
                         task_instance.key
                     )
@@ -1157,7 +1157,7 @@ class SchedulerJob(BaseJob):
 
         task_instance_str = "\n\t".join(
             ["{}".format(x) for x in executable_tis])
-        self.logger.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
+        self.log.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
         # so these dont expire on commit
         for ti in executable_tis:
             copy_dag_id = ti.dag_id
@@ -1208,7 +1208,7 @@ class SchedulerJob(BaseJob):
             .with_for_update()
             .all())
         if len(tis_to_set_to_queued) == 0:
-            self.logger.info("No tasks were able to have their state changed to queued.")
+            self.log.info("No tasks were able to have their state changed to queued.")
             session.commit()
             return []
 
@@ -1236,7 +1236,7 @@ class SchedulerJob(BaseJob):
 
         task_instance_str = "\n\t".join(
             ["{}".format(x) for x in tis_to_be_queued])
-        self.logger.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
+        self.log.info("Setting the follow tasks to queued state:\n\t%s", task_instance_str)
         return tis_to_be_queued
 
     def _enqueue_task_instances_with_queued_state(self, simple_dag_bag, task_instances):
@@ -1268,7 +1268,7 @@ class SchedulerJob(BaseJob):
 
             priority = task_instance.priority_weight
             queue = task_instance.queue
-            self.logger.info(
+            self.log.info(
                 "Sending %s to executor with priority %s and queue %s",
                 task_instance.key, priority, queue
             )
@@ -1357,18 +1357,18 @@ class SchedulerJob(BaseJob):
         for dag in dags:
             dag = dagbag.get_dag(dag.dag_id)
             if dag.is_paused:
-                self.logger.info("Not processing DAG %s since it's paused", dag.dag_id)
+                self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
                 continue
 
             if not dag:
-                self.logger.error("DAG ID %s was not found in the DagBag", dag.dag_id)
+                self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id)
                 continue
 
-            self.logger.info("Processing %s", dag.dag_id)
+            self.log.info("Processing %s", dag.dag_id)
 
             dag_run = self.create_dag_run(dag)
             if dag_run:
-                self.logger.info("Created %s", dag_run)
+                self.log.info("Created %s", dag_run)
             self._process_task_instances(dag, tis_out)
             self.manage_slas(dag)
 
@@ -1384,7 +1384,7 @@ class SchedulerJob(BaseJob):
         """
         for key, executor_state in list(self.executor.get_event_buffer().items()):
             dag_id, task_id, execution_date = key
-            self.logger.info(
+            self.log.info(
                 "Executor reports %s.%s execution_date=%s as %s",
                 dag_id, task_id, execution_date, executor_state
             )
@@ -1453,10 +1453,10 @@ class SchedulerJob(BaseJob):
                    "\n" +
                    "=" * 80)
 
-        self.logger.info(log_str)
+        self.log.info(log_str)
 
     def _execute(self):
-        self.logger.info("Starting the scheduler")
+        self.log.info("Starting the scheduler")
         pessimistic_connection_handling()
 
         # DAGs can be pickled for easier remote execution by some executors
@@ -1469,16 +1469,16 @@ class SchedulerJob(BaseJob):
         # DAGs in parallel. By processing them in separate processes,
         # we can get parallelism and isolation from potentially harmful
         # user code.
-        self.logger.info("Processing files using up to %s processes at a time", self.max_threads)
-        self.logger.info("Running execute loop for %s seconds", self.run_duration)
-        self.logger.info("Processing each file at most %s times", self.num_runs)
-        self.logger.info("Process each file at most once every %s seconds", self.file_process_interval)
-        self.logger.info("Checking for new files in %s every %s seconds", self.subdir, self.dag_dir_list_interval)
+        self.log.info("Processing files using up to %s processes at a time", self.max_threads)
+        self.log.info("Running execute loop for %s seconds", self.run_duration)
+        self.log.info("Processing each file at most %s times", self.num_runs)
+        self.log.info("Process each file at most once every %s seconds", self.file_process_interval)
+        self.log.info("Checking for new files in %s every %s seconds", self.subdir, self.dag_dir_list_interval)
 
         # Build up a list of Python files that could contain DAGs
-        self.logger.info("Searching for files in %s", self.subdir)
+        self.log.info("Searching for files in %s", self.subdir)
         known_file_paths = list_py_file_paths(self.subdir)
-        self.logger.info("There are %s files in %s", len(known_file_paths), self.subdir)
+        self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
 
         def processor_factory(file_path, log_file_path):
             return DagFileProcessor(file_path,
@@ -1497,7 +1497,7 @@ class SchedulerJob(BaseJob):
         try:
             self._execute_helper(processor_manager)
         finally:
-            self.logger.info("Exited execute loop")
+            self.log.info("Exited execute loop")
 
             # Kill all child processes on exit since we don't want to leave
             # them as orphaned.
@@ -1511,22 +1511,22 @@ class SchedulerJob(BaseJob):
                 child_processes = [x for x in this_process.children(recursive=True)
                                    if x.is_running() and x.pid in pids_to_kill]
                 for child in child_processes:
-                    self.logger.info("Terminating child PID: %s", child.pid)
+                    self.log.info("Terminating child PID: %s", child.pid)
                     child.terminate()
                 # TODO: Remove magic number
                 timeout = 5
-                self.logger.info("Waiting up to %s seconds for processes to exit...", timeout)
+                self.log.info("Waiting up to %s seconds for processes to exit...", timeout)
                 try:
                     psutil.wait_procs(child_processes, timeout)
                 except psutil.TimeoutExpired:
-                    self.logger.debug("Ran out of time while waiting for processes to exit")
+                    self.log.debug("Ran out of time while waiting for processes to exit")
 
                 # Then SIGKILL
                 child_processes = [x for x in this_process.children(recursive=True)
                                    if x.is_running() and x.pid in pids_to_kill]
                 if len(child_processes) > 0:
                     for child in child_processes:
-                        self.logger.info("Killing child PID: %s", child.pid)
+                        self.log.info("Killing child PID: %s", child.pid)
                         child.kill()
                         child.wait()
 
@@ -1539,7 +1539,7 @@ class SchedulerJob(BaseJob):
         self.executor.start()
 
         session = settings.Session()
-        self.logger.info("Resetting orphaned tasks for active dag runs")
+        self.log.info("Resetting orphaned tasks for active dag runs")
         self.reset_state_for_orphaned_tasks(session=session)
         session.close()
 
@@ -1558,7 +1558,7 @@ class SchedulerJob(BaseJob):
         # For the execute duration, parse and schedule DAGs
         while (datetime.now() - execute_start_time).total_seconds() < \
                 self.run_duration or self.run_duration < 0:
-            self.logger.debug("Starting Loop...")
+            self.log.debug("Starting Loop...")
             loop_start_time = time.time()
 
             # Traverse the DAG directory for Python files containing DAGs
@@ -1568,23 +1568,23 @@ class SchedulerJob(BaseJob):
 
             if elapsed_time_since_refresh > self.dag_dir_list_interval:
                 # Build up a list of Python files that could contain DAGs
-                self.logger.info("Searching for files in %s", self.subdir)
+                self.log.info("Searching for files in %s", self.subdir)
                 known_file_paths = list_py_file_paths(self.subdir)
                 last_dag_dir_refresh_time = datetime.now()
-                self.logger.info("There are %s files in %s", len(known_file_paths), self.subdir)
+                self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
                 processor_manager.set_file_paths(known_file_paths)
 
-                self.logger.debug("Removing old import errors")
+                self.log.debug("Removing old import errors")
                 self.clear_nonexistent_import_errors(known_file_paths=known_file_paths)
 
             # Kick of new processes and collect results from finished ones
-            self.logger.info("Heartbeating the process manager")
+            self.log.info("Heartbeating the process manager")
             simple_dags = processor_manager.heartbeat()
 
             if self.using_sqlite:
                 # For the sqlite case w/ 1 thread, wait until the processor
                 # is finished to avoid concurrent access to the DB.
-                self.logger.debug("Waiting for processors to finish since we're using sqlite")
+                self.log.debug("Waiting for processors to finish since we're using sqlite")
                 processor_manager.wait_until_finished()
 
             # Send tasks for execution if available
@@ -1613,7 +1613,7 @@ class SchedulerJob(BaseJob):
                                              (State.SCHEDULED,))
 
             # Call hearbeats
-            self.logger.info("Heartbeating the executor")
+            self.log.info("Heartbeating the executor")
             self.executor.heartbeat()
 
             # Process events from the executor
@@ -1623,7 +1623,7 @@ class SchedulerJob(BaseJob):
             time_since_last_heartbeat = (datetime.now() -
                                          last_self_heartbeat_time).total_seconds()
             if time_since_last_heartbeat > self.heartrate:
-                self.logger.info("Heartbeating the scheduler")
+                self.log.info("Heartbeating the scheduler")
                 self.heartbeat()
                 last_self_heartbeat_time = datetime.now()
 
@@ -1636,13 +1636,13 @@ class SchedulerJob(BaseJob):
                 last_stat_print_time = datetime.now()
 
             loop_end_time = time.time()
-            self.logger.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
-            self.logger.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
+            self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
+            self.log.debug("Sleeping for %.2f seconds", self._processor_poll_interval)
             time.sleep(self._processor_poll_interval)
 
             # Exit early for a test mode
             if processor_manager.max_runs_reached():
-                self.logger.info("Exiting loop as all files have been processed %s times", self.num_runs)
+                self.log.info("Exiting loop as all files have been processed %s times", self.num_runs)
                 break
 
         # Stop any processors
@@ -1657,7 +1657,7 @@ class SchedulerJob(BaseJob):
                 all_files_processed = False
                 break
         if all_files_processed:
-            self.logger.info(
+            self.log.info(
                 "Deactivating DAGs that haven't been touched since %s",
                 execute_start_time.isoformat()
             )
@@ -1693,21 +1693,21 @@ class SchedulerJob(BaseJob):
         :return: a list of SimpleDags made from the Dags found in the file
         :rtype: list[SimpleDag]
         """
-        self.logger.info("Processing file %s for tasks to queue", file_path)
+        self.log.info("Processing file %s for tasks to queue", file_path)
         # As DAGs are parsed from this file, they will be converted into SimpleDags
         simple_dags = []
 
         try:
             dagbag = models.DagBag(file_path)
         except Exception:
-            self.logger.exception("Failed at reloading the DAG file %s", file_path)
+            self.log.exception("Failed at reloading the DAG file %s", file_path)
             Stats.incr('dag_file_refresh_error', 1, 1)
             return []
 
         if len(dagbag.dags) > 0:
-            self.logger.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path)
+            self.log.info("DAG(s) %s retrieved from %s", dagbag.dags.keys(), file_path)
         else:
-            self.logger.warning("No viable dags retrieved from %s", file_path)
+            self.log.warning("No viable dags retrieved from %s", file_path)
             self.update_import_errors(session, dagbag)
             return []
 
@@ -1777,7 +1777,7 @@ class SchedulerJob(BaseJob):
                 ti.state = State.SCHEDULED
 
             # Also save this task instance to the DB.
-            self.logger.info("Creating / updating %s in ORM", ti)
+            self.log.info("Creating / updating %s in ORM", ti)
             session.merge(ti)
             session.commit()
 
@@ -1785,11 +1785,11 @@ class SchedulerJob(BaseJob):
         try:
             self.update_import_errors(session, dagbag)
         except Exception:
-            self.logger.exception("Error logging import errors!")
+            self.log.exception("Error logging import errors!")
         try:
             dagbag.kill_zombies()
         except Exception:
-            self.logger.exception("Error killing zombies!")
+            self.log.exception("Error killing zombies!")
 
         return simple_dags
 
@@ -1908,22 +1908,22 @@ class BackfillJob(BaseJob):
             ti.refresh_from_db()
             if ti.state == State.SUCCESS:
                 ti_status.succeeded.add(key)
-                self.logger.debug("Task instance %s succeeded. Don't rerun.", ti)
+                self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
                 ti_status.started.pop(key)
                 continue
             elif ti.state == State.SKIPPED:
                 ti_status.skipped.add(key)
-                self.logger.debug("Task instance %s skipped. Don't rerun.", ti)
+                self.log.debug("Task instance %s skipped. Don't rerun.", ti)
                 ti_status.started.pop(key)
                 continue
             elif ti.state == State.FAILED:
-                self.logger.error("Task instance %s failed", ti)
+                self.log.error("Task instance %s failed", ti)
                 ti_status.failed.add(key)
                 ti_status.started.pop(key)
                 continue
             # special case: if the task needs to run again put it back
             elif ti.state == State.UP_FOR_RETRY:
-                self.logger.warning("Task instance %s is up for retry", ti)
+                self.log.warning("Task instance %s is up for retry", ti)
                 ti_status.started.pop(key)
                 ti_status.to_run[key] = ti
             # special case: The state of the task can be set to NONE by the task itself
@@ -1932,7 +1932,7 @@ class BackfillJob(BaseJob):
             # for that as otherwise those tasks would fall outside of the scope of
             # the backfill suddenly.
             elif ti.state == State.NONE:
-                self.logger.warning(
+                self.log.warning(
                     "FIXME: task instance %s state was set to none externally or "
                     "reaching concurrency limits. Re-adding task to queue.",
                     ti
@@ -1953,7 +1953,7 @@ class BackfillJob(BaseJob):
 
         for key, state in list(executor.get_event_buffer().items()):
             if key not in started:
-                self.logger.warning(
+                self.log.warning(
                     "%s state %s not in started=%s",
                     key, state, started.values()
                 )
@@ -1962,14 +1962,14 @@ class BackfillJob(BaseJob):
             ti = started[key]
             ti.refresh_from_db()
 
-            self.logger.debug("Executor state: %s task %s", state, ti)
+            self.log.debug("Executor state: %s task %s", state, ti)
 
             if state == State.FAILED or state == State.SUCCESS:
                 if ti.state == State.RUNNING or ti.state == State.QUEUED:
                     msg = ("Executor reports task instance {} finished ({}) "
                            "although the task says its {}. Was the task "
                            "killed externally?".format(ti, state, ti.state))
-                    self.logger.error(msg)
+                    self.log.error(msg)
                     ti.handle_failure(msg)
 
     @provide_session
@@ -2083,9 +2083,9 @@ class BackfillJob(BaseJob):
             len(ti_status.skipped),
             len(ti_status.deadlocked),
             len(ti_status.not_ready))
-        self.logger.info(msg)
+        self.log.info(msg)
 
-        self.logger.debug(
+        self.log.debug(
             "Finished dag run loop iteration. Remaining tasks %s",
             ti_status.to_run.values()
         )
@@ -2118,7 +2118,7 @@ class BackfillJob(BaseJob):
 
         while ((len(ti_status.to_run) > 0 or len(ti_status.started) > 0) and
                 len(ti_status.deadlocked) == 0):
-            self.logger.debug("*** Clearing out not_ready list ***")
+            self.log.debug("*** Clearing out not_ready list ***")
             ti_status.not_ready.clear()
 
             # we need to execute the tasks bottom to top
@@ -2138,12 +2138,12 @@ class BackfillJob(BaseJob):
                     ignore_depends_on_past = (
                         self.ignore_first_depends_on_past and
                         ti.execution_date == (start_date or ti.start_date))
-                    self.logger.debug("Task instance to run %s state %s", ti, ti.state)
+                    self.log.debug("Task instance to run %s state %s", ti, ti.state)
 
                     # guard against externally modified tasks instances or
                     # in case max concurrency has been reached at task runtime
                     if ti.state == State.NONE:
-                        self.logger.warning(
+                        self.log.warning(
                             "FIXME: task instance {} state was set to None externally. This should not happen"
                         )
                         ti.set_state(State.SCHEDULED, session=session)
@@ -2152,27 +2152,27 @@ class BackfillJob(BaseJob):
                     # different Job. Don't rerun it.
                     if ti.state == State.SUCCESS:
                         ti_status.succeeded.add(key)
-                        self.logger.debug("Task instance %s succeeded. Don't rerun.", ti)
+                        self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
                             ti_status.started.pop(key)
                         continue
                     elif ti.state == State.SKIPPED:
                         ti_status.skipped.add(key)
-                        self.logger.debug("Task instance %s skipped. Don't rerun.", ti)
+                        self.log.debug("Task instance %s skipped. Don't rerun.", ti)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
                             ti_status.started.pop(key)
                         continue
                     elif ti.state == State.FAILED:
-                        self.logger.error("Task instance %s failed", ti)
+                        self.log.error("Task instance %s failed", ti)
                         ti_status.failed.add(key)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
                             ti_status.started.pop(key)
                         continue
                     elif ti.state == State.UPSTREAM_FAILED:
-                        self.logger.error("Task instance %s upstream failed", ti)
+                        self.log.error("Task instance %s upstream failed", ti)
                         ti_status.failed.add(key)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
@@ -2194,12 +2194,12 @@ class BackfillJob(BaseJob):
                         ti.refresh_from_db(lock_for_update=True, session=session)
                         if ti.state == State.SCHEDULED or ti.state == State.UP_FOR_RETRY:
                             if executor.has_task(ti):
-                                self.logger.debug(
+                                self.log.debug(
                                     "Task Instance %s already in executor waiting for queue to clear",
                                     ti
                                 )
                             else:
-                                self.logger.debug('Sending %s to executor', ti)
+                                self.log.debug('Sending %s to executor', ti)
                                 # Skip scheduled state, we are executing immediately
                                 ti.state = State.QUEUED
                                 session.merge(ti)
@@ -2216,7 +2216,7 @@ class BackfillJob(BaseJob):
                         continue
 
                     if ti.state == State.UPSTREAM_FAILED:
-                        self.logger.error("Task instance %s upstream failed", ti)
+                        self.log.error("Task instance %s upstream failed", ti)
                         ti_status.failed.add(key)
                         ti_status.to_run.pop(key)
                         if key in ti_status.started:
@@ -2225,14 +2225,14 @@ class BackfillJob(BaseJob):
 
                     # special case
                     if ti.state == State.UP_FOR_RETRY:
-                        self.logger.debug("Task instance %s retry period not expired yet", ti)
+                        self.log.debug("Task instance %s retry period not expired yet", ti)
                         if key in ti_status.started:
                             ti_status.started.pop(key)
                         ti_status.to_run[key] = ti
                         continue
 
                     # all remaining tasks
-                    self.logger.debug('Adding %s to not_ready', ti)
+                    self.log.debug('Adding %s to not_ready', ti)
                     ti_status.not_ready.add(key)
 
             # execute the tasks in the queue
@@ -2245,7 +2245,7 @@ class BackfillJob(BaseJob):
             if (ti_status.not_ready and
                     ti_status.not_ready == set(ti_status.to_run) and
                     len(ti_status.started) == 0):
-                self.logger.warning(
+                self.log.warning(
                     "Deadlock discovered for ti_status.to_run=%s",
                     ti_status.to_run.values()
                 )
@@ -2364,7 +2364,7 @@ class BackfillJob(BaseJob):
         run_dates = self.dag.get_run_dates(start_date=start_date,
                                            end_date=self.bf_end_date)
         if len(run_dates) == 0:
-            self.logger.info("No run dates were found for the given dates and dag interval.")
+            self.log.info("No run dates were found for the given dates and dag interval.")
             return
 
         # picklin'
@@ -2402,7 +2402,7 @@ class BackfillJob(BaseJob):
                     raise AirflowException(err)
 
                 if remaining_dates > 0:
-                    self.logger.info(
+                    self.log.info(
                         "max_active_runs limit for dag %s has been reached "
                         " - waiting for other dag runs to finish",
                         self.dag_id
@@ -2413,7 +2413,7 @@ class BackfillJob(BaseJob):
             session.commit()
             session.close()
 
-        self.logger.info("Backfill done. Exiting.")
+        self.log.info("Backfill done. Exiting.")
 
 
 class LocalTaskJob(BaseJob):
@@ -2453,7 +2453,7 @@ class LocalTaskJob(BaseJob):
 
         def signal_handler(signum, frame):
             """Setting kill signal handler"""
-            self.logger.error("Killing subprocess")
+            self.log.error("Killing subprocess")
             self.on_kill()
             raise AirflowException("LocalTaskJob received SIGTERM signal")
         signal.signal(signal.SIGTERM, signal_handler)
@@ -2466,7 +2466,7 @@ class LocalTaskJob(BaseJob):
                 ignore_ti_state=self.ignore_ti_state,
                 job_id=self.id,
                 pool=self.pool):
-            self.logger.info("Task is not able to be run")
+            self.log.info("Task is not able to be run")
             return
 
         try:
@@ -2479,7 +2479,7 @@ class LocalTaskJob(BaseJob):
                 # Monitor the task to see if it's done
                 return_code = self.task_runner.return_code()
                 if return_code is not None:
-                    self.logger.info("Task exited with return code %s", return_code)
+                    self.log.info("Task exited with return code %s", return_code)
                     return
 
                 # Periodically heartbeat so that the scheduler doesn't think this
@@ -2489,7 +2489,7 @@ class LocalTaskJob(BaseJob):
                     last_heartbeat_time = time.time()
                 except OperationalError:
                     Stats.incr('local_task_job_heartbeat_failure', 1, 1)
-                    self.logger.exception(
+                    self.log.exception(
                         "Exception while trying to heartbeat! Sleeping for %s seconds",
                         self.heartrate
                     )
@@ -2500,7 +2500,7 @@ class LocalTaskJob(BaseJob):
                 time_since_last_heartbeat = time.time() - last_heartbeat_time
                 if time_since_last_heartbeat > heartbeat_time_limit:
                     Stats.incr('local_task_job_prolonged_heartbeat_failure', 1, 1)
-                    self.logger.error("Heartbeat time limited exceeded!")
+                    self.log.error("Heartbeat time limited exceeded!")
                     raise AirflowException("Time since last heartbeat({:.2f}s) "
                                            "exceeded limit ({}s)."
                                            .format(time_since_last_heartbeat,
@@ -2530,18 +2530,18 @@ class LocalTaskJob(BaseJob):
 
         if ti.state == State.RUNNING:
             if not same_hostname:
-                self.logger.warning("The recorded hostname {ti.hostname} "
+                self.log.warning("The recorded hostname {ti.hostname} "
                                 "does not match this instance's hostname "
                                 "{fqdn}".format(**locals()))
                 raise AirflowException("Hostname of job runner does not match")
             elif not same_process:
                 current_pid = os.getpid()
-                self.logger.warning("Recorded pid {ti.pid} does not match the current pid "
+                self.log.warning("Recorded pid {ti.pid} does not match the current pid "
                                 "{current_pid}".format(**locals()))
                 raise AirflowException("PID of job runner does not match")
         elif (self.task_runner.return_code() is None
               and hasattr(self.task_runner, 'process')):
-            self.logger.warning(
+            self.log.warning(
                 "State of this instance has been externally set to %s. Taking the poison pill.",
                 ti.state
             )


Mime
View raw message