airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Taylor Edmiston <tedmis...@gmail.com>
Subject Re: Submitting 1000+ tasks to airflow programatically
Date Thu, 22 Mar 2018 18:12:46 GMT
I'm interested in hearing further discussion too, and if others have tried
something similar to our approach.  Several companies on this list have
mentioned various approaches to dynamic DAGs, and I think everyone needs
them eventually.  Maybe it's an opportunity for additional docs regarding
use cases like this and to document best practices from the community.

*Taylor Edmiston*
TEdmiston.com <https://www.tedmiston.com/> | Blog
<http://blog.tedmiston.com>
Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor>


On Thu, Mar 22, 2018 at 12:43 PM, Kyle Hamlin <hamlin.kn@gmail.com> wrote:

> @Chris @Taylor
> Thank you guy very much for your explanations! Your strategy makes a lot of
> sense to me. Generating a dag for each client I'm going to have a ton of
> dags on the front page but at least that is searchable haha. I'm going to
> give this implementation a shot and I'll try to report back with the
> outcome.
>
> Can anyone comment on future work to support data science workflows like
> these, or is Airflow fundamentally the wrong tool?
>
> On Thu, Mar 22, 2018 at 12:07 PM Taylor Edmiston <tedmiston@gmail.com>
> wrote:
>
> > We're not using SubDagOperator.  Our approach is using 1 DAG file to
> > generate a separate DAG class instance for each similar config, which
> gets
> > hoisted into global namespace.  In simplified pseudo-Python, it looks
> like:
> >
> > # sources --> {'configs': [{...}, {...}], 'expire': '<timestamp>'}
> > cache = Variable.get('sources', default_var={}, deserialize_json=True)
> > sources = fetch_configs() if is_empty(cache) or is_expired(cache) else
> > cache['configs']
> > for source in sources:
> >   dag = DAG(...)
> >   globals()[source._id] = dag
> >   # ...create tasks and set dependencies for each DAG (some config pulled
> > from source object for each)...
> >
> > We added the cache part for the same reason you pointed out, because the
> > DAG processing loop was hitting the API a lot.  Btw, you can also turn
> down
> > how much the processing loop runs with scheduler_heartbeat_sec under the
> > scheduler group in config.
> >
> > We also considered the route Chris mentioned of updating cache via a
> > separate DAG but weren't crazy about having a DAG scheduled once per
> > minute.
> >
> > *Taylor Edmiston*
> > TEdmiston.com <https://www.tedmiston.com/> | Blog
> > <http://blog.tedmiston.com>
> > Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor>
> >
> >
> > On Thu, Mar 22, 2018 at 9:17 AM, David Capwell <dcapwell@gmail.com>
> wrote:
> >
> > > For us we compile down to Python rather than do the logic in Python,
> that
> > > makes it so the load doesn't do real work.
> > >
> > > We have our own DSL that is just a simplified compiler; parse, analyze,
> > > optimize, code gen.  In code gen we just generate the Python code.  Our
> > > build then packages it up and have airflow fetch it (very hacky fetch
> > right
> > > now)
> > >
> > > This does make it so loading is simple and fast, but means you can't
> use
> > > the Python api directly
> > >
> > > On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <andrewm4894@gmail.com>
> > > wrote:
> > >
> > > > I've had similar issues with large dags being slow to render on ui
> and
> > > > crashing chrome.
> > > >
> > > > I got around it by changing the default tree view from 25 to just 5.
> > > >
> > > > Involves a couple changes to source files though, would be great if
> > some
> > > of
> > > > the ui defaults could go into airflow.cfg.
> > > >
> > > > https://stackoverflow.com/a/48665734/1919374
> > > >
> > > > On Thu, 22 Mar 2018, 01:26 Chris Fei, <cfei18@gmail.com> wrote:
> > > >
> > > > > @Kyle, I do something similar and have run into the problems you've
> > > > > mentioned. In my case, I access data from S3 and then generate
> > separate
> > > > > DAGs (of different structures) based on the data that's pulled.
> I've
> > > > > also found that the UI for accessing a single large DAG is slow so
> I
> > > > > prefer to keep many separate DAGs. What I'd try is to define a DAG
> > > > > that's responsible for accessing your API and caching the client
> IDs
> > > > > somewhere locally, maybe just to a file on disk or as an Airflow
> > > > > Variable. You can run this DAG on whatever schedule is appropriate
> > for
> > > > > you. From there, build a function that creates a DAG and then for
> > each
> > > > > client ID, register a DAG built by that function to the global
> > context.
> > > > > Like this:
> > > > > def create_client_dag(client_id):
> > > > >     # build dag here
> > > > >
> > > > > def get_client_ids_locally():
> > > > >     # access the data that was pulled from the API
> > > > >
> > > > > client_ids = get_client_ids_locally()
> > > > > for client in client_ids:
> > > > >     dag = create_client_dag(client)
> > > > >     globals()[dag.dag_id] = dag
> > > > >
> > > > > This approach also handles removing client IDs somewhat gracefully.
> > > DAGs
> > > > > for removed clients will still appear in the UI (you can build a
> > > > > maintenance DAG to clean that up), but they'll be disabled and
> their
> > > > > tasks won't be scheduled.
> > > > > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > > > > > Thanks for all the responses let me try to address the main
> themes.
> > > > > >
> > > > > > @Ace @Nicholas @Taylor
> > > > > > I originally started with a loop over my list of client ids
and
> > > > > > created a> SparkSubmitOperator for each client. The pseudo
code
> > would
> > > > > look
> > > > > > something> like this:
> > > > > >
> > > > > > dag = DAG(...)
> > > > > >
> > > > > > client_ids = get_client_ids()
> > > > > > for client_id in client_ids:
> > > > > >     SparkSubmitOperator(
> > > > > >         ...
> > > > > >         dag=dag
> > > > > >     )
> > > > > >
> > > > > > I found that this approach kind of clunky for a few reasons.
> > > > > > First, the> get_cleint_ids() function was hitting our API
every
> > time
> > > > the
> > > > > dag
> > > > > > was read> by the scheduler which seemed excessive (every
30
> seconds
> > > or
> > > > > > so?). Second,> it seemed like when a single task failure
made
> > marked
> > > > the
> > > > > whole
> > > > > > dag as a> failure, but I guess retrying till the task worked
> could
> > > > solve
> > > > > > this? Third,> the UI gets really clunky and slow, basically
> > unusable
> > > > > when it
> > > > > > tries to> render the graph view for that many tasks. Finally,
> > Airflow
> > > > > > doesn't seem> very happy when client_ids are removed i.e.
the
> > > > > get_client_ids()
> > > > > > no longer> returns a specific client_id, it really seems
to want
> a
> > > > > static dag.
> > > > > >
> > > > > > Do I really have to poll and API or database every 30 seconds
for
> > > this>
> > > > > dynamic client_id data?
> > > > > >
> > > > > > @Ace
> > > > > > I have been limiting concurrency so as to not blast the cluster
> > > > > >
> > > > > > @Nicholas
> > > > > > Thank you for the noise suggestion I will definitely implement
> > > > > > that if I> continue with the same methodology
> > > > > >
> > > > > > @Taylor
> > > > > > Are you using a SubDagOperator? Or is your process similar to
the
> > > > > > pseudo code I wrote above?
> > > > > >
> > > > > >
> > > > > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > > > > > <tedmiston@gmail.com> wrote:>
> > > > > >> We also use a similar approach to generate dynamic DAGs
based on
> > > > > >> a common>> template DAG file.  We pull in the list
of config
> > > objects,
> > > > > one
> > > > > >> per DAG,>> from an internal API lightly wrapping the
database,
> > then
> > > we
> > > > > >> cache that>> response in a Airflow Variable that gets
updated
> > once a
> > > > > minute.  The>> dynamic DAGs are generated from that variable.
> > > > > >>
> > > > > >> *Taylor Edmiston*
> > > > > >> TEdmiston.com <https://www.tedmiston.com/> | Blog
> > > > > >> <http://blog.tedmiston.com>
> > > > > >> Stack Overflow CV <https://stackoverflow.com/story/taylor>
|
> > > > > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> |
AngelList
> > > > > >> <https://angel.co/taylor>
> > > > > >>
> > > > > >>
> > > > > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> > > > > >> nicolas.kijak@weightwatchers.com> wrote:
> > > > > >>
> > > > > >>> Kyle,
> > > > > >>>
> > > > > >>> We have a similar approach but on a much, much smaller
scale.
> We
> > > > > >>> now have>>> <100 “things to process”
but expect it to grow to
> > under
> > > > > ~200.  Each>> “thing
> > > > > >>> to process” has the same workflow so we have a single
DAG
> > > > > >>> definition that>>> does about 20 tasks per,
then we loop over
> the
> > > > list
> > > > > of items and
> > > > > >>> produce>> a
> > > > > >>> dag object for each one adding it to the global definition.
> > > > > >>>
> > > > > >>> One of the things we quickly ran into was crushing the
> scheduler
> > > > as>>>
> > > > > everything was running with the same start time.  To get around
> > > > > >>> this we>> add
> > > > > >>> noise to the start time minute and seconds. Simply index
% 60.
> > > > > This>>> spreads out the load so that the scheduler isn’t
trying to
> > run
> > > > > >>> everything>>> at the exact same moment.  I
would suggest if you
> > do
> > > go
> > > > > this
> > > > > >>> route, to>> also
> > > > > >>> stagger your hours if you can because of how many you
plan to
> > run.
> > > > > >> Perhaps
> > > > > >>> your DAGs are smaller and aren’t as CPU intensive
as ours.
> > > > > >>>
> > > > > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <hamlin.kn@gmail.com>
> wrote:
> > > > > >>>
> > > > > >>>   Hello,
> > > > > >>>
> > > > > >>>   I'm currently using Airflow for some ETL tasks where
I
> submit a
> > > > > >>>   spark>>> job
> > > > > >>>   to a cluster and poll till it is complete. This workflow
is
> > nice
> > > > > >>> because it
> > > > > >>>   is typically a single Dag. I'm now starting to do
more
> machine
> > > > > >> learning
> > > > > >>>   tasks and need to build a model per client which is
1000+
> > > > > >>>   clients. My>>>   spark cluster is capable
of handling this
> > > > workload,
> > > > > however, it
> > > > > >> doesn't
> > > > > >>>   seem scalable to write 1000+ dags to fit models for
each
> > client.
> > > > I>>
> > > > > want
> > > > > >>>   each client to have its own task instance so it can
be
> retried
> > > > > >>>   if it>>>   fails without having to run all
1000+ tasks over
> > > again.
> > > > > How do I
> > > > > >> handle
> > > > > >>>   this type of workflow in Airflow?
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message