airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Maguire <andrewm4...@gmail.com>
Subject Re: Submitting 1000+ tasks to airflow programatically
Date Thu, 22 Mar 2018 07:43:29 GMT
I've had similar issues with large dags being slow to render on ui and
crashing chrome.

I got around it by changing the default tree view from 25 to just 5.

Involves a couple changes to source files though, would be great if some of
the ui defaults could go into airflow.cfg.

https://stackoverflow.com/a/48665734/1919374

On Thu, 22 Mar 2018, 01:26 Chris Fei, <cfei18@gmail.com> wrote:

> @Kyle, I do something similar and have run into the problems you've
> mentioned. In my case, I access data from S3 and then generate separate
> DAGs (of different structures) based on the data that's pulled. I've
> also found that the UI for accessing a single large DAG is slow so I
> prefer to keep many separate DAGs. What I'd try is to define a DAG
> that's responsible for accessing your API and caching the client IDs
> somewhere locally, maybe just to a file on disk or as an Airflow
> Variable. You can run this DAG on whatever schedule is appropriate for
> you. From there, build a function that creates a DAG and then for each
> client ID, register a DAG built by that function to the global context.
> Like this:
> def create_client_dag(client_id):
>     # build dag here
>
> def get_client_ids_locally():
>     # access the data that was pulled from the API
>
> client_ids = get_client_ids_locally()
> for client in client_ids:
>     dag = create_client_dag(client)
>     globals()[dag.dag_id] = dag
>
> This approach also handles removing client IDs somewhat gracefully. DAGs
> for removed clients will still appear in the UI (you can build a
> maintenance DAG to clean that up), but they'll be disabled and their
> tasks won't be scheduled.
> On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > Thanks for all the responses let me try to address the main themes.
> >
> > @Ace @Nicholas @Taylor
> > I originally started with a loop over my list of client ids and
> > created a> SparkSubmitOperator for each client. The pseudo code would
> look
> > something> like this:
> >
> > dag = DAG(...)
> >
> > client_ids = get_client_ids()
> > for client_id in client_ids:
> >     SparkSubmitOperator(
> >         ...
> >         dag=dag
> >     )
> >
> > I found that this approach kind of clunky for a few reasons.
> > First, the> get_cleint_ids() function was hitting our API every time the
> dag
> > was read> by the scheduler which seemed excessive (every 30 seconds or
> > so?). Second,> it seemed like when a single task failure made marked the
> whole
> > dag as a> failure, but I guess retrying till the task worked could solve
> > this? Third,> the UI gets really clunky and slow, basically unusable
> when it
> > tries to> render the graph view for that many tasks. Finally, Airflow
> > doesn't seem> very happy when client_ids are removed i.e. the
> get_client_ids()
> > no longer> returns a specific client_id, it really seems to want a
> static dag.
> >
> > Do I really have to poll and API or database every 30 seconds for this>
> dynamic client_id data?
> >
> > @Ace
> > I have been limiting concurrency so as to not blast the cluster
> >
> > @Nicholas
> > Thank you for the noise suggestion I will definitely implement
> > that if I> continue with the same methodology
> >
> > @Taylor
> > Are you using a SubDagOperator? Or is your process similar to the
> > pseudo code I wrote above?
> >
> >
> > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > <tedmiston@gmail.com> wrote:>
> >> We also use a similar approach to generate dynamic DAGs based on
> >> a common>> template DAG file.  We pull in the list of config objects,
> one
> >> per DAG,>> from an internal API lightly wrapping the database, then we
> >> cache that>> response in a Airflow Variable that gets updated once a
> minute.  The>> dynamic DAGs are generated from that variable.
> >>
> >> *Taylor Edmiston*
> >> TEdmiston.com <https://www.tedmiston.com/> | Blog
> >> <http://blog.tedmiston.com>
> >> Stack Overflow CV <https://stackoverflow.com/story/taylor> |
> LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList
> >> <https://angel.co/taylor>
> >>
> >>
> >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> >> nicolas.kijak@weightwatchers.com> wrote:
> >>
> >>> Kyle,
> >>>
> >>> We have a similar approach but on a much, much smaller scale. We
> >>> now have>>> <100 “things to process” but expect it to grow
to under
> ~200.  Each>> “thing
> >>> to process” has the same workflow so we have a single DAG
> >>> definition that>>> does about 20 tasks per, then we loop over the
list
> of items and
> >>> produce>> a
> >>> dag object for each one adding it to the global definition.
> >>>
> >>> One of the things we quickly ran into was crushing the scheduler as>>>
> everything was running with the same start time.  To get around
> >>> this we>> add
> >>> noise to the start time minute and seconds. Simply index % 60.
> This>>> spreads out the load so that the scheduler isn’t trying to run
> >>> everything>>> at the exact same moment.  I would suggest if you
do go
> this
> >>> route, to>> also
> >>> stagger your hours if you can because of how many you plan to run.
> >> Perhaps
> >>> your DAGs are smaller and aren’t as CPU intensive as ours.
> >>>
> >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <hamlin.kn@gmail.com> wrote:
> >>>
> >>>   Hello,
> >>>
> >>>   I'm currently using Airflow for some ETL tasks where I submit a
> >>>   spark>>> job
> >>>   to a cluster and poll till it is complete. This workflow is nice
> >>> because it
> >>>   is typically a single Dag. I'm now starting to do more machine
> >> learning
> >>>   tasks and need to build a model per client which is 1000+
> >>>   clients. My>>>   spark cluster is capable of handling this workload,
> however, it
> >> doesn't
> >>>   seem scalable to write 1000+ dags to fit models for each client. I>>
> want
> >>>   each client to have its own task instance so it can be retried
> >>>   if it>>>   fails without having to run all 1000+ tasks over again.
> How do I
> >> handle
> >>>   this type of workflow in Airflow?
> >>>
> >>>
> >>>
> >>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message