airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aaron Polhamus <aa...@credijusto.com>
Subject How to make DAGs dynamic based on task output
Date Tue, 25 Jul 2017 22:58:01 GMT
This question was posted on StackOverflow here:
https://stackoverflow.com/questions/45314174/how-to-dynamically-iterate-over-the-output-of-an-upstream-task-to-create-paralle

Consider the following example of a DAG where the first task, get_id_creds,
extracts a list of credentials from a database. This operation tells me
what users in my database I am able to run further data preprocessing on
and it writes those ids to the file /tmp/ids.txt. I then scan those ids
into my DAG and use them to generate a list of upload_transaction tasks
that can be run in parallel.

My question is: Is there a more idiomatically correct, dynamic way to do
this using airflow? What I have here feels clumsy and brittle. How can I
directly pass a list of valid IDs from one process to that defines the
subsequent downstream processes?

from datetime import datetime, timedeltaimport osimport sys
from airflow.models import DAGfrom airflow.operators.python_operator
import PythonOperator
import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workerselse:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

default_args = {
  'start_date': datetime.now(),
  'schedule_interval': None}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args)

get_id_creds = PythonOperator(
    task_id='get_id_creds',
    python_callable=dash_workers.get_id_creds,
    provide_context=True,
    dag=DAG)
with open('/tmp/ids.txt', 'r') as infile:
    ids = infile.read().splitlines()
for uid in uids:
    upload_transactions = PythonOperator(
        task_id=uid,
        python_callable=dash_workers.upload_transactions,
        op_args=[uid],
        dag=DAG)
    upload_transactions.set_downstream(get_id_creds)

Thanks in advance!
-- 


*Aaron Polhamus*
*Director of Data Science *

Cel (México): +52 (55) 1951-5612
Cell (USA): +1 (206) 380-3948
Tel: +52 (55) 1168 9757 - Ext. 181

-- 
***Por favor referirse a nuestra página web 
<https://www.credijusto.com/aviso-de-privacidad/> para más información 
acerca de nuestras políticas de privacidad.*


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message