airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-1039) Airflow is raising IntegrityError when during parallel DAG trigger
Date Thu, 01 Nov 2018 20:14:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-1039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672121#comment-16672121
] 

ASF GitHub Bot commented on AIRFLOW-1039:
-----------------------------------------

ashb closed pull request #2186: [AIRFLOW-1039] Raise AirflowError instead IntegrityError
URL: https://github.com/apache/incubator-airflow/pull/2186
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/models.py b/airflow/models.py
index 1244d6078a..a596444e97 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -54,6 +54,7 @@
 from sqlalchemy.ext.declarative import declarative_base, declared_attr
 from sqlalchemy.dialects.mysql import LONGTEXT
 from sqlalchemy.orm import reconstructor, relationship, synonym
+from sqlalchemy.exc import IntegrityError
 
 from croniter import croniter
 import six
@@ -3389,7 +3390,14 @@ def create_dagrun(self,
             state=state
         )
         session.add(run)
-        session.commit()
+        try:
+            session.commit()
+        except IntegrityError:
+            session.rollback()
+            raise AirflowException("Run id {} already exists for dag id {}".format(
+                run_id,
+                self.dag_id
+            ))
 
         run.dag = self
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Airflow is raising IntegrityError when during parallel DAG trigger
> ------------------------------------------------------------------
>
>                 Key: AIRFLOW-1039
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1039
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DagRun
>    Affects Versions: 1.8.0
>            Reporter: Matus Valo
>            Priority: Minor
>
> When Two concurrent processes are trying to trigger the same dag with the same execution
date at the same time, the IntegrityError is thrown by SQLAlchemy:
> uwsgi[15887]: [2017-03-24 12:51:38,074] {app.py:1587} ERROR - Exception on / [POST]
> uwsgi[15887]: Traceback (most recent call last):
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1988, in wsgi_app
> uwsgi[15887]: response = self.full_dispatch_request()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1641, in full_dispatch_request
> uwsgi[15887]: rv = self.handle_user_exception(e)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1544, in handle_user_exception
> uwsgi[15887]: reraise(exc_type, exc_value, tb)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1639, in full_dispatch_request
> uwsgi[15887]: rv = self.dispatch_request()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1625, in dispatch_request
> uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args)
> uwsgi[15887]: File "./ws.py", line 21, in hello
> uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), conf=json.dumps({'input_files':
input_files}), execution_date=datetime.now())
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py",
line 56, in trigger_dag
> uwsgi[15887]: external_trigger=True
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/utils/db.py",
line 53, in wrapper
> uwsgi[15887]: result = func(*args, **kwargs)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py",
line 3377, in create_dagrun
> uwsgi[15887]: session.commit()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 874, in commit
> uwsgi[15887]: self.transaction.commit()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 461, in commit
> uwsgi[15887]: self._prepare_impl()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 441, in _prepare_impl
> uwsgi[15887]: self.session.flush()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2139, in flush
> uwsgi[15887]: self._flush(objects)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2259, in _flush
> uwsgi[15887]: transaction.rollback(_capture_exception=True)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py",
line 60, in __exit__
> uwsgi[15887]: compat.reraise(exc_type, exc_value, exc_tb)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2223, in _flush
> uwsgi[15887]: flush_context.execute()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 389, in execute
> uwsgi[15887]: rec.execute(self)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 548, in execute
> uwsgi[15887]: uow
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
line 181, in save_obj
> uwsgi[15887]: mapper, table, insert)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
line 835, in _emit_insert_statements
> uwsgi[15887]: execute(statement, params)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 945, in execute
> uwsgi[15887]: return meth(self, multiparams, params)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",
line 263, in _execute_on_connection
> uwsgi[15887]: return connection._execute_clauseelement(self, multiparams, params)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1053, in _execute_clauseelement
> uwsgi[15887]: compiled_sql, distilled_params
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1189, in _execute_context
> uwsgi[15887]: context)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2139, in flush
> uwsgi[15887]: self._flush(objects)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2259, in _flush
> uwsgi[15887]: transaction.rollback(_capture_exception=True)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py",
line 60, in __exit__
> uwsgi[15887]: compat.reraise(exc_type, exc_value, exc_tb)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 2223, in _flush
> uwsgi[15887]: flush_context.execute()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 389, in execute
> uwsgi[15887]: rec.execute(self)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py",
line 548, in execute
> uwsgi[15887]: uow
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
line 181, in save_obj
> uwsgi[15887]: mapper, table, insert)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py",
line 835, in _emit_insert_statements
> uwsgi[15887]: execute(statement, params)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 945, in execute
> uwsgi[15887]: return meth(self, multiparams, params)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",
line 263, in _execute_on_connection
> uwsgi[15887]: return connection._execute_clauseelement(self, multiparams, params)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1053, in _execute_clauseelement
> uwsgi[15887]: compiled_sql, distilled_params
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1189, in _execute_context
> uwsgi[15887]: context)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1393, in _handle_dbapi_exception
> uwsgi[15887]: exc_info
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/util/compat.py",
line 203, in raise_from_cause
> uwsgi[15887]: reraise(type(exception), exception, tb=exc_tb, cause=cause)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1182, in _execute_context
> uwsgi[15887]: context)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/engine/default.py",
line 470, in do_execute
> uwsgi[15887]: cursor.execute(statement, parameters)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/MySQLdb/cursors.py",
line 205, in execute
> uwsgi[15887]: self.errorhandler(self, exc, value)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/MySQLdb/connections.py",
line 36, in defaulterrorhandler
> uwsgi[15887]: raise errorclass, errorvalue
> uwsgi[15887]: IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry
'poc_dag2-2017-03-24 12:51:37.000000' for key 'dag_id'") [SQL: u'INSERT INTO dag_run (dag_id,
execution_date, start_date, end_date, state, run_id, external_trigger, conf) VALUES (%s, %s,
now(), %s, %s, %s, %s, %s)'] [parameters: ('poc_dag2', datetime.datetime(2017, 3, 24, 12,
51, 37), None, u'running', '4ac49276-10cb-11e7-8197-005056bc55dd', 1, '\x80\x02}q\x01X\x0b\x00\x00\x00input_files]q\x02X>\x00\x00\x00/matus/dev/airflowtest/input2/data20:51:30.789572200.gzq\x03as.')]
> This is not consistent with AirflowException returned by trigger_dag() function. Moreover,
the session is not rolled back, hence also another exception is occurring:
> uwsgi[15887]: [2017-03-24 12:55:54,105] ERROR in app: Exception on / [POST]
> uwsgi[15887]: Traceback (most recent call last):
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1988, in wsgi_app
> uwsgi[15887]: response = self.full_dispatch_request()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1641, in full_dispatch_request
> uwsgi[15887]: rv = self.handle_user_exception(e)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1544, in handle_user_exception
> uwsgi[15887]: reraise(exc_type, exc_value, tb)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1639, in full_dispatch_request
> uwsgi[15887]: rv = self.dispatch_request()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1625, in dispatch_request
> uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args)
> uwsgi[15887]: File "./ws.py", line 21, in hello
> uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), conf=json.dumps({'input_files':
input_files}), execution_date=datetime.now())
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py",
line 29, in trigger_dag
> uwsgi[15887]: dag = dagbag.get_dag(dag_id)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py",
line 200, in get_dag
> uwsgi[15887]: orm_dag = DagModel.get_current(root_dag_id)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py",
line 2549, in get_current
> uwsgi[15887]: obj = session.query(cls).filter(cls.dag_id == dag_id).first()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2731, in first
> uwsgi[15887]: ret = list(self[0:1])
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2523, in __getitem__
> uwsgi[15887]: return list(res)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2831, in __iter__
> uwsgi[15887]: [2017-03-24 12:55:54,105] {app.py:1587} ERROR - Exception on / [POST]
> uwsgi[15887]: Traceback (most recent call last):
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1988, in wsgi_app
> uwsgi[15887]: response = self.full_dispatch_request()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1641, in full_dispatch_request
> uwsgi[15887]: rv = self.handle_user_exception(e)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1544, in handle_user_exception
> uwsgi[15887]: reraise(exc_type, exc_value, tb)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1639, in full_dispatch_request
> uwsgi[15887]: rv = self.dispatch_request()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/flask/app.py",
line 1625, in dispatch_request
> uwsgi[15887]: return self.view_functions[rule.endpoint](**req.view_args)
> uwsgi[15887]: File "./ws.py", line 21, in hello
> uwsgi[15887]: trigger_dag('poc_dag2', run_id=str(uuid1()), conf=json.dumps({'input_files':
input_files}), execution_date=datetime.now())
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/api/common/experimental/trigger_dag.py",
line 29, in trigger_dag
> uwsgi[15887]: dag = dagbag.get_dag(dag_id)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py",
line 200, in get_dag
> uwsgi[15887]: orm_dag = DagModel.get_current(root_dag_id)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/airflow/models.py",
line 2549, in get_current
> uwsgi[15887]: obj = session.query(cls).filter(cls.dag_id == dag_id).first()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2731, in first
> uwsgi[15887]: ret = list(self[0:1])
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2523, in __getitem__
> uwsgi[15887]: return list(res)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2831, in __iter__
> uwsgi[15887]: return self._execute_and_instances(context)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2852, in _execute_and_instances
> uwsgi[15887]: close_with_result=True)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2861, in _get_bind_args
> uwsgi[15887]: **kw
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2843, in _connection_from_session
> uwsgi[15887]: conn = self.session.connection(**kw)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 966, in connection
> uwsgi[15887]: execution_options=execution_options)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 971, in _connection_for_bind
> uwsgi[15887]: engine, execution_options)
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 382, in _connection_for_bind
> uwsgi[15887]: self._assert_active()
> uwsgi[15887]: File "/home/matus/envs/airflow/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 276, in _assert_active
> uwsgi[15887]: % self._rollback_exception
> uwsgi[15887]: InvalidRequestError: This Session's transaction has been rolled back due
to a previous exception during flush. To begin a new transaction with this Session, first
issue Session.rollback(). Original exception was: (_mysql_exceptions.IntegrityError) (1062,
"Duplicate entry 'poc_dag2-2017-03-24 12:55:51.000000' for key 'dag_id'") [SQL: u'INSERT INTO
dag_run (dag_id, execution_date, start_date, end_date, state, run_id, external_trigger, conf)
VALUES (%s, %s, now(), %s, %s, %s, %s, %s)'] [parameters: ('poc_dag2', datetime.datetime(2017,
3, 24, 12, 55, 51), None, u'running', 'e1c78296-10cb-11e7-9e34-005056bc55dd', 1, '\x80\x02}q\x01X\x0b\x00\x00\x00input_files]q\x02X>\x00\x00\x00/home/matus/dev/airflowtest/input2/data20:55:49.589767900.gzq\x03as.')]
> As example, here is the simple example web service causing exceptions when multiple parallel
clients tries to process file:
> from uuid import uuid1
> import json
> from os.path import join
> from datetime import datetime
> from flask import Flask
> from flask import request
> app = Flask(__name__)
> @app.route("/", methods=['POST'])
> def hello():
>     input_files = list()
>     for f in request.files.values():
>         fname = join('/home/matus/dev/airflowtest/input', f.filename)
>         f.save(fname)
>         input_files.append(fname)
>     from airflow.api.common.experimental.trigger_dag import trigger_dag
>     trigger_dag('poc_dag2', run_id=str(uuid1()), conf=json.dumps({'input_files': input_files}),
execution_date=datetime.now())
>     return '{"status": "OK"}'
> if __name__ == "__main__":
>     app.run()



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message