airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Taylor Edmiston <tedmis...@gmail.com>
Subject Re: Submitting 1000+ tasks to airflow programatically
Date Thu, 22 Mar 2018 16:06:26 GMT
We're not using SubDagOperator.  Our approach is using 1 DAG file to
generate a separate DAG class instance for each similar config, which gets
hoisted into global namespace.  In simplified pseudo-Python, it looks like:

# sources --> {'configs': [{...}, {...}], 'expire': '<timestamp>'}
cache = Variable.get('sources', default_var={}, deserialize_json=True)
sources = fetch_configs() if is_empty(cache) or is_expired(cache) else
cache['configs']
for source in sources:
  dag = DAG(...)
  globals()[source._id] = dag
  # ...create tasks and set dependencies for each DAG (some config pulled
from source object for each)...

We added the cache part for the same reason you pointed out, because the
DAG processing loop was hitting the API a lot.  Btw, you can also turn down
how much the processing loop runs with scheduler_heartbeat_sec under the
scheduler group in config.

We also considered the route Chris mentioned of updating cache via a
separate DAG but weren't crazy about having a DAG scheduled once per minute.

*Taylor Edmiston*
TEdmiston.com <https://www.tedmiston.com/> | Blog
<http://blog.tedmiston.com>
Stack Overflow CV <https://stackoverflow.com/story/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor>


On Thu, Mar 22, 2018 at 9:17 AM, David Capwell <dcapwell@gmail.com> wrote:

