airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [2/5] incubator-airflow git commit: [AIRFLOW-1582] Improve logging within Airflow
Date Wed, 13 Sep 2017 07:37:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 3078f4e..39e65e8 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -35,7 +35,6 @@ import inspect
 import zipfile
 import jinja2
 import json
-import logging
 import os
 import pickle
 import re
@@ -75,11 +74,11 @@ from airflow.utils.decorators import apply_defaults
 from airflow.utils.email import send_email
 from airflow.utils.helpers import (
     as_tuple, is_container, is_in, validate_key, pprinttable)
-from airflow.utils.logging import LoggingMixin
 from airflow.utils.operator_resources import Resources
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
 from airflow.utils.trigger_rule import TriggerRule
+from airflow.utils.log.LoggingMixin import LoggingMixin
 
 Base = declarative_base()
 ID_LEN = 250
@@ -87,7 +86,6 @@ XCOM_RETURN_KEY = 'return_value'
 
 Stats = settings.Stats
 
-
 def get_fernet():
     """
     Deferred load of Fernet key.
@@ -180,6 +178,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         by the scheduler job only
     :type sync_to_db: bool
     """
+
     def __init__(
             self,
             dag_folder=None,
@@ -190,7 +189,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         if executor is None:
             executor = GetDefaultExecutor()
         dag_folder = dag_folder or settings.DAGS_FOLDER
-        self.logger.info("Filling up the DagBag from {}".format(dag_folder))
+        self.logger.info("Filling up the DagBag from %s", dag_folder)
         self.dag_folder = dag_folder
         self.dags = {}
         # the file's last modified timestamp when we last read it
@@ -263,7 +262,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                 return found_dags
 
         except Exception as e:
-            logging.exception(e)
+            self.logger.exception(e)
             return found_dags
 
         mods = []
@@ -275,7 +274,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                         self.file_last_changed[filepath] = file_last_changed_on_disk
                         return found_dags
 
-            self.logger.debug("Importing {}".format(filepath))
+            self.logger.debug("Importing %s", filepath)
             org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
             mod_name = ('unusual_prefix_' +
                         hashlib.sha1(filepath.encode('utf-8')).hexdigest() +
@@ -289,7 +288,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                     m = imp.load_source(mod_name, filepath)
                     mods.append(m)
                 except Exception as e:
-                    self.logger.exception("Failed to import: " + filepath)
+                    self.logger.exception("Failed to import: %s", filepath)
                     self.import_errors[filepath] = str(e)
                     self.file_last_changed[filepath] = file_last_changed_on_disk
 
@@ -300,13 +299,10 @@ class DagBag(BaseDagBag, LoggingMixin):
                 mod_name, ext = os.path.splitext(mod.filename)
                 if not head and (ext == '.py' or ext == '.pyc'):
                     if mod_name == '__init__':
-                        self.logger.warning("Found __init__.{0} at root of {1}".
-                                            format(ext, filepath))
-
+                        self.logger.warning("Found __init__.%s at root of %s", ext, filepath)
                     if safe_mode:
                         with zip_file.open(mod.filename) as zf:
-                            self.logger.debug("Reading {} from {}".
-                                              format(mod.filename, filepath))
+                            self.logger.debug("Reading %s from %s", mod.filename, filepath)
                             content = zf.read()
                             if not all([s in content for s in (b'DAG', b'airflow')]):
                                 self.file_last_changed[filepath] = (
@@ -322,7 +318,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                         m = importlib.import_module(mod_name)
                         mods.append(m)
                     except Exception as e:
-                        self.logger.exception("Failed to import: " + filepath)
+                        self.logger.exception("Failed to import: %s", filepath)
                         self.import_errors[filepath] = str(e)
                         self.file_last_changed[filepath] = file_last_changed_on_disk
 
@@ -347,11 +343,9 @@ class DagBag(BaseDagBag, LoggingMixin):
         from airflow.jobs import LocalTaskJob as LJ
         self.logger.info("Finding 'running' jobs without a recent heartbeat")
         TI = TaskInstance
-        secs = (
-            configuration.getint('scheduler', 'scheduler_zombie_task_threshold'))
+        secs = configuration.getint('scheduler', 'scheduler_zombie_task_threshold')
         limit_dttm = datetime.now() - timedelta(seconds=secs)
-        self.logger.info(
-            "Failing jobs without heartbeat after {}".format(limit_dttm))
+        self.logger.info("Failing jobs without heartbeat after %s", limit_dttm)
 
         tis = (
             session.query(TI)
@@ -371,9 +365,8 @@ class DagBag(BaseDagBag, LoggingMixin):
                 if ti.task_id in dag.task_ids:
                     task = dag.get_task(ti.task_id)
                     ti.task = task
-                    ti.handle_failure("{} killed as zombie".format(ti))
-                    self.logger.info(
-                        'Marked zombie job {} as failed'.format(ti))
+                    ti.handle_failure("{} killed as zombie".format(str(ti)))
+                    self.logger.info('Marked zombie job %s as failed', ti)
                     Stats.incr('zombies_killed')
         session.commit()
 
@@ -451,7 +444,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                                 str([dag.dag_id for dag in found_dags]),
                             ))
                     except Exception as e:
-                        logging.warning(e)
+                        self.logger.warning(e)
         Stats.gauge(
             'collect_dags', (datetime.now() - start_dttm).total_seconds(), 1)
         Stats.gauge(
@@ -619,7 +612,7 @@ class Connection(Base):
                 self.is_encrypted = True
             except AirflowException:
                 self.logger.exception("Failed to load fernet while encrypting value, "
-                                      "using non-encrypted value.")
+                                    "using non-encrypted value.")
                 self._password = value
                 self.is_encrypted = False
 
@@ -648,7 +641,7 @@ class Connection(Base):
                 self.is_extra_encrypted = True
             except AirflowException:
                 self.logger.exception("Failed to load fernet while encrypting value, "
-                                      "using non-encrypted value.")
+                                    "using non-encrypted value.")
                 self._extra = value
                 self.is_extra_encrypted = False
 
@@ -718,8 +711,8 @@ class Connection(Base):
             try:
                 obj = json.loads(self.extra)
             except Exception as e:
-                logging.exception(e)
-                logging.error("Failed parsing the json for conn_id %s", self.conn_id)
+                self.logger.exception(e)
+                self.logger.error("Failed parsing the json for conn_id %s", self.conn_id)
 
         return obj
 
@@ -750,7 +743,7 @@ class DagPickle(Base):
         self.pickle = dag
 
 
-class TaskInstance(Base):
+class TaskInstance(Base, LoggingMixin):
     """
     Task instances store the state of a task instance. This table is the
     authority and single source of truth around what tasks have run and the
@@ -764,7 +757,6 @@ class TaskInstance(Base):
     even while multiple schedulers may be firing task instances.
     """
 
-
     __tablename__ = "task_instance"
 
     task_id = Column(String(ID_LEN), primary_key=True)
@@ -1014,7 +1006,7 @@ class TaskInstance(Base):
         """
         Forces the task instance's state to FAILED in the database.
         """
-        logging.error("Recording the task instance as FAILED")
+        self.logger.error("Recording the task instance as FAILED")
         self.state = State.FAILED
         session.merge(self)
         session.commit()
@@ -1165,14 +1157,16 @@ class TaskInstance(Base):
                 session=session):
             failed = True
             if verbose:
-                logging.info("Dependencies not met for {}, dependency '{}' FAILED: {}"
-                             .format(self, dep_status.dep_name, dep_status.reason))
+                self.logger.info(
+                    "Dependencies not met for %s, dependency '%s' FAILED: %s",
+                    self, dep_status.dep_name, dep_status.reason
+                )
 
         if failed:
             return False
 
         if verbose:
-            logging.info("Dependencies all met for {}".format(self))
+            self.logger.info("Dependencies all met for %s", self)
 
         return True
 
@@ -1188,11 +1182,10 @@ class TaskInstance(Base):
                     session,
                     dep_context):
 
-                logging.debug("{} dependency '{}' PASSED: {}, {}"
-                              .format(self,
-                                      dep_status.dep_name,
-                                      dep_status.passed,
-                                      dep_status.reason))
+                self.logger.debug(
+                    "%s dependency '%s' PASSED: %s, %s",
+                    self, dep_status.dep_name, dep_status.passed, dep_status.reason
+                )
 
                 if not dep_status.passed:
                     yield dep_status
@@ -1335,6 +1328,7 @@ class TaskInstance(Base):
             session.commit()
             return False
 
+        #TODO: Logging needs cleanup, not clear what is being printed
         hr = "\n" + ("-" * 80) + "\n"  # Line break
 
         # For reporting purposes, we report based on 1-indexed,
@@ -1365,11 +1359,10 @@ class TaskInstance(Base):
                    "runtime. Attempt {attempt} of {total}. State set to NONE.").format(
                 attempt=self.try_number + 1,
                 total=self.max_tries + 1)
