airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Meickle (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AIRFLOW-3418) Task stuck in running state, unable to clear
Date Thu, 29 Nov 2018 14:33:00 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-3418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

James Meickle updated AIRFLOW-3418:
-----------------------------------
    Description: 
One of our tasks (a custom operator that sleep-waits until NYSE market close) got stuck in
a "running" state in the metadata db without making any progress. This is what it looked like
in the logs:

{code:java}
[2018-11-29 00:01:14,064] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
[2018-11-29 00:01:14,063] {{cli.py:484}} INFO - Running <TaskInstance: reconciliation_filemover.after_close
2018-11-28T00:00:00+00:00 [running]> on host airflow-core-i-0a53cac37067d957d.dlg.fnd.dynoquant.com
[2018-11-29 06:03:57,643] {{models.py:1355}} INFO - Dependencies not met for <TaskInstance:
reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 [running]>, dependency 'Task
Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution.
The task must be cleared in order to be run.
[2018-11-29 06:03:57,644] {{models.py:1355}} INFO - Dependencies not met for <TaskInstance:
reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 [running]>, dependency 'Task
Instance Not Already Running' FAILED: Task is already running, it started on 2018-11-29 00:01:10.876344+00:00.
[2018-11-29 06:03:57,646] {{logging_mixin.py:95}} INFO - [2018-11-29 06:03:57,646] {{jobs.py:2614}}
INFO - Task is not able to be run
{code}


Seeing this state, we attempted to "clear" it in the web UI. This yielded a complex backtrace:

{code:java}
Traceback (most recent call last):
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", line
1982, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", line
1614, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", line
1517, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/_compat.py", line
33, in reraise
    raise value
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", line
1612, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", line
1598, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask_appbuilder/security/decorators.py",
line 26, in wraps
    return f(self, *args, **kwargs)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/decorators.py",
