airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Szakallas <>
Subject Re: Creating dynamic pool from task
Date Fri, 21 Sep 2018 00:19:00 GMT
What I am doing is very similar. However I am including the DagRun's id in the pool name to
make it unique, as I need to make sure every run gets its own pool. I am getting that from
the context object, which is only available within execute methods or templates. How do you
make sure each run has it's own pool?


Dávid Szakállas
Software Engineer | Whitepages Data Services

From: Taylor Edmiston <>
Sent: Thursday, September 20, 2018 6:17:05 PM
Subject: Re: Creating dynamic pool from task

I've done something similar.  I have a task at the front of the DAG that
ensures the connection pool exists and creates the pool if it doesn't.
I've pasted my code below.  This runs in a for loop that creates one DAG
per iteration each with its own pool.  Then I pass the pool name into the

Does this work for your use case?


redshift_pool = PythonOperator(
        'name': workflow.pool,
        'slots': REDSHIFT_POOL_SLOTS,

def ensure_redshift_pool(name, slots, session=None):
    pool = Pool(pool=name, slots=slots)
    pool_query = (
        .filter(Pool.pool == name)
    pool_query_result = pool_query.one_or_none()
    if not pool_query_result:'redshift pool "{name}" does not exist - creating it')
        session.commit()'created redshift pool "{name}"')
    else:'redshift pool "{name}" already exists')


*Taylor Edmiston*
Blog <> | LinkedIn
<> | Stack Overflow
<> | Developer Story

On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <>

> Hi all,
> I have a DAG that creates a cluster, starts computation tasks, and after
> they completed, tears down the cluster. I want to limit concurrency for the
> computation tasks carried on this cluster to fixed number. So logically, I
> need a pool that is exclusive to the cluster created by a task. I don't
> want interference with other DAGs or different runs of the same DAG.
> I thought I could solve this problem by creating a pool dynamically from a
> task after the cluster is created and delete it once the computation tasks
> are finished. I thought I could template the pool parameter of the
> computation tasks to make them use this dynamically created cluster.
> However this way the computation tasks will never be triggered. So I think
> the pool parameter is saved in the task instance before being templated. I
> would like to hear your thoughts on how to achieve the desired behavior.
> Thanks,
> Dávid Szakállas
> Software Engineer | Whitepages Data Services

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message