airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Devjyoti Patra <devjyo...@qubole.com>
Subject SubdagOperator work with CeleryExecutor
Date Wed, 19 Apr 2017 10:44:53 GMT
I am not able to make SubdagOperator work with CeleryExecutor. The
following code works well on my local setup (with LocalExecutor), but on
production, when clicking on "Zoom into Sub Dag", Airflow complains that
dag with the name  "parent_name.child_name". is not found. Please tell me
what did I miss out in my implementation.

Thanks,
Devj


default_args = {
    'owner': 'airflow',
    'start_date': datetime.strptime('${date_str}', '%Y-%m-%d'),
    'email': ['${email_list}'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'queue': 'default'}

def sub_dag(child_dag_name, default_args, start_date, schedule_interval, kas):
    subdag = DAG(
        '%s.%s' % (PARENT_DAG_NAME, child_dag_name),
        default_args=default_args,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    fork = DummyOperator(task_id='discovery_fork_' + str(k),
                                 dag=subdag)
    #The JOIN task has to be changed for writing to RDS
     join = BashOperator(
            task_id='join_' + str(k),
            bash_command='echo "more wait for subdag..."',
            default_args=default_args,
            dag=subdag
      )

for k in kas:
task = QuboleOperator(

               task_id='task_' + str(k),
               command_type='sparkcmd',
               sql="SOME QUERY",
               qubole_conn_id='default',
               provide_context=True,
               dag=subdag)

         task.set_upstream(fork)
         task.set_downstream(join)


return subdag

#Airflow pipeline is created below
dag = DAG(PARENT_DAG_NAME,
        default_args=default_args,
        schedule_interval='@hourly')



start_node = DummyOperator(task_id='start',
                           dag=dag)


end_node = DummyOperator(task_id='end',
                         dag=dag)



setup_task = QuboleOperator(
        task_id='setup_task',
        command_type='sparkcmd',
        sql="SOME QUERY",
        qubole_conn_id='default',
        provide_context=True,
        dag=dag)

setup_task.set_upstream(start_node)


for k in k_per_subdag:
    child_name = SUBDAG_NAME_PREFIX + str(k)

    branch = SubDagOperator(
        subdag=sub_dag(child_name, default_args, dag.start_date,
dag.schedule_interval, k),
        default_args=default_args,
        task_id=child_name,
        dag=dag
    )

    branch.set_upstream(setup_task)
    branch.set_downstream(end_node)

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message