> For us we compile down to Python rather than do the logic in Python, that
> makes it so the load doesn't do real work.
>
> We have our own DSL that is just a simplified compiler; parse, analyze,
> optimize, code gen.  In code gen we just generate the Python code.  Our
> build then packages it up and have airflow fetch it (very hacky fetch right
> now)
>
> This does make it so loading is simple and fast, but means you can't use
> the Python api directly
>
> On Thu, Mar 22, 2018, 12:43 AM Andrew Maguire <andrewm4894@gmail.com>
> wrote:
>
> > I've had similar issues with large dags being slow to render on ui and
> > crashing chrome.
> >
> > I got around it by changing the default tree view from 25 to just 5.
> >
> > Involves a couple changes to source files though, would be great if some
> of
> > the ui defaults could go into airflow.cfg.
> >
> > https://stackoverflow.com/a/48665734/1919374
> >
> > On Thu, 22 Mar 2018, 01:26 Chris Fei, <cfei18@gmail.com> wrote:
> >
> > > @Kyle, I do something similar and have run into the problems you've
> > > mentioned. In my case, I access data from S3 and then generate separate
> > > DAGs (of different structures) based on the data that's pulled. I've
> > > also found that the UI for accessing a single large DAG is slow so I
> > > prefer to keep many separate DAGs. What I'd try is to define a DAG
> > > that's responsible for accessing your API and caching the client IDs
> > > somewhere locally, maybe just to a file on disk or as an Airflow
> > > Variable. You can run this DAG on whatever schedule is appropriate for
> > > you. From there, build a function that creates a DAG and then for each
> > > client ID, register a DAG built by that function to the global context.
> > > Like this:
> > > def create_client_dag(client_id):
> > >     # build dag here
> > >
> > > def get_client_ids_locally():
> > >     # access the data that was pulled from the API
> > >
> > > client_ids = get_client_ids_locally()
> > > for client in client_ids:
> > >     dag = create_client_dag(client)
> > >     globals()[dag.dag_id] = dag
> > >
> > > This approach also handles removing client IDs somewhat gracefully.
> DAGs
> > > for removed clients will still appear in the UI (you can build a
> > > maintenance DAG to clean that up), but they'll be disabled and their
> > > tasks won't be scheduled.
> > > On Wed, Mar 21, 2018, at 7:32 PM, Kyle Hamlin wrote:
> > > > Thanks for all the responses let me try to address the main themes.
> > > >
> > > > @Ace @Nicholas @Taylor
> > > > I originally started with a loop over my list of client ids and
> > > > created a> SparkSubmitOperator for each client. The pseudo code would
> > > look
> > > > something> like this:
> > > >
> > > > dag = DAG(...)
> > > >
> > > > client_ids = get_client_ids()
> > > > for client_id in client_ids:
> > > >     SparkSubmitOperator(
> > > >         ...
> > > >         dag=dag
> > > >     )
> > > >
> > > > I found that this approach kind of clunky for a few reasons.
> > > > First, the> get_cleint_ids() function was hitting our API every time
> > the
> > > dag
> > > > was read> by the scheduler which seemed excessive (every 30 seconds
> or
> > > > so?). Second,> it seemed like when a single task failure made marked
> > the
> > > whole
> > > > dag as a> failure, but I guess retrying till the task worked could
> > solve
> > > > this? Third,> the UI gets really clunky and slow, basically unusable
> > > when it
> > > > tries to> render the graph view for that many tasks. Finally, Airflow
> > > > doesn't seem> very happy when client_ids are removed i.e. the
> > > get_client_ids()
> > > > no longer> returns a specific client_id, it really seems to want a
> > > static dag.
> > > >
> > > > Do I really have to poll and API or database every 30 seconds for
> this>
> > > dynamic client_id data?
> > > >
> > > > @Ace
> > > > I have been limiting concurrency so as to not blast the cluster
> > > >
> > > > @Nicholas
> > > > Thank you for the noise suggestion I will definitely implement
> > > > that if I> continue with the same methodology
> > > >
> > > > @Taylor
> > > > Are you using a SubDagOperator? Or is your process similar to the
> > > > pseudo code I wrote above?
> > > >
> > > >
> > > > On Wed, Mar 21, 2018 at 2:49 PM Taylor Edmiston
> > > > <tedmiston@gmail.com> wrote:>
> > > >> We also use a similar approach to generate dynamic DAGs based on
> > > >> a common>> template DAG file.  We pull in the list of config
> objects,
> > > one
> > > >> per DAG,>> from an internal API lightly wrapping the database,
then
> we
> > > >> cache that>> response in a Airflow Variable that gets updated
once a
> > > minute.  The>> dynamic DAGs are generated from that variable.
> > > >>
> > > >> *Taylor Edmiston*
> > > >> TEdmiston.com <https://www.tedmiston.com/> | Blog
> > > >> <http://blog.tedmiston.com>
> > > >> Stack Overflow CV <https://stackoverflow.com/story/taylor> |
> > > LinkedIn>> <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > >> <https://angel.co/taylor>
> > > >>
> > > >>
> > > >> On Wed, Mar 21, 2018 at 1:54 PM, Nicolas Kijak <
> > > >> nicolas.kijak@weightwatchers.com> wrote:
> > > >>
> > > >>> Kyle,
> > > >>>
> > > >>> We have a similar approach but on a much, much smaller scale.
We
> > > >>> now have>>> <100 “things to process” but expect
it to grow to under
> > > ~200.  Each>> “thing
> > > >>> to process” has the same workflow so we have a single DAG
> > > >>> definition that>>> does about 20 tasks per, then we loop
over the
> > list
> > > of items and
> > > >>> produce>> a
> > > >>> dag object for each one adding it to the global definition.
> > > >>>
> > > >>> One of the things we quickly ran into was crushing the scheduler
> > as>>>
> > > everything was running with the same start time.  To get around
> > > >>> this we>> add
> > > >>> noise to the start time minute and seconds. Simply index % 60.
> > > This>>> spreads out the load so that the scheduler isn’t trying
to run
> > > >>> everything>>> at the exact same moment.  I would suggest
if you do
> go
> > > this
> > > >>> route, to>> also
> > > >>> stagger your hours if you can because of how many you plan to
run.
> > > >> Perhaps
> > > >>> your DAGs are smaller and aren’t as CPU intensive as ours.
> > > >>>
> > > >>> On 3/21/18, 1:35 PM, "Kyle Hamlin" <hamlin.kn@gmail.com>
wrote:
> > > >>>
> > > >>>   Hello,
> > > >>>
> > > >>>   I'm currently using Airflow for some ETL tasks where I submit
a
> > > >>>   spark>>> job
> > > >>>   to a cluster and poll till it is complete. This workflow is
nice
> > > >>> because it
> > > >>>   is typically a single Dag. I'm now starting to do more machine
> > > >> learning
> > > >>>   tasks and need to build a model per client which is 1000+
> > > >>>   clients. My>>>   spark cluster is capable of handling
this
> > workload,
> > > however, it
> > > >> doesn't
> > > >>>   seem scalable to write 1000+ dags to fit models for each client.
> > I>>
> > > want
> > > >>>   each client to have its own task instance so it can be retried
> > > >>>   if it>>>   fails without having to run all 1000+ tasks
over
> again.
> > > How do I
> > > >> handle
> > > >>>   this type of workflow in Airflow?
> > > >>>
> > > >>>
> > > >>>
> > > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message