airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jlo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-360] Fix style warnings in models.py
Date Sun, 14 Aug 2016 14:34:48 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3767e69dd -> 2396b0c8d


[AIRFLOW-360] Fix style warnings in models.py

Closes #1681 from skudriashev/airflow-360


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2396b0c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2396b0c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2396b0c8

Branch: refs/heads/master
Commit: 2396b0c8d8893301e5f9a4598eda1985a3050f8e
Parents: 3767e69
Author: Stanislav Kudriashev <stas.kudriashev@gmail.com>
Authored: Sun Aug 14 10:34:04 2016 -0400
Committer: jlowin <jlowin@users.noreply.github.com>
Committed: Sun Aug 14 10:34:22 2016 -0400

----------------------------------------------------------------------
 .landscape.yml    |   6 +++
 airflow/models.py | 118 +++++++++++++++++++++++++------------------------
 2 files changed, 66 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2396b0c8/.landscape.yml
----------------------------------------------------------------------
diff --git a/.landscape.yml b/.landscape.yml
index 56de135..a61441e 100644
--- a/.landscape.yml
+++ b/.landscape.yml
@@ -15,12 +15,18 @@ max-line-length: 90
 strictness: medium
 pep8:
   full: true
+  disable:
+    - E402  # module level import not at top of file
 uses:
   - flask
 pylint:
   disable:
+    - broad-except
     - cyclic-import
     - invalid-name
+    - locally-disabled
     - super-on-old-class
+    - wrong-import-order
+    - wrong-import-position
   options:
     docstring-min-length: 10

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2396b0c8/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 013e0c5..3d0f4ea 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -98,10 +98,10 @@ _CONTEXT_MANAGER_DAG = None
 
 
 def clear_task_instances(tis, session, activate_dag_runs=True):