-            logging.warning(hr + msg + hr)
+            self.logger.warning(hr + msg + hr)
 
             self.queued_dttm = datetime.now()
-            msg = "Queuing into pool {}".format(self.pool)
-            logging.info(msg)
+            self.logger.info("Queuing into pool %s", self.pool)
             session.merge(self)
             session.commit()
             return False
@@ -1378,12 +1371,12 @@ class TaskInstance(Base):
         # the current worker process was blocked on refresh_from_db
         if self.state == State.RUNNING:
             msg = "Task Instance already running {}".format(self)
-            logging.warning(msg)
+            self.logger.warning(msg)
             session.commit()
             return False
 
         # print status message
-        logging.info(hr + msg + hr)
+        self.logger.info(hr + msg + hr)
         self.try_number += 1
 
         if not test_mode:
@@ -1401,10 +1394,10 @@ class TaskInstance(Base):
         if verbose:
             if mark_success:
                 msg = "Marking success for {} on {}".format(self.task, self.execution_date)
-                logging.info(msg)
+                self.logger.info(msg)
             else:
                 msg = "Executing {} on {}".format(self.task, self.execution_date)
-                logging.info(msg)
+                self.logger.info(msg)
         return True
 
     @provide_session
@@ -1445,8 +1438,8 @@ class TaskInstance(Base):
                 self.task = task_copy
 
                 def signal_handler(signum, frame):
-                    '''Setting kill signal handler'''
-                    logging.error("Killing subprocess")
+                    """Setting kill signal handler"""
+                    self.logger.error("Killing subprocess")
                     task_copy.on_kill()
                     raise AirflowException("Task received SIGTERM signal")
                 signal.signal(signal.SIGTERM, signal_handler)
@@ -1525,8 +1518,8 @@ class TaskInstance(Base):
             if task.on_success_callback:
                 task.on_success_callback(context)
         except Exception as e3:
-            logging.error("Failed when executing success callback")
-            logging.exception(e3)
+            self.logger.error("Failed when executing success callback")
+            self.logger.exception(e3)
 
         session.commit()
 
@@ -1571,7 +1564,7 @@ class TaskInstance(Base):
         task_copy.dry_run()
 
     def handle_failure(self, error, test_mode=False, context=None):
-        logging.exception(error)
+        self.logger.exception(error)
         task = self.task
         session = settings.Session()
         self.end_date = datetime.now()
@@ -1592,21 +1585,20 @@ class TaskInstance(Base):
             # next task instance try_number exceeds the max_tries.
             if task.retries and self.try_number <= self.max_tries:
                 self.state = State.UP_FOR_RETRY
-                logging.info('Marking task as UP_FOR_RETRY')
+                self.logger.info('Marking task as UP_FOR_RETRY')
                 if task.email_on_retry and task.email:
                     self.email_alert(error, is_retry=True)
             else:
                 self.state = State.FAILED
                 if task.retries:
-                    logging.info('All retries failed; marking task as FAILED')
+                    self.logger.info('All retries failed; marking task as FAILED')
                 else:
-                    logging.info('Marking task as FAILED.')
+                    self.logger.info('Marking task as FAILED.')
                 if task.email_on_failure and task.email:
                     self.email_alert(error, is_retry=False)
         except Exception as e2:
-            logging.error(
-                'Failed to send email to: ' + str(task.email))
-            logging.exception(e2)
+            self.logger.error('Failed to send email to: %s', task.email)
+            self.logger.exception(e2)
 
         # Handling callbacks pessimistically
         try:
@@ -1615,13 +1607,13 @@ class TaskInstance(Base):
             if self.state == State.FAILED and task.on_failure_callback:
                 task.on_failure_callback(context)
         except Exception as e3:
-            logging.error("Failed at executing callback")
-            logging.exception(e3)
+            self.logger.error("Failed at executing callback")
+            self.logger.exception(e3)
 
         if not test_mode:
             session.merge(self)
         session.commit()
-        logging.error(str(error))
+        self.logger.error(str(error))
 
     @provide_session
     def get_template_context(self, session=None):
@@ -1939,7 +1931,7 @@ class SkipMixin(object):
         else:
             assert execution_date is not None, "Execution date is None and no dag run"
 
-            logging.warning("No DAG RUN present this should not happen")
+            self.logger.warning("No DAG RUN present this should not happen")
             # this is defensive against dag runs that are not complete
             for task in tasks:
                 ti = TaskInstance(task, execution_date=execution_date)
@@ -1953,7 +1945,7 @@ class SkipMixin(object):
 
 
 @functools.total_ordering
-class BaseOperator(object):
+class BaseOperator(LoggingMixin):
     """
     Abstract base class for all operators. Since operators create objects that
     become node in the dag, BaseOperator contains many recursive methods for
@@ -2134,8 +2126,7 @@ class BaseOperator(object):
         self.email_on_failure = email_on_failure
         self.start_date = start_date
         if start_date and not isinstance(start_date, datetime):
-            logging.warning(
-                "start_date for {} isn't datetime.datetime".format(self))
+            self.logger.warning("start_date for %s isn't datetime.datetime", self)
         self.end_date = end_date
         if not TriggerRule.is_valid(trigger_rule):
             raise AirflowException(
@@ -2151,10 +2142,12 @@ class BaseOperator(object):
             self.depends_on_past = True
 
         if schedule_interval:
-            logging.warning(
+            self.logger.warning(
                 "schedule_interval is used for {}, though it has "
                 "been deprecated as a task parameter, you need to "
-                "specify it as a DAG parameter instead".format(self))
+                "specify it as a DAG parameter instead",
+                self
+            )
         self._schedule_interval = schedule_interval
         self.retries = retries
         self.queue = queue
@@ -2167,7 +2160,7 @@ class BaseOperator(object):
         if isinstance(retry_delay, timedelta):
             self.retry_delay = retry_delay
         else:
-            logging.debug("retry_delay isn't timedelta object, assuming secs")
+            self.logger.debug("Retry_delay isn't timedelta object, assuming secs")
             self.retry_delay = timedelta(seconds=retry_delay)
         self.retry_exponential_backoff = retry_exponential_backoff
         self.max_retry_delay = max_retry_delay
@@ -2467,7 +2460,7 @@ class BaseOperator(object):
                 try:
                     setattr(self, attr, env.loader.get_source(env, content)[0])
                 except Exception as e:
-                    logging.exception(e)
+                    self.logger.exception(e)
         self.prepare_template()
 
     @property
@@ -2586,12 +2579,12 @@ class BaseOperator(object):
                 ignore_ti_state=ignore_ti_state)
 
     def dry_run(self):
-        logging.info('Dry run')
+        self.logger.info('Dry run')
         for attr in self.template_fields:
             content = getattr(self, attr)
             if content and isinstance(content, six.string_types):
-                logging.info('Rendering template for {0}'.format(attr))
-                logging.info(content)
+                self.logger.info('Rendering template for %s', attr)
+                self.logger.info(content)
 
     def get_direct_relatives(self, upstream=False):
         """
@@ -2753,7 +2746,7 @@ class DagModel(Base):
 
 
 @functools.total_ordering
-class DAG(BaseDag, LoggingMixin):
+class DAG(BaseDag):
     """
     A dag (directed acyclic graph) is a collection of tasks with directional
     dependencies. A dag also has a schedule, a start end an end date
@@ -3529,7 +3522,8 @@ class DAG(BaseDag, LoggingMixin):
             d['pickle_len'] = len(pickled)
             d['pickling_duration'] = "{}".format(datetime.now() - dttm)
         except Exception as e:
-            logging.debug(e)
+            log = LoggingMixin().logger
+            log.debug(e)
             d['is_picklable'] = False
             d['stacktrace'] = traceback.format_exc()
         return d
@@ -3757,12 +3751,13 @@ class DAG(BaseDag, LoggingMixin):
         :type sync_time: datetime
         :return: None
         """
+
         orm_dag = session.query(
             DagModel).filter(DagModel.dag_id == dag.dag_id).first()
         if not orm_dag:
             orm_dag = DagModel(dag_id=dag.dag_id)
-            logging.info("Creating ORM DAG for %s",
-                         dag.dag_id)
+            log = LoggingMixin().logger
+            log.info("Creating ORM DAG for %s", dag.dag_id)
         orm_dag.fileloc = dag.fileloc
         orm_dag.is_subdag = dag.is_subdag
         orm_dag.owners = owner
