airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matus Valo (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-1039) Airflow is raising IntegrityError when during parallel DAG trigger
Date Fri, 24 Mar 2017 20:23:41 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-1039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941074#comment-15941074
] 

Matus Valo commented on AIRFLOW-1039:
-------------------------------------

PR created on github: https://github.com/apache/incubator-airflow/pull/2186

> Airflow is raising IntegrityError when during parallel DAG trigger
> ------------------------------------------------------------------
>
>                 Key: AIRFLOW-1039
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1039
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DagRun
>    Affects Versions: Airflow 1.8
>            Reporter: Matus Valo
>            Priority: Minor
>
> When Two concurrent processes are trying to trigger the same dag with the same execution
date at the same time, the IntegrityError is thrown by SQLAlchemy:
> uwsgi[15887]: [2017-03-24 12:51:38,074] {app.py:1587} ERROR - Exception on / [POST]
> uwsgi[15887]: Traceback (most recent call last):
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1988, in wsgi_app
> uwsgi[15887]: response = self.full_dispatch_request()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1641, in full_dispatch_request
> uwsgi[15887]: rv = self.handle_user_exception(e)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1544, in handle_user_exception
> uwsgi[15887]: reraise(exc_type, exc_value, tb)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1639, in full_dispatch_request
> uwsgi[15887]: rv = self.dispatch_request()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1625, in dispatch_request
> uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args)
> uwsgi[15887]: File "./ws.py", line 21, in hello
> uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), conf=json.dumps({'input_files':
input_files}), execution_date=datetime.now())
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py",
line 56, in trigger_dag
> uwsgi[15887]: external_trigger=True
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/utils/db.py",
line 53, in wrapper
> uwsgi[15887]: result = func(*args, **kwargs)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py",
line 3377, in create_dagrun
> uwsgi[15887]: session.commit()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 874, in commit
> uwsgi[15887]: self.transaction.commit()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 461, in commit
> uwsgi[15887]: self._prepare_impl()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 441, in _prepare_impl
> uwsgi[15887]: self.session.flush()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2139, in flush
> uwsgi[15887]: self._flush(objects)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2259, in _flush
> uwsgi[15887]: transaction.rollback(_capture_exception=True)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py",
line 60, in __exit__
> uwsgi[15887]: compat.reraise(exc_type, exc_value, exc_tb)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2223, in _flush
> uwsgi[15887]: flush_context.execute()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 389, in execute
> uwsgi[15887]: rec.execute(self)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 548, in execute
> uwsgi[15887]: uow
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
line 181, in save_obj
> uwsgi[15887]: mapper, table, insert)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
line 835, in _emit_insert_statements
> uwsgi[15887]: execute(statement, params)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 945, in execute
> uwsgi[15887]: return meth(self, multiparams, params)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",
line 263, in _execute_on_connection
> uwsgi[15887]: return connection._execute_clauseelement(self, multiparams, params)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1053, in _execute_clauseelement
> uwsgi[15887]: compiled_sql, distilled_params
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1189, in _execute_context
> uwsgi[15887]: context)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2139, in flush
> uwsgi[15887]: self._flush(objects)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2259, in _flush
> uwsgi[15887]: transaction.rollback(_capture_exception=True)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py",
line 60, in __exit__
> uwsgi[15887]: compat.reraise(exc_type, exc_value, exc_tb)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2223, in _flush
> uwsgi[15887]: flush_context.execute()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 389, in execute
> uwsgi[15887]: rec.execute(self)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 548, in execute
> uwsgi[15887]: uow
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
line 181, in save_obj
> uwsgi[15887]: mapper, table, insert)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
line 835, in _emit_insert_statements
> uwsgi[15887]: execute(statement, params)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 945, in execute
> uwsgi[15887]: return meth(self, multiparams, params)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",
line 263, in _execute_on_connection
> uwsgi[15887]: return connection._execute_clauseelement(self, multiparams, params)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1053, in _execute_clauseelement
> uwsgi[15887]: compiled_sql, distilled_params
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1189, in _execute_context
> uwsgi[15887]: context)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1393, in _handle_dbapi_exception
> uwsgi[15887]: exc_info
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/compat.py",
line 203, in raise_from_cause
> uwsgi[15887]: reraise(type(exception), exception, tb=exc_tb, cause=cause)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1182, in _execute_context
> uwsgi[15887]: context)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/default.py",
line 470, in do_execute
> uwsgi[15887]: cursor.execute(statement, parameters)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/MySQLdb/cursors.py",
line 205, in execute
> uwsgi[15887]: self.errorhandler(self, exc, value)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/MySQLdb/connections.py",
line 36, in defaulterrorhandler
> uwsgi[15887]: raise errorclass, errorvalue
> uwsgi[15887]: IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry
'poc_dag2-2017-03-24 12:51:37.000000' for key 'dag_id'") [SQL: u'INSERT INTO dag_run (dag_id,
execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, %s,
now(), %s, %s, %s, %s, %s)'] [parameters: ('poc_dag2', datetime.datetime(2017, 3, 24, 12,
51, 37), None, u'running', '4ac49276-10cb-11e7-8197-005056bc55dd', 1, '\x80\x02}q\x01X\x0b\x00\x00\x00input_files]q\x02X>\x00\x00\x00/matus/dev/airflowtest/input2/data20:51:30.789572200.gzq\x03as.')]
> This is not consistent with AirflowException returned by trigger_dag() function. Moreover,
the session is not rolled back, hence also another exception is occurring:
> uwsgi[15887]: [2017-03-24 12:55:54,105] ERROR in app: Exception on / [POST]
> uwsgi[15887]: Traceback (most recent call last):
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1988, in wsgi_app
> uwsgi[15887]: response = self.full_dispatch_request()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1641, in full_dispatch_request
> uwsgi[15887]: rv = self.handle_user_exception(e)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1544, in handle_user_exception
> uwsgi[15887]: reraise(exc_type, exc_value, tb)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1639, in full_dispatch_request
> uwsgi[15887]: rv = self.dispatch_request()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1625, in dispatch_request
> uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args)
> uwsgi[15887]: File "./ws.py", line 21, in hello
> uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), conf=json.dumps({'input_files':
input_files}), execution_date=datetime.now())
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py",
line 29, in trigger_dag
> uwsgi[15887]: dag = dagbag.get_dag(dag_id)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py",
line 200, in get_dag
> uwsgi[15887]: orm_dag = DagModel.get_current(root_dag_id)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py",
line 2549, in get_current
> uwsgi[15887]: obj = session.query(cls).filter(cls.dag_id == dag_id).first()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2731, in first
> uwsgi[15887]: ret = list(self[0:1])
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2523, in __getitem__
> uwsgi[15887]: return list(res)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2831, in __iter__
> uwsgi[15887]: [2017-03-24 12:55:54,105] {app.py:1587} ERROR - Exception on / [POST]
> uwsgi[15887]: Traceback (most recent call last):
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1988, in wsgi_app
> uwsgi[15887]: response = self.full_dispatch_request()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1641, in full_dispatch_request
> uwsgi[15887]: rv = self.handle_user_exception(e)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1544, in handle_user_exception
> uwsgi[15887]: reraise(exc_type, exc_value, tb)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1639, in full_dispatch_request
> uwsgi[15887]: rv = self.dispatch_request()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1625, in dispatch_request
> uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args)
> uwsgi[15887]: File "./ws.py", line 21, in hello
> uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), conf=json.dumps({'input_files':
input_files}), execution_date=datetime.now())
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py",
line 29, in trigger_dag
> uwsgi[15887]: dag = dagbag.get_dag(dag_id)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py",
line 200, in get_dag
> uwsgi[15887]: orm_dag = DagModel.get_current(root_dag_id)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py",
line 2549, in get_current
> uwsgi[15887]: obj = session.query(cls).filter(cls.dag_id == dag_id).first()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2731, in first
> uwsgi[15887]: ret = list(self[0:1])
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2523, in __getitem__
> uwsgi[15887]: return list(res)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2831, in __iter__
> uwsgi[15887]: return self._execute_and_instances(context)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2852, in _execute_and_instances
> uwsgi[15887]: close_with_result=True)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2861, in _get_bind_args
> uwsgi[15887]: **kw
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2843, in _connection_from_session
> uwsgi[15887]: conn = self.session.connection(**kw)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 966, in connection
> uwsgi[15887]: execution_options=execution_options)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 971, in _connection_for_bind
> uwsgi[15887]: engine, execution_options)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 382, in _connection_for_bind
> uwsgi[15887]: self._assert_active()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 276, in _assert_active
> uwsgi[15887]: % self._rollback_exception
> uwsgi[15887]: InvalidRequestError: This Session's transaction has been rolled back due
to a previous exception during flush. To begin a new transaction with this Session, first
issue Session.rollback(). Original exception was: (_mysql_exceptions.IntegrityError) (1062,
"Duplicate entry 'poc_dag2-2017-03-24 12:55:51.000000' for key 'dag_id'") [SQL: u'INSERT INTO
dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf)
VALUES (%s, %s, now(), %s, %s, %s, %s, %s)'] [parameters: ('poc_dag2', datetime.datetime(2017,
3, 24, 12, 55, 51), None, u'running', 'e1c78296-10cb-11e7-9e34-005056bc55dd', 1, '\x80\x02}q\x01X\x0b\x00\x00\x00input_files]q\x02X>\x00\x00\x00/home/matus/dev/airflowtest/input2/data20:55:49.589767900.gzq\x03as.')]
> As example, here is the simple example web service causing exceptions when multiple parallel
clients tries to process file:
> from uuid import uuid1
> import json
> from os.path import join
> from datetime import datetime
> from flask import Flask
> from flask import request
> app = Flask(__name__)
> @app.route("/", methods=['POST'])
> def hello():
>     input_files = list()
>     for f in request.files.values():
>         fname = join('/home/matus/dev/airflowtest/input', f.filename)
>         f.save(fname)
>         input_files.append(fname)
>     from airflow.api.common.experimental.trigger_dag import trigger_dag
>     trigger_dag('poc_dag2', run_id=str(uuid1()), conf=json.dumps({'input_files': input_files}),
execution_date=datetime.now())
>     return '{"status": "OK"}'
> if __name__ == "__main__":
>     app.run()



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message