-    '''
+    """
     Clears a set of task instances, but makes sure the running ones
     get killed.
-    '''
+    """
     job_ids = []
     for ti in tis:
         if ti.state == State.RUNNING:
@@ -109,9 +109,9 @@ def clear_task_instances(tis, session, activate_dag_runs=True):
                 ti.state = State.SHUTDOWN
                 job_ids.append(ti.job_id)
         # todo: this creates an issue with the webui tests
-        #elif ti.state != State.REMOVED:
-        #    ti.state = State.NONE
-        #    session.merge(ti)
+        # elif ti.state != State.REMOVED:
+        #     ti.state = State.NONE
+        #     session.merge(ti)
         else:
             session.delete(ti)
     if job_ids:
@@ -244,7 +244,7 @@ class DagBag(BaseDagBag, LoggingMixin):
                         return found_dags
 
             self.logger.debug("Importing {}".format(filepath))
-            org_mod_name, file_ext = os.path.splitext(os.path.split(filepath)[-1])
+            org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
             mod_name = 'unusual_prefix_' + org_mod_name
 
             if mod_name in sys.modules:
@@ -262,7 +262,7 @@ class DagBag(BaseDagBag, LoggingMixin):
         else:
             zip_file = zipfile.ZipFile(filepath)
             for mod in zip_file.infolist():
-                head, tail = os.path.split(mod.filename)
+                head, _ = os.path.split(mod.filename)
                 mod_name, ext = os.path.splitext(mod.filename)
                 if not head and (ext == '.py' or ext == '.pyc'):
                     if mod_name == '__init__':
@@ -460,7 +460,7 @@ class DagBag(BaseDagBag, LoggingMixin):
     def paused_dags(self):
         session = settings.Session()
         dag_ids = [dp.dag_id for dp in session.query(DagModel).filter(
-            DagModel.is_paused == True)]
+            DagModel.is_paused.is_(True))]
         session.commit()
         session.close()
         return dag_ids
@@ -624,23 +624,22 @@ class Connection(Base):
                 from airflow.contrib.hooks.cloudant_hook import CloudantHook
                 return CloudantHook(cloudant_conn_id=self.conn_id)
         except:
-            return None
+            pass
 
     def __repr__(self):
         return self.conn_id
 
     @property
     def extra_dejson(self):
-        """Returns the extra property by deserializing json"""
+        """Returns the extra property by deserializing json."""
         obj = {}
         if self.extra:
             try:
                 obj = json.loads(self.extra)
             except Exception as e:
                 logging.exception(e)
-                logging.error(
-                    "Failed parsing the json for "
-                    "conn_id {}".format(self.conn_id))
+                logging.error("Failed parsing the json for conn_id %s", self.conn_id)
+
         return obj
 
 
@@ -932,7 +931,7 @@ class TaskInstance(Base):
         """
         Returns a tuple that identifies the task instance uniquely
         """
-        return (self.dag_id, self.task_id, self.execution_date)
+        return self.dag_id, self.task_id, self.execution_date
 
     def set_state(self, state, session):
         self.state = state
@@ -1019,7 +1018,7 @@ class TaskInstance(Base):
             include_queued=include_queued,
             ignore_depends_on_past=ignore_depends_on_past,
             flag_upstream_failed=flag_upstream_failed)
-        return queueable and not self.pool_full()
+        return queueable and not self.pool_full()  # pylint: disable=E1120
 
     @provide_session
     def are_dependents_done(self, session=None):
@@ -1094,11 +1093,11 @@ class TaskInstance(Base):
                     self.set_state(State.SKIPPED, session)
 
         return (
-             (tr == TR.ONE_SUCCESS and successes > 0) or
-             (tr == TR.ONE_FAILED and (failed or upstream_failed)) or
-             (tr == TR.ALL_SUCCESS and successes >= upstream) or
-             (tr == TR.ALL_FAILED and failed + upstream_failed >= upstream) or
-             (tr == TR.ALL_DONE and upstream_done)
+            (tr == TR.ONE_SUCCESS and successes > 0) or
+            (tr == TR.ONE_FAILED and (failed or upstream_failed)) or
+            (tr == TR.ALL_SUCCESS and successes >= upstream) or
+            (tr == TR.ALL_FAILED and failed + upstream_failed >= upstream) or
+            (tr == TR.ALL_DONE and upstream_done)
         )
 
     @provide_session
@@ -1137,8 +1136,8 @@ class TaskInstance(Base):
             previous_ti = session.query(TI).filter(
                 TI.dag_id == self.dag_id,
                 TI.task_id == task.task_id,
-                TI.execution_date ==
-                    self.task.dag.previous_schedule(self.execution_date),
+                TI.execution_date == self.task.dag.previous_schedule(
+                    self.execution_date),
                 TI.state.in_({State.SUCCESS, State.SKIPPED}),
             ).first()
             if not previous_ti:
@@ -1219,7 +1218,8 @@ class TaskInstance(Base):
         Checks on whether the task instance is in the right state and timeframe
         to be retried.
         """
-        return self.state == State.UP_FOR_RETRY and self.next_retry_datetime() < datetime.now()
+        return (self.state == State.UP_FOR_RETRY and
+                self.next_retry_datetime() < datetime.now())
 
     @provide_session
     def pool_full(self, session):
@@ -1317,7 +1317,7 @@ class TaskInstance(Base):
                     verbose=True)):
             logging.warning("Dependencies not met yet")
         elif (
-            # todo: move this to the scheduler
+                # TODO: move this to the scheduler
                 self.state == State.UP_FOR_RETRY and
                 not self.ready_for_retry()):
             next_run = self.next_retry_datetime().isoformat()
@@ -1385,7 +1385,7 @@ class TaskInstance(Base):
                     self.task = task_copy
 
                     def signal_handler(signum, frame):
-                        '''Setting kill signal handler'''
+                        """Setting kill signal handler"""
                         logging.error("Killing subprocess")
                         task_copy.on_kill()
                         raise AirflowException("Task received SIGTERM signal")