@@ -3805,13 +3800,14 @@ class DAG(BaseDag, LoggingMixin):
         :type expiration_date: datetime
         :return: None
         """
+        logger = LoggingMixin().logger
         for dag in session.query(
                 DagModel).filter(DagModel.last_scheduler_run < expiration_date,
                                  DagModel.is_active).all():
-            logging.info("Deactivating DAG ID %s since it was last touched "
-                         "by the scheduler at %s",
-                         dag.dag_id,
-                         dag.last_scheduler_run.isoformat())
+            logger.info(
+                "Deactivating DAG ID %s since it was last touched by the scheduler at %s",
+                dag.dag_id, dag.last_scheduler_run.isoformat()
+            )
             dag.is_active = False
             session.merge(dag)
             session.commit()
@@ -3901,7 +3897,7 @@ class KnownEvent(Base):
         return self.label
 
 
-class Variable(Base):
+class Variable(Base, LoggingMixin):
     __tablename__ = "variable"
 
     id = Column(Integer, primary_key=True)
@@ -3937,8 +3933,9 @@ class Variable(Base):
                 self._val = fernet.encrypt(bytes(value, 'utf-8')).decode()
                 self.is_encrypted = True
             except AirflowException:
-                self.logger.exception("Failed to load fernet while encrypting value, "
-                                      "using non-encrypted value.")
+                self.logger.exception(
+                    "Failed to load fernet while encrypting value, using non-encrypted value."
+                )
                 self._val = value
                 self.is_encrypted = False
 
@@ -4005,7 +4002,7 @@ class Variable(Base):
         session.flush()
 
 
-class XCom(Base):
+class XCom(Base, LoggingMixin):
     """
     Base class for XCom objects.
     """
@@ -4061,10 +4058,11 @@ class XCom(Base):
             try:
                 value = json.dumps(value).encode('UTF-8')
             except ValueError:
-                logging.error("Could not serialize the XCOM value into JSON. "
-                              "If you are using pickles instead of JSON "
-                              "for XCOM, then you need to enable pickle "
-                              "support for XCOM in your airflow config.")
+                log = LoggingMixin().logger
+                log.error("Could not serialize the XCOM value into JSON. "
+                          "If you are using pickles instead of JSON "
+                          "for XCOM, then you need to enable pickle "
+                          "support for XCOM in your airflow config.")
                 raise
 
         # remove any duplicate XComs
@@ -4131,10 +4129,11 @@ class XCom(Base):
                 try:
                     return json.loads(result.value.decode('UTF-8'))
                 except ValueError:
-                    logging.error("Could not serialize the XCOM value into JSON. "
-                                  "If you are using pickles instead of JSON "
-                                  "for XCOM, then you need to enable pickle "
-                                  "support for XCOM in your airflow config.")
+                    log = LoggingMixin().logger
+                    log.error("Could not serialize the XCOM value into JSON. "
+                              "If you are using pickles instead of JSON "
+                              "for XCOM, then you need to enable pickle "
+                              "support for XCOM in your airflow config.")
                     raise
 
     @classmethod
@@ -4180,10 +4179,11 @@ class XCom(Base):
                 try:
                     result.value = json.loads(result.value.decode('UTF-8'))
                 except ValueError:
-                    logging.error("Could not serialize the XCOM value into JSON. "
-                                    "If you are using pickles instead of JSON "
-                                    "for XCOM, then you need to enable pickle "
-                                    "support for XCOM in your airflow config.")
+                    log = LoggingMixin().logger
+                    log.error("Could not serialize the XCOM value into JSON. "
+                              "If you are using pickles instead of JSON "
+                              "for XCOM, then you need to enable pickle "
+                              "support for XCOM in your airflow config.")
                     raise
         return results
 
@@ -4235,8 +4235,9 @@ class DagStat(Base):
             session.commit()
         except Exception as e:
             session.rollback()
-            logging.warning("Could not update dag stats for {}".format(dag_id))
-            logging.exception(e)
+            log = LoggingMixin().logger
+            log.warning("Could not update dag stats for %s", dag_id)
+            log.exception(e)
 
     @staticmethod
     @provide_session
@@ -4287,8 +4288,9 @@ class DagStat(Base):
             session.commit()
         except Exception as e:
             session.rollback()
-            logging.warning("Could not update dag stat table")
-            logging.exception(e)
+            log = LoggingMixin().logger
+            log.warning("Could not update dag stat table")
+            log.exception(e)
 
     @staticmethod
     @provide_session
@@ -4310,11 +4312,12 @@ class DagStat(Base):
                     session.commit()
                 except Exception as e:
                     session.rollback()
-                    logging.warning("Could not create stat record")
-                    logging.exception(e)
+                    log = LoggingMixin().logger
+                    log.warning("Could not create stat record")
+                    log.exception(e)
 
 
-class DagRun(Base):
+class DagRun(Base, LoggingMixin):
     """
     DagRun describes an instance of a Dag. It can be created
     by the scheduler (for regular runs) or by an external trigger
@@ -4527,8 +4530,7 @@ class DagRun(Base):
 
         tis = self.get_task_instances(session=session)
 
-        logging.info("Updating state for {} considering {} task(s)"
-                     .format(self, len(tis)))
+        self.logger.info("Updating state for %s considering %s task(s)", self, len(tis))
 
         for ti in list(tis):
             # skip in db?
@@ -4574,19 +4576,18 @@ class DagRun(Base):
             # if all roots finished and at least on failed, the run failed
             if (not unfinished_tasks and
                     any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)):
-                logging.info('Marking run {} failed'.format(self))
+                self.logger.info('Marking run %s failed', self)
                 self.state = State.FAILED
 
             # if all roots succeeded and no unfinished tasks, the run succeeded
             elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED)
                                               for r in roots):
-                logging.info('Marking run {} successful'.format(self))
+                self.logger.info('Marking run %s successful', self)
                 self.state = State.SUCCESS
 
             # if *all tasks* are deadlocked, the run failed
             elif unfinished_tasks and none_depends_on_past and no_dependencies_met:
-                logging.info(
-                    'Deadlock; marking run {} failed'.format(self))
+                self.logger.info('Deadlock; marking run %s failed', self)
                 self.state = State.FAILED
 
             # finally, if the roots aren't done, the dag is still running

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/bash_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py
index 3146cd6..63321fb 100644
--- a/airflow/operators/bash_operator.py
+++ b/airflow/operators/bash_operator.py
@@ -16,7 +16,6 @@
 from builtins import bytes
 import os
 import signal
-import logging
 from subprocess import Popen, STDOUT, PIPE
 from tempfile import gettempdir, NamedTemporaryFile
 
