airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Riccomini (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (AIRFLOW-168) schedule_interval @once scheduling dag atleast twice
Date Thu, 26 May 2016 03:25:12 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15301461#comment-15301461
] 

Chris Riccomini edited comment on AIRFLOW-168 at 5/26/16 3:24 AM:
------------------------------------------------------------------

I noticed that the scheduler log shows (stacktrace at bottom):

{noformat}
[2016-05-25 20:22:37,925] {jobs.py:580} INFO - Prioritizing 0 queued jobs
[2016-05-25 20:22:37,933] {jobs.py:732} INFO - Starting 0 scheduler jobs
[2016-05-25 20:22:37,933] {jobs.py:747} INFO - Done queuing tasks, calling the executor's
heartbeat
[2016-05-25 20:22:37,933] {jobs.py:750} INFO - Loop took: 0.011795 seconds
[2016-05-25 20:22:37,936] {models.py:308} INFO - Finding 'running' jobs without a recent heartbeat
[2016-05-25 20:22:37,937] {models.py:314} INFO - Failing jobs without heartbeat after 2016-05-25
20:20:22.937222
[2016-05-25 20:22:42,925] {jobs.py:580} INFO - Prioritizing 0 queued jobs
[2016-05-25 20:22:42,934] {jobs.py:732} INFO - Starting 1 scheduler jobs
[2016-05-25 20:22:42,977] {models.py:2703} INFO - Checking state for <DagRun example_xcom
@ 2016-05-25 20:22:42.953808: scheduled__2016-05-25T20:22:42.953808, externally triggered:
False>
[2016-05-25 20:22:42,983] {jobs.py:504} INFO - Getting list of tasks to skip for active runs.
[2016-05-25 20:22:42,986] {jobs.py:520} INFO - Checking dependencies on 3 tasks instances,
minus 0 skippable ones
[2016-05-25 20:22:42,991] {base_executor.py:36} INFO - Adding to queue: airflow run example_xcom
push 2016-05-25T20:22:42.953808 --local -sd DAGS_FOLDER/example_dags/example_xcom.py 
[2016-05-25 20:22:42,993] {base_executor.py:36} INFO - Adding to queue: airflow run example_xcom
push_by_returning 2016-05-25T20:22:42.953808 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:22:43,011] {jobs.py:747} INFO - Done queuing tasks, calling the executor's
heartbeat
[2016-05-25 20:22:43,012] {jobs.py:750} INFO - Loop took: 0.089461 seconds
[2016-05-25 20:22:43,018] {models.py:308} INFO - Finding 'running' jobs without a recent heartbeat
[2016-05-25 20:22:43,019] {models.py:314} INFO - Failing jobs without heartbeat after 2016-05-25
20:20:28.019143
[2016-05-25 20:22:43,028] {sequential_executor.py:26} INFO - Executing command: airflow run
example_xcom push 2016-05-25T20:22:42.953808 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:22:43,453] {__init__.py:36} INFO - Using executor SequentialExecutor
Logging into: /Users/chrisr/airflow/logs/example_xcom/push/2016-05-25T20:22:42.953808
[2016-05-25 20:22:44,300] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-05-25 20:22:48,937] {sequential_executor.py:26} INFO - Executing command: airflow run
example_xcom push_by_returning 2016-05-25T20:22:42.953808 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:22:49,366] {__init__.py:36} INFO - Using executor SequentialExecutor
Logging into: /Users/chrisr/airflow/logs/example_xcom/push_by_returning/2016-05-25T20:22:42.953808
[2016-05-25 20:22:50,210] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-05-25 20:22:54,844] {jobs.py:580} INFO - Prioritizing 0 queued jobs
[2016-05-25 20:22:54,853] {jobs.py:732} INFO - Starting 1 scheduler jobs
[2016-05-25 20:22:54,903] {models.py:2703} INFO - Checking state for <DagRun example_xcom
@ 2015-01-01 00:00:00: scheduled__2015-01-01T00:00:00, externally triggered: False>
[2016-05-25 20:22:54,907] {models.py:2703} INFO - Checking state for <DagRun example_xcom
@ 2016-05-25 20:22:42.953808: scheduled__2016-05-25T20:22:42.953808, externally triggered:
False>
[2016-05-25 20:22:54,911] {jobs.py:504} INFO - Getting list of tasks to skip for active runs.
[2016-05-25 20:22:54,913] {jobs.py:520} INFO - Checking dependencies on 6 tasks instances,
minus 2 skippable ones
[2016-05-25 20:22:54,920] {base_executor.py:36} INFO - Adding to queue: airflow run example_xcom
push 2015-01-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_xcom.py 
[2016-05-25 20:22:54,921] {base_executor.py:36} INFO - Adding to queue: airflow run example_xcom
push_by_returning 2015-01-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:22:54,935] {base_executor.py:36} INFO - Adding to queue: airflow run example_xcom
puller 2016-05-25T20:22:42.953808 --local -sd DAGS_FOLDER/example_dags/example_xcom.py 
[2016-05-25 20:22:54,954] {jobs.py:747} INFO - Done queuing tasks, calling the executor's
heartbeat
[2016-05-25 20:22:54,954] {jobs.py:750} INFO - Loop took: 0.113319 seconds
[2016-05-25 20:22:54,960] {models.py:308} INFO - Finding 'running' jobs without a recent heartbeat
[2016-05-25 20:22:54,960] {models.py:314} INFO - Failing jobs without heartbeat after 2016-05-25
20:20:39.960629
[2016-05-25 20:22:54,978] {sequential_executor.py:26} INFO - Executing command: airflow run
example_xcom push_by_returning 2015-01-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:22:55,410] {__init__.py:36} INFO - Using executor SequentialExecutor
Logging into: /Users/chrisr/airflow/logs/example_xcom/push_by_returning/2015-01-01T00:00:00
[2016-05-25 20:22:56,239] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-05-25 20:23:00,873] {sequential_executor.py:26} INFO - Executing command: airflow run
example_xcom push 2015-01-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:23:01,477] {__init__.py:36} INFO - Using executor SequentialExecutor
Logging into: /Users/chrisr/airflow/logs/example_xcom/push/2015-01-01T00:00:00
[2016-05-25 20:23:02,639] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-05-25 20:23:07,150] {sequential_executor.py:26} INFO - Executing command: airflow run
example_xcom puller 2016-05-25T20:22:42.953808 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:23:07,620] {__init__.py:36} INFO - Using executor SequentialExecutor
Logging into: /Users/chrisr/airflow/logs/example_xcom/puller/2016-05-25T20:22:42.953808
[2016-05-25 20:23:08,498] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-05-25 20:23:13,128] {jobs.py:580} INFO - Prioritizing 0 queued jobs
[2016-05-25 20:23:13,129] {models.py:157} INFO - Filling up the DagBag from /Users/chrisr/airflow/dags
[2016-05-25 20:23:13,230] {jobs.py:732} INFO - Starting 1 scheduler jobs
[2016-05-25 20:23:13,257] {jobs.py:678} ERROR - (sqlite3.IntegrityError) UNIQUE constraint
failed: dag_run.dag_id, dag_run.run_id [SQL: u'INSERT INTO dag_run (dag_id, execution_date,
start_date, end_date, state, run_id, external_trigger, conf) VALUES (?, ?, ?, ?, ?, ?, ?,
?)'] [parameters: ('example_xcom', '2015-01-01 00:00:00.000000', '2016-05-25 20:23:13.254863',
None, u'running', u'scheduled__2015-01-01T00:00:00', 0, None)]
Traceback (most recent call last):
  File "/Users/chrisr/Code/airflow/airflow/jobs.py", line 674, in _do_dags
    self.schedule_dag(dag)
  File "/Users/chrisr/Code/airflow/airflow/jobs.py", line 464, in schedule_dag
    external_trigger=False
  File "/Users/chrisr/Code/airflow/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/Users/chrisr/Code/airflow/airflow/models.py", line 3122, in create_dagrun
    session.commit()
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/session.py", line 801, in commit
    self.transaction.commit()
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/session.py", line 392, in commit
    self._prepare_impl()
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/session.py", line 372, in _prepare_impl
    self.session.flush()
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/session.py", line 2019, in flush
    self._flush(objects)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/session.py", line 2137, in _flush
    transaction.rollback(_capture_exception=True)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/util/langhelpers.py", line 60, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/session.py", line 2101, in _flush
    flush_context.execute()
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/unitofwork.py", line 373, in execute
    rec.execute(self)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/unitofwork.py", line 532, in execute
    uow
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/persistence.py", line 174, in save_obj
    mapper, table, insert)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/persistence.py", line 800, in _emit_insert_statements
    execute(statement, params)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/engine/base.py", line 914, in execute
    return meth(self, multiparams, params)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/sql/elements.py", line 323, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/engine/base.py", line 1010, in _execute_clauseelement
    compiled_sql, distilled_params
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/engine/base.py", line 1146, in _execute_context
    context)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/engine/base.py", line 1341, in _handle_dbapi_exception
    exc_info
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/engine/base.py", line 1139, in _execute_context
    context)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/engine/default.py", line 450, in do_execute
    cursor.execute(statement, parameters)
IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: dag_run.dag_id, dag_run.run_id
[SQL: u'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id,
external_trigger, conf) VALUES (?, ?, ?, ?, ?, ?, ?, ?)'] [parameters: ('example_xcom', '2015-01-01
00:00:00.000000', '2016-05-25 20:23:13.254863', None, u'running', u'scheduled__2015-01-01T00:00:00',
0, None)]
[2016-05-25 20:23:13,268] {jobs.py:747} INFO - Done queuing tasks, calling the executor's
heartbeat
[2016-05-25 20:23:13,269] {jobs.py:750} INFO - Loop took: 0.14371 seconds
[2016-05-25 20:23:13,273] {models.py:308} INFO - Finding 'running' jobs without a recent heartbeat
[2016-05-25 20:23:13,273] {models.py:314} INFO - Failing jobs without heartbeat after 2016-05-25
20:20:58.273414
{noformat}

This is when a clean Airflow install from master, with JUST example_xcom enabled.


was (Author: criccomini):
I noticed that the scheduler log shows:

{noformat}
[2016-05-25 20:22:37,925] {jobs.py:580} INFO - Prioritizing 0 queued jobs
[2016-05-25 20:22:37,933] {jobs.py:732} INFO - Starting 0 scheduler jobs
[2016-05-25 20:22:37,933] {jobs.py:747} INFO - Done queuing tasks, calling the executor's
heartbeat
[2016-05-25 20:22:37,933] {jobs.py:750} INFO - Loop took: 0.011795 seconds
[2016-05-25 20:22:37,936] {models.py:308} INFO - Finding 'running' jobs without a recent heartbeat
[2016-05-25 20:22:37,937] {models.py:314} INFO - Failing jobs without heartbeat after 2016-05-25
20:20:22.937222
[2016-05-25 20:22:42,925] {jobs.py:580} INFO - Prioritizing 0 queued jobs
[2016-05-25 20:22:42,934] {jobs.py:732} INFO - Starting 1 scheduler jobs
[2016-05-25 20:22:42,977] {models.py:2703} INFO - Checking state for <DagRun example_xcom
@ 2016-05-25 20:22:42.953808: scheduled__2016-05-25T20:22:42.953808, externally triggered:
False>
[2016-05-25 20:22:42,983] {jobs.py:504} INFO - Getting list of tasks to skip for active runs.
[2016-05-25 20:22:42,986] {jobs.py:520} INFO - Checking dependencies on 3 tasks instances,
minus 0 skippable ones
[2016-05-25 20:22:42,991] {base_executor.py:36} INFO - Adding to queue: airflow run example_xcom
push 2016-05-25T20:22:42.953808 --local -sd DAGS_FOLDER/example_dags/example_xcom.py 
[2016-05-25 20:22:42,993] {base_executor.py:36} INFO - Adding to queue: airflow run example_xcom
push_by_returning 2016-05-25T20:22:42.953808 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:22:43,011] {jobs.py:747} INFO - Done queuing tasks, calling the executor's
heartbeat
[2016-05-25 20:22:43,012] {jobs.py:750} INFO - Loop took: 0.089461 seconds
[2016-05-25 20:22:43,018] {models.py:308} INFO - Finding 'running' jobs without a recent heartbeat
[2016-05-25 20:22:43,019] {models.py:314} INFO - Failing jobs without heartbeat after 2016-05-25
20:20:28.019143
[2016-05-25 20:22:43,028] {sequential_executor.py:26} INFO - Executing command: airflow run
example_xcom push 2016-05-25T20:22:42.953808 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:22:43,453] {__init__.py:36} INFO - Using executor SequentialExecutor
Logging into: /Users/chrisr/airflow/logs/example_xcom/push/2016-05-25T20:22:42.953808
[2016-05-25 20:22:44,300] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-05-25 20:22:48,937] {sequential_executor.py:26} INFO - Executing command: airflow run
example_xcom push_by_returning 2016-05-25T20:22:42.953808 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:22:49,366] {__init__.py:36} INFO - Using executor SequentialExecutor
Logging into: /Users/chrisr/airflow/logs/example_xcom/push_by_returning/2016-05-25T20:22:42.953808
[2016-05-25 20:22:50,210] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-05-25 20:22:54,844] {jobs.py:580} INFO - Prioritizing 0 queued jobs
[2016-05-25 20:22:54,853] {jobs.py:732} INFO - Starting 1 scheduler jobs
[2016-05-25 20:22:54,903] {models.py:2703} INFO - Checking state for <DagRun example_xcom
@ 2015-01-01 00:00:00: scheduled__2015-01-01T00:00:00, externally triggered: False>
[2016-05-25 20:22:54,907] {models.py:2703} INFO - Checking state for <DagRun example_xcom
@ 2016-05-25 20:22:42.953808: scheduled__2016-05-25T20:22:42.953808, externally triggered:
False>
[2016-05-25 20:22:54,911] {jobs.py:504} INFO - Getting list of tasks to skip for active runs.
[2016-05-25 20:22:54,913] {jobs.py:520} INFO - Checking dependencies on 6 tasks instances,
minus 2 skippable ones
[2016-05-25 20:22:54,920] {base_executor.py:36} INFO - Adding to queue: airflow run example_xcom
push 2015-01-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_xcom.py 
[2016-05-25 20:22:54,921] {base_executor.py:36} INFO - Adding to queue: airflow run example_xcom
push_by_returning 2015-01-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:22:54,935] {base_executor.py:36} INFO - Adding to queue: airflow run example_xcom
puller 2016-05-25T20:22:42.953808 --local -sd DAGS_FOLDER/example_dags/example_xcom.py 
[2016-05-25 20:22:54,954] {jobs.py:747} INFO - Done queuing tasks, calling the executor's
heartbeat
[2016-05-25 20:22:54,954] {jobs.py:750} INFO - Loop took: 0.113319 seconds
[2016-05-25 20:22:54,960] {models.py:308} INFO - Finding 'running' jobs without a recent heartbeat
[2016-05-25 20:22:54,960] {models.py:314} INFO - Failing jobs without heartbeat after 2016-05-25
20:20:39.960629
[2016-05-25 20:22:54,978] {sequential_executor.py:26} INFO - Executing command: airflow run
example_xcom push_by_returning 2015-01-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:22:55,410] {__init__.py:36} INFO - Using executor SequentialExecutor
Logging into: /Users/chrisr/airflow/logs/example_xcom/push_by_returning/2015-01-01T00:00:00
[2016-05-25 20:22:56,239] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-05-25 20:23:00,873] {sequential_executor.py:26} INFO - Executing command: airflow run
example_xcom push 2015-01-01T00:00:00 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:23:01,477] {__init__.py:36} INFO - Using executor SequentialExecutor
Logging into: /Users/chrisr/airflow/logs/example_xcom/push/2015-01-01T00:00:00
[2016-05-25 20:23:02,639] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-05-25 20:23:07,150] {sequential_executor.py:26} INFO - Executing command: airflow run
example_xcom puller 2016-05-25T20:22:42.953808 --local -sd DAGS_FOLDER/example_dags/example_xcom.py

