airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ash Berlin-Taylor (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-2778) Bad Import in collect_dag in DagBag
Date Sat, 21 Jul 2018 17:20:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-2778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551782#comment-16551782
] 

Ash Berlin-Taylor commented on AIRFLOW-2778:
--------------------------------------------

I think that this is working fine in airflow itself because of a "side-effect" when airflow.jobs
is imported --that does import dag_processing, so this only affects use cases such as this
which are "outside" of airflow's normal/full import.

We should definitely fix this just because it's wrong, even though it doesn't affect Airflow's
normal operation. Work around for you (if you haven't worked it out already) is to import
either {{airflow.jobs}}
or {{airflow.utils.dag_processing}} and your test should start passing again.

> Bad Import in collect_dag in DagBag
> -----------------------------------
>
>                 Key: AIRFLOW-2778
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2778
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG
>    Affects Versions: 1.10
>            Reporter: Kevin Yuen
>            Assignee: Kevin Yuen
>            Priority: Minor
>
> I run the following test to make sure there are no import errors in our CI. After upgrading
to 1.10-stable from 1.9.0, the following test starting failing: 
> {code:java}
> class TestAirflowDag:
>     def test_dagbag_import(self):
>         """Verify that Airflow will be able to import all DAGS in the repo
>         """
>         dagbag = self._get_dagbag()
>         assert len(dagbag.import_errors) == 0
>     def _get_dagbag(self):
>         dag_folder = os.getenv('AIRFLOW_DAGS', False)
>         assert dag_folder is not None
>         return DagBag(dag_folder=dag_folder, include_examples=False)
> {code}
> The following error was raised: 
> {code:java}
> # Used to store stats around DagBag processing 
> stats = [] 
> FileLoadStat = namedtuple( 
>     'FileLoadStat', 
>     "file duration dag_num task_num dags"
> )  
> for filepath in utils.dag_processing.list_py_file_paths(dag_folder): 
> E AttributeError: module 'airflow.utils' has no attribute 'dag_processing'
> {code}
> The issue is likely because `dag_processing` was not imported. 
> [https://github.com/apache/incubator-airflow/blob/7f2bc0ddf74c9bf9113401c4ecb1355e6c2fab7f/airflow/models.py#L68-L94]
> This was further validated by stopping execution using pdb, and running before utils.dag_processing.list_py_file_paths(dag_folder)
is called.
> {code:java}
> >> from airflow.utils.dag_processing import *{code}
> The test then proceed to pass since the file was loaded. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message