airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [2/4] incubator-airflow git commit: [AIRFLOW-1604] Rename logger to log
Date Tue, 19 Sep 2017 08:17:31 GMT
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index f690fb4..28dcc04 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -77,7 +77,7 @@ from airflow.utils.operator_resources import Resources
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
 from airflow.utils.trigger_rule import TriggerRule
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 Base = declarative_base()
 ID_LEN = 250
@@ -184,7 +184,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         if executor is None:
             executor = GetDefaultExecutor()
         dag_folder = dag_folder or settings.DAGS_FOLDER
-        self.logger.info("Filling up the DagBag from %s", dag_folder)
+        self.log.info("Filling up the DagBag from %s", dag_folder)
         self.dag_folder = dag_folder
         self.dags = {}
         # the file's last modified timestamp when we last read it
@@ -257,7 +257,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                 return found_dags
 
         except Exception as e:
-            self.logger.exception(e)
+            self.log.exception(e)
             return found_dags
 
         mods = []
@@ -269,7 +269,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                         self.file_last_changed[filepath] = file_last_changed_on_disk
                         return found_dags
 
-            self.logger.debug("Importing %s", filepath)
+            self.log.debug("Importing %s", filepath)
             org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
             mod_name = ('unusual_prefix_' +
                         hashlib.sha1(filepath.encode('utf-8')).hexdigest() +
@@ -283,7 +283,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                     m = imp.load_source(mod_name, filepath)
                     mods.append(m)
                 except Exception as e:
-                    self.logger.exception("Failed to import: %s", filepath)
+                    self.log.exception("Failed to import: %s", filepath)
                     self.import_errors[filepath] = str(e)
                     self.file_last_changed[filepath] = file_last_changed_on_disk
 
@@ -294,10 +294,10 @@ class DagBag(BaseDagBag, LoggingMixin):
                 mod_name, ext = os.path.splitext(mod.filename)
                 if not head and (ext == '.py' or ext == '.pyc'):
                     if mod_name == '__init__':
-                        self.logger.warning("Found __init__.%s at root of %s", ext, filepath)
+                        self.log.warning("Found __init__.%s at root of %s", ext, filepath)
                     if safe_mode:
                         with zip_file.open(mod.filename) as zf:
-                            self.logger.debug("Reading %s from %s", mod.filename, filepath)
+                            self.log.debug("Reading %s from %s", mod.filename, filepath)
                             content = zf.read()
                             if not all([s in content for s in (b'DAG', b'airflow')]):
                                 self.file_last_changed[filepath] = (
@@ -313,7 +313,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                         m = importlib.import_module(mod_name)
                         mods.append(m)
                     except Exception as e:
-                        self.logger.exception("Failed to import: %s", filepath)
+                        self.log.exception("Failed to import: %s", filepath)
                         self.import_errors[filepath] = str(e)
                         self.file_last_changed[filepath] = file_last_changed_on_disk
 
@@ -336,11 +336,11 @@ class DagBag(BaseDagBag, LoggingMixin):
         Fails tasks that haven't had a heartbeat in too long
         """
         from airflow.jobs import LocalTaskJob as LJ
-        self.logger.info("Finding 'running' jobs without a recent heartbeat")
+        self.log.info("Finding 'running' jobs without a recent heartbeat")
         TI = TaskInstance
         secs = configuration.getint('scheduler', 'scheduler_zombie_task_threshold')
         limit_dttm = datetime.now() - timedelta(seconds=secs)
-        self.logger.info("Failing jobs without heartbeat after %s", limit_dttm)
+        self.log.info("Failing jobs without heartbeat after %s", limit_dttm)
 
         tis = (
             session.query(TI)
@@ -361,7 +361,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                     task = dag.get_task(ti.task_id)
                     ti.task = task
                     ti.handle_failure("{} killed as zombie".format(str(ti)))
-                    self.logger.info('Marked zombie job %s as failed', ti)
+                    self.log.info('Marked zombie job %s as failed', ti)
                     Stats.incr('zombies_killed')
         session.commit()
 
@@ -381,7 +381,7 @@ class DagBag(BaseDagBag, LoggingMixin):
             subdag.parent_dag = dag
             subdag.is_subdag = True
             self.bag_dag(subdag, parent_dag=dag, root_dag=root_dag)
-        self.logger.debug('Loaded DAG {dag}'.format(**locals()))
+        self.log.debug('Loaded DAG {dag}'.format(**locals()))
 
     def collect_dags(
             self,
@@ -439,7 +439,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                                 str([dag.dag_id for dag in found_dags]),
                             ))
                     except Exception as e:
-                        self.logger.warning(e)
+                        self.log.warning(e)
         Stats.gauge(
             'collect_dags', (datetime.now() - start_dttm).total_seconds(), 1)
         Stats.gauge(
@@ -606,7 +606,7 @@ class Connection(Base, LoggingMixin):
                 self._password = fernet.encrypt(bytes(value, 'utf-8')).decode()
                 self.is_encrypted = True
             except AirflowException:
-                self.logger.exception("Failed to load fernet while encrypting value, "
+                self.log.exception("Failed to load fernet while encrypting value, "
                                     "using non-encrypted value.")
                 self._password = value
                 self.is_encrypted = False
@@ -635,7 +635,7 @@ class Connection(Base, LoggingMixin):
                 self._extra = fernet.encrypt(bytes(value, 'utf-8')).decode()
                 self.is_extra_encrypted = True
             except AirflowException:
-                self.logger.exception("Failed to load fernet while encrypting value, "
+                self.log.exception("Failed to load fernet while encrypting value, "
                                     "using non-encrypted value.")
                 self._extra = value
                 self.is_extra_encrypted = False
@@ -706,8 +706,8 @@ class Connection(Base, LoggingMixin):
             try:
                 obj = json.loads(self.extra)
             except Exception as e:
-                self.logger.exception(e)
-                self.logger.error("Failed parsing the json for conn_id %s", self.conn_id)
+                self.log.exception(e)
+                self.log.error("Failed parsing the json for conn_id %s", self.conn_id)
 
         return obj
 
@@ -1001,7 +1001,7 @@ class TaskInstance(Base, LoggingMixin):
         """
         Forces the task instance's state to FAILED in the database.
         """
-        self.logger.error("Recording the task instance as FAILED")
+        self.log.error("Recording the task instance as FAILED")
         self.state = State.FAILED
         session.merge(self)
         session.commit()
@@ -1152,7 +1152,7 @@ class TaskInstance(Base, LoggingMixin):
                 session=session):
             failed = True
             if verbose:
-                self.logger.info(
+                self.log.info(
                     "Dependencies not met for %s, dependency '%s' FAILED: %s",
                     self, dep_status.dep_name, dep_status.reason
                 )
@@ -1161,7 +1161,7 @@ class TaskInstance(Base, LoggingMixin):
             return False
 
         if verbose:
-            self.logger.info("Dependencies all met for %s", self)
+            self.log.info("Dependencies all met for %s", self)
 
         return True
 
@@ -1177,7 +1177,7 @@ class TaskInstance(Base, LoggingMixin):
                     session,
                     dep_context):
 
-                self.logger.debug(
+                self.log.debug(
                     "%s dependency '%s' PASSED: %s, %s",
                     self, dep_status.dep_name, dep_status.passed, dep_status.reason
                 )
@@ -1354,10 +1354,10 @@ class TaskInstance(Base, LoggingMixin):
                    "runtime. Attempt {attempt} of {total}. State set to NONE.").format(
                 attempt=self.try_number + 1,
                 total=self.max_tries + 1)
-            self.logger.warning(hr + msg + hr)
+            self.log.warning(hr + msg + hr)
 
             self.queued_dttm = datetime.now()
-            self.logger.info("Queuing into pool %s", self.pool)
+            self.log.info("Queuing into pool %s", self.pool)
             session.merge(self)
             session.commit()
             return False
@@ -1366,12 +1366,12 @@ class TaskInstance(Base, LoggingMixin):
         # the current worker process was blocked on refresh_from_db
         if self.state == State.RUNNING:
             msg = "Task Instance already running {}".format(self)
-            self.logger.warning(msg)
+            self.log.warning(msg)
             session.commit()
             return False
 
         # print status message
-        self.logger.info(hr + msg + hr)
+        self.log.info(hr + msg + hr)
         self.try_number += 1
 
         if not test_mode:
@@ -1389,10 +1389,10 @@ class TaskInstance(Base, LoggingMixin):
         if verbose:
             if mark_success:
                 msg = "Marking success for {} on {}".format(self.task, self.execution_date)
-                self.logger.info(msg)
+                self.log.info(msg)
             else:
                 msg = "Executing {} on {}".format(self.task, self.execution_date)
-                self.logger.info(msg)
+                self.log.info(msg)
         return True
 
     @provide_session
@@ -1434,7 +1434,7 @@ class TaskInstance(Base, LoggingMixin):
 
                 def signal_handler(signum, frame):
                     """Setting kill signal handler"""
-                    self.logger.error("Killing subprocess")
+                    self.log.error("Killing subprocess")
                     task_copy.on_kill()
                     raise AirflowException("Task received SIGTERM signal")
                 signal.signal(signal.SIGTERM, signal_handler)
@@ -1513,8 +1513,8 @@ class TaskInstance(Base, LoggingMixin):
             if task.on_success_callback:
                 task.on_success_callback(context)
         except Exception as e3:
-            self.logger.error("Failed when executing success callback")
-            self.logger.exception(e3)
+            self.log.error("Failed when executing success callback")
+            self.log.exception(e3)
 
         session.commit()
 
@@ -1559,7 +1559,7 @@ class TaskInstance(Base, LoggingMixin):
         task_copy.dry_run()
 
     def handle_failure(self, error, test_mode=False, context=None):
-        self.logger.exception(error)
+        self.log.exception(error)
         task = self.task
         session = settings.Session()
         self.end_date = datetime.now()
@@ -1580,20 +1580,20 @@ class TaskInstance(Base, LoggingMixin):
             # next task instance try_number exceeds the max_tries.
             if task.retries and self.try_number <= self.max_tries:
                 self.state = State.UP_FOR_RETRY
-                self.logger.info('Marking task as UP_FOR_RETRY')
+                self.log.info('Marking task as UP_FOR_RETRY')
                 if task.email_on_retry and task.email:
                     self.email_alert(error, is_retry=True)
             else:
                 self.state = State.FAILED
                 if task.retries:
-                    self.logger.info('All retries failed; marking task as FAILED')
+                    self.log.info('All retries failed; marking task as FAILED')
                 else:
-                    self.logger.info('Marking task as FAILED.')
+                    self.log.info('Marking task as FAILED.')
                 if task.email_on_failure and task.email:
                     self.email_alert(error, is_retry=False)
         except Exception as e2:
-            self.logger.error('Failed to send email to: %s', task.email)
-            self.logger.exception(e2)
+            self.log.error('Failed to send email to: %s', task.email)
+            self.log.exception(e2)
 
         # Handling callbacks pessimistically
         try:
@@ -1602,13 +1602,13 @@ class TaskInstance(Base, LoggingMixin):
             if self.state == State.FAILED and task.on_failure_callback:
                 task.on_failure_callback(context)
         except Exception as e3:
-            self.logger.error("Failed at executing callback")
-            self.logger.exception(e3)
+            self.log.error("Failed at executing callback")
+            self.log.exception(e3)
 
         if not test_mode:
             session.merge(self)
         session.commit()
-        self.logger.error(str(error))
+        self.log.error(str(error))
 
     @provide_session
     def get_template_context(self, session=None):
@@ -1898,7 +1898,7 @@ class Log(Base):
         self.owner = owner or task_owner
 
 
-class SkipMixin(object):
+class SkipMixin(LoggingMixin):
     def skip(self, dag_run, execution_date, tasks):
         """
         Sets tasks instances to skipped from the same dag run.
@@ -1926,7 +1926,7 @@ class SkipMixin(object):
         else:
             assert execution_date is not None, "Execution date is None and no dag run"
 
-            self.logger.warning("No DAG RUN present this should not happen")
+            self.log.warning("No DAG RUN present this should not happen")
             # this is defensive against dag runs that are not complete
             for task in tasks:
                 ti = TaskInstance(task, execution_date=execution_date)
@@ -2121,7 +2121,7 @@ class BaseOperator(LoggingMixin):
         self.email_on_failure = email_on_failure
         self.start_date = start_date
         if start_date and not isinstance(start_date, datetime):
-            self.logger.warning("start_date for %s isn't datetime.datetime", self)
+            self.log.warning("start_date for %s isn't datetime.datetime", self)
         self.end_date = end_date
         if not TriggerRule.is_valid(trigger_rule):
             raise AirflowException(
@@ -2137,7 +2137,7 @@ class BaseOperator(LoggingMixin):
             self.depends_on_past = True
 
         if schedule_interval:
-            self.logger.warning(
+            self.log.warning(
                 "schedule_interval is used for {}, though it has "
                 "been deprecated as a task parameter, you need to "
                 "specify it as a DAG parameter instead",
@@ -2155,7 +2155,7 @@ class BaseOperator(LoggingMixin):
         if isinstance(retry_delay, timedelta):
             self.retry_delay = retry_delay
         else:
-            self.logger.debug("Retry_delay isn't timedelta object, assuming secs")
+            self.log.debug("Retry_delay isn't timedelta object, assuming secs")
             self.retry_delay = timedelta(seconds=retry_delay)
         self.retry_exponential_backoff = retry_exponential_backoff
         self.max_retry_delay = max_retry_delay
@@ -2455,7 +2455,7 @@ class BaseOperator(LoggingMixin):
                 try:
                     setattr(self, attr, env.loader.get_source(env, content)[0])
                 except Exception as e:
-                    self.logger.exception(e)
+                    self.log.exception(e)
         self.prepare_template()
 
     @property
@@ -2574,12 +2574,12 @@ class BaseOperator(LoggingMixin):
                 ignore_ti_state=ignore_ti_state)
 
     def dry_run(self):
-        self.logger.info('Dry run')
+        self.log.info('Dry run')
         for attr in self.template_fields:
             content = getattr(self, attr)
             if content and isinstance(content, six.string_types):
-                self.logger.info('Rendering template for %s', attr)
-                self.logger.info(content)
+                self.log.info('Rendering template for %s', attr)
+                self.log.info(content)
 
     def get_direct_relatives(self, upstream=False):
         """
@@ -3517,7 +3517,7 @@ class DAG(BaseDag, LoggingMixin):
             d['pickle_len'] = len(pickled)
             d['pickling_duration'] = "{}".format(datetime.now() - dttm)
         except Exception as e:
-            self.logger.debug(e)
+            self.log.debug(e)
             d['is_picklable'] = False
             d['stacktrace'] = traceback.format_exc()
         return d
@@ -3754,7 +3754,7 @@ class DAG(BaseDag, LoggingMixin):
             DagModel).filter(DagModel.dag_id == self.dag_id).first()
         if not orm_dag:
             orm_dag = DagModel(dag_id=self.dag_id)
-            self.logger.info("Creating ORM DAG for %s", self.dag_id)
+            self.log.info("Creating ORM DAG for %s", self.dag_id)
         orm_dag.fileloc = self.fileloc
         orm_dag.is_subdag = self.is_subdag
         orm_dag.owners = owner
@@ -3797,11 +3797,11 @@ class DAG(BaseDag, LoggingMixin):
         :type expiration_date: datetime
         :return: None
         """
-        logger = LoggingMixin().logger
+        log = LoggingMixin().log
         for dag in session.query(
                 DagModel).filter(DagModel.last_scheduler_run < expiration_date,
                                  DagModel.is_active).all():
-            logger.info(
+            log.info(
                 "Deactivating DAG ID %s since it was last touched by the scheduler at %s",
                 dag.dag_id, dag.last_scheduler_run.isoformat()
             )
@@ -3930,7 +3930,7 @@ class Variable(Base, LoggingMixin):
                 self._val = fernet.encrypt(bytes(value, 'utf-8')).decode()
                 self.is_encrypted = True
             except AirflowException:
-                self.logger.exception(
+                self.log.exception(
                     "Failed to load fernet while encrypting value, using non-encrypted value."
                 )
                 self._val = value
@@ -4052,7 +4052,7 @@ class XCom(Base, LoggingMixin):
             try:
                 value = json.dumps(value).encode('UTF-8')
             except ValueError:
-                log = LoggingMixin().logger
+                log = LoggingMixin().log
                 log.error("Could not serialize the XCOM value into JSON. "
                           "If you are using pickles instead of JSON "
                           "for XCOM, then you need to enable pickle "
@@ -4123,7 +4123,7 @@ class XCom(Base, LoggingMixin):
                 try:
                     return json.loads(result.value.decode('UTF-8'))
                 except ValueError:
-                    log = LoggingMixin().logger
+                    log = LoggingMixin().log
                     log.error("Could not serialize the XCOM value into JSON. "
                               "If you are using pickles instead of JSON "
                               "for XCOM, then you need to enable pickle "
@@ -4173,7 +4173,7 @@ class XCom(Base, LoggingMixin):
                 try:
                     result.value = json.loads(result.value.decode('UTF-8'))
                 except ValueError:
-                    log = LoggingMixin().logger
+                    log = LoggingMixin().log
                     log.error("Could not serialize the XCOM value into JSON. "
                               "If you are using pickles instead of JSON "
                               "for XCOM, then you need to enable pickle "
@@ -4229,7 +4229,7 @@ class DagStat(Base):
             session.commit()
         except Exception as e:
             session.rollback()
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.warning("Could not update dag stats for %s", dag_id)
             log.exception(e)
 
@@ -4282,7 +4282,7 @@ class DagStat(Base):
             session.commit()
         except Exception as e:
             session.rollback()
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.warning("Could not update dag stat table")
             log.exception(e)
 
@@ -4306,7 +4306,7 @@ class DagStat(Base):
                     session.commit()
                 except Exception as e:
                     session.rollback()
-                    log = LoggingMixin().logger
+                    log = LoggingMixin().log
                     log.warning("Could not create stat record")
                     log.exception(e)
 
@@ -4524,7 +4524,7 @@ class DagRun(Base, LoggingMixin):
 
         tis = self.get_task_instances(session=session)
 
-        self.logger.info("Updating state for %s considering %s task(s)", self, len(tis))
+        self.log.info("Updating state for %s considering %s task(s)", self, len(tis))
 
         for ti in list(tis):
             # skip in db?
@@ -4570,18 +4570,18 @@ class DagRun(Base, LoggingMixin):
             # if all roots finished and at least on failed, the run failed
             if (not unfinished_tasks and
                     any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
-                self.logger.info('Marking run %s failed', self)
+                self.log.info('Marking run %s failed', self)
                 self.state = State.FAILED
 
             # if all roots succeeded and no unfinished tasks, the run succeeded
             elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED)
                                               for r in roots):
-                self.logger.info('Marking run %s successful', self)
+                self.log.info('Marking run %s successful', self)
                 self.state = State.SUCCESS
 
             # if *all tasks* are deadlocked, the run failed
             elif unfinished_tasks and none_depends_on_past and no_dependencies_met:
-                self.logger.info('Deadlock; marking run %s failed', self)
+                self.log.info('Deadlock; marking run %s failed', self)
                 self.state = State.FAILED
 
             # finally, if the roots aren't done, the dag is still running

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py
index 63321fb..ff2ed51 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -67,7 +67,7 @@ class BashOperator(BaseOperator):
         which will be cleaned afterwards
         """
         bash_command = self.bash_command
-        self.logger.info("Tmp dir root location: \n %s", gettempdir())
+        self.log.info("Tmp dir root location: \n %s", gettempdir())
         with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
             with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
 
@@ -75,11 +75,11 @@ class BashOperator(BaseOperator):
                 f.flush()
                 fname = f.name
                 script_location = tmp_dir + "/" + fname
-                self.logger.info(
+                self.log.info(
                     "Temporary script location: %s",
                     script_location
                 )
-                self.logger.info("Running command: %s", bash_command)
+                self.log.info("Running command: %s", bash_command)
                 sp = Popen(
                     ['bash', fname],
                     stdout=PIPE, stderr=STDOUT,
@@ -88,13 +88,13 @@ class BashOperator(BaseOperator):
 
                 self.sp = sp
 
-                self.logger.info("Output:")
+                self.log.info("Output:")
                 line = ''
                 for line in iter(sp.stdout.readline, b''):
                     line = line.decode(self.output_encoding).strip()
-                    self.logger.info(line)
+                    self.log.info(line)
                 sp.wait()
-                self.logger.info(
+                self.log.info(
                     "Command exited with return code %s",
                     sp.returncode
                 )
@@ -106,6 +106,6 @@ class BashOperator(BaseOperator):
             return line
 
     def on_kill(self):
-        self.logger.info('Sending SIGTERM signal to bash process group')
+        self.log.info('Sending SIGTERM signal to bash process group')
         os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py
index f263a2c..ff82539 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/check_operator.py
@@ -71,15 +71,15 @@ class CheckOperator(BaseOperator):
         self.sql = sql
 
     def execute(self, context=None):
-        self.logger.info('Executing SQL check: %s', self.sql)
+        self.log.info('Executing SQL check: %s', self.sql)
         records = self.get_db_hook().get_first(self.sql)
-        self.logger.info('Record: %s', records)
+        self.log.info('Record: %s', records)
         if not records:
             raise AirflowException("The query returned None")
         elif not all([bool(r) for r in records]):
             exceptstr = "Test failed.\nQuery:\n{q}\nResults:\n{r!s}"
             raise AirflowException(exceptstr.format(q=self.sql, r=records))
-        self.logger.info("Success.")
+        self.log.info("Success.")
 
     def get_db_hook(self):
         return BaseHook.get_hook(conn_id=self.conn_id)
@@ -134,7 +134,7 @@ class ValueCheckOperator(BaseOperator):
         self.has_tolerance = self.tol is not None
 
     def execute(self, context=None):
-        self.logger.info('Executing SQL check: %s', self.sql)
+        self.log.info('Executing SQL check: %s', self.sql)
         records = self.get_db_hook().get_first(self.sql)
         if not records:
             raise AirflowException("The query returned None")
@@ -208,9 +208,9 @@ class IntervalCheckOperator(BaseOperator):
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-        self.logger.info('Executing SQL check: %s', self.sql2)
+        self.log.info('Executing SQL check: %s', self.sql2)
         row2 = hook.get_first(self.sql2)
-        self.logger.info('Executing SQL check: %s', self.sql1)
+        self.log.info('Executing SQL check: %s', self.sql1)
         row1 = hook.get_first(self.sql1)
         if not row2:
             raise AirflowException("The query {q} returned None".format(q=self.sql2))
@@ -230,20 +230,20 @@ class IntervalCheckOperator(BaseOperator):
             else:
                 ratio = float(max(current[m], reference[m])) / \
                     min(current[m], reference[m])
-            self.logger.info(rlog.format(m, ratio, self.metrics_thresholds[m]))
+            self.log.info(rlog.format(m, ratio, self.metrics_thresholds[m]))
             ratios[m] = ratio
             test_results[m] = ratio < self.metrics_thresholds[m]
         if not all(test_results.values()):
             failed_tests = [it[0] for it in test_results.items() if not it[1]]
             j = len(failed_tests)
             n = len(self.metrics_sorted)
-            self.logger.warning(countstr.format(**locals()))
+            self.log.warning(countstr.format(**locals()))
             for k in failed_tests:
-                self.logger.warning(
+                self.log.warning(
                     fstr.format(k=k, r=ratios[k], tr=self.metrics_thresholds[k])
                 )
             raise AirflowException(estr.format(", ".join(failed_tests)))
-        self.logger.info("All tests have passed")
+        self.log.info("All tests have passed")
 
     def get_db_hook(self):
         return BaseHook.get_hook(conn_id=self.conn_id)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index bd2862b..3a952cd 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -69,9 +69,9 @@ class TriggerDagRunOperator(BaseOperator):
                 state=State.RUNNING,
                 conf=dro.payload,
                 external_trigger=True)
-            self.logger.info("Creating DagRun %s", dr)
+            self.log.info("Creating DagRun %s", dr)
             session.add(dr)
             session.commit()
             session.close()
         else:
-            self.logger.info("Criteria not met, moving on")
+            self.log.info("Criteria not met, moving on")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/docker_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py
index 8a333d6..3011f1c 100644
--- a/airflow/operators/docker_operator.py
+++ b/airflow/operators/docker_operator.py
@@ -134,7 +134,7 @@ class DockerOperator(BaseOperator):
         self.container = None
 
     def execute(self, context):
-        self.logger.info('Starting docker container from image %s', self.image)
+        self.log.info('Starting docker container from image %s', self.image)
 
         tls_config = None
         if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key:
@@ -155,10 +155,10 @@ class DockerOperator(BaseOperator):
             image = self.image
 
         if self.force_pull or len(self.cli.images(name=image)) == 0:
-            self.logger.info('Pulling docker image %s', image)
+            self.log.info('Pulling docker image %s', image)
             for l in self.cli.pull(image, stream=True):
                 output = json.loads(l.decode('utf-8'))
-                self.logger.info("%s", output['status'])
+                self.log.info("%s", output['status'])
 
         cpu_shares = int(round(self.cpus * 1024))
 
@@ -184,7 +184,7 @@ class DockerOperator(BaseOperator):
                 line = line.strip()
                 if hasattr(line, 'decode'):
                     line = line.decode('utf-8')
-                self.logger.info(line)
+                self.log.info(line)
 
             exit_code = self.cli.wait(self.container['Id'])
             if exit_code != 0:
@@ -202,5 +202,5 @@ class DockerOperator(BaseOperator):
 
     def on_kill(self):
         if self.cli is not None:
-            self.logger.info('Stopping docker container')
+            self.log.info('Stopping docker container')
             self.cli.stop(self.container['Id'])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/generic_transfer.py
----------------------------------------------------------------------
diff --git a/airflow/operators/generic_transfer.py b/airflow/operators/generic_transfer.py
index 790749a..c8a2a58 100644
--- a/airflow/operators/generic_transfer.py
+++ b/airflow/operators/generic_transfer.py
@@ -61,15 +61,15 @@ class GenericTransfer(BaseOperator):
     def execute(self, context):
         source_hook = BaseHook.get_hook(self.source_conn_id)
 
-        self.logger.info("Extracting data from %s", self.source_conn_id)
-        self.logger.info("Executing: \n %s", self.sql)
+        self.log.info("Extracting data from %s", self.source_conn_id)
+        self.log.info("Executing: \n %s", self.sql)
         results = source_hook.get_records(self.sql)
 
         destination_hook = BaseHook.get_hook(self.destination_conn_id)
         if self.preoperator:
-            self.logger.info("Running preoperator")
-            self.logger.info(self.preoperator)
+            self.log.info("Running preoperator")
+            self.log.info(self.preoperator)
             destination_hook.run(self.preoperator)
 
-        self.logger.info("Inserting rows into %s", self.destination_conn_id)
+        self.log.info("Inserting rows into %s", self.destination_conn_id)
         destination_hook.insert_rows(table=self.destination_table, rows=results)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py
index 983069b..221feeb 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -93,7 +93,7 @@ class HiveOperator(BaseOperator):
             self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:])
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.hql)
+        self.log.info('Executing: %s', self.hql)
         self.hook = self.get_hook()
         self.hook.run_cli(hql=self.hql, schema=self.schema,
                           hive_conf=context_to_airflow_vars(context))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_stats_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_stats_operator.py b/airflow/operators/hive_stats_operator.py
index 025e427..896547e 100644
--- a/airflow/operators/hive_stats_operator.py
+++ b/airflow/operators/hive_stats_operator.py
@@ -139,15 +139,15 @@ class HiveStatsCollectionOperator(BaseOperator):
         """.format(**locals())
 
         hook = PrestoHook(presto_conn_id=self.presto_conn_id)
-        self.logger.info('Executing SQL check: %s', sql)
+        self.log.info('Executing SQL check: %s', sql)
         row = hook.get_first(hql=sql)
-        self.logger.info("Record: %s", row)
+        self.log.info("Record: %s", row)
         if not row:
             raise AirflowException("The query returned None")
 
         part_json = json.dumps(self.partition, sort_keys=True)
 
-        self.logger.info("Deleting rows from previous runs if they exist")
+        self.log.info("Deleting rows from previous runs if they exist")
         mysql = MySqlHook(self.mysql_conn_id)
         sql = """
         SELECT 1 FROM hive_stats
@@ -167,7 +167,7 @@ class HiveStatsCollectionOperator(BaseOperator):
             """.format(**locals())
             mysql.run(sql)
 
-        self.logger.info("Pivoting and loading cells into the Airflow db")
+        self.log.info("Pivoting and loading cells into the Airflow db")
         rows = [
             (self.ds, self.dttm, self.table, part_json) +
             (r[0][0], r[0][1], r[1])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_druid.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py
index d7b1b82..e420dfd 100644
--- a/airflow/operators/hive_to_druid.py
+++ b/airflow/operators/hive_to_druid.py
@@ -92,7 +92,7 @@ class HiveToDruidTransfer(BaseOperator):
 
     def execute(self, context):
         hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
-        self.logger.info("Extracting data from Hive")
+        self.log.info("Extracting data from Hive")
         hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_')
         sql = self.sql.strip().strip(';')
         tblproperties = ''.join([", '{}' = '{}'".format(k, v) for k, v in self.hive_tblproperties.items()])
@@ -107,7 +107,7 @@ class HiveToDruidTransfer(BaseOperator):
         AS
         {sql}
         """.format(**locals())
-        self.logger.info("Running command:\n %s", hql)
+        self.log.info("Running command:\n %s", hql)
         hive.run_cli(hql)
 
         m = HiveMetastoreHook(self.metastore_conn_id)
@@ -131,13 +131,13 @@ class HiveToDruidTransfer(BaseOperator):
                 columns=columns,
             )
 
-            self.logger.info("Inserting rows into Druid, hdfs path: %s", static_path)
+            self.log.info("Inserting rows into Druid, hdfs path: %s", static_path)
 
             druid.submit_indexing_job(index_spec)
 
-            self.logger.info("Load seems to have succeeded!")
+            self.log.info("Load seems to have succeeded!")
         finally:
-            self.logger.info(
+            self.log.info(
                 "Cleaning up by dropping the temp Hive table %s",
                 hive_table
             )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_mysql.py b/airflow/operators/hive_to_mysql.py
index e82a099..d2d9d0c 100644
--- a/airflow/operators/hive_to_mysql.py
+++ b/airflow/operators/hive_to_mysql.py
@@ -77,7 +77,7 @@ class HiveToMySqlTransfer(BaseOperator):
 
     def execute(self, context):
         hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
-        self.logger.info("Extracting data from Hive: %s", self.sql)
+        self.log.info("Extracting data from Hive: %s", self.sql)
 
         if self.bulk_load:
             tmpfile = NamedTemporaryFile()
@@ -88,10 +88,10 @@ class HiveToMySqlTransfer(BaseOperator):
 
         mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
         if self.mysql_preoperator:
-            self.logger.info("Running MySQL preoperator")
+            self.log.info("Running MySQL preoperator")
             mysql.run(self.mysql_preoperator)
 
-        self.logger.info("Inserting rows into MySQL")
+        self.log.info("Inserting rows into MySQL")
 
         if self.bulk_load:
             mysql.bulk_load(table=self.mysql_table, tmp_file=tmpfile.name)
@@ -100,7 +100,7 @@ class HiveToMySqlTransfer(BaseOperator):
             mysql.insert_rows(table=self.mysql_table, rows=results)
 
         if self.mysql_postoperator:
-            self.logger.info("Running MySQL postoperator")
+            self.log.info("Running MySQL postoperator")
             mysql.run(self.mysql_postoperator)
 
-        self.logger.info("Done.")
+        self.log.info("Done.")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/hive_to_samba_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_samba_operator.py b/airflow/operators/hive_to_samba_operator.py
index d6e6dec..93ebec1 100644
--- a/airflow/operators/hive_to_samba_operator.py
+++ b/airflow/operators/hive_to_samba_operator.py
@@ -53,7 +53,7 @@ class Hive2SambaOperator(BaseOperator):
         samba = SambaHook(samba_conn_id=self.samba_conn_id)
         hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
         tmpfile = tempfile.NamedTemporaryFile()
-        self.logger.info("Fetching file from Hive")
+        self.log.info("Fetching file from Hive")
         hive.to_csv(hql=self.hql, csv_filepath=tmpfile.name)
-        self.logger.info("Pushing to samba")
+        self.log.info("Pushing to samba")
         samba.push_from_local(self.destination_filepath, tmpfile.name)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py
index d92c931..63b892c 100644
--- a/airflow/operators/http_operator.py
+++ b/airflow/operators/http_operator.py
@@ -74,7 +74,7 @@ class SimpleHttpOperator(BaseOperator):
     def execute(self, context):
         http = HttpHook(self.method, http_conn_id=self.http_conn_id)
 
-        self.logger.info("Calling HTTP method")
+        self.log.info("Calling HTTP method")
 
         response = http.run(self.endpoint,
                             self.data,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/jdbc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/jdbc_operator.py b/airflow/operators/jdbc_operator.py
index 942e312..4ec2fa0 100644
--- a/airflow/operators/jdbc_operator.py
+++ b/airflow/operators/jdbc_operator.py
@@ -55,6 +55,6 @@ class JdbcOperator(BaseOperator):
         self.autocommit = autocommit
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         self.hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id)
         self.hook.run(self.sql, self.autocommit, parameters=self.parameters)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py
index 58f7e67..a1e2a0c 100644
--- a/airflow/operators/latest_only_operator.py
+++ b/airflow/operators/latest_only_operator.py
@@ -32,29 +32,29 @@ class LatestOnlyOperator(BaseOperator, SkipMixin):
         # If the DAG Run is externally triggered, then return without
         # skipping downstream tasks
         if context['dag_run'] and context['dag_run'].external_trigger:
-            self.logger.info("Externally triggered DAG_Run: allowing execution to proceed.")
+            self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
             return
 
         now = datetime.datetime.now()
         left_window = context['dag'].following_schedule(
             context['execution_date'])
         right_window = context['dag'].following_schedule(left_window)
-        self.logger.info(
+        self.log.info(
             'Checking latest only with left_window: %s right_window: %s now: %s',
             left_window, right_window, now
         )
 
         if not left_window < now <= right_window:
-            self.logger.info('Not latest execution, skipping downstream.')
+            self.log.info('Not latest execution, skipping downstream.')
 
             downstream_tasks = context['task'].get_flat_relatives(upstream=False)
-            self.logger.debug("Downstream task_ids %s", downstream_tasks)
+            self.log.debug("Downstream task_ids %s", downstream_tasks)
 
             if downstream_tasks:
                 self.skip(context['dag_run'],
                           context['ti'].execution_date,
                           downstream_tasks)
 
-            self.logger.info('Done.')
+            self.log.info('Done.')
         else:
-            self.logger.info('Latest, allowing execution to proceed.')
+            self.log.info('Latest, allowing execution to proceed.')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mssql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mssql_operator.py b/airflow/operators/mssql_operator.py
index bc0822f..3455232 100644
--- a/airflow/operators/mssql_operator.py
+++ b/airflow/operators/mssql_operator.py
@@ -44,7 +44,7 @@ class MsSqlOperator(BaseOperator):
         self.database = database
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id,
                          schema=self.database)
         hook.run(self.sql, autocommit=self.autocommit,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mssql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mssql_to_hive.py b/airflow/operators/mssql_to_hive.py
index 719ddd2..c2c858d 100644
--- a/airflow/operators/mssql_to_hive.py
+++ b/airflow/operators/mssql_to_hive.py
@@ -102,7 +102,7 @@ class MsSqlToHiveTransfer(BaseOperator):
         hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
         mssql = MsSqlHook(mssql_conn_id=self.mssql_conn_id)
 
-        self.logger.info("Dumping Microsoft SQL Server query results to local file")
+        self.log.info("Dumping Microsoft SQL Server query results to local file")
         conn = mssql.get_conn()
         cursor = conn.cursor()
         cursor.execute(self.sql)
@@ -118,7 +118,7 @@ class MsSqlToHiveTransfer(BaseOperator):
             f.flush()
             cursor.close()
             conn.close()
-            self.logger.info("Loading file into Hive")
+            self.log.info("Loading file into Hive")
             hive.load_file(
                 f.name,
                 self.hive_table,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mysql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mysql_operator.py b/airflow/operators/mysql_operator.py
index 923eaf8..20f1b7e 100644
--- a/airflow/operators/mysql_operator.py
+++ b/airflow/operators/mysql_operator.py
@@ -46,7 +46,7 @@ class MySqlOperator(BaseOperator):
         self.database = database
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                          schema=self.database)
         hook.run(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/mysql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py
index fde92b5..cd472a8 100644
--- a/airflow/operators/mysql_to_hive.py
+++ b/airflow/operators/mysql_to_hive.py
@@ -110,7 +110,7 @@ class MySqlToHiveTransfer(BaseOperator):
         hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
         mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
 
-        self.logger.info("Dumping MySQL query results to local file")
+        self.log.info("Dumping MySQL query results to local file")
         conn = mysql.get_conn()
         cursor = conn.cursor()
         cursor.execute(self.sql)
@@ -123,7 +123,7 @@ class MySqlToHiveTransfer(BaseOperator):
             f.flush()
             cursor.close()
             conn.close()
-            self.logger.info("Loading file into Hive")
+            self.log.info("Loading file into Hive")
             hive.load_file(
                 f.name,
                 self.hive_table,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/oracle_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/oracle_operator.py b/airflow/operators/oracle_operator.py
index f87bbf9..9a35267 100644
--- a/airflow/operators/oracle_operator.py
+++ b/airflow/operators/oracle_operator.py
@@ -42,7 +42,7 @@ class OracleOperator(BaseOperator):
         self.parameters = parameters
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
         hook.run(
             self.sql,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/pig_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/pig_operator.py b/airflow/operators/pig_operator.py
index cdce48a..a4e4e5c 100644
--- a/airflow/operators/pig_operator.py
+++ b/airflow/operators/pig_operator.py
@@ -59,7 +59,7 @@ class PigOperator(BaseOperator):
                 "(\$([a-zA-Z_][a-zA-Z0-9_]*))", "{{ \g<2> }}", self.pig)
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.pig)
+        self.log.info('Executing: %s', self.pig)
         self.hook = self.get_hook()
         self.hook.run_cli(pig=self.pig)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/postgres_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/postgres_operator.py b/airflow/operators/postgres_operator.py
index 55c1573..c93dc7b 100644
--- a/airflow/operators/postgres_operator.py
+++ b/airflow/operators/postgres_operator.py
@@ -49,7 +49,7 @@ class PostgresOperator(BaseOperator):
         self.database = database
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id,
                                  schema=self.database)
         self.hook.run(self.sql, self.autocommit, parameters=self.parameters)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/presto_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/operators/presto_to_mysql.py b/airflow/operators/presto_to_mysql.py
index 48158ca..d0c323a 100644
--- a/airflow/operators/presto_to_mysql.py
+++ b/airflow/operators/presto_to_mysql.py
@@ -61,14 +61,14 @@ class PrestoToMySqlTransfer(BaseOperator):
 
     def execute(self, context):
         presto = PrestoHook(presto_conn_id=self.presto_conn_id)
-        self.logger.info("Extracting data from Presto: %s", self.sql)
+        self.log.info("Extracting data from Presto: %s", self.sql)
         results = presto.get_records(self.sql)
 
         mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
         if self.mysql_preoperator:
-            self.logger.info("Running MySQL preoperator")
-            self.logger.info(self.mysql_preoperator)
+            self.log.info("Running MySQL preoperator")
+            self.log.info(self.mysql_preoperator)
             mysql.run(self.mysql_preoperator)
 
-        self.logger.info("Inserting rows into MySQL")
+        self.log.info("Inserting rows into MySQL")
         mysql.insert_rows(table=self.mysql_table, rows=results)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py
index 56837ec..718c88f 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -87,7 +87,7 @@ class PythonOperator(BaseOperator):
             self.op_kwargs = context
 
         return_value = self.execute_callable()
-        self.logger.info("Done. Returned value was: %s", return_value)
+        self.log.info("Done. Returned value was: %s", return_value)
         return return_value
 
     def execute_callable(self):
@@ -115,17 +115,17 @@ class BranchPythonOperator(PythonOperator, SkipMixin):
     """
     def execute(self, context):
         branch = super(BranchPythonOperator, self).execute(context)
-        self.logger.info("Following branch %s", branch)
-        self.logger.info("Marking other directly downstream tasks as skipped")
+        self.log.info("Following branch %s", branch)
+        self.log.info("Marking other directly downstream tasks as skipped")
 
         downstream_tasks = context['task'].downstream_list
-        self.logger.debug("Downstream task_ids %s", downstream_tasks)
+        self.log.debug("Downstream task_ids %s", downstream_tasks)
 
         skip_tasks = [t for t in downstream_tasks if t.task_id != branch]
         if downstream_tasks:
             self.skip(context['dag_run'], context['ti'].execution_date, skip_tasks)
 
-        self.logger.info("Done.")
+        self.log.info("Done.")
 
 
 class ShortCircuitOperator(PythonOperator, SkipMixin):
@@ -142,21 +142,21 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     def execute(self, context):
         condition = super(ShortCircuitOperator, self).execute(context)
-        self.logger.info("Condition result is %s", condition)
+        self.log.info("Condition result is %s", condition)
 
         if condition:
-            self.logger.info('Proceeding with downstream tasks...')
+            self.log.info('Proceeding with downstream tasks...')
             return
 
-        self.logger.info('Skipping downstream tasks...')
+        self.log.info('Skipping downstream tasks...')
 
         downstream_tasks = context['task'].get_flat_relatives(upstream=False)
-        self.logger.debug("Downstream task_ids %s", downstream_tasks)
+        self.log.debug("Downstream task_ids %s", downstream_tasks)
 
         if downstream_tasks:
             self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
 
-        self.logger.info("Done.")
+        self.log.info("Done.")
 
 class PythonVirtualenvOperator(PythonOperator):
     """
@@ -233,7 +233,7 @@ class PythonVirtualenvOperator(PythonOperator):
             # generate filenames
             input_filename = os.path.join(tmp_dir, 'script.in')
             output_filename = os.path.join(tmp_dir, 'script.out')
-            string_args_filename = os.path.join(tmp_dir, 'string_args.txt') 
+            string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
             script_filename = os.path.join(tmp_dir, 'script.py')
 
             # set up virtualenv
@@ -261,12 +261,12 @@ class PythonVirtualenvOperator(PythonOperator):
 
     def _execute_in_subprocess(self, cmd):
         try:
-            self.logger.info("Executing cmd\n{}".format(cmd))
+            self.log.info("Executing cmd\n{}".format(cmd))
             output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
             if output:
-                self.logger.info("Got output\n{}".format(output))
+                self.log.info("Got output\n{}".format(output))
         except subprocess.CalledProcessError as e:
-            self.logger.info("Got error output\n{}".format(e.output))
+            self.log.info("Got error output\n{}".format(e.output))
             raise
 
     def _write_string_args(self, filename):
@@ -294,14 +294,14 @@ class PythonVirtualenvOperator(PythonOperator):
                 else:
                     return pickle.load(f)
             except ValueError:
-                self.logger.error("Error deserializing result. Note that result deserialization "
+                self.log.error("Error deserializing result. Note that result deserialization "
                               "is not supported across major Python versions.")
                 raise
 
     def _write_script(self, script_filename):
         with open(script_filename, 'w') as f:
             python_code = self._generate_python_code()
-            self.logger.debug('Writing code to file\n{}'.format(python_code))
+            self.log.debug('Writing code to file\n{}'.format(python_code))
             f.write(python_code)
 
     def _generate_virtualenv_cmd(self, tmp_dir):
@@ -323,7 +323,7 @@ class PythonVirtualenvOperator(PythonOperator):
     def _generate_python_cmd(self, tmp_dir, script_filename, input_filename, output_filename, string_args_filename):
         # direct path alleviates need to activate
         return ['{}/bin/python'.format(tmp_dir), script_filename, input_filename, output_filename, string_args_filename]
-            
+
     def _generate_python_code(self):
         if self.use_dill:
             pickling_library = 'dill'
@@ -354,3 +354,5 @@ class PythonVirtualenvOperator(PythonOperator):
                 python_callable_name=fn.__name__,
                 pickling_library=pickling_library)
 
+        self.log.info("Done.")
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/redshift_to_s3_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/redshift_to_s3_operator.py b/airflow/operators/redshift_to_s3_operator.py
index e25d613..683ff9c 100644
--- a/airflow/operators/redshift_to_s3_operator.py
+++ b/airflow/operators/redshift_to_s3_operator.py
@@ -70,7 +70,7 @@ class RedshiftToS3Transfer(BaseOperator):
         a_key, s_key = self.s3.get_credentials()
         unload_options = '\n\t\t\t'.join(self.unload_options)
 
-        self.logger.info("Retrieving headers from %s.%s...", self.schema, self.table)
+        self.log.info("Retrieving headers from %s.%s...", self.schema, self.table)
 
         columns_query = """SELECT column_name
                             FROM information_schema.columns
@@ -99,6 +99,6 @@ class RedshiftToS3Transfer(BaseOperator):
                         """.format(column_names, column_castings, self.schema, self.table,
                                 self.s3_bucket, self.s3_key, a_key, s_key, unload_options)
 
-        self.logger.info('Executing UNLOAD command...')
+        self.log.info('Executing UNLOAD command...')
         self.hook.run(unload_query, self.autocommit)
-        self.logger.info("UNLOAD command complete...")
+        self.log.info("UNLOAD command complete...")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/s3_file_transform_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py
index 5de5127..68c733c 100644
--- a/airflow/operators/s3_file_transform_operator.py
+++ b/airflow/operators/s3_file_transform_operator.py
@@ -74,12 +74,12 @@ class S3FileTransformOperator(BaseOperator):
     def execute(self, context):
         source_s3 = S3Hook(s3_conn_id=self.source_s3_conn_id)
         dest_s3 = S3Hook(s3_conn_id=self.dest_s3_conn_id)
-        self.logger.info("Downloading source S3 file %s", self.source_s3_key)
+        self.log.info("Downloading source S3 file %s", self.source_s3_key)
         if not source_s3.check_for_key(self.source_s3_key):
             raise AirflowException("The source key {0} does not exist".format(self.source_s3_key))
         source_s3_key_object = source_s3.get_key(self.source_s3_key)
         with NamedTemporaryFile("w") as f_source, NamedTemporaryFile("w") as f_dest:
-            self.logger.info(
+            self.log.info(
                 "Dumping S3 file %s contents to local file %s",
                 self.source_s3_key, f_source.name
             )
@@ -90,20 +90,20 @@ class S3FileTransformOperator(BaseOperator):
                 [self.transform_script, f_source.name, f_dest.name],
                 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
             (transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate()
-            self.logger.info("Transform script stdout %s", transform_script_stdoutdata)
+            self.log.info("Transform script stdout %s", transform_script_stdoutdata)
             if transform_script_process.returncode > 0:
                 raise AirflowException("Transform script failed %s", transform_script_stderrdata)
             else:
-                self.logger.info(
+                self.log.info(
                     "Transform script successful. Output temporarily located at %s",
                     f_dest.name
                 )
-            self.logger.info("Uploading transformed file to S3")
+            self.log.info("Uploading transformed file to S3")
             f_dest.flush()
             dest_s3.load_file(
                 filename=f_dest.name,
                 key=self.dest_s3_key,
                 replace=self.replace
             )
-            self.logger.info("Upload successful")
+            self.log.info("Upload successful")
             dest_s3.connection.close()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/s3_to_hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py
index 68fe903..2b4aceb 100644
--- a/airflow/operators/s3_to_hive_operator.py
+++ b/airflow/operators/s3_to_hive_operator.py
@@ -129,7 +129,7 @@ class S3ToHiveTransfer(BaseOperator):
         # Downloading file from S3
         self.s3 = S3Hook(s3_conn_id=self.s3_conn_id)
         self.hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
-        self.logger.info("Downloading S3 file")
+        self.log.info("Downloading S3 file")
 
         if self.wildcard_match:
             if not self.s3.check_for_wildcard_key(self.s3_key):
@@ -146,13 +146,13 @@ class S3ToHiveTransfer(BaseOperator):
                 NamedTemporaryFile(mode="w",
                                    dir=tmp_dir,
                                    suffix=file_ext) as f:
-            self.logger.info("Dumping S3 key {0} contents to local file {1}"
-                             .format(s3_key_object.key, f.name))
+            self.log.info("Dumping S3 key {0} contents to local file {1}"
+                          .format(s3_key_object.key, f.name))
             s3_key_object.get_contents_to_file(f)
             f.flush()
             self.s3.connection.close()
             if not self.headers:
-                self.logger.info("Loading file %s into Hive", f.name)
+                self.log.info("Loading file %s into Hive", f.name)
                 self.hive.load_file(
                     f.name,
                     self.hive_table,
@@ -165,11 +165,11 @@ class S3ToHiveTransfer(BaseOperator):
             else:
                 # Decompressing file
                 if self.input_compressed:
-                    self.logger.info("Uncompressing file %s", f.name)
+                    self.log.info("Uncompressing file %s", f.name)
                     fn_uncompressed = uncompress_file(f.name,
                                                       file_ext,
                                                       tmp_dir)
-                    self.logger.info("Uncompressed to %s", fn_uncompressed)
+                    self.log.info("Uncompressed to %s", fn_uncompressed)
                     # uncompressed file available now so deleting
                     # compressed file to save disk space
                     f.close()
@@ -178,19 +178,19 @@ class S3ToHiveTransfer(BaseOperator):
 
                 # Testing if header matches field_dict
                 if self.check_headers:
-                    self.logger.info("Matching file header against field_dict")
+                    self.log.info("Matching file header against field_dict")
                     header_list = self._get_top_row_as_list(fn_uncompressed)
                     if not self._match_headers(header_list):
                         raise AirflowException("Header check failed")
 
                 # Deleting top header row
-                self.logger.info("Removing header from file %s", fn_uncompressed)
+                self.log.info("Removing header from file %s", fn_uncompressed)
                 headless_file = (
                     self._delete_top_row_and_compress(fn_uncompressed,
                                                       file_ext,
                                                       tmp_dir))
-                self.logger.info("Headless file %s", headless_file)
-                self.logger.info("Loading file %s into Hive", headless_file)
+                self.log.info("Headless file %s", headless_file)
+                self.log.info("Loading file %s into Hive", headless_file)
                 self.hive.load_file(headless_file,
                                     self.hive_table,
                                     field_dict=self.field_dict,
@@ -211,7 +211,7 @@ class S3ToHiveTransfer(BaseOperator):
             raise AirflowException("Unable to retrieve header row from file")
         field_names = self.field_dict.keys()
         if len(field_names) != len(header_list):
-            self.logger.warning("Headers count mismatch"
+            self.log.warning("Headers count mismatch"
                               "File headers:\n {header_list}\n"
                               "Field names: \n {field_names}\n"
                               "".format(**locals()))
@@ -219,7 +219,7 @@ class S3ToHiveTransfer(BaseOperator):
         test_field_match = [h1.lower() == h2.lower()
                             for h1, h2 in zip(header_list, field_names)]
         if not all(test_field_match):
-            self.logger.warning("Headers do not match field names"
+            self.log.warning("Headers do not match field names"
                               "File headers:\n {header_list}\n"
                               "Field names: \n {field_names}\n"
                               "".format(**locals()))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index ea301dc..b5c43c2 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -15,7 +15,7 @@
 from __future__ import print_function
 from future import standard_library
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 standard_library.install_aliases()
 from builtins import str
@@ -82,7 +82,7 @@ class BaseSensorOperator(BaseOperator):
                 else:
                     raise AirflowSensorTimeout('Snap. Time is OUT.')
             sleep(self.poke_interval)
-        self.logger.info("Success criteria met. Exiting.")
+        self.log.info("Success criteria met. Exiting.")
 
 
 class SqlSensor(BaseSensorOperator):
@@ -108,7 +108,7 @@ class SqlSensor(BaseSensorOperator):
     def poke(self, context):
         hook = BaseHook.get_connection(self.conn_id).get_hook()
 
-        self.logger.info('Poking: %s', self.sql)
+        self.log.info('Poking: %s', self.sql)
         records = hook.get_records(self.sql)
         if not records:
             return False
@@ -237,7 +237,7 @@ class ExternalTaskSensor(BaseSensorOperator):
         serialized_dttm_filter = ','.join(
             [datetime.isoformat() for datetime in dttm_filter])
 
-        self.logger.info(
+        self.log.info(
             'Poking for '
             '{self.external_dag_id}.'
             '{self.external_task_id} on '
@@ -313,7 +313,7 @@ class NamedHivePartitionSensor(BaseSensorOperator):
 
             schema, table, partition = self.parse_partition_name(partition)
 
-            self.logger.info(
+            self.log.info(
                 'Poking for {schema}.{table}/{partition}'.format(**locals())
             )
             return self.hook.check_for_named_partition(
@@ -371,7 +371,7 @@ class HivePartitionSensor(BaseSensorOperator):
     def poke(self, context):
         if '.' in self.table:
             self.schema, self.table = self.table.split('.')
-        self.logger.info(
+        self.log.info(
             'Poking for table {self.schema}.{self.table}, '
             'partition {self.partition}'.format(**locals()))
         if not hasattr(self, 'hook'):
@@ -417,7 +417,7 @@ class HdfsSensor(BaseSensorOperator):
         :return: (bool) depending on the matching criteria
         """
         if size:
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             log.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result))
             size *= settings.MEGABYTE
             result = [x for x in result if x['length'] >= size]
@@ -435,7 +435,7 @@ class HdfsSensor(BaseSensorOperator):
         :return: (list) of dicts which were not removed
         """
         if ignore_copying:
-            log = LoggingMixin().logger
+            log = LoggingMixin().log
             regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext)
             ignored_extentions_regex = re.compile(regex_builder)
             log.debug(
@@ -448,20 +448,20 @@ class HdfsSensor(BaseSensorOperator):
 
     def poke(self, context):
         sb = self.hook(self.hdfs_conn_id).get_conn()
-        self.logger.info('Poking for file {self.filepath}'.format(**locals()))
+        self.log.info('Poking for file {self.filepath}'.format(**locals()))
         try:
             # IMOO it's not right here, as there no raise of any kind.
             # if the filepath is let's say '/data/mydirectory', it's correct but if it is '/data/mydirectory/*',
             # it's not correct as the directory exists and sb does not raise any error
             # here is a quick fix
             result = [f for f in sb.ls([self.filepath], include_toplevel=False)]
-            self.logger.debug('HdfsSensor.poke: result is %s', result)
+            self.log.debug('HdfsSensor.poke: result is %s', result)
             result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
             result = self.filter_for_filesize(result, self.file_size)
             return bool(result)
         except:
             e = sys.exc_info()
-            self.logger.debug("Caught an exception !: %s", str(e))
+            self.log.debug("Caught an exception !: %s", str(e))
             return False
 
 
@@ -484,7 +484,7 @@ class WebHdfsSensor(BaseSensorOperator):
     def poke(self, context):
         from airflow.hooks.webhdfs_hook import WebHDFSHook
         c = WebHDFSHook(self.webhdfs_conn_id)
-        self.logger.info('Poking for file {self.filepath}'.format(**locals()))
+        self.log.info('Poking for file {self.filepath}'.format(**locals()))
         return c.check_for_path(hdfs_path=self.filepath)
 
 
@@ -535,7 +535,7 @@ class S3KeySensor(BaseSensorOperator):
         from airflow.hooks.S3_hook import S3Hook
         hook = S3Hook(s3_conn_id=self.s3_conn_id)
         full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
-        self.logger.info('Poking for key : {full_url}'.format(**locals()))
+        self.log.info('Poking for key : {full_url}'.format(**locals()))
         if self.wildcard_match:
             return hook.check_for_wildcard_key(self.bucket_key,
                                                self.bucket_name)
@@ -577,7 +577,7 @@ class S3PrefixSensor(BaseSensorOperator):
         self.s3_conn_id = s3_conn_id
 
     def poke(self, context):
-        self.logger.info('Poking for prefix : {self.prefix}\n'
+        self.log.info('Poking for prefix : {self.prefix}\n'
                      'in bucket s3://{self.bucket_name}'.format(**locals()))
         from airflow.hooks.S3_hook import S3Hook
         hook = S3Hook(s3_conn_id=self.s3_conn_id)
@@ -602,7 +602,7 @@ class TimeSensor(BaseSensorOperator):
         self.target_time = target_time
 
     def poke(self, context):
-        self.logger.info('Checking if the time (%s) has come', self.target_time)
+        self.log.info('Checking if the time (%s) has come', self.target_time)
         return datetime.now().time() > self.target_time
 
 
@@ -627,7 +627,7 @@ class TimeDeltaSensor(BaseSensorOperator):
         dag = context['dag']
         target_dttm = dag.following_schedule(context['execution_date'])
         target_dttm += self.delta
-        self.logger.info('Checking if the time (%s) has come', target_dttm)
+        self.log.info('Checking if the time (%s) has come', target_dttm)
         return datetime.now() > target_dttm
 
 
@@ -679,7 +679,7 @@ class HttpSensor(BaseSensorOperator):
             http_conn_id=http_conn_id)
 
     def poke(self, context):
-        self.logger.info('Poking: %s', self.endpoint)
+        self.log.info('Poking: %s', self.endpoint)
         try:
             response = self.hook.run(self.endpoint,
                                      data=self.request_params,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/slack_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/slack_operator.py b/airflow/operators/slack_operator.py
index 4f2d7bc..8b21211 100644
--- a/airflow/operators/slack_operator.py
+++ b/airflow/operators/slack_operator.py
@@ -67,7 +67,7 @@ class SlackAPIOperator(BaseOperator):
         rc = sc.api_call(self.method, **self.api_params)
         if not rc['ok']:
             msg = "Slack API call failed (%s)".format(rc['error'])
-            self.logger.error(msg)
+            self.log.error(msg)
             raise AirflowException(msg)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/operators/sqlite_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py
index 7c85847..b32837d 100644
--- a/airflow/operators/sqlite_operator.py
+++ b/airflow/operators/sqlite_operator.py
@@ -41,6 +41,6 @@ class SqliteOperator(BaseOperator):
         self.parameters = parameters or []
 
     def execute(self, context):
-        self.logger.info('Executing: %s', self.sql)
+        self.log.info('Executing: %s', self.sql)
         hook = SqliteHook(sqlite_conn_id=self.sqlite_conn_id)
         hook.run(self.sql, parameters=self.parameters)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/plugins_manager.py
----------------------------------------------------------------------
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 7c1d246..d5c3407 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -25,9 +25,9 @@ import re
 import sys
 
 from airflow import configuration
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 class AirflowPluginException(Exception):
     pass

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/security/kerberos.py
----------------------------------------------------------------------
diff --git a/airflow/security/kerberos.py b/airflow/security/kerberos.py
index a9687b3..7a169b2 100644
--- a/airflow/security/kerberos.py
+++ b/airflow/security/kerberos.py
@@ -24,7 +24,7 @@ from airflow import configuration, LoggingMixin
 
 NEED_KRB181_WORKAROUND = None
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 
 def renew_from_kt():

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index cf1eca4..1e5e614 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -27,9 +27,9 @@ from sqlalchemy.orm import scoped_session, sessionmaker
 from sqlalchemy.pool import NullPool
 
 from airflow import configuration as conf
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 class DummyStatsLogger(object):
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/task_runner/base_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/base_task_runner.py b/airflow/task_runner/base_task_runner.py
index 7794f4a..6a07db2 100644
--- a/airflow/task_runner/base_task_runner.py
+++ b/airflow/task_runner/base_task_runner.py
@@ -19,7 +19,7 @@ import json
 import subprocess
 import threading
 
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 from airflow import configuration as conf
 from tempfile import mkstemp
@@ -39,7 +39,7 @@ class BaseTaskRunner(LoggingMixin):
         """
         # Pass task instance context into log handlers to setup the logger.
         self._task_instance = local_task_job.task_instance
-        self.set_logger_contexts(self._task_instance)
+        self.set_log_contexts(self._task_instance)
 
         popen_prepend = []
         cfg_path = None
@@ -54,7 +54,7 @@ class BaseTaskRunner(LoggingMixin):
         # Add sudo commands to change user if we need to. Needed to handle SubDagOperator
         # case using a SequentialExecutor.
         if self.run_as_user and (self.run_as_user != getpass.getuser()):
-            self.logger.debug("Planning to run as the %s user", self.run_as_user)
+            self.log.debug("Planning to run as the %s user", self.run_as_user)
             cfg_dict = conf.as_dict(display_sensitive=True)
             cfg_subset = {
                 'core': cfg_dict.get('core', {}),
@@ -95,7 +95,7 @@ class BaseTaskRunner(LoggingMixin):
                 line = line.decode('utf-8')
             if len(line) == 0:
                 break
-            self.logger.info('Subtask: %s', line.rstrip('\n'))
+            self.log.info('Subtask: %s', line.rstrip('\n'))
 
     def run_command(self, run_with, join_args=False):
         """
@@ -112,7 +112,7 @@ class BaseTaskRunner(LoggingMixin):
         """
         cmd = [" ".join(self._command)] if join_args else self._command
         full_cmd = run_with + cmd
-        self.logger.info('Running: %s', full_cmd)
+        self.log.info('Running: %s', full_cmd)
         proc = subprocess.Popen(
             full_cmd,
             stdout=subprocess.PIPE,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/task_runner/bash_task_runner.py
----------------------------------------------------------------------
diff --git a/airflow/task_runner/bash_task_runner.py b/airflow/task_runner/bash_task_runner.py
index b73e258..109b44c 100644
--- a/airflow/task_runner/bash_task_runner.py
+++ b/airflow/task_runner/bash_task_runner.py
@@ -33,7 +33,7 @@ class BashTaskRunner(BaseTaskRunner):
 
     def terminate(self):
         if self.process and psutil.pid_exists(self.process.pid):
-            kill_process_tree(self.logger, self.process.pid)
+            kill_process_tree(self.log, self.process.pid)
 
     def on_finish(self):
         super(BashTaskRunner, self).on_finish()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 6497fcc..cc64f68 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -27,7 +27,7 @@ from datetime import datetime
 
 from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 class SimpleDag(BaseDag):
@@ -205,7 +205,7 @@ def list_py_file_paths(directory, safe_mode=True):
 
                     file_paths.append(file_path)
                 except Exception:
-                    log = LoggingMixin().logger
+                    log = LoggingMixin().log
                     log.exception("Error while examining %s", f)
     return file_paths
 
@@ -443,7 +443,7 @@ class DagFileProcessorManager(LoggingMixin):
             if file_path in new_file_paths:
                 filtered_processors[file_path] = processor
             else:
-                self.logger.warning("Stopping processor for %s", file_path)
+                self.log.warning("Stopping processor for %s", file_path)
                 processor.stop()
         self._processors = filtered_processors
 
@@ -519,7 +519,7 @@ class DagFileProcessorManager(LoggingMixin):
                     os.symlink(log_directory, latest_log_directory_path)
             elif (os.path.isdir(latest_log_directory_path) or
                     os.path.isfile(latest_log_directory_path)):
-                self.logger.warning(
+                self.log.warning(
                     "%s already exists as a dir/file. Skip creating symlink.",
                     latest_log_directory_path
                 )
@@ -558,7 +558,7 @@ class DagFileProcessorManager(LoggingMixin):
 
         for file_path, processor in self._processors.items():
             if processor.done:
-                self.logger.info("Processor for %s finished", file_path)
+                self.log.info("Processor for %s finished", file_path)
                 now = datetime.now()
                 finished_processors[file_path] = processor
                 self._last_runtime[file_path] = (now -
@@ -573,7 +573,7 @@ class DagFileProcessorManager(LoggingMixin):
         simple_dags = []
         for file_path, processor in finished_processors.items():
             if processor.result is None:
-                self.logger.warning(
+                self.log.warning(
                     "Processor for %s exited with return code %s. See %s for details.",
                     processor.file_path, processor.exit_code, processor.log_file
                 )
@@ -606,12 +606,12 @@ class DagFileProcessorManager(LoggingMixin):
                                         set(files_paths_at_run_limit))
 
             for file_path, processor in self._processors.items():
-                self.logger.debug(
+                self.log.debug(
                     "File path %s is still being processed (started: %s)",
                     processor.file_path, processor.start_time.isoformat()
                 )
 
-            self.logger.debug(
+            self.log.debug(
                 "Queuing the following files for processing:\n\t%s",
                 "\n\t".join(files_paths_to_queue)
             )
@@ -626,7 +626,7 @@ class DagFileProcessorManager(LoggingMixin):
             processor = self._processor_factory(file_path, log_file_path)
 
             processor.start()
-            self.logger.info(
+            self.log.info(
                 "Started a process (PID: %s) to generate tasks for %s - logging into %s",
                 processor.pid, file_path, log_file_path
             )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index c7e58e7..ef2560f 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -25,9 +25,9 @@ from sqlalchemy import event, exc
 from sqlalchemy.pool import Pool
 
 from airflow import settings
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
-log = LoggingMixin().logger
+log = LoggingMixin().log
 
 def provide_session(func):
     """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/email.py
----------------------------------------------------------------------
diff --git a/airflow/utils/email.py b/airflow/utils/email.py
index f252d55..fadd4d5 100644
--- a/airflow/utils/email.py
+++ b/airflow/utils/email.py
@@ -31,7 +31,7 @@ from email.utils import formatdate
 
 from airflow import configuration
 from airflow.exceptions import AirflowConfigException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 
 
 def send_email(to, subject, html_content, files=None, dryrun=False, cc=None, bcc=None, mime_subtype='mixed'):
@@ -88,7 +88,7 @@ def send_email_smtp(to, subject, html_content, files=None, dryrun=False, cc=None
 
 
 def send_MIME_email(e_from, e_to, mime_msg, dryrun=False):
-    log = LoggingMixin().logger
+    log = LoggingMixin().log
 
     SMTP_HOST = configuration.get('smtp', 'SMTP_HOST')
     SMTP_PORT = configuration.getint('smtp', 'SMTP_PORT')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/LoggingMixin.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/LoggingMixin.py b/airflow/utils/log/LoggingMixin.py
deleted file mode 100644
index 4572d63..0000000
--- a/airflow/utils/log/LoggingMixin.py
+++ /dev/null
@@ -1,45 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from __future__ import absolute_import
-from __future__ import division
-from __future__ import print_function
-from __future__ import unicode_literals
-
-import logging
-from builtins import object
-
-
-class LoggingMixin(object):
-    """
-    Convenience super-class to have a logger configured with the class name
-    """
-
-    @property
-    def logger(self):
-        try:
-            return self._logger
-        except AttributeError:
-            self._logger = logging.root.getChild(self.__class__.__module__ + '.' + self.__class__.__name__)
-            return self._logger
-
-    def set_logger_contexts(self, task_instance):
-        """
-        Set the context for all handlers of current logger.
-        """
-        for handler in self.logger.handlers:
-            try:
-                handler.set_context(task_instance)
-            except AttributeError:
-                pass

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb2f5890/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py
index 0bc0b5e..dcdaf6d 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -15,7 +15,7 @@ import os
 
 from airflow import configuration
 from airflow.exceptions import AirflowException
-from airflow.utils.log.LoggingMixin import LoggingMixin
+from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.log.file_task_handler import FileTaskHandler
 
 
@@ -40,7 +40,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
                 google_cloud_storage_conn_id=remote_conn_id
             )
         except:
-            self.logger.error(
+            self.log.error(
                 'Could not create a GoogleCloudStorageHook with connection id '
                 '"%s". Please make sure that airflow[gcp_api] is installed '
                 'and the GCS connection exists.', remote_conn_id
@@ -137,7 +137,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
             # return error if needed
             if return_error:
                 msg = 'Could not read logs from {}'.format(remote_log_location)
-                self.logger.error(msg)
+                self.log.error(msg)
                 return msg
 
     def gcs_write(self, log, remote_log_location, append=True):
@@ -167,7 +167,7 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
                 tmpfile.flush()
                 self.hook.upload(bkt, blob, tmpfile.name)
         except:
-            self.logger.error('Could not write logs to %s', remote_log_location)
+            self.log.error('Could not write logs to %s', remote_log_location)
 
     def parse_gcs_url(self, gsurl):
         """


Mime
View raw message