line 55, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/views.py",
line 837, in clear
    include_upstream=upstream)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 4011, in sub_dag
    dag = copy.deepcopy(self)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166, in deepcopy
    y = copier(memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 3996, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166, in deepcopy
    y = copier(memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 2740, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct
    state = deepcopy(state, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct
    state = deepcopy(state, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct
    state = deepcopy(state, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct
    state = deepcopy(state, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 218, in _deepcopy_list
    y.append(deepcopy(a, memo))
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct
    state = deepcopy(state, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 174, in deepcopy
    rv = reductor(4)
TypeError: cannot serialize '_io.TextIOWrapper' object
{code}

After browsing through Airflow's code I had a suspicion that this was simply the "clear" code
in the UI not handling some property on one of our operators. I instead used the Browse feature
to edit the metadata state db directly. This did result in the status change; in the task
being set to "up_for_retry", and the same logfile now having additional contents:

{code:java}
[2018-11-29 14:18:11,390] {{logging_mixin.py:95}} INFO - [2018-11-29 14:18:11,390] {{jobs.py:2695}}
WARNING - State of this instance has been externally set to failed. Taking the poison pill.
[2018-11-29 14:18:11,399] {{helpers.py:240}} INFO - Sending Signals.SIGTERM to GPID 5287
[2018-11-29 14:18:11,399] {{models.py:1636}} ERROR - Received SIGTERM. Terminating subprocesses.
[2018-11-29 14:18:11,418] {{models.py:1760}} ERROR - Task received SIGTERM signal
Traceback (most recent call last):
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 1654, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py",
line 78, in execute
    sleep(self.poke_interval)
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 1638, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2018-11-29 14:18:11,420] {{models.py:1783}} INFO - Marking task as UP_FOR_RETRY
[2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
Traceback (most recent call last):
[2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/bin/airflow", line 32, in <module>
[2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    args.func(args)
[2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/cli.py",
line 74, in wrapper
[2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    return f(*args, **kwargs)
[2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py",
line 490, in run
[2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    _run(args, dag, ti)
[2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py",
line 406, in _run
[2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    pool=args.pool,
[2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/db.py",
line 74, in wrapper
[2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    return func(*args, **kwargs)
[2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 1654, in _run_raw_task
[2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    result = task_copy.execute(context=context)
[2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py",
line 78, in execute
[2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    sleep(self.poke_interval)
[2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 1638, in signal_handler
[2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    raise AirflowException("Task received SIGTERM signal")
[2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2018-11-29 14:18:11,693] {{helpers.py:230}} INFO - Process psutil.Process(pid=5287 (terminated))
(5287) terminated with exit code 1
[2018-11-29 14:18:11,694] {{logging_mixin.py:95}} INFO - [2018-11-29 14:18:11,693] {{jobs.py:2627}}
INFO - Task exited with return code 0
{code}

The log line about "not able to be run" comes from jobs.py and it's unclear to me why this
would be called in this case (two workers grabbing the same message...?) or why the task would
just hang in a "running" state: https://github.com/apache/incubator-airflow/blob/1.10.1/airflow/jobs.py#L2614

We had not previously observed any of this behavior. We had just upgraded to 1.10.1 earlier
this week.

 

 

  was:
One of our tasks (a custom operator that sleep-waits until NYSE market close) got stuck in
a "running" state in the metadata db without making any progress. This is what it looked like
in the logs:

{{[2018-11-29 06:03:57,643] \{{models.py:1355}} INFO - Dependencies not met for <TaskInstance:
reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 [running]>, dependency 'Task
Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution.
The task must be cleared in order to be run.}}
{{[2018-11-29 06:03:57,644] \{{models.py:1355}} INFO - Dependencies not met for <TaskInstance:
reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 [running]>, dependency 'Task
Instance Not Already Running' FAILED: Task is already running, it started on 2018-11-29 00:01:10.876344+00:00.}}
{{[2018-11-29 06:03:57,646] \{{logging_mixin.py:95}} INFO - [2018-11-29 06:03:57,646] \{{jobs.py:2614}}
INFO - Task is not able to be run}}

Seeing this state, we attempted to "clear" it in the web UI. This yielded a complex backtrace:

{{Traceback (most recent call last):}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", line
1982, in wsgi_app}}
{{ response = self.full_dispatch_request()}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", line
1614, in full_dispatch_request}}
{{ rv = self.handle_user_exception(e)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", line
1517, in handle_user_exception}}
{{ reraise(exc_type, exc_value, tb)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/_compat.py",
line 33, in reraise}}
{{ raise value}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", line
1612, in full_dispatch_request}}
{{ rv = self.dispatch_request()}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py", line
1598, in dispatch_request}}
{{ return self.view_functions[rule.endpoint](**req.view_args)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask_appbuilder/security/decorators.py",
line 26, in wraps}}
{{ return f(self, *args, **kwargs)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/decorators.py",
line 55, in wrapper}}
{{ return f(*args, **kwargs)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/views.py",
line 837, in clear}}
{{ include_upstream=upstream)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 4011, in sub_dag}}
{{ dag = copy.deepcopy(self)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166, in deepcopy}}
{{ y = copier(memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 3996, in __deepcopy__}}
{{ setattr(result, k, copy.deepcopy(v, memo))}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166, in deepcopy}}
{{ y = copier(memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 2740, in __deepcopy__}}
{{ setattr(result, k, copy.deepcopy(v, memo))}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy}}
{{ y = _reconstruct(x, rv, 1, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct}}
{{ state = deepcopy(state, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy}}
{{ y = _reconstruct(x, rv, 1, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct}}
{{ state = deepcopy(state, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy}}
{{ y = _reconstruct(x, rv, 1, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct}}
{{ state = deepcopy(state, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy}}
{{ y = _reconstruct(x, rv, 1, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct}}
{{ state = deepcopy(state, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 218, in _deepcopy_list}}
{{ y.append(deepcopy(a, memo))}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy}}
{{ y = _reconstruct(x, rv, 1, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct}}
{{ state = deepcopy(state, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy}}
{{ y = copier(x, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict}}
{{ y[deepcopy(key, memo)] = deepcopy(value, memo)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 174, in deepcopy}}
{{ rv = reductor(4)}}
{{TypeError: cannot serialize '_io.TextIOWrapper' object}}

After browsing through Airflow's code I had a suspicion that this was simply the "clear" code
in the UI not handling some property on one of our operators. I instead used the Browse feature
to edit the metadata state db directly. This did result in the status change; in the task
being set to "up_for_retry", and the same logfile now having additional contents:


{{[2018-11-29 14:18:11,390] \{{logging_mixin.py:95}} INFO - [2018-11-29 14:18:11,390] \{{jobs.py:2695}}
WARNING - State of this instance has been externally set to failed. Taking the poison pill.}}
{{[2018-11-29 14:18:11,399] \{{helpers.py:240}} INFO - Sending Signals.SIGTERM to GPID 5287}}
{{[2018-11-29 14:18:11,399] \{{models.py:1636}} ERROR - Received SIGTERM. Terminating subprocesses.}}
{{[2018-11-29 14:18:11,418] \{{models.py:1760}} ERROR - Task received SIGTERM signal}}
{{Traceback (most recent call last):}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 1654, in _run_raw_task}}
{{ result = task_copy.execute(context=context)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py",
line 78, in execute}}
{{ sleep(self.poke_interval)}}
{{ File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 1638, in signal_handler}}
{{ raise AirflowException("Task received SIGTERM signal")}}
{{airflow.exceptions.AirflowException: Task received SIGTERM signal}}
{{[2018-11-29 14:18:11,420] \{{models.py:1783}} INFO - Marking task as UP_FOR_RETRY}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
Traceback (most recent call last):}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
File "/home/airflow/virtualenvs/airflow/bin/airflow", line 32, in <module>}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
args.func(args)}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/cli.py",
line 74, in wrapper}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
return f(*args, **kwargs)}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py", line
490, in run}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
_run(args, dag, ti)}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py", line
406, in _run}}
{{[2018-11-29 14:18:11,445] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
pool=args.pool,}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/db.py",
line 74, in wrapper}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
return func(*args, **kwargs)}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", line
1654, in _run_raw_task}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
result = task_copy.execute(context=context)}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py",
line 78, in execute}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
sleep(self.poke_interval)}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", line
1638, in signal_handler}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
raise AirflowException("Task received SIGTERM signal")}}
{{[2018-11-29 14:18:11,446] \{{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
airflow.exceptions.AirflowException: Task received SIGTERM signal}}
{{[2018-11-29 14:18:11,693] \{{helpers.py:230}} INFO - Process psutil.Process(pid=5287 (terminated))
(5287) terminated with exit code 1}}
{{[2018-11-29 14:18:11,694] \{{logging_mixin.py:95}} INFO - [2018-11-29 14:18:11,693] \{{jobs.py:2627}}
INFO - Task exited with return code 0}}

The log line about "not able to be run" comes from jobs.py and it's unclear to me why this
would be called in this case (two workers grabbing the same message...?) or why the task would
just hang in a "running" state: https://github.com/apache/incubator-airflow/blob/1.10.1/airflow/jobs.py#L2614

We had not previously observed any of this behavior. We had just upgraded to 1.10.1 earlier
this week.

 

 


> Task stuck in running state, unable to clear
> --------------------------------------------
>
>                 Key: AIRFLOW-3418
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3418
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: worker
>    Affects Versions: 1.10.1
>            Reporter: James Meickle
>            Priority: Critical
>
> One of our tasks (a custom operator that sleep-waits until NYSE market close) got stuck
in a "running" state in the metadata db without making any progress. This is what it looked
like in the logs:
> {code:java}
> [2018-11-29 00:01:14,064] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
[2018-11-29 00:01:14,063] {{cli.py:484}} INFO - Running <TaskInstance: reconciliation_filemover.after_close
2018-11-28T00:00:00+00:00 [running]> on host airflow-core-i-0a53cac37067d957d.dlg.fnd.dynoquant.com
> [2018-11-29 06:03:57,643] {{models.py:1355}} INFO - Dependencies not met for <TaskInstance:
reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 [running]>, dependency 'Task
Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution.
The task must be cleared in order to be run.
> [2018-11-29 06:03:57,644] {{models.py:1355}} INFO - Dependencies not met for <TaskInstance:
reconciliation_filemover.after_close 2018-11-28T00:00:00+00:00 [running]>, dependency 'Task
Instance Not Already Running' FAILED: Task is already running, it started on 2018-11-29 00:01:10.876344+00:00.
> [2018-11-29 06:03:57,646] {{logging_mixin.py:95}} INFO - [2018-11-29 06:03:57,646] {{jobs.py:2614}}
INFO - Task is not able to be run
> {code}
> Seeing this state, we attempted to "clear" it in the web UI. This yielded a complex backtrace:
> {code:java}
> Traceback (most recent call last):
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py",
line 1982, in wsgi_app
>     response = self.full_dispatch_request()
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py",
line 1614, in full_dispatch_request
>     rv = self.handle_user_exception(e)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py",
line 1517, in handle_user_exception
>     reraise(exc_type, exc_value, tb)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/_compat.py",
line 33, in reraise
>     raise value
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py",
line 1612, in full_dispatch_request
>     rv = self.dispatch_request()
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask/app.py",
line 1598, in dispatch_request
>     return self.view_functions[rule.endpoint](**req.view_args)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/flask_appbuilder/security/decorators.py",
line 26, in wraps
>     return f(self, *args, **kwargs)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/decorators.py",
line 55, in wrapper
>     return f(*args, **kwargs)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/www_rbac/views.py",
line 837, in clear
>     include_upstream=upstream)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 4011, in sub_dag
>     dag = copy.deepcopy(self)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166, in deepcopy
>     y = copier(memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 3996, in __deepcopy__
>     setattr(result, k, copy.deepcopy(v, memo))
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 166, in deepcopy
>     y = copier(memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 2740, in __deepcopy__
>     setattr(result, k, copy.deepcopy(v, memo))
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 218, in _deepcopy_list
>     y.append(deepcopy(a, memo))
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 182, in deepcopy
>     y = _reconstruct(x, rv, 1, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 297, in _reconstruct
>     state = deepcopy(state, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 155, in deepcopy
>     y = copier(x, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 243, in _deepcopy_dict
>     y[deepcopy(key, memo)] = deepcopy(value, memo)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/copy.py", line 174, in deepcopy
>     rv = reductor(4)
> TypeError: cannot serialize '_io.TextIOWrapper' object
> {code}
> After browsing through Airflow's code I had a suspicion that this was simply the "clear"
code in the UI not handling some property on one of our operators. I instead used the Browse
feature to edit the metadata state db directly. This did result in the status change; in the
task being set to "up_for_retry", and the same logfile now having additional contents:
> {code:java}
> [2018-11-29 14:18:11,390] {{logging_mixin.py:95}} INFO - [2018-11-29 14:18:11,390] {{jobs.py:2695}}
WARNING - State of this instance has been externally set to failed. Taking the poison pill.
> [2018-11-29 14:18:11,399] {{helpers.py:240}} INFO - Sending Signals.SIGTERM to GPID 5287
> [2018-11-29 14:18:11,399] {{models.py:1636}} ERROR - Received SIGTERM. Terminating subprocesses.
> [2018-11-29 14:18:11,418] {{models.py:1760}} ERROR - Task received SIGTERM signal
> Traceback (most recent call last):
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 1654, in _run_raw_task
>     result = task_copy.execute(context=context)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py",
line 78, in execute
>     sleep(self.poke_interval)
>   File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 1638, in signal_handler
>     raise AirflowException("Task received SIGTERM signal")
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-11-29 14:18:11,420] {{models.py:1783}} INFO - Marking task as UP_FOR_RETRY
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
Traceback (most recent call last):
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/bin/airflow", line 32, in <module>
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    args.func(args)
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/cli.py",
line 74, in wrapper
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    return f(*args, **kwargs)
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py",
line 490, in run
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    _run(args, dag, ti)
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/bin/cli.py",
line 406, in _run
> [2018-11-29 14:18:11,445] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    pool=args.pool,
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/utils/db.py",
line 74, in wrapper
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    return func(*args, **kwargs)
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 1654, in _run_raw_task
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    result = task_copy.execute(context=context)
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/sensors/base_sensor_operator.py",
line 78, in execute
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    sleep(self.poke_interval)
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
  File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
line 1638, in signal_handler
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
    raise AirflowException("Task received SIGTERM signal")
> [2018-11-29 14:18:11,446] {{base_task_runner.py:101}} INFO - Job 38275: Subtask after_close
airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-11-29 14:18:11,693] {{helpers.py:230}} INFO - Process psutil.Process(pid=5287 (terminated))
(5287) terminated with exit code 1
> [2018-11-29 14:18:11,694] {{logging_mixin.py:95}} INFO - [2018-11-29 14:18:11,693] {{jobs.py:2627}}
INFO - Task exited with return code 0
> {code}
> The log line about "not able to be run" comes from jobs.py and it's unclear to me why
this would be called in this case (two workers grabbing the same message...?) or why the task
would just hang in a "running" state: https://github.com/apache/incubator-airflow/blob/1.10.1/airflow/jobs.py#L2614
> We had not previously observed any of this behavior. We had just upgraded to 1.10.1 earlier
this week.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Mime
View raw message