@@ -68,7 +67,7 @@ class BashOperator(BaseOperator):
         which will be cleaned afterwards
         """
         bash_command = self.bash_command
-        logging.info("tmp dir root location: \n" + gettempdir())
+        self.logger.info("Tmp dir root location: \n %s", gettempdir())
         with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
             with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
 
@@ -76,9 +75,11 @@ class BashOperator(BaseOperator):
                 f.flush()
                 fname = f.name
                 script_location = tmp_dir + "/" + fname
-                logging.info("Temporary script "
-                             "location :{0}".format(script_location))
-                logging.info("Running command: " + bash_command)
+                self.logger.info(
+                    "Temporary script location: %s",
+                    script_location
+                )
+                self.logger.info("Running command: %s", bash_command)
                 sp = Popen(
                     ['bash', fname],
                     stdout=PIPE, stderr=STDOUT,
@@ -87,14 +88,16 @@ class BashOperator(BaseOperator):
 
                 self.sp = sp
 
-                logging.info("Output:")
+                self.logger.info("Output:")
                 line = ''
                 for line in iter(sp.stdout.readline, b''):
                     line = line.decode(self.output_encoding).strip()
-                    logging.info(line)
+                    self.logger.info(line)
                 sp.wait()
-                logging.info("Command exited with "
-                             "return code {0}".format(sp.returncode))
+                self.logger.info(
+                    "Command exited with return code %s",
+                    sp.returncode
+                )
 
                 if sp.returncode:
                     raise AirflowException("Bash command failed")
@@ -103,6 +106,6 @@ class BashOperator(BaseOperator):
             return line
 
     def on_kill(self):
-        logging.info('Sending SIGTERM signal to bash process group')
+        self.logger.info('Sending SIGTERM signal to bash process group')
         os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/check_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py
index 1cf50da..f263a2c 100644
--- a/airflow/operators/check_operator.py
+++ b/airflow/operators/check_operator.py
@@ -14,7 +14,6 @@
 
 from builtins import zip
 from builtins import str
-import logging
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
@@ -72,15 +71,15 @@ class CheckOperator(BaseOperator):
         self.sql = sql
 
     def execute(self, context=None):
-        logging.info('Executing SQL check: ' + self.sql)
+        self.logger.info('Executing SQL check: %s', self.sql)
         records = self.get_db_hook().get_first(self.sql)
-        logging.info("Record: " + str(records))
+        self.logger.info('Record: %s', records)
         if not records:
             raise AirflowException("The query returned None")
         elif not all([bool(r) for r in records]):
             exceptstr = "Test failed.\nQuery:\n{q}\nResults:\n{r!s}"
             raise AirflowException(exceptstr.format(q=self.sql, r=records))
-        logging.info("Success.")
+        self.logger.info("Success.")
 
     def get_db_hook(self):
         return BaseHook.get_hook(conn_id=self.conn_id)
@@ -135,7 +134,7 @@ class ValueCheckOperator(BaseOperator):
         self.has_tolerance = self.tol is not None
 
     def execute(self, context=None):
-        logging.info('Executing SQL check: ' + self.sql)
+        self.logger.info('Executing SQL check: %s', self.sql)
         records = self.get_db_hook().get_first(self.sql)
         if not records:
             raise AirflowException("The query returned None")
@@ -209,9 +208,9 @@ class IntervalCheckOperator(BaseOperator):
 
     def execute(self, context=None):
         hook = self.get_db_hook()
-        logging.info('Executing SQL check: ' + self.sql2)
+        self.logger.info('Executing SQL check: %s', self.sql2)
         row2 = hook.get_first(self.sql2)
-        logging.info('Executing SQL check: ' + self.sql1)
+        self.logger.info('Executing SQL check: %s', self.sql1)
         row1 = hook.get_first(self.sql1)
         if not row2:
             raise AirflowException("The query {q} returned None".format(q=self.sql2))
@@ -231,19 +230,20 @@ class IntervalCheckOperator(BaseOperator):
             else:
                 ratio = float(max(current[m], reference[m])) / \
                     min(current[m], reference[m])
-            logging.info(rlog.format(m, ratio, self.metrics_thresholds[m]))
+            self.logger.info(rlog.format(m, ratio, self.metrics_thresholds[m]))
             ratios[m] = ratio
             test_results[m] = ratio < self.metrics_thresholds[m]
         if not all(test_results.values()):
             failed_tests = [it[0] for it in test_results.items() if not it[1]]
             j = len(failed_tests)
             n = len(self.metrics_sorted)
-            logging.warning(countstr.format(**locals()))
+            self.logger.warning(countstr.format(**locals()))
             for k in failed_tests:
-                logging.warning(fstr.format(k=k, r=ratios[k],
-                                tr=self.metrics_thresholds[k]))
+                self.logger.warning(
+                    fstr.format(k=k, r=ratios[k], tr=self.metrics_thresholds[k])
+                )
             raise AirflowException(estr.format(", ".join(failed_tests)))
-        logging.info("All tests have passed")
+        self.logger.info("All tests have passed")
 
     def get_db_hook(self):
         return BaseHook.get_hook(conn_id=self.conn_id)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index c3ffa1a..bd2862b 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -13,13 +13,11 @@
 # limitations under the License.
 
 from datetime import datetime
-import logging
 
 from airflow.models import BaseOperator, DagBag
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.state import State
 from airflow import settings
-from airflow import configuration as conf
 
 
 class DagRunOrder(object):
@@ -71,9 +69,9 @@ class TriggerDagRunOperator(BaseOperator):
                 state=State.RUNNING,
                 conf=dro.payload,
                 external_trigger=True)
-            logging.info("Creating DagRun {}".format(dr))
+            self.logger.info("Creating DagRun %s", dr)
             session.add(dr)
             session.commit()
             session.close()
         else:
-            logging.info("Criteria not met, moving on")
+            self.logger.info("Criteria not met, moving on")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/docker_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py
index ddcc97b..8a333d6 100644
--- a/airflow/operators/docker_operator.py
+++ b/airflow/operators/docker_operator.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 import json
-import logging
+
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -134,7 +134,7 @@ class DockerOperator(BaseOperator):
         self.container = None
 
     def execute(self, context):
-        logging.info('Starting docker container from image ' + self.image)
+        self.logger.info('Starting docker container from image %s', self.image)
 
         tls_config = None
         if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key:
@@ -155,10 +155,10 @@ class DockerOperator(BaseOperator):
             image = self.image
 
         if self.force_pull or len(self.cli.images(name=image)) == 0:
-            logging.info('Pulling docker image ' + image)
+            self.logger.info('Pulling docker image %s', image)
             for l in self.cli.pull(image, stream=True):
                 output = json.loads(l.decode('utf-8'))
-                logging.info("{}".format(output['status']))
+                self.logger.info("%s", output['status'])
 
         cpu_shares = int(round(self.cpus * 1024))
 
@@ -184,7 +184,7 @@ class DockerOperator(BaseOperator):
                 line = line.strip()
                 if hasattr(line, 'decode'):
                     line = line.decode('utf-8')
-                logging.info(line)
+                self.logger.info(line)
 
             exit_code = self.cli.wait(self.container['Id'])
             if exit_code != 0:
@@ -202,5 +202,5 @@ class DockerOperator(BaseOperator):
 
     def on_kill(self):
         if self.cli is not None:
-            logging.info('Stopping docker container')
+            self.logger.info('Stopping docker container')
             self.cli.stop(self.container['Id'])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/generic_transfer.py
----------------------------------------------------------------------
diff --git a/airflow/operators/generic_transfer.py b/airflow/operators/generic_transfer.py
index de3bf73..790749a 100644
--- a/airflow/operators/generic_transfer.py
+++ b/airflow/operators/generic_transfer.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.hooks.base_hook import BaseHook
@@ -64,15 +61,15 @@ class GenericTransfer(BaseOperator):
     def execute(self, context):
         source_hook = BaseHook.get_hook(self.source_conn_id)
 
-        logging.info("Extracting data from {}".format(self.source_conn_id))
-        logging.info("Executing: \n" + self.sql)
+        self.logger.info("Extracting data from %s", self.source_conn_id)
+        self.logger.info("Executing: \n %s", self.sql)
         results = source_hook.get_records(self.sql)
 
         destination_hook = BaseHook.get_hook(self.destination_conn_id)
         if self.preoperator:
-            logging.info("Running preoperator")
-            logging.info(self.preoperator)
+            self.logger.info("Running preoperator")
+            self.logger.info(self.preoperator)
             destination_hook.run(self.preoperator)
 
-        logging.info("Inserting rows into {}".format(self.destination_conn_id))
+        self.logger.info("Inserting rows into %s", self.destination_conn_id)
         destination_hook.insert_rows(table=self.destination_table, rows=results)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py
index 06a83e3..983069b 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -11,8 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
 import re
 
 from airflow.hooks.hive_hooks import HiveCliHook
@@ -95,7 +93,7 @@ class HiveOperator(BaseOperator):
             self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:])
 
     def execute(self, context):
-        logging.info('Executing: ' + self.hql)
+        self.logger.info('Executing: %s', self.hql)
         self.hook = self.get_hook()
         self.hook.run_cli(hql=self.hql, schema=self.schema,
                           hive_conf=context_to_airflow_vars(context))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/hive_stats_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_stats_operator.py b/airflow/operators/hive_stats_operator.py
index b31c6b5..025e427 100644
--- a/airflow/operators/hive_stats_operator.py
+++ b/airflow/operators/hive_stats_operator.py
@@ -12,11 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from builtins import str
 from builtins import zip
 from collections import OrderedDict
 import json
-import logging
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.mysql_hook import MySqlHook
@@ -141,15 +139,15 @@ class HiveStatsCollectionOperator(BaseOperator):
         """.format(**locals())
 
         hook = PrestoHook(presto_conn_id=self.presto_conn_id)
-        logging.info('Executing SQL check: ' + sql)
+        self.logger.info('Executing SQL check: %s', sql)
         row = hook.get_first(hql=sql)
-        logging.info("Record: " + str(row))
+        self.logger.info("Record: %s", row)
         if not row:
             raise AirflowException("The query returned None")
 
         part_json = json.dumps(self.partition, sort_keys=True)
 
-        logging.info("Deleting rows from previous runs if they exist")
+        self.logger.info("Deleting rows from previous runs if they exist")
         mysql = MySqlHook(self.mysql_conn_id)
         sql = """
         SELECT 1 FROM hive_stats
@@ -169,7 +167,7 @@ class HiveStatsCollectionOperator(BaseOperator):
             """.format(**locals())
             mysql.run(sql)
 
-        logging.info("Pivoting and loading cells into the Airflow db")
+        self.logger.info("Pivoting and loading cells into the Airflow db")
         rows = [
             (self.ds, self.dttm, self.table, part_json) +
             (r[0][0], r[0][1], r[1])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/hive_to_druid.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py
index 70d7825..7ac0b02 100644
--- a/airflow/operators/hive_to_druid.py
+++ b/airflow/operators/hive_to_druid.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.hooks.hive_hooks import HiveCliHook, HiveMetastoreHook
 from airflow.hooks.druid_hook import DruidHook
 from airflow.models import BaseOperator
@@ -90,7 +87,7 @@ class HiveToDruidTransfer(BaseOperator):
 
     def execute(self, context):
         hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
-        logging.info("Extracting data from Hive")
+        self.logger.info("Extracting data from Hive")
         hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_')
         sql = self.sql.strip().strip(';')
         hql = """\
@@ -104,7 +101,7 @@ class HiveToDruidTransfer(BaseOperator):
         AS
         {sql}
         """.format(**locals())
-        logging.info("Running command:\n {}".format(hql))
+        self.logger.info("Running command:\n %s", hql)
         hive.run_cli(hql)
 
         m = HiveMetastoreHook(self.metastore_conn_id)
@@ -128,15 +125,16 @@ class HiveToDruidTransfer(BaseOperator):
                 columns=columns,
             )
 
