airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Will Wong (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AIRFLOW-2219) Race condition to DagRun.verify_integrity between Scheduler and Webserver
Date Fri, 16 Mar 2018 00:30:00 GMT
Will Wong created AIRFLOW-2219:
----------------------------------

             Summary: Race condition to DagRun.verify_integrity between Scheduler and Webserver
                 Key: AIRFLOW-2219
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2219
             Project: Apache Airflow
          Issue Type: Bug
          Components: DagRun, db, scheduler, webserver
    Affects Versions: 1.8.1, 1.9.0
            Reporter: Will Wong


Symptoms:
 * Triggering dag causes the 404 nuke page with an error message along the lines of: {{psycopg2.IntegrityError:
duplicate key value violates unique constraint "task_instance_pkey"}} when calling {{DagRun.verify_integrity}}

Or
 * Similar error in scheduler log for dag file when scheduling a DAG. (Example exception at
the end of description)

This occurs because {{Dag.create_dagrun}} commits a the dag_run entry to the database and
then runs {{verify_integrity}} to add the task_instances immediately. However, the scheduler
already picks up a dag run before all task_instances are created and also calls {{verify_integrity}}
to create task_instances at the same time.

I don't _think_ this actually breaks anything in particular. The exception happens either
on the webpage or in the scheduler logs:
 * If it occurs in the UI, it just scares people thinking something broke but the task_instances
will be created by the scheduler.
 * If the error shows up in the scheduler, the task_instances are created by the webserver
and it continues processing the DAG during the next loop.

 
 I'm not sure if {{DagRun.verify_integrity}} is necessary for both {{SchedulerJob._process_task_instances}}
as well {{Dag.create_dagrun}} but perhaps we can just stick to one?

 
{noformat}
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1170, in
_execute_context
    context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
line 683, in do_executemany
    cursor.executemany(statement, parameters)
psycopg2.IntegrityError: duplicate key value violates unique constraint "task_instance_pkey"
DETAIL:  Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0, chunkedgraph_edgetask_scheduler,
2018-03-15 23:46:57.116673) already exists.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 371, in helper
    pickle_dags)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1792, in process_file
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 1391, in _process_dags
    self._process_task_instances(dag, tis_out)
  File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 915, in _process_task_instances
    run.verify_integrity(session=session)
  File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 4786, in verify_integrity
    session.commit()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 943, in commit
    self.transaction.commit()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 467, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 447, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2254, in
flush
    self._flush(objects)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2380, in
_flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 66,
in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 187, in reraise
    raise value
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2344, in
_flush
    flush_context.execute()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 391, in
execute
    rec.execute(self)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 556, in
execute
    uow
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 181,
in save_obj
    mapper, table, insert)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 830,
in _emit_insert_statements
    execute(statement, multiparams)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 948, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 269, in
_execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1060, in
_execute_clauseelement
    compiled_sql, distilled_params
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1200, in
_execute_context
    context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1413, in
_handle_dbapi_exception
    exc_info
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1170, in
_execute_context
    context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py",
line 683, in do_executemany
    cursor.executemany(statement, parameters)

DETAIL: Key (task_id, dag_id, execution_date)=(docker_task_10240_7680_0, chunkedgraph_edgetask_scheduler,
2018-03-15 23:46:57.116673) already exists.
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message