airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@apache.org>
Subject Re: 1.7.1 release blocker: [AIRFLOW-56] Airflow's scheduler can "lose" queued tasks
Date Fri, 06 May 2016 21:02:25 GMT
Same :)

On Fri, May 6, 2016 at 1:57 PM, Arthur Wiedmer <arthur.wiedmer@gmail.com>
wrote:

> Good, because that's the version we use :)
>
> On Fri, May 6, 2016 at 1:55 PM, Bolke de Bruin <bdbruin@gmail.com> wrote:
>
> > It seems it does: https://dev.mysql.com/doc/refman/5.6/en/select.html
> >
> > Bolke
> >
> > Sent from my iPhone
> >
> > On 6 mei 2016, at 22:52, Chris Riccomini <criccomini@apache.org> wrote:
> >
> > >> Postgres, Mysql >= 5.7)
> > >
> > > Curious--does this work in MySQL 5.6?
> > >
> > >> On Fri, May 6, 2016 at 1:36 PM, Bolke de Bruin <bdbruin@gmail.com>
> > wrote:
> > >>
> > >> Dear All,
> > >>
> > >> One of the remaining issues to resolve is Airflow-56: the fact that
> > >> airflow’s scheduler can “lose” queued tasks. One of the recent changes
> > was
> > >> the hold queue status in the executor instead using the database.
> While
> > >> this goes against the “database knows the state”, it was done (afaik)
> to
> > >> prevent a race condition when a scheduler might actually set the task
> > twice
> > >> for execution as the task might not have updated its state in the
> > database
> > >> yet. However, if the executor loses its queue for some reason the
> tasks
> > >> that have a state of queued in the database will never get picked up.
> > >> Airflow-56 contains a DAG that will exhibit this behavior on clean
> > install
> > >> of 1.7.1rc3 with mysql and local executor.
> > >>
> > >> Jeremiah has worked on
> > >> https://github.com/apache/incubator-airflow/pull/1378, that basically
> > >> reverts the behavior to using the database (pre 1.7.0), but contains
> > some
> > >> additional fixes. In addition I have asked him to include the
> following:
> > >>
> > >> @provide_session
> > >> def refresh_from_db(self, session=None, lock_for_update=False):
> > >>    """
> > >>    Refreshes the task instance from the database based on the primary
> > key
> > >>    """
> > >>    TI = TaskInstance
> > >>
> > >>    if lock_for_update:
> > >>        ti = session.query(TI).filter(
> > >>            TI.dag_id == self.dag_id,
> > >>            TI.task_id == self.task_id,
> > >>            TI.execution_date == self.execution_date,
> > >>        ).with_for_update().first()
> > >>    else:
> > >>        ti = session.query(TI).filter(
> > >>            TI.dag_id == self.dag_id,
> > >>            TI.task_id == self.task_id,
> > >>            TI.execution_date == self.execution_date,
> > >>        ).first()
> > >>
> > >> And to use “lock_for_update=True” in TaskInstance.run. This locks the
> > >> record for update (with a FOR UPDATE) in the database (Postgres, Mysql
> > >=
> > >> 5.7) and ensures a “select” needs to wait if it includes this record
> in
> > its
> > >> results. As soon as a “session.commit()” occurs the lock is gone. The
> > >> window of for the race condition to occur is this much shorter now
> (the
> > >> time between a new scheduler run and the time it takes for the python
> > >> interpreter to start “airflow run”) though not entirely closed.
> > >>
> > >> Due to the lock their *might* be a small performance hit, but it
> should
> > be
> > >> very small: only queries that include the aforementioned locked record
> > will
> > >> be delayed for a few milliseconds. However, we might overlook
> something
> > so
> > >> I kindly request to review this PR, so we can make it part of 1.7.1.
> > >>
> > >> Thanks
> > >> Bolke
> > >>
> > >>
> > >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message