airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aaron Polhamus <aa...@credijusto.com>
Subject Airflow dependency error when instantiating multiple tasks via a 'for' loop
Date Fri, 21 Jul 2017 15:34:48 GMT
Apologies if the dev list is not an appropriate place to ask this question,
which I've also posted to
https://stackoverflow.com/questions/45227118/airflow-dependency-error-when-instantiating-multiple-tasks-via-a-for-loop
.

Thanks in advance!
Aaron

I'm running this DAG. It import functions from dash_workers.py (not
included, yet--would this be helpful?) and implements those functions as
tasks defined by PythonOperator:

from datetime import datetime, timedelta
import os
import sys

import airflow.models as af_models
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('CAPONE_DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime(2017, 7, 18),
  'schedule_interval': None
}

DAG = af_models.DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

get_id_creds = PythonOperator(
    task_id='get_id_creds',
    python_callable=dash_workers.get_id_creds,
    provide_context=True,
    dag=DAG)

with open('/tmp/ids.txt', 'r') as infile:
    ids = infile.read().splitlines()

for uid in ids:
    print('Building transactions for {}'.format(uid))

This results in:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 263, in process_file
    m = imp.load_source(mod_name, filepath)
  File   "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/pyth
on3.6/imp.py", line 172, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 675, in _load
  File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 205, in _call_with_frames_removed
  File "/Users/aaronpolhamus/airflow/dags/dash_dag.py", line 47, in  <module>
    upload_transactions.set_upstream(get_id_creds)
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 2478, in set_upstream
    self._set_relatives(task_or_task_list, upstream=True)
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 2458, in _set_relatives
    task.append_only_new(task._downstream_task_ids, self.task_id)
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 2419, in append_only_new
    ''.format(**locals()))
airflow.exceptions.AirflowException: Dependency <Task(PythonOperator):
  get_rfc_creds>, upload_transactions already registered
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py",
line 573, in test
    dag = dag or get_dag(args)
  File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py",
line 126, in get_dag
    'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found:
dash_preproc. Either the dag did not exist or it failed to parse.


I'm running this DAG. It import functions from dash_workers.py (not
included, yet--would this be helpful?) and implements those functions as
tasks defined by PythonOperator:

from datetime import datetime, timedelta
import os
import sys

import airflow.models as af_models
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('CAPONE_DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime(2017, 7, 18),
  'schedule_interval': None
}

DAG = af_models.DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

get_id_creds = PythonOperator(
    task_id='get_id_creds',
    python_callable=dash_workers.get_id_creds,
    provide_context=True,
    dag=DAG)

with open('/tmp/ids.txt', 'r') as infile:
    ids = infile.read().splitlines()

for uid in ids:
    print('Building transactions for {}'.format(uid))
    upload_transactions = PythonOperator(
        task_id='upload_transactions',
        python_callable=dash_workers.upload_transactions,
        op_args=[uid],
        dag=DAG)
    upload_transactions.set_upstream(get_id_creds)

This results in:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 263, in process_file
    m = imp.load_source(mod_name, filepath)
  File   "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/pyth
on3.6/imp.py", line 172, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 675, in _load
  File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 205, in _call_with_frames_removed
  File "/Users/aaronpolhamus/airflow/dags/dash_dag.py", line 47, in  <module>
    upload_transactions.set_upstream(get_id_creds)
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 2478, in set_upstream
    self._set_relatives(task_or_task_list, upstream=True)
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 2458, in _set_relatives
    task.append_only_new(task._downstream_task_ids, self.task_id)
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py",
line 2419, in append_only_new
    ''.format(**locals()))
airflow.exceptions.AirflowException: Dependency <Task(PythonOperator):
  get_rfc_creds>, upload_transactions already registered
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py",
line 573, in test
    dag = dag or get_dag(args)
  File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py",
line 126, in get_dag
    'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found:
dash_preproc. Either the dag did not exist or it failed to parse.

The application here is that I am extracting a list of IDs from a SQL table
using the function get_id_creds and then generating detailed data profiles
on a per-ID basis. Both functions use MySqlHook internally and I've tested
each function/task on a standalone basis to make sure that they result in
the expected behavior in isolation (they do).

The crux of the error seems to be the line airflow.exceptions.AirflowException:
Dependency <Task(PythonOperator): get_rfc_creds>, upload_transactions
already registered. This seems to suggest that on the first pass through
the loop the task is "registered" and then on the second pass the parser is
complaining that it's already done that operation. This example script
<https://github.com/trbs/airflow-examples/blob/master/dags/example_python_operator.py>
makes
it look easy to do what I'm doing here: just embed your downstream task
within a for loop. No idea why this is failing.

I'm set of for local parallelism with LocalExecutor. My understanding is
that if I can get this working I can run multiple data profile generation
jobs in parallel on the same machine.

I have two questions here:

   1. Where is this error coming from and how can I get this script working?
   2. Task 1 writes a list of IDs to hard disk and those are then read back
   into the DAG for iterating over in the Task 2 execution loop. Is there are
   more efficient way to pass the results of Task 1 to Task 2, and can you
   provide a quick example?

-- 


*Aaron Polhamus*
*Director of Data Science *

Cel (México): +52 (55) 1951-5612
Cell (USA): +1 (206) 380-3948
Tel: +52 (55) 1168 9757 - Ext. 181

-- 
***Por favor referirse a nuestra página web 
<https://www.credijusto.com/aviso-de-privacidad/> para más información 
acerca de nuestras políticas de privacidad.*


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message