airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Len Frodgers (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AIRFLOW-703) Xcom data cleared too soon
Date Fri, 16 Dec 2016 15:20:58 GMT
Len Frodgers created AIRFLOW-703:
------------------------------------

             Summary: Xcom data cleared too soon
                 Key: AIRFLOW-703
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-703
             Project: Apache Airflow
          Issue Type: Bug
          Components: core, scheduler, xcom
    Affects Versions: Airflow 2.0, Airflow 1.7.1.3
         Environment: Tested using Dockerized Airflow setup with MySQL backend and Celery
executor
            Reporter: Len Frodgers



Xcom data is cleared at the start of the `run` method of the `TaskInstance`, regardless of
whether the TI is subsequently executed (e.g. if the TI has previously succeeded, it won't
execute). This means that if a TI for a DagRun is run twice in close succession, the latter
will not execute (since the former TI succeeded or is still running), but WILL clear any xcoms
set by the former TI. Therefore, and downstream tasks depending on these xcoms will fail.

I noticed this bug when I changed num_runs of the scheduler from None to 10. It didn't happen
every time, but probably 50% or so.

However, I can reproduce this reliably and repeatably with the following test dag:

```
def func1(ti, **kwargs):
    ti.xcom_push("k1", "xcom_custom")
    return "xcom_default"


def func2(ti, **kwargs):
    time.sleep(10)
    xcom_custom = ti.xcom_pull("op1", key="k1")
    xcom_default = ti.xcom_pull("op1")
    logging.info("Default: %s, Custom: %s", xcom_default, xcom_custom)
    assert None not in (xcom_custom, xcom_default)


bug_dag = DAG(
    'xcom_bug',
    max_active_runs=1,
    schedule_interval=timedelta(minutes=3),
    start_date=datetime(2016, 12, 16, 14, 57)
)
op1 = PythonOperator(python_callable=func1, provide_context=True, task_id="op1", dag=bug_dag)
op2 = PythonOperator(python_callable=func2, provide_context=True, task_id="op2", dag=bug_dag)
op1.set_downstream(op2)
```

To make op1 execute twice, I use the UI to run it twice while op2 is doing the `time.sleep`.

Logs from running this:
[will attach]

The fix seems straightforward: don't clear xcom unless the TI will actually execute. Will
happily create a PR.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message