airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Zaczkiewicz <paulz...@gmail.com>
Subject Re: SubdagOperator work with CeleryExecutor
Date Fri, 21 Apr 2017 14:50:05 GMT
There are major outstanding issues with SubDagOperator. in 1.8.0 separate
from what you're experiencing here.  Sometimes a SubDag run will just hang,
and you can't re-run portions of the SubDag without re-running the entire
SubDag. I'd recommend against using SubDags in 1.8.0.

I'm not sure what's causing your Zoom Into Subdag issues though.

On Wed, Apr 19, 2017 at 6:44 AM, Devjyoti Patra <devjyotip@qubole.com>
wrote:

> I am not able to make SubdagOperator work with CeleryExecutor. The
> following code works well on my local setup (with LocalExecutor), but on
> production, when clicking on "Zoom into Sub Dag", Airflow complains that
> dag with the name  "parent_name.child_name". is not found. Please tell me
> what did I miss out in my implementation.
>
> Thanks,
> Devj
>
>
> default_args = {
>     'owner': 'airflow',
>     'start_date': datetime.strptime('${date_str}', '%Y-%m-%d'),
>     'email': ['${email_list}'],
>     'email_on_failure': True,
>     'email_on_retry': True,
>     'retries': 1,
>     'retry_delay': timedelta(minutes=5),
>     'queue': 'default'}
>
> def sub_dag(child_dag_name, default_args, start_date, schedule_interval,
> kas):
>     subdag = DAG(
>         '%s.%s' % (PARENT_DAG_NAME, child_dag_name),
>         default_args=default_args,
>         schedule_interval=schedule_interval,
>         start_date=start_date,
>     )
>
>     fork = DummyOperator(task_id='discovery_fork_' + str(k),
>                                  dag=subdag)
>     #The JOIN task has to be changed for writing to RDS
>      join = BashOperator(
>             task_id='join_' + str(k),
>             bash_command='echo "more wait for subdag..."',
>             default_args=default_args,
>             dag=subdag
>       )
>
> for k in kas:
> task = QuboleOperator(
>
>                task_id='task_' + str(k),
>                command_type='sparkcmd',
>                sql="SOME QUERY",
>                qubole_conn_id='default',
>                provide_context=True,
>                dag=subdag)
>
>          task.set_upstream(fork)
>          task.set_downstream(join)
>
>
> return subdag
>
> #Airflow pipeline is created below
> dag = DAG(PARENT_DAG_NAME,
>         default_args=default_args,
>         schedule_interval='@hourly')
>
>
>
> start_node = DummyOperator(task_id='start',
>                            dag=dag)
>
>
> end_node = DummyOperator(task_id='end',
>                          dag=dag)
>
>
>
> setup_task = QuboleOperator(
>         task_id='setup_task',
>         command_type='sparkcmd',
>         sql="SOME QUERY",
>         qubole_conn_id='default',
>         provide_context=True,
>         dag=dag)
>
> setup_task.set_upstream(start_node)
>
>
> for k in k_per_subdag:
>     child_name = SUBDAG_NAME_PREFIX + str(k)
>
>     branch = SubDagOperator(
>         subdag=sub_dag(child_name, default_args, dag.start_date,
> dag.schedule_interval, k),
>         default_args=default_args,
>         task_id=child_name,
>         dag=dag
>     )
>
>     branch.set_upstream(setup_task)
>     branch.set_downstream(end_node)
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message