@@ -1394,12 +1394,11 @@ class TaskInstance(Base):
                     self.render_templates()
                     task_copy.pre_execute(context=context)
 
-                    # If a timout is specified for the task, make it fail
+                    # If a timeout is specified for the task, make it fail
                     # if it goes beyond
                     result = None
                     if task_copy.execution_timeout:
-                        with timeout(int(
-                                task_copy.execution_timeout.total_seconds())):
+                        with timeout(int(task_copy.execution_timeout.total_seconds())):
                             result = task_copy.execute(context=context)
 
                     else:
@@ -1551,7 +1550,7 @@ class TaskInstance(Base):
                 self.var = None
 
             def __getattr__(self, item):
-                self.var =  Variable.get(item, deserialize_json=True)
+                self.var = Variable.get(item, deserialize_json=True)
                 return self.var
 
             def __repr__(self):
@@ -1964,7 +1963,7 @@ class BaseOperator(object):
                 "The trigger_rule must be one of {all_triggers},"
                 "'{d}.{t}'; received '{tr}'."
                 .format(all_triggers=TriggerRule.all_triggers,
-                        d=dag.dag_id, t=task_id, tr = trigger_rule))
+                        d=dag.dag_id, t=task_id, tr=trigger_rule))
 
         self.trigger_rule = trigger_rule
         self.depends_on_past = depends_on_past
@@ -2186,12 +2185,12 @@ class BaseOperator(object):
         pass
 
     def on_kill(self):
-        '''
+        """
         Override this method to cleanup subprocesses when a task instance
         gets killed. Any use of the threading, subprocess or multiprocessing
         module within an operator needs to be cleaned up or it will leave
         ghost processes behind.
-        '''
+        """
         pass
 
     def __deepcopy__(self, memo):
@@ -2213,12 +2212,12 @@ class BaseOperator(object):
         return result
 
     def render_template_from_field(self, attr, content, context, jinja_env):
-        '''
+        """
         Renders a template from a field. If the field is a string, it will
         simply render the string and return the result. If it is a collection or
         nested set of collections, it will traverse the structure and render
         all strings in it.
-        '''
+        """
         rt = self.render_template
         if isinstance(content, six.string_types):
             result = jinja_env.from_string(content).render(**context)
@@ -2237,10 +2236,10 @@ class BaseOperator(object):
         return result
 
     def render_template(self, attr, content, context):
-        '''
+        """
         Renders a template either from a file or directly in a field, and returns
         the rendered result.
-        '''
+        """
         jinja_env = self.dag.get_template_env() \
             if hasattr(self, 'dag') \
             else jinja2.Environment(cache_size=0)
@@ -2254,12 +2253,12 @@ class BaseOperator(object):
             return self.render_template_from_field(attr, content, context, jinja_env)
 
     def prepare_template(self):
-        '''
+        """
         Hook that is triggered after the templated fields get replaced
         by their content. If you need your operator to alter the
         content of the file before the template is rendered,
         it should override this method to do so.
-        '''
+        """
         pass
 
     def resolve_template_files(self):
@@ -2524,7 +2523,8 @@ class DagModel(Base):
     dag_id = Column(String(ID_LEN), primary_key=True)
     # A DAG can be paused from the UI / DB
     # Set this default value of is_paused based on a configuration value!
-    is_paused_at_creation = configuration.getboolean('core', 'dags_are_paused_at_creation')
+    is_paused_at_creation = configuration.getboolean('core',
+                                                     'dags_are_paused_at_creation')
     is_paused = Column(Boolean, default=is_paused_at_creation)
     # Whether the DAG is a subdag
     is_subdag = Column(Boolean, default=False)
@@ -2938,10 +2938,10 @@ class DAG(BaseDag, LoggingMixin):
         raise NotImplementedError("")
 
     def get_template_env(self):
-        '''
+        """
         Returns a jinja2 Environment while taking into account the DAGs
         template_searchpath and user_defined_macros
