airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Meickle (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AIRFLOW-2229) Scheduler cannot retry abrupt task failures within factory-generated DAGs
Date Tue, 20 Mar 2018 15:25:00 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-2229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

James Meickle updated AIRFLOW-2229:
-----------------------------------
    Description: 
We had an issue where one of our tasks failed without the worker updating state (unclear
why, but let's assume it was an OOM), resulting in this series of error messages:

{{Mar 20 14:27:05 airflow-core-i-0fc1f995414837b8b.stg.int.dynoquant.com airflow_scheduler-stdout.log:
[2018-03-20 14:27:04,993] \{{models.py:1595 ERROR - Executor reports task instance %s finished
(%s) although the task says its %s. Was the task killed externally?}}}}
{{ Mar 20 14:27:05 airflow-core-i-0fc1f995414837b8b.stg.int.dynoquant.com airflow_scheduler-stdout.log:
NoneType}}
{{ Mar 20 14:27:05 airflow-core-i-0fc1f995414837b8b.stg.int.dynoquant.com airflow_scheduler-stdout.log:
[2018-03-20 14:27:04,994] {{jobs.py:1435 ERROR - Cannot load the dag bag to handle failure
for <TaskInstance: nightly_dataload.dummy_operator 2018-03-19 00:00:00 [queued]>. Setting
task to FAILED without callbacks or retries. Do you have enough resources?}}}}

Mysterious failures are not unexpected, because we are in the cloud, after all. The concern
is the last line: ignoring callbacks and retries, implying that it's a lack of resources.
However, the machine was totally underutilized at the time.

I dug into this code a bit more and as far as I can tell this error is happening in this code
path: [https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/jobs.py#L1427]

{{self.log.error(msg)}}
 {{try:}}
 {{    simple_dag = simple_dag_bag.get_dag(dag_id)}}
 {{    dagbag = models.DagBag(simple_dag.full_filepath)}}
 {{    dag = dagbag.get_dag(dag_id)}}
 {{    ti.task = dag.get_task(task_id)}}
 {{    ti.handle_failure(msg)}}
 {{except Exception:}}
 {{    self.log.error("Cannot load the dag bag to handle failure for %s"}}
 {{    ". Setting task to FAILED without callbacks or "}}
 {{    "retries. Do you have enough resources?", ti)}}
 {{    ti.state = State.FAILED}}
 {{    session.merge(ti)}}
 {{    session.commit()}}{{}}

I am not very familiar with this code, nor do I have time to attach a debugger at the moment,
but I think what is happening here is:
 * I have a factory Python file, which imports and instantiates DAG code from other files.
 * The scheduler loads the DAGs from the factory file on the filesystem. It gets a fileloc
(as represented in the DB) not of the factory file, but of the file it loaded code from.
 * The scheduler makes a simple DAGBag from the instantiated DAGs.
 * This line of code uses the simple DAG, which references the original DAG object's fileloc,
to create a new DAGBag object.
 * This DAGBag looks for the original DAG in the fileloc, which is the file containing that
DAG's _code_, but is not actually importable by Airflow.
 * An exception is raised trying to load the DAG from the DAGBag, which found nothing.
 * Handling of the task failure never occurs.
 * The over-broad Exception code swallows all of the above occurring.
 * There's just a generic error message that is not helpful to a system operator.

If this is the case, at minimum, the try/except should be rewritten to be more graceful and
to have a better error message. But I question whether this level of DAGBag abstraction/indirection
isn't making this failure case worse than it needs to be; under normal conditions the scheduler
is definitely able to find the relevant factory-generated DAGs and execute tasks within them
as expected, even with the fileloc set to the code path and not the import path.

 

  was:
We had an issue where one of our tasks failed without the worker updating state (unclear
why, but let's assume it was an OOM), resulting in this series of error messages:

{{Mar 20 14:27:05 airflow-core-i-0fc1f995414837b8b.stg.int.dynoquant.com airflow_scheduler-stdout.log:
[2018-03-20 14:27:04,993] \{{models.py:1595}} ERROR - Executor reports task instance %s finished
(%s) although the task says its %s. Was the task killed externally?}}
{{Mar 20 14:27:05 airflow-core-i-0fc1f995414837b8b.stg.int.dynoquant.com airflow_scheduler-stdout.log:
NoneType}}
{{Mar 20 14:27:05 airflow-core-i-0fc1f995414837b8b.stg.int.dynoquant.com airflow_scheduler-stdout.log:
[2018-03-20 14:27:04,994] \{{jobs.py:1435}} ERROR - Cannot load the dag bag to handle failure
for <TaskInstance: nightly_dataload.dummy_operator 2018-03-19 00:00:00 [queued]>. Setting
task to FAILED without callbacks or retries. Do you have enough resources?}}

Mysterious failures are not unexpected, because we are in the cloud, after all. The concern
is the last line: ignoring callbacks and retries, implying that it's a lack of resources.
However, the machine was totally underutilized at the time.

I dug into this code a bit more and as far as I can tell this error is happening in this code
path: [https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/jobs.py#L1427]

{{self.log.error(msg)}}
{{try:}}
{{    simple_dag = simple_dag_bag.get_dag(dag_id)}}
{{    dagbag = models.DagBag(simple_dag.full_filepath)}}
{{    dag = dagbag.get_dag(dag_id)}}
{{    ti.task = dag.get_task(task_id)}}
{{    ti.handle_failure(msg)}}
{{except Exception:}}
{{    self.log.error("Cannot load the dag bag to handle failure for %s"}}
{{    ". Setting task to FAILED without callbacks or "}}
{{    "retries. Do you have enough resources?", ti)}}
{{    ti.state = State.FAILED}}
{{    session.merge(ti)}}
{{    session.commit()}}{{}}

I am not very familiar with this code, nor do I have time to attach a debugger at the moment,
but I think what is happening here is:
 * I have a factory Python file, which imports and instantiates DAG code from other files.
 * The scheduler loads the DAGs from the factory file on the filesystem. It gets a fileloc
(as represented in the DB) not of the factory file, but of the file it loaded code from.
 * The scheduler makes a simple DAGBag from the instantiated DAGs.
 * This line of code uses the simple DAG, which references the original DAG object's fileloc,
to create a new DAGBag object.
 * This DAGBag looks for the original DAG in the fileloc, which is the file containing that
DAG's _code_, but is not actually importable by Airflow.
 * An exception is raised trying to load the DAG from the DAGBag, which found nothing.
 * Handling of the task failure never occurs.
 * The over-broad Exception code swallows all of the above occurring.
 * There's just a generic error message that is not helpful to a system operator.

If this is the case, at minimum, the try/except should be rewritten to be more graceful and
to have a better error message. But I question whether this level of DAGBag abstraction/indirection
isn't making this failure case worse than it needs to be; under normal conditions the scheduler
is definitely able to find the relevant factory-generated DAGs and execute tasks within them
as expected, even with the fileloc set to the code path and not the import path.

 


> Scheduler cannot retry abrupt task failures within factory-generated DAGs
> -------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2229
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2229
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.9.0
>            Reporter: James Meickle
>            Priority: Major
>
> We had an issue where one of our tasks failed without the worker updating state (unclear
why, but let's assume it was an OOM), resulting in this series of error messages:
> {{Mar 20 14:27:05 airflow-core-i-0fc1f995414837b8b.stg.int.dynoquant.com airflow_scheduler-stdout.log:
[2018-03-20 14:27:04,993] \{{models.py:1595 ERROR - Executor reports task instance %s finished
(%s) although the task says its %s. Was the task killed externally?}}}}
> {{ Mar 20 14:27:05 airflow-core-i-0fc1f995414837b8b.stg.int.dynoquant.com airflow_scheduler-stdout.log:
NoneType}}
> {{ Mar 20 14:27:05 airflow-core-i-0fc1f995414837b8b.stg.int.dynoquant.com airflow_scheduler-stdout.log:
[2018-03-20 14:27:04,994] {{jobs.py:1435 ERROR - Cannot load the dag bag to handle failure
for <TaskInstance: nightly_dataload.dummy_operator 2018-03-19 00:00:00 [queued]>. Setting
task to FAILED without callbacks or retries. Do you have enough resources?}}}}
> Mysterious failures are not unexpected, because we are in the cloud, after all. The concern
is the last line: ignoring callbacks and retries, implying that it's a lack of resources.
However, the machine was totally underutilized at the time.
> I dug into this code a bit more and as far as I can tell this error is happening in this
code path: [https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/jobs.py#L1427]
> {{self.log.error(msg)}}
>  {{try:}}
>  {{    simple_dag = simple_dag_bag.get_dag(dag_id)}}
>  {{    dagbag = models.DagBag(simple_dag.full_filepath)}}
>  {{    dag = dagbag.get_dag(dag_id)}}
>  {{    ti.task = dag.get_task(task_id)}}
>  {{    ti.handle_failure(msg)}}
>  {{except Exception:}}
>  {{    self.log.error("Cannot load the dag bag to handle failure for %s"}}
>  {{    ". Setting task to FAILED without callbacks or "}}
>  {{    "retries. Do you have enough resources?", ti)}}
>  {{    ti.state = State.FAILED}}
>  {{    session.merge(ti)}}
>  {{    session.commit()}}{{}}
> I am not very familiar with this code, nor do I have time to attach a debugger at the
moment, but I think what is happening here is:
>  * I have a factory Python file, which imports and instantiates DAG code from other files.
>  * The scheduler loads the DAGs from the factory file on the filesystem. It gets a fileloc
(as represented in the DB) not of the factory file, but of the file it loaded code from.
>  * The scheduler makes a simple DAGBag from the instantiated DAGs.
>  * This line of code uses the simple DAG, which references the original DAG object's
fileloc, to create a new DAGBag object.
>  * This DAGBag looks for the original DAG in the fileloc, which is the file containing
that DAG's _code_, but is not actually importable by Airflow.
>  * An exception is raised trying to load the DAG from the DAGBag, which found nothing.
>  * Handling of the task failure never occurs.
>  * The over-broad Exception code swallows all of the above occurring.
>  * There's just a generic error message that is not helpful to a system operator.
> If this is the case, at minimum, the try/except should be rewritten to be more graceful
and to have a better error message. But I question whether this level of DAGBag abstraction/indirection
isn't making this failure case worse than it needs to be; under normal conditions the scheduler
is definitely able to find the relevant factory-generated DAGs and execute tasks within them
as expected, even with the fileloc set to the code path and not the import path.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message