[2016-05-25 20:23:07,620] {__init__.py:36} INFO - Using executor SequentialExecutor
Logging into: /Users/chrisr/airflow/logs/example_xcom/puller/2016-05-25T20:22:42.953808
[2016-05-25 20:23:08,498] {__init__.py:36} INFO - Using executor SequentialExecutor
[2016-05-25 20:23:13,128] {jobs.py:580} INFO - Prioritizing 0 queued jobs
[2016-05-25 20:23:13,129] {models.py:157} INFO - Filling up the DagBag from /Users/chrisr/airflow/dags
[2016-05-25 20:23:13,230] {jobs.py:732} INFO - Starting 1 scheduler jobs
[2016-05-25 20:23:13,257] {jobs.py:678} ERROR - (sqlite3.IntegrityError) UNIQUE constraint
failed: dag_run.dag_id, dag_run.run_id [SQL: u'INSERT INTO dag_run (dag_id, execution_date,
start_date, end_date, state, run_id, external_trigger, conf) VALUES (?, ?, ?, ?, ?, ?, ?,
?)'] [parameters: ('example_xcom', '2015-01-01 00:00:00.000000', '2016-05-25 20:23:13.254863',
None, u'running', u'scheduled__2015-01-01T00:00:00', 0, None)]
Traceback (most recent call last):
  File "/Users/chrisr/Code/airflow/airflow/jobs.py", line 674, in _do_dags
    self.schedule_dag(dag)
  File "/Users/chrisr/Code/airflow/airflow/jobs.py", line 464, in schedule_dag
    external_trigger=False
  File "/Users/chrisr/Code/airflow/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/Users/chrisr/Code/airflow/airflow/models.py", line 3122, in create_dagrun
    session.commit()
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/session.py", line 801, in commit
    self.transaction.commit()
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/session.py", line 392, in commit
    self._prepare_impl()
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/session.py", line 372, in _prepare_impl
    self.session.flush()
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/session.py", line 2019, in flush
    self._flush(objects)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/session.py", line 2137, in _flush
    transaction.rollback(_capture_exception=True)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/util/langhelpers.py", line 60, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/session.py", line 2101, in _flush
    flush_context.execute()
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/unitofwork.py", line 373, in execute
    rec.execute(self)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/unitofwork.py", line 532, in execute
    uow
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/persistence.py", line 174, in save_obj
    mapper, table, insert)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/orm/persistence.py", line 800, in _emit_insert_statements
    execute(statement, params)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/engine/base.py", line 914, in execute
    return meth(self, multiparams, params)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/sql/elements.py", line 323, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/engine/base.py", line 1010, in _execute_clauseelement
    compiled_sql, distilled_params
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/engine/base.py", line 1146, in _execute_context
    context)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/engine/base.py", line 1341, in _handle_dbapi_exception
    exc_info
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/engine/base.py", line 1139, in _execute_context
    context)
  File "build/bdist.macosx-10.10-intel/egg/sqlalchemy/engine/default.py", line 450, in do_execute
    cursor.execute(statement, parameters)
IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: dag_run.dag_id, dag_run.run_id
[SQL: u'INSERT INTO dag_run (dag_id, execution_date, start_date, end_date, state, run_id,
external_trigger, conf) VALUES (?, ?, ?, ?, ?, ?, ?, ?)'] [parameters: ('example_xcom', '2015-01-01
00:00:00.000000', '2016-05-25 20:23:13.254863', None, u'running', u'scheduled__2015-01-01T00:00:00',
0, None)]
[2016-05-25 20:23:13,268] {jobs.py:747} INFO - Done queuing tasks, calling the executor's
heartbeat
[2016-05-25 20:23:13,269] {jobs.py:750} INFO - Loop took: 0.14371 seconds
[2016-05-25 20:23:13,273] {models.py:308} INFO - Finding 'running' jobs without a recent heartbeat
[2016-05-25 20:23:13,273] {models.py:314} INFO - Failing jobs without heartbeat after 2016-05-25
20:20:58.273414
{noformat}

This is when a clean Airflow install from master, with JUST example_xcom enabled.

> schedule_interval @once scheduling dag atleast twice
> ----------------------------------------------------
>
>                 Key: AIRFLOW-168
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-168
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: Airflow 1.7.1.2
>            Reporter: Sumit Maheshwari
>         Attachments: Screen Shot 2016-05-24 at 9.51.50 PM.png, screenshot-1.png
>
>
> I was looking at example_xcom example and found that it got scheduled twice. Ones at
the start_time and ones at the current time. To be correct I tried multiple times (by reloading
db) and its same. 
> I am on airflow master, using sequential executor with sqlite3. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message