airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <>
Subject [GitHub] reubenvanammers commented on a change in pull request #4156: [AIRFLOW-3314] Changed auto inlets feature to work as described
Date Mon, 26 Nov 2018 03:16:40 GMT
reubenvanammers commented on a change in pull request #4156: [AIRFLOW-3314] Changed auto inlets
feature to work as described

 File path: airflow/lineage/
 @@ -110,26 +114,31 @@ def wrapper(self, context, *args, **kwargs):
                       for i in inlets]
-        if self._inlets['auto']:
-            # dont append twice
-            task_ids = set(self._inlets['task_ids']).symmetric_difference(
-                self.upstream_task_ids
-            )
-            inlets = self.xcom_pull(context,
-                                    task_ids=task_ids,
-                                    dag_id=self.dag_id,
-                                    key=PIPELINE_OUTLETS)
-            inlets = [item for sublist in inlets if sublist for item in sublist]
-            inlets = [DataSet.map_type(i['typeName'])(data=i['attributes'])
-                      for i in inlets]
-            self.inlets.extend(inlets)
-        if len(self._inlets['datasets']) > 0:
-            self.inlets.extend(self._inlets['datasets'])
+        if self._inlets["auto"]:
+            visited_task_ids = set(self._inlets["task_ids"])  # prevent double counting of
+            stack = {self.task_id}
+            while stack:
+                task_id = stack.pop()
+                task = self._dag.task_dict[task_id]
+                visited_task_ids.add(task_id)
+                inlets = self.xcom_pull(
 Review comment:
   Hi @bolkedebruin, can you elaborate on your objection? Once the code reaches a node of
the DAG and finds outlets, it stops attempting to find further tasks upstream as the upstream
tasks  are not placed into the stack. In the case, for example, that the task will have multiple
(say, for e.g., 2) upstream tasks, then xcom_pull will be called twice compared to the previous
code. However, since xcom_pull fires off a seperate database call (using get_one) for each
task_id in the task_ids parameter, I don't think that this should have an appreciable difference
in speed. 
   Regarding using something like topological sort, I feel like that would lose too much information;
in what I regard as the 'expected' behaviour, it requires discrimination between upstream
or sibling  nodes, as you want to stop when outlets from a branch are found. As again, since
that will do individual database queries, I'm not sure that it will be more efficient.
   I could change it so that it doesn't do a database query on the task, but thats a relatively
small change. 
   Thanks for looking over the PR, and tell me if I have misunderstood something.

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

View raw message