airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Deng Xiaodong <xd.den...@gmail.com>
Subject Re: Multiple Schedulers - "scheduler_lock"
Date Sun, 03 Mar 2019 02:26:09 GMT
Thanks Max.

I have documented all the discussions around this topic & useful inputs into AIP-15 (Support
Multiple-Schedulers for HA & Better Scheduling Performance) https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651
<https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651>. 

More inputs from folks are welcomed.

Thanks.


XD

> On 3 Mar 2019, at 6:18 AM, Maxime Beauchemin <maximebeauchemin@gmail.com> wrote:
> 
> Personally I'd vote against the idea of having certain scheduler handling a
> subset of the DAGs, that's just not HA.
> 
> Also if you are in an env where you have a small number of large DAGs, the
> odds of having wasted work and double-firing get pretty high.
> 
> With the lock in place, it's just a matter of the scheduler loop to select
> (in a db transaction) the dag that's not been processed for the longest
> time that is not locked. Flipping the lock flag to true should be part of
> the db transaction. We probably need a btree index on lock and last
> processed time.
> 
> This way adding scheduler processes increases the scheduling pace, and
> provides an HA solution. No leader / master / slave or election process,
> just equal workers that work together.
> 
> Max
> 
> On Sat, Mar 2, 2019 at 7:04 AM Deng Xiaodong <xd.deng.r@gmail.com> wrote:
> 
>> Get your point and agree. And the suggestion you gave lastly to random
>> sort DAGs is a great idea to address it. Thanks!
>> 
>> XD
>> 
>>> On 2 Mar 2019, at 10:41 PM, Jarek Potiuk <Jarek.Potiuk@polidea.com>
>> wrote:
>>> 
>>> I think that the probability calculation holds only if there is no
>>> correlation between different schedulers. I think however there might be
>> an
>>> accidental correlation if you think about typical deployments.
>>> 
>>> Some details why I think accidental correlation is possible and even
>>> likely. Assume that:
>>> 
>>>  - we have similar and similarly busy machines running schedulers
>> (likely)
>>>  - time is synchronised between the machines (likely)
>>>  - the machines have the same DAG folders mounted (or copied) and the
>>>  same filesystem is used (this is exactly what multiple schedulers
>>>  deployment is all about)
>>>  - the schedulers start scanning at exactly the same time (crossing 0:00
>>>  second every full five minutes for example)  - this I am not sure but I
>>>  imagine this might be "typical" behaviour.
>>>  - they process list of DAGs in exactly the same sequence (it looks like
>>>  this is the case dag_processing
>>>  <
>> https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L300
>>> 
>>>  and models/__init__
>>>  <
>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L567
>>> :
>>>  we use os.walk which uses os.listdir for which sequence of processing
>>>  depends on the filesystem implementation
>>>  <
>> https://stackoverflow.com/questions/31534583/is-os-listdir-deterministic>
>>> and
>>>  then we append files to the list)
>>> 
>>> Then it's rather likely that the schedulers will be competing about the
>>> very same DAGs at the very beginning. Locking will change how quickly
>> they
>>> process each DAG of course, but If the DAGs are of similar sizes it's
>> also
>>> likely that the speed of scanning (DAGS/s) is similar for all schedulers.
>>> The schedulers will then catch-up with each other and might pretty much
>>> continuously compete for the same DAGs almost all the time.
>>> 
>>> It can be mitigated super-easily by random sorting of the DAGs folder
>> list
>>> after it is prepared (it's file-system dependent now so we do not rely on
>>> particular order) . Then the probability numbers will hold perfectly I
>>> think :)
>>> 
>>> J.
>>> 
>>> 
>>> On Sat, Mar 2, 2019 at 2:41 PM Deng Xiaodong <xd.deng.r@gmail.com>
>> wrote:
>>> 
>>>> I’m thinking of which architecture would be ideal.
>>>> 
>>>> 
>>>> # Option-1:
>>>> The master-slave architecture would be one option. But leader-selection
>>>> will be very essential to consider, otherwise we have issue in terms of
>> HA
>>>> again.
>>>> 
>>>> 
>>>> # Option-2:
>>>> Another option we may consider is to simply start multiple scheduler
>>>> instances (just using the current implementation, after modify &
>> validate
>>>> the scheduler_lock on DagModel).
>>>> 
>>>> - In this case, given we handle everything properly using locking, we
>>>> don’t need to worry too much about double-scheduling/triggering.
>>>> 
>>>> - Another potential concern I had earlier is that different schedulers
>> may
>>>> compete with each other and cause “waste” of scheduler resource.
>>>> After further thinking, I realise this is a typical Birthday Problem.
>>>> Given we have m DAGs, and n schedulers, at any moment, the probability
>>>> that all schedulers are working on different DAGs is m!/((m-n)! *
>> (m^n)),
>>>> and the probability that there are schedulers competing on the same DAG
>>>> will be 1-m!/((m-n)! * (m^n)).
>>>> 
>>>> Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, the
>>>> probability that there is schedulers competing on the same DAG is only
>>>> 0.5%. If we run 2 schedulers against 300 DAGs, this probability is only
>>>> 0.33%.
>>>> (This probability will be higher if m/n is low. But users should not
>> start
>>>> too many schedulers if they don’t have that many DAGs).
>>>> 
>>>> Given the probability of schedulers competing is so low, my concern on
>>>> scheduler resource waste is not really valid.
>>>> 
>>>> 
>>>> 
>>>> Based on these calculations/assessment, I think we can go for option-2,
>>>> i.e. we don’t make big change in the current implementation. Instead, we
>>>> ensure the scheduler_lock is working well and test intensively on
>> running
>>>> multiple schedulers. Then we should be good to let users know that it’s
>>>> safe to run multiple schedulers.
>>>> 
>>>> Please share your thoughts on this and correct me if I’m wrong in any
>>>> point above. Thanks.
>>>> 
>>>> 
>>>> XD
>>>> 
>>>> 
>>>> Reference: https://en.wikipedia.org/wiki/Birthday_problem <
>>>> https://en.wikipedia.org/wiki/Birthday_problem>
>>>> 
>>>> 
>>>>> On 2 Mar 2019, at 3:39 PM, Tao Feng <fengtao04@gmail.com> wrote:
>>>>> 
>>>>> Does the proposal use master-slave architecture(leader scheduler vs
>> slave
>>>>> scheduler)?
>>>>> 
>>>>> On Fri, Mar 1, 2019 at 5:32 PM Kevin Yang <yrqls21@gmail.com> wrote:
>>>>> 
>>>>>> Preventing double-triggering by separating DAG files different
>>>> schedulers
>>>>>> parse sounds easier and more intuitive. I actually removed one of
the
>>>>>> double-triggering prevention logic here
>>>>>> <
>>>>>> 
>>>> 
>> https://github.com/apache/airflow/pull/4234/files#diff-a7f584b9502a6dd19987db41a8834ff9L127
>>>>>>> (expensive)
>>>>>> and
>>>>>> was relying on this lock
>>>>>> <
>>>>>> 
>>>> 
>> https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L1233
>>>>>>> 
>>>>>> to
>>>>>> prevent double-firing and safe-guard our non-idempotent tasks( btw
the
>>>>>> insert can be insert overwrite to be idempotent).
>>>>>> 
>>>>>> Also tho in Airbnb we requeue tasks a lot, we haven't see
>> double-firing
>>>>>> recently.
>>>>>> 
>>>>>> Cheers,
>>>>>> Kevin Y
>>>>>> 
>>>>>> On Fri, Mar 1, 2019 at 2:08 PM Maxime Beauchemin <
>>>>>> maximebeauchemin@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Forgot to mention: the intention was to use the lock, but I never
>>>>>>> personally got to do the second phase which would consist of
skipping
>>>> the
>>>>>>> DAG if the lock is on, and expire the lock eventually based on
a
>> config
>>>>>>> setting.
>>>>>>> 
>>>>>>> Max
>>>>>>> 
>>>>>>> On Fri, Mar 1, 2019 at 1:57 PM Maxime Beauchemin <
>>>>>>> maximebeauchemin@gmail.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> My original intention with the lock was preventing
>> "double-triggering"
>>>>>> of
>>>>>>>> task (triggering refers to the scheduler putting the message
in the
>>>>>>> queue).
>>>>>>>> Airflow now has good "double-firing-prevention" of tasks
(firing
>>>>>> happens
>>>>>>>> when the worker receives the message and starts the task),
even if
>> the
>>>>>>>> scheduler was to go rogue or restart and send multiple triggers
for
>> a
>>>>>>> task
>>>>>>>> instance, the worker(s) should only start one task instance.
That's
>>>>>> done
>>>>>>> by
>>>>>>>> running the database assertions behind the conditions being
met as
>>>> read
>>>>>>>> database transaction (no task can alter the rows that validate
the
>>>>>>>> assertion while it's getting asserted). In practice it's
a little
>>>>>> tricky
>>>>>>>> and we've seen rogue double-firing in the past (I have no
idea how
>>>>>> often
>>>>>>>> that happens).
>>>>>>>> 
>>>>>>>> If we do want to prevent double-triggerring, we should make
sure
>> that
>>>> 2
>>>>>>>> schedulers aren't processing the same DAG or DagRun at the
same
>> time.
>>>>>>> That
>>>>>>>> would mean for the scheduler to not start the process of
locked
>> DAGs,
>>>>>> and
>>>>>>>> by providing a mechanism to expire the locks after some time.
>>>>>>>> 
>>>>>>>> Has anyone experienced double firing lately? If that exist
we should
>>>>>> fix
>>>>>>>> it, but also be careful around multiple scheduler double-triggering
>> as
>>>>>> it
>>>>>>>> would make that problem potentially much worse.
>>>>>>>> 
>>>>>>>> Max
>>>>>>>> 
>>>>>>>> On Fri, Mar 1, 2019 at 8:19 AM Deng Xiaodong <xd.deng.r@gmail.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> It’s exactly what my team is doing & what I shared
here earlier
>> last
>>>>>>> year
>>>>>>>>> (
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>> <
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>> 
>>>>>>>>> )
>>>>>>>>> 
>>>>>>>>> It’s somehow a “hacky” solution (and HA is not
addressed), and now
>>>> I’m
>>>>>>>>> thinking how we can have it more proper & robust.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> XD
>>>>>>>>> 
>>>>>>>>>> On 2 Mar 2019, at 12:04 AM, Mario Urquizo <
>> mario.urquizo@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> We have been running multiple schedulers for about
3 months.  We
>>>>>>> created
>>>>>>>>>> multiple services to run airflow schedulers.  The
only difference
>> is
>>>>>>>>> that
>>>>>>>>>> we have each of the schedulers pointed to a directory
one level
>>>>>> deeper
>>>>>>>>> than
>>>>>>>>>> the DAG home directory that the workers and webapp
use. We have
>> seen
>>>>>>>>> much
>>>>>>>>>> better scheduling performance but this does not yet
help with HA.
>>>>>>>>>> 
>>>>>>>>>> DAGS_HOME:
>>>>>>>>>> {airflow_home}/dags  (webapp & workers)
>>>>>>>>>> {airflow_home}/dags/group-a/ (scheduler1)
>>>>>>>>>> {airflow_home}/dags/group-b/ (scheduler2)
>>>>>>>>>> {airflow_home}/dags/group-etc/ (scheduler3)
>>>>>>>>>> 
>>>>>>>>>> Not sure if this helps, just sharing in case it does.
>>>>>>>>>> 
>>>>>>>>>> Thank you,
>>>>>>>>>> Mario
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Fri, Mar 1, 2019 at 9:44 AM Bolke de Bruin <bdbruin@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> I have done quite some work on making it possible
to run multiple
>>>>>>>>>>> schedulers at the same time.  At the moment I
don’t think there
>> are
>>>>>>>>> real
>>>>>>>>>>> blockers actually to do so. We just don’t actively
test it.
>>>>>>>>>>> 
>>>>>>>>>>> Database locking is mostly in place (DagRuns
and TaskInstances).
>>>>>> And
>>>>>>> I
>>>>>>>>>>> think the worst that can happen is that a task
is scheduled
>> twice.
>>>>>>> The
>>>>>>>>> task
>>>>>>>>>>> will detect this most of the time and kill one
off if concurrent
>> if
>>>>>>> not
>>>>>>>>>>> sequential then I will run again in some occasions.
Everyone is
>>>>>>> having
>>>>>>>>>>> idempotent tasks right so no harm done? ;-)
>>>>>>>>>>> 
>>>>>>>>>>> Have you encountered issues? Maybe work those
out?
>>>>>>>>>>> 
>>>>>>>>>>> Cheers
>>>>>>>>>>> Bolke.
>>>>>>>>>>> 
>>>>>>>>>>> Verstuurd vanaf mijn iPad
>>>>>>>>>>> 
>>>>>>>>>>>> Op 1 mrt. 2019 om 16:25 heeft Deng Xiaodong
<
>> xd.deng.r@gmail.com>
>>>>>>> het
>>>>>>>>>>> volgende geschreven:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi Max,
>>>>>>>>>>>> 
>>>>>>>>>>>> Following
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>> <
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/0e21230e08f07ef6f8e3c59887e9005447d6932639d3ce16a103078f@%3Cdev.airflow.apache.org%3E
>>>>>>>>>> ,
>>>>>>>>>>> I’m trying to prepare an AIP for supporting
multiple-scheduler in
>>>>>>>>> Airflow
>>>>>>>>>>> (mainly for HA and Higher scheduling performance).
>>>>>>>>>>>> 
>>>>>>>>>>>> Along the process of code checking, I found
that there is one
>>>>>>>>> attribute
>>>>>>>>>>> of DagModel, “scheduler_lock”. It’s not
used at all in current
>>>>>>>>>>> implementation, but it was introduced long time
back (2015) to
>>>>>> allow
>>>>>>>>>>> multiple schedulers to work together (
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>>>>> <
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://github.com/apache/airflow/commit/2070bfc50b5aa038301519ef7c630f2fcb569620
>>>>>>>>>> 
>>>>>>>>>>> ).
>>>>>>>>>>>> 
>>>>>>>>>>>> Since you were the original author of it,
it would be very
>> helpful
>>>>>>> if
>>>>>>>>>>> you can kindly share why the multiple-schedulers
implementation
>> was
>>>>>>>>> removed
>>>>>>>>>>> eventually, and what challenges/complexity there
were.
>>>>>>>>>>>> (You already shared a few valuable inputs
in the earlier
>>>>>> discussion
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>>>>> <
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>> https://lists.apache.org/thread.html/d37befd6f04dbdbfd2a2d41722352603bc2e2f97fb47bdc5ba454d0c@%3Cdev.airflow.apache.org%3E
>>>>>>>>>> 
>>>>>>>>>>> , mainly relating to hiccups around concurrency,
cross DAG
>>>>>>>>> prioritisation &
>>>>>>>>>>> load on DB. Other than these, anything else you
would like to
>>>>>>> advise?)
>>>>>>>>>>>> 
>>>>>>>>>>>> I will also dive into the git history further
to understand it
>>>>>>> better.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> XD
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>>>> 
>>> 
>>> --
>>> 
>>> Jarek Potiuk
>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>> 
>>> M: +48 660 796 129 <+48660796129>
>>> E: jarek.potiuk@polidea.com
>> 
>> 


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message