airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ruslan Dautkhanov (JIRA)" <j...@apache.org>
Subject [jira] [Issue Comment Deleted] (AIRFLOW-980) IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "dag_run_dag_id_key" on sample DAGs
Date Tue, 14 Mar 2017 21:00:43 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Ruslan Dautkhanov updated AIRFLOW-980:
--------------------------------------
    Comment: was deleted

(was: It seems database schema changed form 1.7 to 1.8 "column task_instance.pid does not
exist"
Is there is a documented way to upgrade database to the new schema?)

> IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint
"dag_run_dag_id_key" on sample DAGs
> ----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-980
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-980
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: Airflow 1.7.1.3
>         Environment: Local Executor
> postgresql+psycopg2 database backend
>            Reporter: Ruslan Dautkhanov
>
> Fresh Airflow install using pip.
> Only sample DAGs are installed.
> LocalExecutor (4 workers).
> Most of the parameters are at defaults.
> Turned On all of the sample DAGs (14 of them).
> After some execution (a lot of DAGs had at least one successful execution),
> started seeing below error stack again and again .. In scheduler log.
> {noformat}
> IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint
"dag_run_dag_id_key"
>  [SQL: 'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id,
external_trigger, conf) VALUES (%(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s,
%(state)s, %(run_id)s, %(external_trigger)s, %(conf)s) RETURNING dag_run.id'] [parameters:
{'end_date': None, 'run_id': u'scheduled__2015-01-01T00:00:00', 'execution_date': datetime.datetime(2015,
1, 1, 0, 0), 'external_trigger': False, 'state': u'running', 'conf': None, 'start_date': datetime.datetime(2017,
3, 14, 11, 12, 29, 646995), 'dag_id': 'example_xcom'}]
> Process Process-152:
> Traceback (most recent call last):
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/multiprocessing/process.py", line
258, in _bootstrap
>     self.run()
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/multiprocessing/process.py", line
114, in run
>     self._target(*self._args, **self._kwargs)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py",
line 664, in _do_dags
>     dag = dagbag.get_dag(dag.dag_id)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/models.py",
line 188, in get_dag
>     orm_dag = DagModel.get_current(root_dag_id)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/models.py",
line 2320, in get_current
>     obj = session.query(cls).filter(cls.dag_id == dag_id).first()
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2634, in first
>     ret = list(self[0:1])
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2457, in __getitem__
>     return list(res)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2736, in __iter__
>     return self._execute_and_instances(context)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2749, in _execute_and_instances
>     close_with_result=True)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2740, in _connection_from_session
>     **kw)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 893, in connection
>     execution_options=execution_options)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 898, in _connection_for_bind
>     engine, execution_options)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 313, in _connection_for_bind
>     self._assert_active()
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 214, in _assert_active
>     % self._rollback_exception
> InvalidRequestError: This Session's transaction has been rolled back due to a previous
exception during flush. To begin a new transaction with this Session, first issue Session.rollback().
Original exception was: (psycopg2.IntegrityError) duplicate key value violates unique constraint
"dag_run_dag_id_key"
>  [SQL: 'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id,
external_trigger, conf) VALUES (%(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s,
%(state)s, %(run_id)s, %(external_trigger)s, %(conf)s) RETURNING dag_run.id'] [parameters:
{'end_date': None, 'run_id': u'scheduled__2015-01-01T00:00:00', 'execution_date': datetime.datetime(2015,
1, 1, 0, 0), 'external_trigger': False, 'state': u'running', 'conf': None, 'start_date': datetime.datetime(2017,
3, 14, 11, 12, 29, 646995), 'dag_id': 'example_xcom'}]
> [2017-03-14 11:12:29,757] {jobs.py:741} INFO - Done queuing tasks, calling the executor's
heartbeat
> [2017-03-14 11:12:29,757] {jobs.py:744} INFO - Loop took: 29.335935 seconds
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message