airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Palmer <crisp...@gmail.com>
Subject Re: Creating dynamic pool from task
Date Tue, 25 Sep 2018 14:24:38 GMT
David,

I was playing around with this over the weekend, and mostly found that it
doesn't seem to be possible. I was able to get an operator to template out
the pool attribute, when it renders it's templates. However this doesn't
normally get done until execution, and so the un-templated pool attribute
get's used when the scheduler sends the task to the executor.

Chris

On Fri, Sep 21, 2018 at 6:12 PM Chris Palmer <crispy16@gmail.com> wrote:

> I see, so for a given DagRun you want to limit the compute tasks that are
> running. But I'm guessing you want multiple DagRuns to be able to run
> concurrently to operate on their own clusters independently.
>
> From what I could tell in the code, the pool gets checked before execution
> (which is when templates are rendered). Which makes dynamic pools difficult
> to do.
>
> It's probably possible to find a solution but I think it's likely going to
> involve some ugly code/inspection of the python stack.
>
> Chris
>
> On Sep 21, 2018 4:47 PM, "David Szakallas" <dszakallas@whitepages.com>
> wrote:
>
> Chris, the tasks are independent of each other so they can run
> concurrently. I have to limit the concurrency though, so they don’t starve.
> As the cluster is created dynamically with a task, a shared pool with other
> DAGs or other runs of the same DAG is not preferable.
>
> I imagined something like this:
>
>                                                                     .——>
>  [compute_1] ——.
>                                                                   / ——>
>  [compute_2] ——  \
>                                                                 /
>              .                        \
> [create_cluster] —> [create_pool_x6]                         .
>             [ delete_pool ] —> [delete cluster]
>                                                                \
>               .                        /
>                                                                  \ ——>
>  [compute_19] —— /
>                                                                    . ——>
> [compute_20] ——.
> Thanks,
> David
>
>
> > On Sep 21, 2018, at 7:23 PM, Chris Palmer <chris@crpalmer.com> wrote:
> >
> > What would cause multiple computation tasks to run on the cluster at the
> > same time? Are you worried about concurrent DagRuns? Does setting dag
> > concurrency and/or task concurrency appropriately solve your problem?
> >
> > Chris
> >
> > On Thu, Sep 20, 2018 at 8:31 PM David Szakallas <
> dszakallas@whitepages.com>
> > wrote:
> >
> >> What I am doing is very similar. However I am including the DagRun's id
> in
> >> the pool name to make it unique, as I need to make sure every run gets
> its
> >> own pool. I am getting that from the context object, which is only
> >> available within execute methods or templates. How do you make sure each
> >> run has it's own pool?
> >>
> >>
> >> Thanks,
> >>
> >> Dávid Szakállas
> >> Software Engineer | Whitepages Data Services
> >>
> >> ________________________________
> >> From: Taylor Edmiston <tedmiston@gmail.com>
> >> Sent: Thursday, September 20, 2018 6:17:05 PM
> >> To: dev@airflow.incubator.apache.org
> >> Subject: Re: Creating dynamic pool from task
> >>
> >> I've done something similar.  I have a task at the front of the DAG that
> >> ensures the connection pool exists and creates the pool if it doesn't.
> >> I've pasted my code below.  This runs in a for loop that creates one DAG
> >> per iteration each with its own pool.  Then I pass the pool name into
> the
> >> sensors.
> >>
> >> Does this work for your use case?
> >>
> >> --
> >>
> >> redshift_pool = PythonOperator(
> >>    task_id='redshift_pool',
> >>    dag=dag,
> >>    python_callable=ensure_redshift_pool,
> >>    op_kwargs={
> >>        'name': workflow.pool,
> >>        'slots': REDSHIFT_POOL_SLOTS,
> >>    },
> >>    ...
> >> )
> >>
> >> @provide_session
> >> def ensure_redshift_pool(name, slots, session=None):
> >>    pool = Pool(pool=name, slots=slots)
> >>    pool_query = (
> >>        session.query(Pool)
> >>        .filter(Pool.pool == name)
> >>    )
> >>    pool_query_result = pool_query.one_or_none()
> >>    if not pool_query_result:
> >>        logger.info(f'redshift pool "{name}" does not exist - creating
> >> it')
> >>        session.add(pool)
> >>        session.commit()
> >>        logger.info(f'created redshift pool "{name}"')
> >>    else:
> >>        logger.info(f'redshift pool "{name}" already exists')
> >>
> >> --
> >>
> >> *Taylor Edmiston*
> >> Blog <https://blog.tedmiston.com/> | LinkedIn
> >> <https://www.linkedin.com/in/tedmiston/> | Stack Overflow
> >> <https://stackoverflow.com/users/149428/taylor-edmiston> | Developer
> Story
> >> <https://stackoverflow.com/story/taylor>
> >>
> >>
> >>
> >> On Thu, Sep 20, 2018 at 10:08 AM David Szakallas <
> >> dszakallas@whitepages.com>
> >> wrote:
> >>
> >>> Hi all,
> >>>
> >>> I have a DAG that creates a cluster, starts computation tasks, and
> after
> >>> they completed, tears down the cluster. I want to limit concurrency for
> >> the
> >>> computation tasks carried on this cluster to fixed number. So
> logically,
> >> I
> >>> need a pool that is exclusive to the cluster created by a task. I don't
> >>> want interference with other DAGs or different runs of the same DAG.
> >>>
> >>> I thought I could solve this problem by creating a pool dynamically
> from
> >> a
> >>> task after the cluster is created and delete it once the computation
> >> tasks
> >>> are finished. I thought I could template the pool parameter of the
> >>> computation tasks to make them use this dynamically created cluster.
> >>>
> >>> However this way the computation tasks will never be triggered. So I
> >> think
> >>> the pool parameter is saved in the task instance before being
> templated.
> >> I
> >>> would like to hear your thoughts on how to achieve the desired
> behavior.
> >>>
> >>> Thanks,
> >>>
> >>> Dávid Szakállas
> >>> Software Engineer | Whitepages Data Services
> >>>
> >>>
> >>>
> >>>
> >>>
> >>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message