airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bolke de Bruin <bdbr...@gmail.com>
Subject Re: dag file processing times
Date Tue, 25 Apr 2017 07:01:56 GMT
We could of course write a module loader that takes care of the caching and maybe even the
manifest. This would help with versioning and could look a bit like the java class loader
(by separating the imported modules or making sure we always load the modules when loading
dags). Didn’t think about repercussions so there might be severe cons. Please note that
I don’t think the multiprocess processor solves the sys.modules issue entirely: cached modules
in the parent will still be there, so any dependencies the airflow scheduler itself brings
in will be in the processor. It is probably enough in 99% of the circumstances though.

On the config issue I don’t entirely agree. If you have a config that is available outside
your dag, this will still be loaded if you do not use serialisation. Strengthening my case
for just sending DAGs (and if needed dependencies) around and not use pickling/serialization
(btw the on the wire format of marshmallow is json).

Bolke.


> On 25 Apr 2017, at 01:09, Maxime Beauchemin <maximebeauchemin@gmail.com> wrote:
> 
> With configuration as code, you can't really know whether the DAG
> definition has changed based on whether the module was altered. This python
> module could be importing other modules that have been changed, could have
> read a config file somewhere on the drive that might have changed, or read
> from a DB that is constantly getting mutated.
> 
> There are also issues around the fact that Python caches modules in
> `sys.modules`, so even though the crawler is re-interpreting modules,
> imported modules wouldn't get re-interpreted [as our DAG authors expected]
> 
> For these reasons [and others I won't get into here], we decided that the
> scheduler would use a subprocess pool and re-interpret the DAGs from
> scratch at every cycle, insulating the different DAGs and guaranteeing no
> interpreter caching.
> 
> Side note: yaml parsing is much more expensive than other markup languages
> and would recommend working around it to store DAG configuration. Our
> longest-to-parse DAGs at Airbnb were reading yaml to build build a DAG, and
> I believe someone wrote custom logic to avoid reparsing the yaml at every
> cycle. Parsing equivalent json or hocon was an order of magnitude faster.
> 
> Max
> 
> On Mon, Apr 24, 2017 at 2:55 PM, Bolke de Bruin <bdbruin@gmail.com> wrote:
> 
>> Inotify can work without a daemon. Just fire a call to the API when a file
>> changes. Just a few lines in bash.
>> 
>> If you bundle you dependencies in a zip you should be fine with the above.
>> Or if we start using manifests that list the files that are needed in a
>> dag...
>> 
>> 
>> Sent from my iPhone
>> 
>>> On 24 Apr 2017, at 22:46, Dan Davydov <dan.davydov@airbnb.com.INVALID>
>> wrote:
>>> 
>>> One idea to solve this is to use a daemon that uses inotify to watch for
>>> changes in files and then reprocesses just those files. The hard part is
>>> without any kind of dependency/build system for DAGs it can be hard to
>> tell
>>> which DAGs depend on which files.
>>> 
>>> On Mon, Apr 24, 2017 at 1:21 PM, Gerard Toonstra <gtoonstra@gmail.com>
>>> wrote:
>>> 
>>>> Hey,
>>>> 
>>>> I've seen some people complain about DAG file processing times. An issue
>>>> was raised about this today:
>>>> 
>>>> https://issues.apache.org/jira/browse/AIRFLOW-1139
>>>> 
>>>> I attempted to provide a good explanation what's going on. Feel free to
>>>> validate and comment.
>>>> 
>>>> 
>>>> I'm noticing that the file processor is a bit naive in the way it
>>>> reprocesses DAGs. It doesn't look at the DAG interval for example, so it
>>>> looks like it reprocesses all files continuously in one big batch, even
>> if
>>>> we can determine that the next "schedule"  for all its dags are in the
>>>> future?
>>>> 
>>>> 
>>>> Wondering if a change in the DagFileProcessingManager could optimize
>> things
>>>> a bit here.
>>>> 
>>>> In the part where it gets the simple_dags from a file it's currently
>>>> processing:
>>>> 
>>>>               for simple_dag in processor.result:
>>>>                   simple_dags.append(simple_dag)
>>>> 
>>>> the file_path is in the context and the simple_dags should be able to
>>>> provide the next interval date for each dag in the file.
>>>> 
>>>> The idea is to add files to a sorted deque by "next_schedule_datetime"
>> (the
>>>> minimum next interval date), so that when we build the list
>>>> "files_paths_to_queue", it can remove files that have dags that we know
>>>> won't have a new dagrun for a while.
>>>> 
>>>> One gotcha to resolve after that is to deal with files getting updated
>> with
>>>> new dags or changed dag definitions and renames and different interval
>>>> schedules.
>>>> 
>>>> Worth a PR to glance over?
>>>> 
>>>> Rgds,
>>>> 
>>>> Gerard
>>>> 
>> 


Mime
View raw message