airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF subversion and git services (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-738) XCom: Deadlock found when trying to get lock; try restarting transaction
Date Tue, 10 Jan 2017 08:15:58 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15814291#comment-15814291
] 

ASF subversion and git services commented on AIRFLOW-738:
---------------------------------------------------------

Commit e18d67dec4774946a35f7c34953bdfd7138595bf in incubator-airflow's branch refs/heads/v1-8-test
from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e18d67d ]

[AIRFLOW-738] Commit deleted xcom items before insert

A delete insert sequence within one transaction can lead
to a deadlocked transaction with Mariadb / MySQL.

The deletes, in case they affected no rows, all get a shared lock
(mode IX) on the end-of-table gap. Once the insert is executed,
the shared lock is still held by all threads,
and the insert intention waits for the release of this shared lock.

The solution is to not do the following in parallel:

1. Delete the rows you want to insert, when the rows aren't there.
2. Insert the rows

In this case the risk of not executing the delete and insert
is relatively low, as it was the users intention to run the
task. In case it fails in between the two transactions
the task can be tried.


> XCom: Deadlock found when trying to get lock; try restarting transaction
> ------------------------------------------------------------------------
>
>                 Key: AIRFLOW-738
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-738
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: Airflow 1.8
>            Reporter: Bolke de Bruin
>            Priority: Blocker
>
> When using the following dag:
> {code}
> from datetime import datetime, timedelta
> import logging
> import pprint
> import random
> # The DAG object; we'll need this to instantiate a DAG
> from airflow import DAG
> # Operators; we need this to operate!
> from airflow.operators.python_operator import PythonOperator
> start_time = datetime.now().replace(minute=0, second=0, microsecond=0)
> start_time += timedelta(hours=-1)  # timedelta(days=-2)
> default_args = {
>     'owner': 'airflow',
>     'depends_on_past': False,
>     'start_date': start_time,
>     'email': ['alex.papanic@gmail.com'],
>     'email_on_failure': True,
>     'email_on_retry': True,
>     'retries': 1,
>     'retry_delay': timedelta(minutes=1)
>     # 'queue': 'bash_queue',
>     # 'pool': 'backfill',
>     # 'priority_weight': 10,
>     # 'end_date': datetime(2016, 1, 1),
> }
> dag = DAG(
>     'xcom_test',
>     default_args=default_args,
>     schedule_interval='@once')
> def upload_activity_status(pgconn_id, **context):
>     upstream_task_ids = context['task'].upstream_task_ids
>     logging.info(
>         "Getting status from upstream task {}".format(upstream_task_ids))
>     status = context['ti'].xcom_pull(task_ids=upstream_task_ids)
>     logging.info("Xcom pull results:\n{}".format(pprint.pformat(status)))
>     logging.info("Upload to DB here")
> upload_ativity_status = PythonOperator(
>     task_id='upload_activity_status',
>     python_callable=upload_activity_status,
>     op_kwargs={'pgconn_id': 'postgres_conn'},
>     provide_context=True,
>     dag=dag)
> def poll_data(params, execution_date, **context):
>     logging.info("Test polling function for {data_stream}".format(**params))
>     status = random.random() < 0.5
>     output = dict(
>         data_stream=params['data_stream'],
>         timeperiod=execution_date + timedelta(hours=-1),
>         status=status
>     )
>     return output
> def poll_data_factory(data_stream, dag):
>     return PythonOperator(
>         task_id='poll_{}'.format(data_stream),
>         python_callable=poll_data,
>         params={u'data_stream': data_stream},
>         provide_context=True,
>         dag=dag
>     )
> poll_streams = []
> streams = ['stream' + str(i) for i in range(30)]
> for data_stream in streams:
>     poll = poll_data_factory(data_stream, dag)
>     poll_streams.append(poll)
>     upload_ativity_status.set_upstream(poll)
> {code}
> The following error is thrown:
> {code}
> 2017-01-06 21:41:35,824] {jobs.py:1433} INFO - Heartbeating the scheduler
> Traceback (most recent call last):
>   File "/Users/bolke/Documents/dev/airflow_env/bin/airflow", line 4, in <module>
>     __import__('pkg_resources').run_script('airflow==1.7.2.dev0', 'airflow')
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/pkg_resources/__init__.py",
line 739, in run_script
>     self.require(requires)[0].run_script(script_name, ns)
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/pkg_resources/__init__.py",
line 1494, in run_script
>     exec(code, namespace, namespace)
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/EGG-INFO/scripts/airflow",
line 28, in <module>
>     args.func(args)
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/bin/cli.py",
line 380, in run
>     pool=args.pool,
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/utils/db.py",
line 54, in wrapper
>     result = func(*args, **kwargs)
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/models.py",
line 1334, in run
>     self.handle_failure(e, test_mode, context)
>   File "/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/models.py",
line 1407, in handle_failure
>     session.merge(self)
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 1815, in
merge
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 1861, in
_merge
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 831, in get
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 864, in _get_impl
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/loading.py", line 223, in
load_on_ident
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2756, in one
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2726, in one_or_none
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2797, in __iter__
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2818, in _execute_and_instances
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2827, in _get_bind_args
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/query.py", line 2809, in _connection_from_session
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 966, in
connection
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 971, in
_connection_for_bind
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 382, in
_connection_for_bind
>   File "build/bdist.macosx-10.12-x86_64/egg/sqlalchemy/orm/session.py", line 276, in
_assert_active
> sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due
to a previous exception during flush. To begin a new transaction with this Session, first
issue Session.rollback(). Original exception was: (_mysql_exceptions.OperationalError) (1213,
'Deadlock found when trying to get lock; try restarting transaction') [SQL: u'INSERT INTO
xcom (`key`, value, timestamp, execution_date, task_id, dag_id) VALUES (%s, %s, now(), %s,
%s, %s)'] [parameters: (u'return_value', '\x80\x02}q\x00(U\x06statusq\x01\x89U\ntimeperiodq\x02cdatetime\ndatetime\nq\x03U\n\x07\xe1\x01\x06\x13\x00\x00\x00\x00\x00q\x04\x85q\x05Rq\x06U\x0bdata_streamq\x07U\x08stream26q\x08u.',
datetime.datetime(2017, 1, 6, 20, 0), 'poll_stream26', 'xcom_test')]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message