airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Meickle (JIRA)" <>
Subject [jira] [Created] (AIRFLOW-2229) Scheduler cannot retry abrupt task failures within factory-generated DAGs
Date Tue, 20 Mar 2018 15:25:00 GMT
James Meickle created AIRFLOW-2229:

             Summary: Scheduler cannot retry abrupt task failures within factory-generated
                 Key: AIRFLOW-2229
             Project: Apache Airflow
          Issue Type: Bug
          Components: scheduler
    Affects Versions: 1.9.0
            Reporter: James Meickle

We had an issue where one of our tasks failed without the worker updating state (unclear
why, but let's assume it was an OOM), resulting in this series of error messages:

{{Mar 20 14:27:05 airflow_scheduler-stdout.log:
[2018-03-20 14:27:04,993] \{{}} ERROR - Executor reports task instance %s finished
(%s) although the task says its %s. Was the task killed externally?}}
{{Mar 20 14:27:05 airflow_scheduler-stdout.log:
{{Mar 20 14:27:05 airflow_scheduler-stdout.log:
[2018-03-20 14:27:04,994] \{{}} ERROR - Cannot load the dag bag to handle failure
for <TaskInstance: nightly_dataload.dummy_operator 2018-03-19 00:00:00 [queued]>. Setting
task to FAILED without callbacks or retries. Do you have enough resources?}}

Mysterious failures are not unexpected, because we are in the cloud, after all. The concern
is the last line: ignoring callbacks and retries, implying that it's a lack of resources.
However, the machine was totally underutilized at the time.

I dug into this code a bit more and as far as I can tell this error is happening in this code
path: []

{{    simple_dag = simple_dag_bag.get_dag(dag_id)}}
{{    dagbag = models.DagBag(simple_dag.full_filepath)}}
{{    dag = dagbag.get_dag(dag_id)}}
{{    ti.task = dag.get_task(task_id)}}
{{    ti.handle_failure(msg)}}
{{except Exception:}}
{{    self.log.error("Cannot load the dag bag to handle failure for %s"}}
{{    ". Setting task to FAILED without callbacks or "}}
{{    "retries. Do you have enough resources?", ti)}}
{{    ti.state = State.FAILED}}
{{    session.merge(ti)}}
{{    session.commit()}}{{}}

I am not very familiar with this code, nor do I have time to attach a debugger at the moment,
but I think what is happening here is:
 * I have a factory Python file, which imports and instantiates DAG code from other files.
 * The scheduler loads the DAGs from the factory file on the filesystem. It gets a fileloc
(as represented in the DB) not of the factory file, but of the file it loaded code from.
 * The scheduler makes a simple DAGBag from the instantiated DAGs.
 * This line of code uses the simple DAG, which references the original DAG object's fileloc,
to create a new DAGBag object.
 * This DAGBag looks for the original DAG in the fileloc, which is the file containing that
DAG's _code_, but is not actually importable by Airflow.
 * An exception is raised trying to load the DAG from the DAGBag, which found nothing.
 * Handling of the task failure never occurs.
 * The over-broad Exception code swallows all of the above occurring.
 * There's just a generic error message that is not helpful to a system operator.

If this is the case, at minimum, the try/except should be rewritten to be more graceful and
to have a better error message. But I question whether this level of DAGBag abstraction/indirection
isn't making this failure case worse than it needs to be; under normal conditions the scheduler
is definitely able to find the relevant factory-generated DAGs and execute tasks within them
as expected, even with the fileloc set to the code path and not the import path.


This message was sent by Atlassian JIRA

View raw message