-            logging.info("Inserting rows into Druid, hdfs path: {}".format(static_path))
+            self.logger.info("Inserting rows into Druid, hdfs path: %s", static_path)
 
             druid.submit_indexing_job(index_spec)
 
-            logging.info("Load seems to have succeeded!")
+            self.logger.info("Load seems to have succeeded!")
         finally:
-            logging.info(
-                "Cleaning up by dropping the temp "
-                "Hive table {}".format(hive_table))
+            self.logger.info(
+                "Cleaning up by dropping the temp Hive table %s",
+                hive_table
+            )
             hql = "DROP TABLE IF EXISTS {}".format(hive_table)
             hive.run_cli(hql)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/hive_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_mysql.py b/airflow/operators/hive_to_mysql.py
index 4a64749..e82a099 100644
--- a/airflow/operators/hive_to_mysql.py
+++ b/airflow/operators/hive_to_mysql.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.hooks.hive_hooks import HiveServer2Hook
 from airflow.hooks.mysql_hook import MySqlHook
 from airflow.models import BaseOperator
@@ -80,8 +77,7 @@ class HiveToMySqlTransfer(BaseOperator):
 
     def execute(self, context):
         hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
-        logging.info("Extracting data from Hive")
-        logging.info(self.sql)
+        self.logger.info("Extracting data from Hive: %s", self.sql)
 
         if self.bulk_load:
             tmpfile = NamedTemporaryFile()
@@ -92,10 +88,10 @@ class HiveToMySqlTransfer(BaseOperator):
 
         mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
         if self.mysql_preoperator:
-            logging.info("Running MySQL preoperator")
+            self.logger.info("Running MySQL preoperator")
             mysql.run(self.mysql_preoperator)
 
-        logging.info("Inserting rows into MySQL")
+        self.logger.info("Inserting rows into MySQL")
 
         if self.bulk_load:
             mysql.bulk_load(table=self.mysql_table, tmp_file=tmpfile.name)
@@ -104,7 +100,7 @@ class HiveToMySqlTransfer(BaseOperator):
             mysql.insert_rows(table=self.mysql_table, rows=results)
 
         if self.mysql_postoperator:
-            logging.info("Running MySQL postoperator")
+            self.logger.info("Running MySQL postoperator")
             mysql.run(self.mysql_postoperator)
 
-        logging.info("Done.")
+        self.logger.info("Done.")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/hive_to_samba_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_samba_operator.py b/airflow/operators/hive_to_samba_operator.py
index 8f18dd9..d6e6dec 100644
--- a/airflow/operators/hive_to_samba_operator.py
+++ b/airflow/operators/hive_to_samba_operator.py
@@ -11,8 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
 import tempfile
 
 from airflow.hooks.hive_hooks import HiveServer2Hook
@@ -55,7 +53,7 @@ class Hive2SambaOperator(BaseOperator):
         samba = SambaHook(samba_conn_id=self.samba_conn_id)
         hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
         tmpfile = tempfile.NamedTemporaryFile()
-        logging.info("Fetching file from Hive")
+        self.logger.info("Fetching file from Hive")
         hive.to_csv(hql=self.hql, csv_filepath=tmpfile.name)
-        logging.info("Pushing to samba")
+        self.logger.info("Pushing to samba")
         samba.push_from_local(self.destination_filepath, tmpfile.name)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/http_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py
index 9884566..d92c931 100644
--- a/airflow/operators/http_operator.py
+++ b/airflow/operators/http_operator.py
@@ -12,8 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
-
 from airflow.exceptions import AirflowException
 from airflow.hooks.http_hook import HttpHook
 from airflow.models import BaseOperator
