airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Szakallas <dszakal...@whitepages.com>
Subject Re: Creating dynamic pool from task
Date Fri, 21 Sep 2018 00:19:00 GMT
What I am doing is very similar. However I am including the DagRun's id in the pool name to
make it unique, as I need to make sure every run gets its own pool. I am getting that from
the context object, which is only available within execute methods or templates. How do you
make sure each run has it's own pool?


Thanks,

Dávid Szakállas
Software Engineer | Whitepages Data Services

________________________________
From: Taylor Edmiston <tedmiston@gmail.com>
Sent: Thursday, September 20, 2018 6:17:05 PM
To: dev@airflow.incubator.apache.org
Subject: Re: Creating dynamic pool from task

I've done something similar.  I have a task at the front of the DAG that
ensures the connection pool exists and creates the pool if it doesn't.
I've pasted my code below.  This runs in a for loop that creates one DAG
per iteration each with its own pool.  Then I pass the pool name into the
sensors.

Does this work for your use case?

--

redshift_pool = PythonOperator(
    task_id='redshift_pool',
    dag=dag,
    python_callable=ensure_redshift_pool,
    op_kwargs={
        'name': workflow.pool,
        'slots': REDSHIFT_POOL_SLOTS,
    },
    ...
)

@provide_session
def ensure_redshift_pool(name, slots, session=None):
    pool = Pool(pool=name, slots=slots)
    pool_query = (
        session.query(Pool)
        .filter(Pool.pool == name)
    )
    pool_query_result = pool_query.one_or_none()
    if not pool_query_result:
        logger.info(f'redshift pool "{name}" does not exist - creating it')
        session.add(pool)
        session.commit()
        logger.info(f'created redshift pool "{name}"')
    else:
        logger.info(f'redshift pool "{name}" already exists')

--

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story
<https://stackoverflow.com/story/taylor>



On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <dszakallas@whitepages.com>
wrote:

> Hi all,
>
> I have a DAG that creates a cluster, starts computation tasks, and after
> they completed, tears down the cluster. I want to limit concurrency for the
> computation tasks carried on this cluster to fixed number. So logically, I
> need a pool that is exclusive to the cluster created by a task. I don't
> want interference with other DAGs or different runs of the same DAG.
>
> I thought I could solve this problem by creating a pool dynamically from a
> task after the cluster is created and delete it once the computation tasks
> are finished. I thought I could template the pool parameter of the
> computation tasks to make them use this dynamically created cluster.
>
> However this way the computation tasks will never be triggered. So I think
> the pool parameter is saved in the task instance before being templated. I
> would like to hear your thoughts on how to achieve the desired behavior.
>
> Thanks,
>
> Dávid Szakállas
> Software Engineer | Whitepages Data Services
>
>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message