airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-931] Do not set QUEUED in TaskInstances
Date Thu, 09 Mar 2017 16:33:22 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-stable 50f9ed8c7 -> 07d40d7cd


[AIRFLOW-931] Do not set QUEUED in TaskInstances

The contract of TaskInstances stipulates that end
states for Tasks
can only be UP_FOR_RETRY, SUCCESS, FAILED,
UPSTREAM_FAILED or
SKIPPED. If concurrency was reached task instances
were set to
QUEUED by the task instance themselves. This would
prevent the
scheduler to pick them up again.

We set the state to NONE now, to ensure integrity.

Closes #2127 from bolkedebruin/AIRFLOW-931

(cherry picked from commit e42398100a3248eddb6b511ade73f6a239e58090)
Signed-off-by: Bolke de Bruin <bolke@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/07d40d7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/07d40d7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/07d40d7c

Branch: refs/heads/v1-8-stable
Commit: 07d40d7cdae0a1601c4cc396bb737e44f19cb398
Parents: 50f9ed8
Author: Bolke de Bruin <bolke@xs4all.nl>
Authored: Thu Mar 9 08:32:46 2017 -0800
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Thu Mar 9 08:33:17 2017 -0800

----------------------------------------------------------------------
 airflow/models.py | 27 ++++++++++++++-------------
 tests/models.py   | 13 +++++++++++++
 2 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07d40d7c/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index ba8d051..62457f0 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1291,19 +1291,20 @@ class TaskInstance(Base):
             verbose=True)
 
         if not runnable and not mark_success:
-            if self.state != State.QUEUED:
-                # If a task's dependencies are met but it can't be run yet then queue it
-                # instead
-                self.state = State.QUEUED
-                msg = "Queuing attempt {attempt} of {total}".format(
-                    attempt=self.try_number % (task.retries + 1) + 1,
-                    total=task.retries + 1)
-                logging.info(hr + msg + hr)
-
-                self.queued_dttm = datetime.now()
-                msg = "Queuing into pool {}".format(self.pool)
-                logging.info(msg)
-                session.merge(self)
+            # FIXME: we might have hit concurrency limits, which means we probably
+            # have been running prematurely. This should be handled in the
+            # scheduling mechanism.
+            self.state = State.NONE
+            msg = ("FIXME: Rescheduling due to concurrency limits reached at task "
+                   "runtime. Attempt {attempt} of {total}. State set to NONE.").format(
+                attempt=self.try_number % (task.retries + 1) + 1,
+                total=task.retries + 1)
+            logging.warning(hr + msg + hr)
+
+            self.queued_dttm = datetime.now()
+            msg = "Queuing into pool {}".format(self.pool)
+            logging.info(msg)
+            session.merge(self)
             session.commit()
             return
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07d40d7c/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 868ea36..867e293 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -289,6 +289,19 @@ class TaskInstanceTest(unittest.TestCase):
         dag >> op5
         self.assertIs(op5.dag, dag)
 
+    @patch.object(DAG, 'concurrency_reached')
+    def test_requeue_over_concurrency(self, mock_concurrency_reached):
+        mock_concurrency_reached.return_value = True
+
+        dag = DAG(dag_id='test_requeue_over_concurrency', start_date=DEFAULT_DATE,
+                  max_active_runs=1, concurrency=2)
+        task = DummyOperator(task_id='test_requeue_over_concurrency_op', dag=dag)
+
+        ti = TI(task=task, execution_date=datetime.datetime.now())
+        ti.run()
+        self.assertEqual(ti.state, models.State.NONE)
+
+
     @patch.object(TI, 'pool_full')
     def test_run_pooling_task(self, mock_pool_full):
         """


Mime
View raw message