airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ruslan Dautkhanov (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-980) IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "dag_run_dag_id_key" on sample DAGs
Date Tue, 14 Mar 2017 20:54:41 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15924978#comment-15924978
] 

Ruslan Dautkhanov commented on AIRFLOW-980:
-------------------------------------------

I've upragded airflow to 1.80-rc5 as you suggested. webserver started fine, scheduler fails
and dies with following exception:

{noformat}
[2017-03-14 14:52:13,474] {jobs.py:1311} INFO - Exited execute loop
Traceback (most recent call last):
  File "/opt/cloudera/parcels/Anaconda/bin/airflow", line 28, in <module>
    args.func(args)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/bin/cli.py", line
839, in scheduler
    job.run()
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line
200, in run
    self._execute()
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line
1309, in _execute
    self._execute_helper(processor_manager)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line
1364, in _execute_helper
    self.reset_state_for_orphaned_tasks(dr, session=session)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line
53, in wrapper
    result = func(*args, **kwargs)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line
227, in reset_state_for_orphaned_tasks
    tis.extend(dag_run.get_task_instances(state=State.SCHEDULED, session=session))
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line
53, in wrapper
    result = func(*args, **kwargs)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/models.py", line
3960, in get_task_instances
    return tis.all()
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2588, in all
    return list(self)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2736, in __iter__
    return self._execute_and_instances(context)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2751, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 914, in execute
    return meth(self, multiparams, params)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/sql/elements.py",
line 323, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1010, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1146, in _execute_context
    context)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1341, in _handle_dbapi_exception
    exc_info
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/util/compat.py",
line 200, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/engine/base.py",
line 1139, in _execute_context
    context)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/engine/default.py",
line 450, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) column task_instance.pid does
not exist
LINE 1: ...nstance.queued_dttm AS task_instance_queued_dttm, task_insta...
                                                             ^
 [SQL: 'SELECT task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id,
task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS
task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration
AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.try_number
AS task_instance_try_number, task_instance.hostname AS task_instance_hostname, task_instance.unixname
AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool
AS task_instance_pool, task_instance.queue AS task_instance_queue, task_instance.priority_weight
AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm
AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid \nFROM task_instance
\nWHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.execution_date = %(execution_date_1)s
AND task_instance.state = %(state_1)s'] [parameters: {'state_1': u'scheduled', 'execution_date_1':
datetime.datetime(2015, 1, 1, 0, 0), 'dag_id_1': u'example_xcom'}]

{noformat}

I probably should wipe the database and do initdb again? Since I don't have any real workflows
yet, I am fine with this route.

> IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint
"dag_run_dag_id_key" on sample DAGs
> ----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-980
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-980
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: Airflow 1.7.1.3
>         Environment: Local Executor
> postgresql+psycopg2 database backend
>            Reporter: Ruslan Dautkhanov
>
> Fresh Airflow install using pip.
> Only sample DAGs are installed.
> LocalExecutor (4 workers).
> Most of the parameters are at defaults.
> Turned On all of the sample DAGs (14 of them).
> After some execution (a lot of DAGs had at least one successful execution),
> started seeing below error stack again and again .. In scheduler log.
> {noformat}
> IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint
"dag_run_dag_id_key"
>  [SQL: 'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id,
external_trigger, conf) VALUES (%(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s,
%(state)s, %(run_id)s, %(external_trigger)s, %(conf)s) RETURNING dag_run.id'] [parameters:
{'end_date': None, 'run_id': u'scheduled__2015-01-01T00:00:00', 'execution_date': datetime.datetime(2015,
1, 1, 0, 0), 'external_trigger': False, 'state': u'running', 'conf': None, 'start_date': datetime.datetime(2017,
3, 14, 11, 12, 29, 646995), 'dag_id': 'example_xcom'}]
> Process Process-152:
> Traceback (most recent call last):
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/multiprocessing/process.py", line
258, in _bootstrap
>     self.run()
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/multiprocessing/process.py", line
114, in run
>     self._target(*self._args, **self._kwargs)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py",
line 664, in _do_dags
>     dag = dagbag.get_dag(dag.dag_id)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/models.py",
line 188, in get_dag
>     orm_dag = DagModel.get_current(root_dag_id)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/models.py",
line 2320, in get_current
>     obj = session.query(cls).filter(cls.dag_id == dag_id).first()
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2634, in first
>     ret = list(self[0:1])
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2457, in __getitem__
>     return list(res)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2736, in __iter__
>     return self._execute_and_instances(context)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2749, in _execute_and_instances
>     close_with_result=True)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/query.py",
line 2740, in _connection_from_session
>     **kw)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 893, in connection
>     execution_options=execution_options)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 898, in _connection_for_bind
>     engine, execution_options)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 313, in _connection_for_bind
>     self._assert_active()
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 214, in _assert_active
>     % self._rollback_exception
> InvalidRequestError: This Session's transaction has been rolled back due to a previous
exception during flush. To begin a new transaction with this Session, first issue Session.rollback().
Original exception was: (psycopg2.IntegrityError) duplicate key value violates unique constraint
"dag_run_dag_id_key"
>  [SQL: 'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id,
external_trigger, conf) VALUES (%(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s,
%(state)s, %(run_id)s, %(external_trigger)s, %(conf)s) RETURNING dag_run.id'] [parameters:
{'end_date': None, 'run_id': u'scheduled__2015-01-01T00:00:00', 'execution_date': datetime.datetime(2015,
1, 1, 0, 0), 'external_trigger': False, 'state': u'running', 'conf': None, 'start_date': datetime.datetime(2017,
3, 14, 11, 12, 29, 646995), 'dag_id': 'example_xcom'}]
> [2017-03-14 11:12:29,757] {jobs.py:741} INFO - Done queuing tasks, calling the executor's
heartbeat
> [2017-03-14 11:12:29,757] {jobs.py:744} INFO - Loop took: 29.335935 seconds
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message