airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Palmer <ch...@crpalmer.com>
Subject Re: Creating dynamic pool from task
Date Fri, 21 Sep 2018 17:23:48 GMT
What would cause multiple computation tasks to run on the cluster at the
same time? Are you worried about concurrent DagRuns? Does setting dag
concurrency and/or task concurrency appropriately solve your problem?

Chris

On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <dszakallas@whitepages.com>
wrote:

> What I am doing is very similar. However I am including the DagRun's id in
> the pool name to make it unique, as I need to make sure every run gets its
> own pool. I am getting that from the context object, which is only
> available within execute methods or templates. How do you make sure each
> run has it's own pool?
>
>
> Thanks,
>
> Dávid Szakállas
> Software Engineer | Whitepages Data Services
>
> ________________________________
> From: Taylor Edmiston <tedmiston@gmail.com>
> Sent: Thursday, September 20, 2018 6:17:05 PM
> To: dev@airflow.incubator.apache.org
> Subject: Re: Creating dynamic pool from task
>
> I've done something similar.  I have a task at the front of the DAG that
> ensures the connection pool exists and creates the pool if it doesn't.
> I've pasted my code below.  This runs in a for loop that creates one DAG
> per iteration each with its own pool.  Then I pass the pool name into the
> sensors.
>
> Does this work for your use case?
>
> --
>
> redshift_pool = PythonOperator(
>     task_id='redshift_pool',
>     dag=dag,
>     python_callable=ensure_redshift_pool,
>     op_kwargs={
>         'name': workflow.pool,
>         'slots': REDSHIFT_POOL_SLOTS,
>     },
>     ...
> )
>
> @provide_session
> def ensure_redshift_pool(name, slots, session=None):
>     pool = Pool(pool=name, slots=slots)
>     pool_query = (
>         session.query(Pool)
>         .filter(Pool.pool == name)
>     )
>     pool_query_result = pool_query.one_or_none()
>     if not pool_query_result:
>         logger.info(f'redshift pool "{name}" does not exist - creating
> it')
>         session.add(pool)
>         session.commit()
>         logger.info(f'created redshift pool "{name}"')
>     else:
>         logger.info(f'redshift pool "{name}" already exists')
>
> --
>
> *Taylor Edmiston*
> Blog <https://blog.tedmiston.com/> | LinkedIn
> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow
> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story
> <https://stackoverflow.com/story/taylor>
>
>
>
> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <
> dszakallas@whitepages.com>
> wrote:
>
> > Hi all,
> >
> > I have a DAG that creates a cluster, starts computation tasks, and after
> > they completed, tears down the cluster. I want to limit concurrency for
> the
> > computation tasks carried on this cluster to fixed number. So logically,
> I
> > need a pool that is exclusive to the cluster created by a task. I don't
> > want interference with other DAGs or different runs of the same DAG.
> >
> > I thought I could solve this problem by creating a pool dynamically from
> a
> > task after the cluster is created and delete it once the computation
> tasks
> > are finished. I thought I could template the pool parameter of the
> > computation tasks to make them use this dynamically created cluster.
> >
> > However this way the computation tasks will never be triggered. So I
> think
> > the pool parameter is saved in the task instance before being templated.
> I
> > would like to hear your thoughts on how to achieve the desired behavior.
> >
> > Thanks,
> >
> > Dávid Szakállas
> > Software Engineer | Whitepages Data Services
> >
> >
> >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message