airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF subversion and git services (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-492) Insert into dag_stats table results into failed task while task itself succeeded
Date Wed, 26 Apr 2017 18:40:04 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985336#comment-15985336
] 

ASF subversion and git services commented on AIRFLOW-492:
---------------------------------------------------------

Commit c2472ffa124ffc65b8762ea583554494624dbb6a in incubator-airflow's branch refs/heads/master
from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=c2472ff ]

[AIRFLOW-492] Make sure stat updates cannot fail a task

Previously a failed commit into the db for the
statistics
could also fail a task. Secondly, the ui could
display
out of date statistics.

This patch reworks DagStat so that failure to
update the
statistics does not propagate. Next to that, it
make sure the ui always displays the latest
statistics.

Closes #2254 from bolkedebruin/AIRFLOW-492


> Insert into dag_stats table results into failed task while task itself succeeded
> --------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-492
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-492
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Bolke de Bruin
>            Assignee: Siddharth Anand
>            Priority: Critical
>         Attachments: subdag_test.py
>
>
> In some occasions there seem to be a duplicate key being inserted in dag_stats that results
in a task/dag run being marked failed while the task itself has succeeded.
> [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run <DagRun hanging_subdags_n16_sqe.level_2_14
@ 2016-04-21 00:00:00: backfill_2016-04-21T00:00:00, externally triggered: False> successful
> [2016-09-07 18:44:17,671] {models.py:1450} ERROR - (_mysql_exceptions.IntegrityError)
(1062, "Duplicate entry 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL:
u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, %s)'] [parameters:
('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
line 1409, in run
>     result = task_copy.execute(context=context)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py",
line 88, in execute
>     executor=self.executor)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
line 3244, in run
>     job.run()
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
line 189, in run
>     self._execute()
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
line 1855, in _execute
>     models.DagStat.clean_dirty([run.dag_id], session=session)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py",
line 54, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
line 3695, in clean_dirty
>     session.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 801,
in commit
>     self.transaction.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 392,
in commit
>     self._prepare_impl()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 372,
in _prepare_impl
>     self.session.flush()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2019,
in flush
>     self._flush(objects)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2137,
in _flush
>     transaction.rollback(_capture_exception=True)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line
60, in __exit__
>     compat.reraise(exc_type, exc_value, exc_tb)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2101,
in _flush
>     flush_context.execute()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 373,
in execute
>     rec.execute(self)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 532,
in execute
>     uow
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 174,
in save_obj
>     mapper, table, insert)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 767,
in _emit_insert_statements
>     execute(statement, multiparams)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 914,
in execute
>     return meth(self, multiparams, params)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/elements.py", line 323,
in _execute_on_connection
>     return connection._execute_clauseelement(self, multiparams, params)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1010,
in _execute_clauseelement
>     compiled_sql, distilled_params
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1146,
in _execute_context
>     context)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1341,
in _handle_dbapi_exception
>     exc_info
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 200,
in raise_from_cause
>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1139,
in _execute_context
>     context)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/default.py", line 450,
in do_execute
>     cursor.execute(statement, parameters)
>   File "/usr/local/lib/python2.7/dist-packages/MySQLdb/cursors.py", line 226, in execute
>     self.errorhandler(self, exc, value)
>   File "/usr/local/lib/python2.7/dist-packages/MySQLdb/connections.py", line 36, in defaulterrorhandler
>     raise errorvalue
> IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'hanging_subdags_n16_sqe.level_2_14-success'
for key 'PRIMARY'") [SQL: u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s,
%s, %s, %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> [2016-09-07 18:44:17,787] {models.py:1473} INFO - Marking task as FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message