airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Clifton King (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-1258) TaskInstances within SubDagOperator are marked as failed after an hour
Date Fri, 13 Oct 2017 16:40:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-1258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16203817#comment-16203817
] 

Clifton King commented on AIRFLOW-1258:
---------------------------------------

+1 this is affecting me as well. is there any fix for this?

> TaskInstances within SubDagOperator are marked as failed after an hour
> ----------------------------------------------------------------------
>
>                 Key: AIRFLOW-1258
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1258
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.8.1
>            Reporter: John Doe
>             Fix For: 1.9.0
>
>
> We have multiple SubDagOperators which we use to isolate individual units in our broader
dags (we typically have tens of SubDagOperators in a given DAG). For any TaskInstance inside
the SubDag which runs over an hour, the dag fails right after the 1 hour mark.
> This is completely unrelated to our codebase and can be recreated with a sleep BashOperator:
> {code}
> from datetime import datetime
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.subdag_operator import SubDagOperator
> DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}
> def define_sub(dag, step_name, sleeptime):
>     op = BashOperator(
>         task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag
>     )
>     return dag
> def gen_sub_dag(parent_name, step_name, sleeptime):
>     sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS)
>     define_sub(sub, step_name, sleeptime)
>     return sub
> long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None)
> long_sub_dag = SubDagOperator(
>     subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub',
dag=long_runner_parent
> )
> {code}
> Under Airflow 1.7.1.3, we would see the following error in the SubDagOperator:
> {code} [2017-05-25 17:08:56,082] {jobs.py:965} ERROR - The airflow run command failed
at reporting an error. This should not occur in normal circumstances. Task state is 'running',reported
state is 'success'. TI is <TaskInstance: long_runner.long_runner_sub.long_runner_sub 2017-05-24
16:00:00 [running]> {code}
> which we could then manually mark as 'success' in the airflow database. However, starting
in 1.8.1 (we skipped 1.8.0 as AIRFLOW-1004 was a hard blocker) the SubDag now instead fails
outright.
> Nothing in the logs indicate any reason for the failure - we've reduced the level to
DEBUG and still see nothing.
> airflow-scheduler.log:
> {code}
> 2017-05-31 19:44:10,260 INFO - Heartbeating the process manager
> 2017-05-31 19:44:10,263 INFO - Started a process (PID: 6462) to generate tasks for /efs/airflow/dags/long_running.py
- logging into /opt/airflow/logs/scheduler/2017-05-31/long_running.py.log
> 2017-05-31 19:44:10,268 INFO - Heartbeating the executor
> 2017-05-31 19:44:10,271 INFO - Executor reports long_runner.long_runner_sub execution_date=2017-05-31
18:42:55.400517 as failed
> 2017-05-31 19:44:10,324 INFO - Heartbeating the scheduler
> {code}
>  
> In the SubDagOperator log, we see a second task queued immediately before the failure
- despite the original task running unabated:
> {code}
> [2017-05-31 19:44:04,441] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow
run long_runner long_runner_sub 2017-05-31T18:42:55.400517 --job_id 108 --raw -sd DAGS_FOLDER/long_running.py']
> [2017-05-31 19:44:05,816] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:05,815]
{models.py:1122} INFO - Dependencies not met for <TaskInstance: long_runner.long_runner_sub
2017-05-31 18:42:55.400517 [running]>, dependency 'Task Instance State' FAILED: Task is
in the 'running' state which is not a valid state for execution. The task must be cleared
in order to be run.
> [2017-05-31 19:44:05,816] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:05,816]
{models.py:1148} DEBUG - <TaskInstance: long_runner.long_runner_sub 2017-05-31 18:42:55.400517
[running]> dependency 'Trigger Rule' PASSED: True, The task instance did not have any upstream
tasks.
> [2017-05-31 19:44:05,817] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:05,817]
{models.py:1148} DEBUG - <TaskInstance: long_runner.long_runner_sub 2017-05-31 18:42:55.400517
[running]> dependency 'Task Instance Not Already Running' PASSED: False, Task is already
running, it started on 2017-05-31 18:43:03.494829.
> [2017-05-31 19:44:05,817] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:05,817]
{models.py:1122} INFO - Dependencies not met for <TaskInstance: long_runner.long_runner_sub
2017-05-31 18:42:55.400517 [running]>, dependency 'Task Instance Not Already Running' FAILED:
Task is already running, it started on 2017-05-31 18:43:03.494829.
> [2017-05-31 19:44:05,817] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:05,817]
{models.py:1148} DEBUG - <TaskInstance: long_runner.long_runner_sub 2017-05-31 18:42:55.400517
[running]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked
for retrying.
> [2017-05-31 19:44:05,818] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:05,817]
{models.py:1148} DEBUG - <TaskInstance: long_runner.long_runner_sub 2017-05-31 18:42:55.400517
[running]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past
set.
> [2017-05-31 19:44:10,467] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:10,466]
{jobs.py:1722} DEBUG - Executor state: failed task <TaskInstance: long_runner.long_runner_sub.long_runner_sub
2017-05-31 18:42:55.400517 [running]>
> [2017-05-31 19:44:10,467] {base_task_runner.py:95} INFO - Subtask: [2017-05-31 19:44:10,467]
{jobs.py:1729} ERROR - Executor reports task instance <TaskInstance: long_runner.long_runner_sub.long_runner_sub
2017-05-31 18:42:55.400517 [running]> finished (failed) although the task says its running.
Was the task killed externally?
> {code}
> It's unclear if the second task is a response to the first incorrectly being marked as
a failure, or if the second task being queued causes the failure state when it takes the poison
pill



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message