airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Maneesh Sharma (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AIRFLOW-1370) Scheduler is crashing because of IntegrityError
Date Tue, 04 Jul 2017 08:52:00 GMT
Maneesh Sharma created AIRFLOW-1370:
---------------------------------------

             Summary: Scheduler is crashing because of IntegrityError
                 Key: AIRFLOW-1370
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1370
             Project: Apache Airflow
          Issue Type: Bug
          Components: celery, scheduler
    Affects Versions: Airflow 1.8
            Reporter: Maneesh Sharma


Scheduler is crashing with multiple task running on Celery Executor. It is throwing `{color:red}IntegrityError:
(psycopg2.IntegrityError) duplicate key value violates unique constraint "task_instance_pkey"{color}`.
Below is the complete stack trace of error --

Process DagFileProcessor490-Process:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/jobs.py", line 348, in helper
    pickle_dags)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in
wrapper
    result = func(*args, **kwargs)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/jobs.py", line 1587, in process_file
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/jobs.py", line 1176, in _process_dags
    self._process_task_instances(dag, tis_out)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/jobs.py", line 880, in _process_task_instances
    run.verify_integrity(session=session)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in
wrapper
    result = func(*args, **kwargs)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/airflow/models.py", line 4117, in
verify_integrity
    session.commit()
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 906,
in commit
    self.transaction.commit()
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 461,
in commit
    self._prepare_impl()
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 441,
in _prepare_impl
    self.session.flush()
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2171,
in flush
    self._flush(objects)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2291,
in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line
66, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2255,
in _flush
    flush_context.execute()
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line
389, in execute
    rec.execute(self)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line
548, in execute
    uow
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line
181, in save_obj
    mapper, table, insert)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line
799, in _emit_insert_statements
    execute(statement, multiparams)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 945,
in execute
    return meth(self, multiparams, params)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line
263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053,
in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189,
in _execute_context
    context)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1402,
in _handle_dbapi_exception
    exc_info
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 203,
in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1159,
in _execute_context
    context)
  File "/home/ubuntu/.local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line
467, in do_executemany
    cursor.executemany(statement, parameters)
IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "task_instance_pkey"
DETAIL:  Key (task_id, dag_id, execution_date)=(Hello_World_task-21, Hello_World_Tasks, 2017-07-04
06:59:40) already exists.
 [SQL: 'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date,
duration, state, try_number, hostname, unixname, job_id, pool, queue, priority_weight, operator,
queued_dttm, pid) VALUES (%(task_id)s, %(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s,
%(duration)s, %(state)s, %(try_number)s, %(hostname)s, %(unixname)s, %(job_id)s, %(pool)s,
%(queue)s, %(priority_weight)s, %(operator)s, %(queued_dttm)s, %(pid)s)'] [parameters: ({'task_id':
'Hello_World_task-21', 'unixname': 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None,
'queued_dttm': None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None, 'start_date': None,
'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id': 'Hello_World_Tasks'}, {'task_id':
'Hello_World_task-20', 'unixname': 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None,
'queued_dttm': None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None, 'start_date': None,
'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id': 'Hello_World_Tasks'}, {'task_id':
'Hello_World_task-23', 'unixname': 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None,
'queued_dttm': None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None, 'start_date': None,
'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id': 'Hello_World_Tasks'}, {'task_id':
'Hello_World_task-22', 'unixname': 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None,
'queued_dttm': None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None, 'start_date': None,
'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id': 'Hello_World_Tasks'}, {'task_id':
'Hello_World_task-25', 'unixname': 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None,
'queued_dttm': None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None, 'start_date': None,
'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id': 'Hello_World_Tasks'}, {'task_id':
'Hello_World_task-24', 'unixname': 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None,
'queued_dttm': None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None, 'start_date': None,
'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id': 'Hello_World_Tasks'}, {'task_id':
'Hello_World_task-27', 'unixname': 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None,
'queued_dttm': None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None, 'start_date': None,
'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id': 'Hello_World_Tasks'}, {'task_id':
'Hello_World_task-26', 'unixname': 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None,
'queued_dttm': None, 'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None,
'try_number': 0, 'queue': 'default', 'duration': None, 'state': None, 'start_date': None,
'operator': None, 'priority_weight': 2, 'hostname': u'', 'dag_id': 'Hello_World_Tasks'}  ...
displaying 10 of 2002 total bound parameter sets ...  {'task_id': 'Hello_World_task-1783',
'unixname': 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 'queued_dttm': None,
'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None, 'try_number': 0,
'queue': 'default', 'duration': None, 'state': None, 'start_date': None, 'operator': None,
'priority_weight': 2, 'hostname': u'', 'dag_id': 'Hello_World_Tasks'}, {'task_id': 'Hello_World_task-1782',
'unixname': 'ubuntu', 'job_id': None, 'end_date': None, 'pool': None, 'queued_dttm': None,
'execution_date': datetime.datetime(2017, 7, 4, 6, 59, 40), 'pid': None, 'try_number': 0,
'queue': 'default', 'duration': None, 'state': None, 'start_date': None, 'operator': None,
'priority_weight': 2, 'hostname': u'', 'dag_id': 'Hello_World_Tasks'})]
 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message