airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maxime Beauchemin <maximebeauche...@gmail.com>
Subject Re: Submitting 1000+ tasks to airflow programatically
Date Fri, 23 Mar 2018 00:04:33 GMT
On the open PR I described how DagFetcher might imply a new DAG manifest
(replacing the current DAG_FOLDER auto-parsing & discovery) that describes
a list of dag_ids and related DAG URIs.

That DAG manifest could be a static list OR a something dynamic if you pass
it a callable. To enable the dynamic DAGs pattern, perhaps that manifest
supports not only URIs but also DAG factories as functions that return one
or many DAG objects. This might be a tad too dynamic though, it can be good
to keep the balance and have something more predictable.

Max

On Thu, Mar 22, 2018 at 1:24 PM, James Meickle <jmeickle@quantopian.com>
wrote:

> I'm very excited about the possibility of implementing a DAGFetcher (per
> prior thread about this) that is aware of dynamic data sources, and can
> handle abstracting/caching/deploying them itself, rather than having each
> Airflow process run the query for each DAG refresh.
>
> On Thu, Mar 22, 2018 at 2:12 PM, Taylor Edmiston <tedmiston@gmail.com>
> wrote:
>
> > I'm interested in hearing further discussion too, and if others have
> tried
> > something similar to our approach.  Several companies on this list have
> > mentioned various approaches to dynamic DAGs, and I think everyone needs
> > them eventually.  Maybe it's an opportunity for additional docs regarding
> > use cases like this and to document best practices from the community.
> >
> > *Taylor Edmiston*
> > TEdmiston.com <https://www.tedmiston.com/> | Blog
> > <http://blog.tedmiston.com>
> > Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor>
> >
> >
> > On Thu, Mar 22, 2018 at 12:43 PM, Kyle Hamlin <hamlin.kn@gmail.com>
> wrote:
> >
> > > @Chris @Taylor
> > > Thank you guy very much for your explanations! Your strategy makes a
> lot
> > of
> > > sense to me. Generating a dag for each client I'm going to have a ton
> of
> > > dags on the front page but at least that is searchable haha. I'm going
> to
> > > give this implementation a shot and I'll try to report back with the
> > > outcome.
> > >
> > > Can anyone comment on future work to support data science workflows
> like
> > > these, or is Airflow fundamentally the wrong tool?
> > >
> > > On Thu, Mar 22, 2018 at 12:07 PM Taylor Edmiston <tedmiston@gmail.com>
> > > wrote:
> > >
> > > > We're not using SubDagOperator.  Our approach is using 1 DAG file to
> > > > generate a separate DAG class instance for each similar config, which
> > > gets
> > > > hoisted into global namespace.  In simplified pseudo-Python, it looks
> > > like:
> > > >
> > > > # sources --> {'configs': [{...}, {...}], 'expire': '<timestamp>'}
> > > > cache = Variable.get('sources', default_var={},
> deserialize_json=True)
> > > > sources = fetch_configs() if is_empty(cache) or is_expired(cache)
> else
> > > > cache['configs']
> > > > for source in sources:
> > > >   dag = DAG(...)
> > > >   globals()[source._id] = dag
> > > >   # ...create tasks and set dependencies for each DAG (some config
> > pulled
> > > > from source object for each)...
> > > >
> > > > We added the cache part for the same reason you pointed out, because
> > the
> > > > DAG processing loop was hitting the API a lot.  Btw, you can also
> turn
> > > down
> > > > how much the processing loop runs with scheduler_heartbeat_sec under
> > the
> > > > scheduler group in config.
> > > >
> > > > We also considered the route Chris mentioned of updating cache via a
> > > > separate DAG but weren't crazy about having a DAG scheduled once per
> > > > minute.
> > > >
> > > > *Taylor Edmiston*
> > > > TEdmiston.com <https://www.tedmiston.com/> | Blog
> > > > <http://blog.tedmiston.com>
> > > > Stack Overflow CV <https://stackoverflow.com/story/taylor> |
> LinkedIn
> > > > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > > <https://angel.co/taylor>
> > > >
> > > >
> > > > On Thu, Mar 22, 2018 at 9:17 AM, David Capwell <dcapwell@gmail.com>
> > > wrote:
> > > >
> > > > > For us we compile down to Python rather than do the logic in
> Python,
> > > that
> > > > > makes it so the load doesn't do real work.
> > > > >
> > > > > We have our own DSL that is just a simplified compiler; parse,
> > analyze,
> > > > > optimize, code gen.  In code gen we just generate the Python code.
> > Our
> > > > > build then packages it up and have airflow fetch it (very hacky
> fetch
> > > > right
> > > > > now)
> > > > >
> > > > > This does make it so loading is simple and fast, but means you
> can't
> > > use
> > > > > the Python api directly
> > > > >
> > > > > On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <
> andrewm4894@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > I've had similar issues with large dags being slow to render
on
> ui
> > > and
> > > > > > crashing chrome.
> > > > > >
> > > > > > I got around it by changing the default tree view from 25 to
just
> > 5.
> > > > > >
> > > > > > Involves a couple changes to source files though, would be great
> if
> > > > some
> > > > > of
> > > > > > the ui defaults could go into airflow.cfg.
> > > > > >
> > > > > > https://stackoverflow.com/a/48665734/1919374
> > > > > >
> > > > > > On Thu, 22 Mar 2018, 01:26 Chris Fei, <cfei18@gmail.com>
wrote:
> > > > > >
> > > > > > > @Kyle, I do something similar and have run into the problems
> > you've
> > > > > > > mentioned. In my case, I access data from S3 and then generate
> > > > separate
> > > > > > > DAGs (of different structures) based on the data that's
pulled.
> > > I've
> > > > > > > also found that the UI for accessing a single large DAG
is slow
> > so
> > > I
> > > > > > > prefer to keep many separate DAGs. What I'd try is to define
a
> > DAG
> > > > > > > that's responsible for accessing your API and caching the
> client
> > > IDs
> > > > > > > somewhere locally, maybe just to a file on disk or as an
> Airflow
> > > > > > > Variable. You can run this DAG on whatever schedule is
> > appropriate
> > > > for
> > > > > > > you. From there, build a function that creates a DAG and
then
> for
> > > > each
> > > > > > > client ID, register a DAG built by that function to the
global
> > > > context.
> > > > > > > Like this:
> > > > > > > def create_client_dag(client_id):
> > > > > > >     # build dag here
> > > > > > >
> > > > > > > def get_client_ids_locally():
> > > > > > >     # access the data that was pulled from the API
> > > > > > >
> > > > > > > client_ids = get_client_ids_locally()
> > > > > > > for client in client_ids:
> > > > > > >     dag = create_client_dag(client)
> > > > > > >     globals()[dag.dag_id] = dag
> > > > > > >
> > > > > > > This approach also handles removing client IDs somewhat
> > gracefully.
> > > > > DAGs
> > > > > > > for removed clients will still appear in the UI (you can
build
> a
> > > > > > > maintenance DAG to clean that up), but they'll be disabled
and
> > > their
> > > > > > > tasks won't be scheduled.
> > > > > > > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > > > > > > > Thanks for all the responses let me try to address
the main
> > > themes.
> > > > > > > >
> > > > > > > > @Ace @Nicholas @Taylor
> > > > > > > > I originally started with a loop over my list of client
ids
> and
> > > > > > > > created a> SparkSubmitOperator for each client.
The pseudo
> code
> > > > would
> > > > > > > look
> > > > > > > > something> like this:
> > > > > > > >
> > > > > > > > dag = DAG(...)
> > > > > > > >
> > > > > > > > client_ids = get_client_ids()
> > > > > > > > for client_id in client_ids:
> > > > > > > >     SparkSubmitOperator(
> > > > > > > >         ...
> > > > > > > >         dag=dag
> > > > > > > >     )
> > > > > > > >
> > > > > > > > I found that this approach kind of clunky for a few
reasons.
> > > > > > > > First, the> get_cleint_ids() function was hitting
our API
> every
> > > > time
> > > > > > the
> > > > > > > dag
> > > > > > > > was read> by the scheduler which seemed excessive
(every 30
> > > seconds
> > > > > or
> > > > > > > > so?). Second,> it seemed like when a single task
failure made
> > > > marked
> > > > > > the
> > > > > > > whole
> > > > > > > > dag as a> failure, but I guess retrying till the
task worked
> > > could
> > > > > > solve
> > > > > > > > this? Third,> the UI gets really clunky and slow,
basically
> > > > unusable
> > > > > > > when it
> > > > > > > > tries to> render the graph view for that many tasks.
Finally,
> > > > Airflow
> > > > > > > > doesn't seem> very happy when client_ids are removed
i.e. the
> > > > > > > get_client_ids()
> > > > > > > > no longer> returns a specific client_id, it really
seems to
> > want
> > > a
> > > > > > > static dag.
> > > > > > > >
> > > > > > > > Do I really have to poll and API or database every
30 seconds
> > for
> > > > > this>
> > > > > > > dynamic client_id data?
> > > > > > > >
> > > > > > > > @Ace
> > > > > > > > I have been limiting concurrency so as to not blast
the
> cluster
> > > > > > > >
> > > > > > > > @Nicholas
> > > > > > > > Thank you for the noise suggestion I will definitely
> implement
> > > > > > > > that if I> continue with the same methodology
> > > > > > > >
> > > > > > > > @Taylor
> > > > > > > > Are you using a SubDagOperator? Or is your process
similar to
> > the
> > > > > > > > pseudo code I wrote above?
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > > > > > > > <tedmiston@gmail.com> wrote:>
> > > > > > > >> We also use a similar approach to generate dynamic
DAGs
> based
> > on
> > > > > > > >> a common>> template DAG file.  We pull in
the list of config
> > > > > objects,
> > > > > > > one
> > > > > > > >> per DAG,>> from an internal API lightly
wrapping the
> database,
> > > > then
> > > > > we
> > > > > > > >> cache that>> response in a Airflow Variable
that gets
> updated
> > > > once a
> > > > > > > minute.  The>> dynamic DAGs are generated from that
variable.
> > > > > > > >>
> > > > > > > >> *Taylor Edmiston*
> > > > > > > >> TEdmiston.com <https://www.tedmiston.com/>
| Blog
> > > > > > > >> <http://blog.tedmiston.com>
> > > > > > > >> Stack Overflow CV <https://stackoverflow.com/story/taylor>
> |
> > > > > > > LinkedIn>> <https://www.linkedin.com/in/tedmiston/>
|
> AngelList
> > > > > > > >> <https://angel.co/taylor>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak
<
> > > > > > > >> nicolas.kijak@weightwatchers.com> wrote:
> > > > > > > >>
> > > > > > > >>> Kyle,
> > > > > > > >>>
> > > > > > > >>> We have a similar approach but on a much,
much smaller
> scale.
> > > We
> > > > > > > >>> now have>>> <100 “things to
process” but expect it to grow
> to
> > > > under
> > > > > > > ~200.  Each>> “thing
> > > > > > > >>> to process” has the same workflow so we
have a single DAG
> > > > > > > >>> definition that>>> does about 20
tasks per, then we loop
> over
> > > the
> > > > > > list
> > > > > > > of items and
> > > > > > > >>> produce>> a
> > > > > > > >>> dag object for each one adding it to the global
definition.
> > > > > > > >>>
> > > > > > > >>> One of the things we quickly ran into was
crushing the
> > > scheduler
> > > > > > as>>>
> > > > > > > everything was running with the same start time.  To get
around
> > > > > > > >>> this we>> add
> > > > > > > >>> noise to the start time minute and seconds.
Simply index %
> > 60.
> > > > > > > This>>> spreads out the load so that the scheduler
isn’t trying
> > to
> > > > run
> > > > > > > >>> everything>>> at the exact same moment.
 I would suggest if