@@ -45,7 +43,7 @@ class SimpleHttpOperator(BaseOperator):
         depends on the option that's being modified.
     """
 
-    template_fields = ('endpoint','data',)
+    template_fields = ('endpoint', 'data',)
     template_ext = ()
     ui_color = '#f4a460'
 
@@ -75,7 +73,9 @@ class SimpleHttpOperator(BaseOperator):
 
     def execute(self, context):
         http = HttpHook(self.method, http_conn_id=self.http_conn_id)
-        logging.info("Calling HTTP method")
+
+        self.logger.info("Calling HTTP method")
+
         response = http.run(self.endpoint,
                             self.data,
                             self.headers,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/jdbc_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/jdbc_operator.py b/airflow/operators/jdbc_operator.py
index 28977db..942e312 100644
--- a/airflow/operators/jdbc_operator.py
+++ b/airflow/operators/jdbc_operator.py
@@ -11,11 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-__author__ = 'janomar'
-
-import logging
-
 from airflow.hooks.jdbc_hook import JdbcHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -60,6 +55,6 @@ class JdbcOperator(BaseOperator):
         self.autocommit = autocommit
 
     def execute(self, context):
-        logging.info('Executing: ' + str(self.sql))
+        self.logger.info('Executing: %s', self.sql)
         self.hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id)
         self.hook.run(self.sql, self.autocommit, parameters=self.parameters)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py
index 909a211..58f7e67 100644
--- a/airflow/operators/latest_only_operator.py
+++ b/airflow/operators/latest_only_operator.py
@@ -13,7 +13,6 @@
 # limitations under the License.
 
 import datetime
-import logging
 
 from airflow.models import BaseOperator, SkipMixin
 
@@ -33,29 +32,29 @@ class LatestOnlyOperator(BaseOperator, SkipMixin):
         # If the DAG Run is externally triggered, then return without
         # skipping downstream tasks
         if context['dag_run'] and context['dag_run'].external_trigger:
-            logging.info("""Externally triggered DAG_Run:
-                         allowing execution to proceed.""")
+            self.logger.info("Externally triggered DAG_Run: allowing execution to proceed.")
             return
 
         now = datetime.datetime.now()
         left_window = context['dag'].following_schedule(
             context['execution_date'])
         right_window = context['dag'].following_schedule(left_window)
-        logging.info(
-            'Checking latest only with left_window: %s right_window: %s '
-            'now: %s', left_window, right_window, now)
+        self.logger.info(
+            'Checking latest only with left_window: %s right_window: %s now: %s',
+            left_window, right_window, now
+        )
 
         if not left_window < now <= right_window:
-            logging.info('Not latest execution, skipping downstream.')
+            self.logger.info('Not latest execution, skipping downstream.')
 
             downstream_tasks = context['task'].get_flat_relatives(upstream=False)
-            logging.debug("Downstream task_ids {}".format(downstream_tasks))
+            self.logger.debug("Downstream task_ids %s", downstream_tasks)
 
             if downstream_tasks:
                 self.skip(context['dag_run'],
                           context['ti'].execution_date,
                           downstream_tasks)
 
-            logging.info('Done.')
+            self.logger.info('Done.')
         else:
-            logging.info('Latest, allowing execution to proceed.')
+            self.logger.info('Latest, allowing execution to proceed.')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/mssql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mssql_operator.py b/airflow/operators/mssql_operator.py
index 9ae2fff..bc0822f 100644
--- a/airflow/operators/mssql_operator.py
+++ b/airflow/operators/mssql_operator.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.hooks.mssql_hook import MsSqlHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -47,7 +44,7 @@ class MsSqlOperator(BaseOperator):
         self.database = database
 
     def execute(self, context):
-        logging.info('Executing: ' + str(self.sql))
+        self.logger.info('Executing: %s', self.sql)
         hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id,
                          schema=self.database)
         hook.run(self.sql, autocommit=self.autocommit,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/mssql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mssql_to_hive.py b/airflow/operators/mssql_to_hive.py
index a0a2e10..719ddd2 100644
--- a/airflow/operators/mssql_to_hive.py
+++ b/airflow/operators/mssql_to_hive.py
@@ -15,11 +15,9 @@
 from builtins import chr
 from collections import OrderedDict
 import unicodecsv as csv
-import logging
 from tempfile import NamedTemporaryFile
 import pymssql
 
-
 from airflow.hooks.hive_hooks import HiveCliHook
 from airflow.hooks.mssql_hook import MsSqlHook
 from airflow.models import BaseOperator
@@ -104,7 +102,7 @@ class MsSqlToHiveTransfer(BaseOperator):
         hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
         mssql = MsSqlHook(mssql_conn_id=self.mssql_conn_id)
 
-        logging.info("Dumping Microsoft SQL Server query results to local file")
+        self.logger.info("Dumping Microsoft SQL Server query results to local file")
         conn = mssql.get_conn()
         cursor = conn.cursor()
         cursor.execute(self.sql)
@@ -120,7 +118,7 @@ class MsSqlToHiveTransfer(BaseOperator):
             f.flush()
             cursor.close()
             conn.close()
-            logging.info("Loading file into Hive")
+            self.logger.info("Loading file into Hive")
             hive.load_file(
                 f.name,
                 self.hive_table,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/mysql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mysql_operator.py b/airflow/operators/mysql_operator.py
index 156ada8..923eaf8 100644
--- a/airflow/operators/mysql_operator.py
+++ b/airflow/operators/mysql_operator.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.hooks.mysql_hook import MySqlHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -49,7 +46,7 @@ class MySqlOperator(BaseOperator):
         self.database = database
 
     def execute(self, context):
-        logging.info('Executing: ' + str(self.sql))
+        self.logger.info('Executing: %s', self.sql)
         hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                          schema=self.database)
         hook.run(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/mysql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py
index ad3ecae..fde92b5 100644
--- a/airflow/operators/mysql_to_hive.py
+++ b/airflow/operators/mysql_to_hive.py
@@ -15,7 +15,6 @@
 from builtins import chr
 from collections import OrderedDict
 import unicodecsv as csv
-import logging
 from tempfile import NamedTemporaryFile
 import MySQLdb
 
@@ -111,7 +110,7 @@ class MySqlToHiveTransfer(BaseOperator):
         hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
         mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
 
-        logging.info("Dumping MySQL query results to local file")
+        self.logger.info("Dumping MySQL query results to local file")
         conn = mysql.get_conn()
         cursor = conn.cursor()
         cursor.execute(self.sql)
@@ -124,7 +123,7 @@ class MySqlToHiveTransfer(BaseOperator):
             f.flush()
             cursor.close()
             conn.close()
-            logging.info("Loading file into Hive")
+            self.logger.info("Loading file into Hive")
             hive.load_file(
                 f.name,
                 self.hive_table,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/oracle_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/oracle_operator.py b/airflow/operators/oracle_operator.py
index ab7bdb2..f87bbf9 100644
--- a/airflow/operators/oracle_operator.py
+++ b/airflow/operators/oracle_operator.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.hooks.oracle_hook import OracleHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -45,7 +42,7 @@ class OracleOperator(BaseOperator):
         self.parameters = parameters
 
     def execute(self, context):
-        logging.info('Executing: ' + str(self.sql))
+        self.logger.info('Executing: %s', self.sql)
         hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
         hook.run(
             self.sql,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/pig_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/pig_operator.py b/airflow/operators/pig_operator.py
index 4a21ecc..cdce48a 100644
--- a/airflow/operators/pig_operator.py
+++ b/airflow/operators/pig_operator.py
@@ -11,8 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
 import re
 
 from airflow.hooks.pig_hook import PigCliHook
@@ -61,7 +59,7 @@ class PigOperator(BaseOperator):
                 "(\$([a-zA-Z_][a-zA-Z0-9_]*))", "{{ \g<2> }}", self.pig)
 
     def execute(self, context):
-        logging.info('Executing: ' + self.pig)
+        self.logger.info('Executing: %s', self.pig)
         self.hook = self.get_hook()
         self.hook.run_cli(pig=self.pig)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/postgres_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/postgres_operator.py b/airflow/operators/postgres_operator.py
index 0de5aa5..55c1573 100644
--- a/airflow/operators/postgres_operator.py
+++ b/airflow/operators/postgres_operator.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.hooks.postgres_hook import PostgresHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -52,7 +49,7 @@ class PostgresOperator(BaseOperator):
         self.database = database
 
     def execute(self, context):
-        logging.info('Executing: ' + str(self.sql))
+        self.logger.info('Executing: %s', self.sql)
         self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id,
                                  schema=self.database)
         self.hook.run(self.sql, self.autocommit, parameters=self.parameters)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/presto_to_mysql.py
----------------------------------------------------------------------
diff --git a/airflow/operators/presto_to_mysql.py b/airflow/operators/presto_to_mysql.py
index 7ff2ad6..48158ca 100644
--- a/airflow/operators/presto_to_mysql.py
+++ b/airflow/operators/presto_to_mysql.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.hooks.presto_hook import PrestoHook
 from airflow.hooks.mysql_hook import MySqlHook
 from airflow.models import BaseOperator
@@ -64,15 +61,14 @@ class PrestoToMySqlTransfer(BaseOperator):
 
     def execute(self, context):
         presto = PrestoHook(presto_conn_id=self.presto_conn_id)
-        logging.info("Extracting data from Presto")
-        logging.info(self.sql)
+        self.logger.info("Extracting data from Presto: %s", self.sql)
         results = presto.get_records(self.sql)
 
         mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
         if self.mysql_preoperator:
-            logging.info("Running MySQL preoperator")
-            logging.info(self.mysql_preoperator)
+            self.logger.info("Running MySQL preoperator")
+            self.logger.info(self.mysql_preoperator)
             mysql.run(self.mysql_preoperator)
 
-        logging.info("Inserting rows into MySQL")
+        self.logger.info("Inserting rows into MySQL")
         mysql.insert_rows(table=self.mysql_table, rows=results)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/python_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py
index bef9bb0..552996f 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -11,10 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-from builtins import str
-import logging
-
 from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator, SkipMixin
 from airflow.utils.decorators import apply_defaults
@@ -78,7 +74,7 @@ class PythonOperator(BaseOperator):
             self.op_kwargs = context
 
         return_value = self.python_callable(*self.op_args, **self.op_kwargs)
-        logging.info("Done. Returned value was: " + str(return_value))
+        self.logger.info("Done. Returned value was: %s", return_value)
         return return_value
 
 
@@ -103,17 +99,17 @@ class BranchPythonOperator(PythonOperator, SkipMixin):
     """
     def execute(self, context):
         branch = super(BranchPythonOperator, self).execute(context)
-        logging.info("Following branch {}".format(branch))
-        logging.info("Marking other directly downstream tasks as skipped")
+        self.logger.info("Following branch %s", branch)
+        self.logger.info("Marking other directly downstream tasks as skipped")
 
         downstream_tasks = context['task'].downstream_list
-        logging.debug("Downstream task_ids {}".format(downstream_tasks))
+        self.logger.debug("Downstream task_ids %s", downstream_tasks)
 
         skip_tasks = [t for t in downstream_tasks if t.task_id != branch]
         if downstream_tasks:
             self.skip(context['dag_run'], context['ti'].execution_date, skip_tasks)
 
-        logging.info("Done.")
+        self.logger.info("Done.")
 
 
 class ShortCircuitOperator(PythonOperator, SkipMixin):
@@ -130,18 +126,18 @@ class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     def execute(self, context):
         condition = super(ShortCircuitOperator, self).execute(context)
-        logging.info("Condition result is {}".format(condition))
+        self.logger.info("Condition result is %s", condition)
 
         if condition:
-            logging.info('Proceeding with downstream tasks...')
+            self.logger.info('Proceeding with downstream tasks...')
             return
 
-        logging.info('Skipping downstream tasks...')
+        self.logger.info('Skipping downstream tasks...')
 
         downstream_tasks = context['task'].get_flat_relatives(upstream=False)
-        logging.debug("Downstream task_ids {}".format(downstream_tasks))
+        self.logger.debug("Downstream task_ids %s", downstream_tasks)
 
         if downstream_tasks:
             self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
 
-        logging.info("Done.")
+        self.logger.info("Done.")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/redshift_to_s3_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/redshift_to_s3_operator.py b/airflow/operators/redshift_to_s3_operator.py
index fda88d9..e25d613 100644
--- a/airflow/operators/redshift_to_s3_operator.py
+++ b/airflow/operators/redshift_to_s3_operator.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.hooks.postgres_hook import PostgresHook
 from airflow.hooks.S3_hook import S3Hook
 from airflow.models import BaseOperator
@@ -71,9 +68,9 @@ class RedshiftToS3Transfer(BaseOperator):
         self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
         self.s3 = S3Hook(s3_conn_id=self.s3_conn_id)
         a_key, s_key = self.s3.get_credentials()
-        unload_options = ('\n\t\t\t').join(self.unload_options)
+        unload_options = '\n\t\t\t'.join(self.unload_options)
 
