airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Taylor Edmiston <tedmis...@gmail.com>
Subject Re: Creating dynamic pool from task
Date Thu, 27 Sep 2018 14:11:35 GMT
I also believe Chris is correct that it's not quite possible to be that
dynamic today.

If you can find a workaround like only running 1 DAG run for each DAG at a
time and reusing the pool, or perhaps it might work to create the pool
based on the DagRun's execution date instead of its id?

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston> | Developer Story
<https://stackoverflow.com/story/taylor>


On Tue, Sep 25, 2018 at 10:24 AM, Chris Palmer <crispy16@gmail.com> wrote:

> David,
>
> I was playing around with this over the weekend, and mostly found that it
> doesn't seem to be possible. I was able to get an operator to template out
> the pool attribute, when it renders it's templates. However this doesn't
> normally get done until execution, and so the un-templated pool attribute
> get's used when the scheduler sends the task to the executor.
>
> Chris
>
> On Fri, Sep 21, 2018 at 6:12 PM Chris Palmer <crispy16@gmail.com> wrote:
>
> > I see, so for a given DagRun you want to limit the compute tasks that are
> > running. But I'm guessing you want multiple DagRuns to be able to run
> > concurrently to operate on their own clusters independently.
> >
> > From what I could tell in the code, the pool gets checked before
> execution
> > (which is when templates are rendered). Which makes dynamic pools
> difficult
> > to do.
> >
> > It's probably possible to find a solution but I think it's likely going
> to
> > involve some ugly code/inspection of the python stack.
> >
> > Chris
> >
> > On Sep 21, 2018 4:47 PM, "David Szakallas" <dszakallas@whitepages.com>
> > wrote:
> >
> > Chris, the tasks are independent of each other so they can run
> > concurrently. I have to limit the concurrency though, so they don’t
> starve.
> > As the cluster is created dynamically with a task, a shared pool with
> other
> > DAGs or other runs of the same DAG is not preferable.
> >
> > I imagined something like this:
> >
> >                                                                     .——>
> >  [compute_1] ——.
> >                                                                   / ——>
> >  [compute_2] ——  \
> >                                                                 /
> >              .                        \
> > [create_cluster] —> [create_pool_x6]                         .
> >             [ delete_pool ] —> [delete cluster]
> >                                                                \
> >               .                        /
> >                                                                  \ ——>
> >  [compute_19] —— /
> >                                                                    . ——>
> > [compute_20] ——.
> > Thanks,
> > David
> >
> >
> > > On Sep 21, 2018, at 7:23 PM, Chris Palmer <chris@crpalmer.com> wrote:
> > >
> > > What would cause multiple computation tasks to run on the cluster at
> the
> > > same time? Are you worried about concurrent DagRuns? Does setting dag
> > > concurrency and/or task concurrency appropriately solve your problem?
> > >
> > > Chris
> > >
> > > On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <
> > dszakallas@whitepages.com>
> > > wrote:
> > >
> > >> What I am doing is very similar. However I am including the DagRun's
> id
> > in
> > >> the pool name to make it unique, as I need to make sure every run gets
> > its
> > >> own pool. I am getting that from the context object, which is only
> > >> available within execute methods or templates. How do you make sure
> each
> > >> run has it's own pool?
> > >>
> > >>
> > >> Thanks,
> > >>
> > >> Dávid Szakállas
> > >> Software Engineer | Whitepages Data Services
> > >>
> > >> ________________________________
> > >> From: Taylor Edmiston <tedmiston@gmail.com>
> > >> Sent: Thursday, September 20, 2018 6:17:05 PM
> > >> To: dev@airflow.incubator.apache.org
> > >> Subject: Re: Creating dynamic pool from task
> > >>
> > >> I've done something similar.  I have a task at the front of the DAG
> that
> > >> ensures the connection pool exists and creates the pool if it doesn't.
> > >> I've pasted my code below.  This runs in a for loop that creates one
> DAG
> > >> per iteration each with its own pool.  Then I pass the pool name into
> > the
> > >> sensors.
> > >>
> > >> Does this work for your use case?
> > >>
> > >> --
> > >>
> > >> redshift_pool = PythonOperator(
> > >>    task_id='redshift_pool',
> > >>    dag=dag,
> > >>    python_callable=ensure_redshift_pool,
> > >>    op_kwargs={
> > >>        'name': workflow.pool,
> > >>        'slots': REDSHIFT_POOL_SLOTS,
> > >>    },
> > >>    ...
> > >> )
> > >>
> > >> @provide_session
> > >> def ensure_redshift_pool(name, slots, session=None):
> > >>    pool = Pool(pool=name, slots=slots)
> > >>    pool_query = (
> > >>        session.query(Pool)
> > >>        .filter(Pool.pool == name)
> > >>    )
> > >>    pool_query_result = pool_query.one_or_none()
> > >>    if not pool_query_result:
> > >>        logger.info(f'redshift pool "{name}" does not exist - creating
> > >> it')
> > >>        session.add(pool)
> > >>        session.commit()
> > >>        logger.info(f'created redshift pool "{name}"')
> > >>    else:
> > >>        logger.info(f'redshift pool "{name}" already exists')
> > >>
> > >> --
> > >>
> > >> *Taylor Edmiston*
> > >> Blog <https://blog.tedmiston.com/> | LinkedIn
> > >> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow
> > >> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer
> > Story
> > >> <https://stackoverflow.com/story/taylor>
> > >>
> > >>
> > >>
> > >> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <
> > >> dszakallas@whitepages.com>
> > >> wrote:
> > >>
> > >>> Hi all,
> > >>>
> > >>> I have a DAG that creates a cluster, starts computation tasks, and
> > after
> > >>> they completed, tears down the cluster. I want to limit concurrency
> for
> > >> the
> > >>> computation tasks carried on this cluster to fixed number. So
> > logically,
> > >> I
> > >>> need a pool that is exclusive to the cluster created by a task. I
> don't
> > >>> want interference with other DAGs or different runs of the same DAG.
> > >>>
> > >>> I thought I could solve this problem by creating a pool dynamically
> > from
> > >> a
> > >>> task after the cluster is created and delete it once the computation
> > >> tasks
> > >>> are finished. I thought I could template the pool parameter of the
> > >>> computation tasks to make them use this dynamically created cluster.
> > >>>
> > >>> However this way the computation tasks will never be triggered. So
I
> > >> think
> > >>> the pool parameter is saved in the task instance before being
> > templated.
> > >> I
> > >>> would like to hear your thoughts on how to achieve the desired
> > behavior.
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Dávid Szakállas
> > >>> Software Engineer | Whitepages Data Services
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>
> >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message