-        '''
+        """
         searchpath = [self.folder]
         if self.template_searchpath:
             searchpath += self.template_searchpath
@@ -3157,19 +3157,19 @@ class DAG(BaseDag, LoggingMixin):
             get_downstream(t)
 
     def add_task(self, task):
-        '''
+        """
         Add a task to the DAG
 
         :param task: the task you want to add
         :type task: task
-        '''
+        """
         if not self.start_date and not task.start_date:
             raise AirflowException("Task is missing the start_date parameter")
         if not task.start_date:
             task.start_date = self.start_date
 
         if task.task_id in self.task_dict:
-            #TODO raise an error in Airflow 2.0
+            # TODO: raise an error in Airflow 2.0
             warnings.warn(
                 'The requested task could not be added to the DAG because a '
                 'task with task_id {} is already in the DAG. Starting in '
@@ -3184,12 +3184,12 @@ class DAG(BaseDag, LoggingMixin):
         self.task_count = len(self.tasks)
 
     def add_tasks(self, tasks):
-        '''
+        """
         Add a list of tasks to the DAG
 
         :param task: a lit of tasks you want to add
         :type task: list of tasks
-        '''
+        """
         for task in tasks:
             self.add_task(task)
 
@@ -3255,8 +3255,9 @@ class DAG(BaseDag, LoggingMixin):
                       conf=None,
                       session=None):
         """
-        Creates a dag run from this dag including the tasks associated with this dag. Returns
the dag
-        run.
+        Creates a dag run from this dag including the tasks associated with this dag.
+        Returns the dag run.
+
         :param run_id: defines the the run id for this dag run
         :type run_id: string
         :param execution_date: the execution date of this dag run
@@ -3264,7 +3265,7 @@ class DAG(BaseDag, LoggingMixin):
         :param state: the state of the dag run
         :type state: State
         :param start_date: the date this dag run should be evaluated
-        :type state_date: datetime
+        :type start_date: datetime
         :param external_trigger: whether this dag run is externally triggered
         :type external_trigger: bool
         :param session: database session
@@ -3284,7 +3285,7 @@ class DAG(BaseDag, LoggingMixin):
 
         run.dag = self
 
-        # create the associated taskinstances
+        # create the associated task instances
         # state is None at the moment of creation
         run.verify_integrity(session=session)
 
@@ -3618,8 +3619,9 @@ class XCom(Base):
             xcoms = [xcoms]
         for xcom in xcoms:
             if not isinstance(xcom, XCom):
-                raise TypeError('Expected XCom; received {}'.format(
-                                xcom.__class__.__name__))
+                raise TypeError(
+                    'Expected XCom; received {}'.format(xcom.__class__.__name__)
+                )
             session.delete(xcom)
         session.commit()
 
@@ -3734,9 +3736,9 @@ class DagRun(Base):
             else:
                 # this is required to deal with NULL values
                 if None in state:
-                    tis = tis.filter(or_(
-                        TI.state.in_(state),
-                        TI.state.is_(None))
+                    tis = tis.filter(
+                        or_(TI.state.in_(state),
+                            TI.state.is_(None))
                     )
                 else:
                     tis = tis.filter(TI.state.in_(state))
@@ -3761,8 +3763,8 @@ class DagRun(Base):
 
     def get_dag(self):
         """
-        Returns the Dag associated with this DagRun
-        :param session: database session
+        Returns the Dag associated with this DagRun.
+
         :return: DAG
         """
         if not self.dag:
@@ -3817,7 +3819,7 @@ class DagRun(Base):
             root_ids = [t.task_id for t in dag.roots]
             roots = [t for t in tis if t.task_id in root_ids]
 
-            if any(r.state in (State.FAILED,  State.UPSTREAM_FAILED)
+            if any(r.state in (State.FAILED, State.UPSTREAM_FAILED)
                    for r in roots):
                 logging.info('Marking run {} failed'.format(self))
                 self.state = State.FAILED


Mime
View raw message