airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Siddharth Anand (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AIRFLOW-492) Insert into dag_stats table results into failed task while task itself succeeded
Date Thu, 08 Sep 2016 06:49:21 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15472992#comment-15472992
] 

Siddharth Anand edited comment on AIRFLOW-492 at 9/8/16 6:48 AM:
-----------------------------------------------------------------

[~bolke]
Can you provide the sample code that exercised this code path? In addition to the tests that
the author added, I also ran all of the test dags for 1 hour against the LocalExecutor against
Postgres in order to look for errors. I believe there is a subdag operator in the test dag
corpus.


was (Author: sanand):
[~bolke]
Can you provide the sample code that exercised this code path? In addition to the tests that
the author added, I also ran all of the test dags for 1 hour against the LocalExecutor against
Postgres in order to look for errors. I believe there is a subdag operator in the test dags.

> Insert into dag_stats table results into failed task while task itself succeeded
> --------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-492
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-492
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Bolke de Bruin
>            Assignee: siddharth anand
>            Priority: Critical
>
> In some occasions there seem to be a duplicate key being inserted in dag_stats that results
in a task/dag run being marked failed while the task itself has succeeded.
> [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run <DagRun hanging_subdags_n16_sqe.level_2_14
@ 2016-04-21 00:00:00: backfill_2016-04-21T00:00:00, externally triggered: False> successful
> [2016-09-07 18:44:17,671] {models.py:1450} ERROR - (_mysql_exceptions.IntegrityError)
(1062, "Duplicate entry 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL:
u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, %s)'] [parameters:
('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> Traceback (most recent call last):
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
line 1409, in run
>     result = task_copy.execute(context=context)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py",
line 88, in execute
>     executor=self.executor)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
line 3244, in run
>     job.run()
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
line 189, in run
>     self._execute()
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
line 1855, in _execute
>     models.DagStat.clean_dirty([run.dag_id], session=session)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py",
line 54, in wrapper
>     result = func(*args, **kwargs)
>   File "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
line 3695, in clean_dirty
>     session.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 801,
in commit
>     self.transaction.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 392,
in commit
>     self._prepare_impl()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 372,
in _prepare_impl
>     self.session.flush()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2019,
in flush
>     self._flush(objects)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2137,
in _flush
>     transaction.rollback(_capture_exception=True)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line
60, in __exit__
>     compat.reraise(exc_type, exc_value, exc_tb)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", line 2101,
in _flush
>     flush_context.execute()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 373,
in execute
>     rec.execute(self)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", line 532,
in execute
>     uow
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 174,
in save_obj
>     mapper, table, insert)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 767,
in _emit_insert_statements
>     execute(statement, multiparams)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 914,
in execute
>     return meth(self, multiparams, params)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/sql/elements.py", line 323,
in _execute_on_connection
>     return connection._execute_clauseelement(self, multiparams, params)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1010,
in _execute_clauseelement
>     compiled_sql, distilled_params
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1146,
in _execute_context
>     context)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1341,
in _handle_dbapi_exception
>     exc_info
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/compat.py", line 200,
in raise_from_cause
>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", line 1139,
in _execute_context
>     context)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/default.py", line 450,
in do_execute
>     cursor.execute(statement, parameters)
>   File "/usr/local/lib/python2.7/dist-packages/MySQLdb/cursors.py", line 226, in execute
>     self.errorhandler(self, exc, value)
>   File "/usr/local/lib/python2.7/dist-packages/MySQLdb/connections.py", line 36, in defaulterrorhandler
>     raise errorvalue
> IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'hanging_subdags_n16_sqe.level_2_14-success'
for key 'PRIMARY'") [SQL: u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s,
%s, %s, %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> [2016-09-07 18:44:17,787] {models.py:1473} INFO - Marking task as FAILED.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message