airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@apache.org>
Subject Re: 1.7.1 release blocker: [AIRFLOW-56] Airflow's scheduler can "lose" queued tasks
Date Fri, 06 May 2016 20:52:12 GMT
> Postgres, Mysql >= 5.7)

Curious--does this work in MySQL 5.6?

On Fri, May 6, 2016 at 1:36 PM, Bolke de Bruin <bdbruin@gmail.com> wrote:

> Dear All,
>
> One of the remaining issues to resolve is Airflow-56: the fact that
> airflow’s scheduler can “lose” queued tasks. One of the recent changes was
> the hold queue status in the executor instead using the database. While
> this goes against the “database knows the state”, it was done (afaik) to
> prevent a race condition when a scheduler might actually set the task twice
> for execution as the task might not have updated its state in the database
> yet. However, if the executor loses its queue for some reason the tasks
> that have a state of queued in the database will never get picked up.
> Airflow-56 contains a DAG that will exhibit this behavior on clean install
> of 1.7.1rc3 with mysql and local executor.
>
> Jeremiah has worked on
> https://github.com/apache/incubator-airflow/pull/1378, that basically
> reverts the behavior to using the database (pre 1.7.0), but contains some
> additional fixes. In addition I have asked him to include the following:
>
> @provide_session
> def refresh_from_db(self, session=None, lock_for_update=False):
>     """
>     Refreshes the task instance from the database based on the primary key
>     """
>     TI = TaskInstance
>
>     if lock_for_update:
>         ti = session.query(TI).filter(
>             TI.dag_id == self.dag_id,
>             TI.task_id == self.task_id,
>             TI.execution_date == self.execution_date,
>         ).with_for_update().first()
>     else:
>         ti = session.query(TI).filter(
>             TI.dag_id == self.dag_id,
>             TI.task_id == self.task_id,
>             TI.execution_date == self.execution_date,
>         ).first()
>
> And to use “lock_for_update=True” in TaskInstance.run. This locks the
> record for update (with a FOR UPDATE) in the database (Postgres, Mysql >=
> 5.7) and ensures a “select” needs to wait if it includes this record in its
> results. As soon as a “session.commit()” occurs the lock is gone. The
> window of for the race condition to occur is this much shorter now (the
> time between a new scheduler run and the time it takes for the python
> interpreter to start “airflow run”) though not entirely closed.
>
> Due to the lock their *might* be a small performance hit, but it should be
> very small: only queries that include the aforementioned locked record will
> be delayed for a few milliseconds. However, we might overlook something so
> I kindly request to review this PR, so we can make it part of 1.7.1.
>
> Thanks
> Bolke
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message