From commits-return-28729-archive-asf-public=cust-asf.ponee.io@airflow.incubator.apache.org Sun Nov 25 15:16:27 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 2A01D180677 for ; Sun, 25 Nov 2018 15:16:27 +0100 (CET) Received: (qmail 34410 invoked by uid 500); 25 Nov 2018 14:16:26 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 34401 invoked by uid 99); 25 Nov 2018 14:16:26 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 25 Nov 2018 14:16:26 +0000 From: GitBox To: commits@airflow.apache.org Subject: [GitHub] bolkedebruin commented on a change in pull request #4156: [AIRFLOW-3314] Changed auto inlets feature to work as described Message-ID: <154315538569.17483.13185564074878722806.gitbox@gitbox.apache.org> Date: Sun, 25 Nov 2018 14:16:25 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit bolkedebruin commented on a change in pull request #4156: [AIRFLOW-3314] Changed auto inlets feature to work as described URL: https://github.com/apache/incubator-airflow/pull/4156#discussion_r236074966 ########## File path: airflow/lineage/__init__.py ########## @@ -110,26 +114,31 @@ def wrapper(self, context, *args, **kwargs): for i in inlets] self.inlets.extend(inlets) - if self._inlets['auto']: - # dont append twice - task_ids = set(self._inlets['task_ids']).symmetric_difference( - self.upstream_task_ids - ) - inlets = self.xcom_pull(context, - task_ids=task_ids, - dag_id=self.dag_id, - key=PIPELINE_OUTLETS) - inlets = [item for sublist in inlets if sublist for item in sublist] - inlets = [DataSet.map_type(i['typeName'])(data=i['attributes']) - for i in inlets] - self.inlets.extend(inlets) - - if len(self._inlets['datasets']) > 0: - self.inlets.extend(self._inlets['datasets']) + if self._inlets["auto"]: + visited_task_ids = set(self._inlets["task_ids"]) # prevent double counting of outlets + stack = {self.task_id} + while stack: + task_id = stack.pop() + task = self._dag.task_dict[task_id] + visited_task_ids.add(task_id) + inlets = self.xcom_pull( Review comment: I suggest getting all upstream tasks and using a topological sort to have the right order. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services