-        logging.info("Retrieving headers from %s.%s..." % (self.schema, self.table))
+        self.logger.info("Retrieving headers from %s.%s...", self.schema, self.table)
 
         columns_query = """SELECT column_name
                             FROM information_schema.columns
@@ -86,9 +83,9 @@ class RedshiftToS3Transfer(BaseOperator):
         cursor.execute(columns_query)
         rows = cursor.fetchall()
         columns = map(lambda row: row[0], rows)
-        column_names = (', ').join(map(lambda c: "\\'{0}\\'".format(c), columns))
-        column_castings = (', ').join(map(lambda c: "CAST({0} AS text) AS {0}".format(c),
-                                            columns))
+        column_names = ', '.join(map(lambda c: "\\'{0}\\'".format(c), columns))
+        column_castings = ', '.join(map(lambda c: "CAST({0} AS text) AS {0}".format(c),
+                                        columns))
 
         unload_query = """
                         UNLOAD ('SELECT {0}
@@ -102,6 +99,6 @@ class RedshiftToS3Transfer(BaseOperator):
                         """.format(column_names, column_castings, self.schema, self.table,
                                 self.s3_bucket, self.s3_key, a_key, s_key, unload_options)
 
-        logging.info('Executing UNLOAD command...')
+        self.logger.info('Executing UNLOAD command...')
         self.hook.run(unload_query, self.autocommit)
-        logging.info("UNLOAD command complete...")
+        self.logger.info("UNLOAD command complete...")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/s3_file_transform_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_file_transform_operator.py b/airflow/operators/s3_file_transform_operator.py
index 1cdd0e5..5de5127 100644
--- a/airflow/operators/s3_file_transform_operator.py
+++ b/airflow/operators/s3_file_transform_operator.py
@@ -12,7 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import logging
 from tempfile import NamedTemporaryFile
 import subprocess
 
@@ -75,15 +74,15 @@ class S3FileTransformOperator(BaseOperator):
     def execute(self, context):
         source_s3 = S3Hook(s3_conn_id=self.source_s3_conn_id)
         dest_s3 = S3Hook(s3_conn_id=self.dest_s3_conn_id)
-        logging.info("Downloading source S3 file {0}"
-                     "".format(self.source_s3_key))
+        self.logger.info("Downloading source S3 file %s", self.source_s3_key)
         if not source_s3.check_for_key(self.source_s3_key):
-            raise AirflowException("The source key {0} does not exist"
-                            "".format(self.source_s3_key))
+            raise AirflowException("The source key {0} does not exist".format(self.source_s3_key))
         source_s3_key_object = source_s3.get_key(self.source_s3_key)
         with NamedTemporaryFile("w") as f_source, NamedTemporaryFile("w") as f_dest:
-            logging.info("Dumping S3 file {0} contents to local file {1}"
-                         "".format(self.source_s3_key, f_source.name))
+            self.logger.info(
+                "Dumping S3 file %s contents to local file %s",
+                self.source_s3_key, f_source.name
+            )
             source_s3_key_object.get_contents_to_file(f_source)
             f_source.flush()
             source_s3.connection.close()
@@ -91,21 +90,20 @@ class S3FileTransformOperator(BaseOperator):
                 [self.transform_script, f_source.name, f_dest.name],
                 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
             (transform_script_stdoutdata, transform_script_stderrdata) = transform_script_process.communicate()
-            logging.info("Transform script stdout "
-                         "" + transform_script_stdoutdata)
+            self.logger.info("Transform script stdout %s", transform_script_stdoutdata)
             if transform_script_process.returncode > 0:
-                raise AirflowException("Transform script failed "
-                                "" + transform_script_stderrdata)
+                raise AirflowException("Transform script failed %s", transform_script_stderrdata)
             else:
-                logging.info("Transform script successful."
-                             "Output temporarily located at {0}"
-                             "".format(f_dest.name))
-            logging.info("Uploading transformed file to S3")
+                self.logger.info(
+                    "Transform script successful. Output temporarily located at %s",
+                    f_dest.name
+                )
+            self.logger.info("Uploading transformed file to S3")
             f_dest.flush()
             dest_s3.load_file(
                 filename=f_dest.name,
                 key=self.dest_s3_key,
                 replace=self.replace
             )
-            logging.info("Upload successful")
+            self.logger.info("Upload successful")
             dest_s3.connection.close()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/s3_to_hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py
index 7ae0616..68fe903 100644
--- a/airflow/operators/s3_to_hive_operator.py
+++ b/airflow/operators/s3_to_hive_operator.py
@@ -14,7 +14,6 @@
 
 from builtins import next
 from builtins import zip
-import logging
 from tempfile import NamedTemporaryFile
 from airflow.utils.file import TemporaryDirectory
 import gzip
@@ -29,6 +28,7 @@ from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.compression import uncompress_file
 
+
 class S3ToHiveTransfer(BaseOperator):
     """
     Moves data from S3 to Hive. The operator downloads a file from S3,
@@ -129,7 +129,7 @@ class S3ToHiveTransfer(BaseOperator):
         # Downloading file from S3
         self.s3 = S3Hook(s3_conn_id=self.s3_conn_id)
         self.hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
-        logging.info("Downloading S3 file")
+        self.logger.info("Downloading S3 file")
 
         if self.wildcard_match:
             if not self.s3.check_for_wildcard_key(self.s3_key):
@@ -146,13 +146,13 @@ class S3ToHiveTransfer(BaseOperator):
                 NamedTemporaryFile(mode="w",
                                    dir=tmp_dir,
                                    suffix=file_ext) as f:
-            logging.info("Dumping S3 key {0} contents to local"
-                         " file {1}".format(s3_key_object.key, f.name))
+            self.logger.info("Dumping S3 key {0} contents to local file {1}"
+                             .format(s3_key_object.key, f.name))
             s3_key_object.get_contents_to_file(f)
             f.flush()
             self.s3.connection.close()
             if not self.headers:
-                logging.info("Loading file {0} into Hive".format(f.name))
+                self.logger.info("Loading file %s into Hive", f.name)
                 self.hive.load_file(
                     f.name,
                     self.hive_table,
@@ -165,11 +165,11 @@ class S3ToHiveTransfer(BaseOperator):
             else:
                 # Decompressing file
                 if self.input_compressed:
-                    logging.info("Uncompressing file {0}".format(f.name))
+                    self.logger.info("Uncompressing file %s", f.name)
                     fn_uncompressed = uncompress_file(f.name,
                                                       file_ext,
                                                       tmp_dir)
-                    logging.info("Uncompressed to {0}".format(fn_uncompressed))
+                    self.logger.info("Uncompressed to %s", fn_uncompressed)
                     # uncompressed file available now so deleting
                     # compressed file to save disk space
                     f.close()
@@ -178,20 +178,19 @@ class S3ToHiveTransfer(BaseOperator):
 
                 # Testing if header matches field_dict
                 if self.check_headers:
-                    logging.info("Matching file header against field_dict")
+                    self.logger.info("Matching file header against field_dict")
                     header_list = self._get_top_row_as_list(fn_uncompressed)
                     if not self._match_headers(header_list):
                         raise AirflowException("Header check failed")
 
                 # Deleting top header row
-                logging.info("Removing header from file {0}".
-                             format(fn_uncompressed))
+                self.logger.info("Removing header from file %s", fn_uncompressed)
                 headless_file = (
                     self._delete_top_row_and_compress(fn_uncompressed,
                                                       file_ext,
                                                       tmp_dir))
-                logging.info("Headless file {0}".format(headless_file))
-                logging.info("Loading file {0} into Hive".format(headless_file))
+                self.logger.info("Headless file %s", headless_file)
+                self.logger.info("Loading file %s into Hive", headless_file)
                 self.hive.load_file(headless_file,
                                     self.hive_table,
                                     field_dict=self.field_dict,
@@ -212,18 +211,18 @@ class S3ToHiveTransfer(BaseOperator):
             raise AirflowException("Unable to retrieve header row from file")
         field_names = self.field_dict.keys()
         if len(field_names) != len(header_list):
-            logging.warning("Headers count mismatch"
-                            "File headers:\n {header_list}\n"
-                            "Field names: \n {field_names}\n"
-                            "".format(**locals()))
+            self.logger.warning("Headers count mismatch"
+                              "File headers:\n {header_list}\n"
+                              "Field names: \n {field_names}\n"
+                              "".format(**locals()))
             return False
         test_field_match = [h1.lower() == h2.lower()
                             for h1, h2 in zip(header_list, field_names)]
         if not all(test_field_match):
-            logging.warning("Headers do not match field names"
-                            "File headers:\n {header_list}\n"
-                            "Field names: \n {field_names}\n"
-                            "".format(**locals()))
+            self.logger.warning("Headers do not match field names"
+                              "File headers:\n {header_list}\n"
+                              "Field names: \n {field_names}\n"
+                              "".format(**locals()))
             return False
         else:
             return True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 409c18d..ea301dc 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -14,12 +14,14 @@
 
 from __future__ import print_function
 from future import standard_library
+
+from airflow.utils.log.LoggingMixin import LoggingMixin
+
 standard_library.install_aliases()
 from builtins import str
 from past.builtins import basestring
 
 from datetime import datetime
-import logging
 from urllib.parse import urlparse
 from time import sleep
 import re
@@ -80,7 +82,7 @@ class BaseSensorOperator(BaseOperator):
                 else:
                     raise AirflowSensorTimeout('Snap. Time is OUT.')
             sleep(self.poke_interval)
-        logging.info("Success criteria met. Exiting.")
+        self.logger.info("Success criteria met. Exiting.")
 
 
 class SqlSensor(BaseSensorOperator):
@@ -106,7 +108,7 @@ class SqlSensor(BaseSensorOperator):
     def poke(self, context):
         hook = BaseHook.get_connection(self.conn_id).get_hook()
 
-        logging.info('Poking: ' + self.sql)
+        self.logger.info('Poking: %s', self.sql)
         records = hook.get_records(self.sql)
         if not records:
             return False
@@ -235,7 +237,7 @@ class ExternalTaskSensor(BaseSensorOperator):
         serialized_dttm_filter = ','.join(
             [datetime.isoformat() for datetime in dttm_filter])
 
-        logging.info(
+        self.logger.info(
             'Poking for '
             '{self.external_dag_id}.'
             '{self.external_task_id} on '
@@ -311,7 +313,7 @@ class NamedHivePartitionSensor(BaseSensorOperator):
 
             schema, table, partition = self.parse_partition_name(partition)
 
-            logging.info(
+            self.logger.info(
                 'Poking for {schema}.{table}/{partition}'.format(**locals())
             )
             return self.hook.check_for_named_partition(
@@ -369,7 +371,7 @@ class HivePartitionSensor(BaseSensorOperator):
     def poke(self, context):
         if '.' in self.table:
             self.schema, self.table = self.table.split('.')
-        logging.info(
+        self.logger.info(
             'Poking for table {self.schema}.{self.table}, '
             'partition {self.partition}'.format(**locals()))
         if not hasattr(self, 'hook'):
@@ -415,10 +417,11 @@ class HdfsSensor(BaseSensorOperator):
         :return: (bool) depending on the matching criteria
         """
         if size:
-            logging.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result))
+            log = LoggingMixin().logger
+            log.debug('Filtering for file size >= %s in files: %s', size, map(lambda x: x['path'], result))
             size *= settings.MEGABYTE
             result = [x for x in result if x['length'] >= size]
-            logging.debug('HdfsSensor.poke: after size filter result is %s', result)
+            log.debug('HdfsSensor.poke: after size filter result is %s', result)
         return result
 
     @staticmethod
@@ -432,31 +435,33 @@ class HdfsSensor(BaseSensorOperator):
         :return: (list) of dicts which were not removed
         """
         if ignore_copying:
+            log = LoggingMixin().logger
             regex_builder = "^.*\.(%s$)$" % '$|'.join(ignored_ext)
             ignored_extentions_regex = re.compile(regex_builder)
-            logging.debug('Filtering result for ignored extentions: %s in files %s', ignored_extentions_regex.pattern,
-                          map(lambda x: x['path'], result))
+            log.debug(
+                'Filtering result for ignored extensions: %s in files %s',
+                ignored_extentions_regex.pattern, map(lambda x: x['path'], result)
+            )
             result = [x for x in result if not ignored_extentions_regex.match(x['path'])]
-            logging.debug('HdfsSensor.poke: after ext filter result is %s', result)
+            log.debug('HdfsSensor.poke: after ext filter result is %s', result)
         return result
 
     def poke(self, context):
         sb = self.hook(self.hdfs_conn_id).get_conn()
-        logging.getLogger("snakebite").setLevel(logging.WARNING)
-        logging.info('Poking for file {self.filepath} '.format(**locals()))
+        self.logger.info('Poking for file {self.filepath}'.format(**locals()))
         try:
             # IMOO it's not right here, as there no raise of any kind.
             # if the filepath is let's say '/data/mydirectory', it's correct but if it is '/data/mydirectory/*',
             # it's not correct as the directory exists and sb does not raise any error
             # here is a quick fix
             result = [f for f in sb.ls([self.filepath], include_toplevel=False)]
-            logging.debug('HdfsSensor.poke: result is %s', result)
+            self.logger.debug('HdfsSensor.poke: result is %s', result)
             result = self.filter_for_ignored_ext(result, self.ignored_ext, self.ignore_copying)
             result = self.filter_for_filesize(result, self.file_size)
             return bool(result)
         except:
             e = sys.exc_info()
-            logging.debug("Caught an exception !: %s", str(e))
+            self.logger.debug("Caught an exception !: %s", str(e))
             return False
 
 
@@ -479,8 +484,7 @@ class WebHdfsSensor(BaseSensorOperator):
     def poke(self, context):
         from airflow.hooks.webhdfs_hook import WebHDFSHook
         c = WebHDFSHook(self.webhdfs_conn_id)
-        logging.info(
-            'Poking for file {self.filepath} '.format(**locals()))
+        self.logger.info('Poking for file {self.filepath}'.format(**locals()))
         return c.check_for_path(hdfs_path=self.filepath)
 
 
@@ -531,7 +535,7 @@ class S3KeySensor(BaseSensorOperator):
         from airflow.hooks.S3_hook import S3Hook
         hook = S3Hook(s3_conn_id=self.s3_conn_id)
         full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
-        logging.info('Poking for key : {full_url}'.format(**locals()))
+        self.logger.info('Poking for key : {full_url}'.format(**locals()))
         if self.wildcard_match:
             return hook.check_for_wildcard_key(self.bucket_key,
                                                self.bucket_name)
@@ -573,7 +577,7 @@ class S3PrefixSensor(BaseSensorOperator):
         self.s3_conn_id = s3_conn_id
 
     def poke(self, context):
-        logging.info('Poking for prefix : {self.prefix}\n'
+        self.logger.info('Poking for prefix : {self.prefix}\n'
                      'in bucket s3://{self.bucket_name}'.format(**locals()))
         from airflow.hooks.S3_hook import S3Hook
         hook = S3Hook(s3_conn_id=self.s3_conn_id)
@@ -598,8 +602,7 @@ class TimeSensor(BaseSensorOperator):
         self.target_time = target_time
 
     def poke(self, context):
-        logging.info(
-            'Checking if the time ({0}) has come'.format(self.target_time))
+        self.logger.info('Checking if the time (%s) has come', self.target_time)
         return datetime.now().time() > self.target_time
 
 
@@ -624,7 +627,7 @@ class TimeDeltaSensor(BaseSensorOperator):
         dag = context['dag']
         target_dttm = dag.following_schedule(context['execution_date'])
         target_dttm += self.delta
-        logging.info('Checking if the time ({0}) has come'.format(target_dttm))
+        self.logger.info('Checking if the time (%s) has come', target_dttm)
         return datetime.now() > target_dttm
 
 
@@ -676,7 +679,7 @@ class HttpSensor(BaseSensorOperator):
             http_conn_id=http_conn_id)
 
     def poke(self, context):
-        logging.info('Poking: ' + self.endpoint)
+        self.logger.info('Poking: %s', self.endpoint)
         try:
             response = self.hook.run(self.endpoint,
                                      data=self.request_params,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/slack_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/slack_operator.py b/airflow/operators/slack_operator.py
index 86659d9..4f2d7bc 100644
--- a/airflow/operators/slack_operator.py
+++ b/airflow/operators/slack_operator.py
@@ -12,12 +12,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import json
+
 from slackclient import SlackClient
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.exceptions import AirflowException
-import json
-import logging
 
 
 class SlackAPIOperator(BaseOperator):
@@ -66,8 +66,9 @@ class SlackAPIOperator(BaseOperator):
         sc = SlackClient(self.token)
         rc = sc.api_call(self.method, **self.api_params)
         if not rc['ok']:
-            logging.error("Slack API call failed ({})".format(rc['error']))
-            raise AirflowException("Slack API call failed: ({})".format(rc['error']))
+            msg = "Slack API call failed (%s)".format(rc['error'])
+            self.logger.error(msg)
+            raise AirflowException(msg)
 
 
 class SlackAPIPostOperator(SlackAPIOperator):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a7a51890/airflow/operators/sqlite_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py
index 0ff4d05..7c85847 100644
--- a/airflow/operators/sqlite_operator.py
+++ b/airflow/operators/sqlite_operator.py
@@ -11,9 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import logging
-
 from airflow.hooks.sqlite_hook import SqliteHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -44,6 +41,6 @@ class SqliteOperator(BaseOperator):
         self.parameters = parameters or []
 
     def execute(self, context):
-        logging.info('Executing: ' + self.sql)
+        self.logger.info('Executing: %s', self.sql)
         hook = SqliteHook(sqlite_conn_id=self.sqlite_conn_id)
         hook.run(self.sql, parameters=self.parameters)



Mime
View raw message