> > you
> > > > do
> > > > > go
> > > > > > > this
> > > > > > > >>> route, to>> also
> > > > > > > >>> stagger your hours if you can because of how
many you plan
> to
> > > > run.
> > > > > > > >> Perhaps
> > > > > > > >>> your DAGs are smaller and aren’t as CPU
intensive as ours.
> > > > > > > >>>
> > > > > > > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <hamlin.kn@gmail.com>
> > > wrote:
> > > > > > > >>>
> > > > > > > >>>   Hello,
> > > > > > > >>>
> > > > > > > >>>   I'm currently using Airflow for some ETL
tasks where I
> > > submit a
> > > > > > > >>>   spark>>> job
> > > > > > > >>>   to a cluster and poll till it is complete.
This workflow
> is
> > > > nice
> > > > > > > >>> because it
> > > > > > > >>>   is typically a single Dag. I'm now starting
to do more
> > > machine
> > > > > > > >> learning
> > > > > > > >>>   tasks and need to build a model per client
which is 1000+
> > > > > > > >>>   clients. My>>>   spark cluster
is capable of handling
> this
> > > > > > workload,
> > > > > > > however, it
> > > > > > > >> doesn't
> > > > > > > >>>   seem scalable to write 1000+ dags to fit
models for each
> > > > client.
> > > > > > I>>
> > > > > > > want
> > > > > > > >>>   each client to have its own task instance
so it can be
> > > retried
> > > > > > > >>>   if it>>>   fails without having
to run all 1000+ tasks
> over
> > > > > again.
> > > > > > > How do I
> > > > > > > >> handle
> > > > > > > >>>   this type of workflow in Airflow?
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message