airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject [1/6] incubator-airflow git commit: [AIRFLOW-2203] Store task ids as sets not lists
Date Wed, 14 Mar 2018 08:15:40 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4cf2fba19 -> c3730650c


[AIRFLOW-2203] Store task ids as sets not lists

Massively improve performance by using sets to represent a task's
upstream and downstream task ids.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/781c5bf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/781c5bf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/781c5bf6

Branch: refs/heads/master
Commit: 781c5bf6967473701e4812d72ace4f675f5b5c94
Parents: 4cf2fba
Author: wongwill86 <wongwill86@gmail.com>
Authored: Mon Mar 12 17:33:33 2018 -0400
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Wed Mar 14 09:11:32 2018 +0100

----------------------------------------------------------------------
 airflow/models.py                           | 29 +++++++++++-------------
 tests/ti_deps/deps/test_trigger_rule_dep.py |  2 +-
 2 files changed, 14 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/781c5bf6/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index cf31b07..74a54bb 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2286,8 +2286,8 @@ class BaseOperator(LoggingMixin):
         self.task_concurrency = task_concurrency
 
         # Private attributes
-        self._upstream_task_ids = []
-        self._downstream_task_ids = []
+        self._upstream_task_ids = set()
+        self._downstream_task_ids = set()
 
         if not dag and _CONTEXT_MANAGER_DAG:
             dag = _CONTEXT_MANAGER_DAG
@@ -2771,13 +2771,13 @@ class BaseOperator(LoggingMixin):
     def task_type(self):
         return self.__class__.__name__
 
-    def append_only_new(self, l, item):
-        if any([item is t for t in l]):
+    def add_only_new(self, item_set, item):
+        if item in item_set:
             raise AirflowException(
                 'Dependency {self}, {item} already registered'
                 ''.format(**locals()))
         else:
-            l.append(item)
+            item_set.add(item)
 
     def _set_relatives(self, task_or_task_list, upstream=False):
         try:
@@ -2793,7 +2793,7 @@ class BaseOperator(LoggingMixin):
 
         # relationships can only be set if the tasks share a single DAG. Tasks
         # without a DAG are assigned to that DAG.
-        dags = {t._dag.dag_id: t.dag for t in [self] + task_list if t.has_dag()}
+        dags = {t._dag.dag_id: t._dag for t in [self] + task_list if t.has_dag()}
 
         if len(dags) > 1:
             raise AirflowException(
@@ -2814,13 +2814,11 @@ class BaseOperator(LoggingMixin):
             if dag and not task.has_dag():
                 task.dag = dag
             if upstream:
-                task.append_only_new(task._downstream_task_ids, self.task_id)
-                self.append_only_new(self._upstream_task_ids, task.task_id)
+                task.add_only_new(task._downstream_task_ids, self.task_id)
+                self.add_only_new(self._upstream_task_ids, task.task_id)
             else:
-                self.append_only_new(self._downstream_task_ids, task.task_id)
-                task.append_only_new(task._upstream_task_ids, self.task_id)
-
-        self.detect_downstream_cycle()
+                self.add_only_new(self._downstream_task_ids, task.task_id)
+                task.add_only_new(task._upstream_task_ids, self.task_id)
 
     def set_downstream(self, task_or_task_list):
         """
@@ -3729,10 +3727,9 @@ class DAG(BaseDag, LoggingMixin):
         for t in dag.tasks:
             # Removing upstream/downstream references to tasks that did not
             # made the cut
-            t._upstream_task_ids = [
-                tid for tid in t._upstream_task_ids if tid in dag.task_ids]
-            t._downstream_task_ids = [
-                tid for tid in t._downstream_task_ids if tid in dag.task_ids]
+            t._upstream_task_ids = t._upstream_task_ids.intersection(dag.task_dict.keys())
+            t._downstream_task_ids = t._downstream_task_ids.intersection(
+                dag.task_dict.keys())
 
         if len(dag.tasks) < len(self.tasks):
             dag.partial = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/781c5bf6/tests/ti_deps/deps/test_trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py b/tests/ti_deps/deps/test_trigger_rule_dep.py
index a61ff0d..18595bb 100644
--- a/tests/ti_deps/deps/test_trigger_rule_dep.py
+++ b/tests/ti_deps/deps/test_trigger_rule_dep.py
@@ -28,7 +28,7 @@ class TriggerRuleDepTest(unittest.TestCase):
         task = BaseOperator(task_id='test_task', trigger_rule=trigger_rule,
                             start_date=datetime(2015, 1, 1))
         if upstream_task_ids:
-            task._upstream_task_ids.extend(upstream_task_ids)
+            task._upstream_task_ids.update(upstream_task_ids)
         return TaskInstance(task=task, state=state, execution_date=None)
 
     def test_no_upstream_tasks(self):


Mime
View raw message