airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jlo...@apache.org
Subject [1/3] incubator-airflow git commit: Handle queued tasks from multiple jobs/executors
Date Mon, 09 May 2016 23:21:13 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 385add2bf -> 40b3fffa0


Handle queued tasks from multiple jobs/executors

When Scheduler is run with `—num-runs`, there can be multiple
Schedulers and Executors all trying to run tasks. For queued tasks,
Scheduler was previously only trying to run tasks that it itself had
queued — but that doesn’t work if the Scheduler is restarting. This PR
reverts that behavior and adds two types of “best effort” executions —
before running a TI, executors check if it is already running, and
before ending executors call sync() one last time


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/43bdd7a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/43bdd7a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/43bdd7a4

Branch: refs/heads/master
Commit: 43bdd7a4c876f9ac2d4c357d9c30c0956a1b0d76
Parents: aff5d8c
Author: jlowin <jlowin@users.noreply.github.com>
Authored: Wed Apr 13 19:18:39 2016 -0400
Committer: jlowin <jlowin@users.noreply.github.com>
Committed: Mon May 9 17:18:58 2016 -0400

----------------------------------------------------------------------
 airflow/executors/base_executor.py   | 33 ++++++++++++++++------
 airflow/executors/celery_executor.py |  1 +
 airflow/executors/local_executor.py  |  2 +-
 airflow/jobs.py                      | 46 ++++++-------------------------
 tests/dags/test_issue_1225.py        | 16 ++++++++++-
 tests/jobs.py                        | 20 +++++++++++---
 6 files changed, 66 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bdd7a4/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py
index 0307583..2e88fa9 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -30,10 +30,11 @@ class BaseExecutor(LoggingMixin):
         """
         pass
 
-    def queue_command(self, key, command, priority=1, queue=None):
+    def queue_command(self, task_instance, command, priority=1, queue=None):
+        key = task_instance.key
         if key not in self.queued_tasks and key not in self.running:
             self.logger.info("Adding to queue: {}".format(command))
-            self.queued_tasks[key] = (command, priority, queue)
+            self.queued_tasks[key] = (command, priority, queue, task_instance)
 
     def queue_task_instance(
             self,
@@ -54,7 +55,7 @@ class BaseExecutor(LoggingMixin):
             pool=pool,
             pickle_id=pickle_id)
         self.queue_command(
-            task_instance.key,
+            task_instance,
             command,
             priority=task_instance.task.priority_weight_total,
             queue=task_instance.task.queue)
@@ -67,9 +68,6 @@ class BaseExecutor(LoggingMixin):
         pass
 
     def heartbeat(self):
-        # Calling child class sync method
-        self.logger.debug("Calling the {} sync method".format(self.__class__))
-        self.sync()
 
         # Triggering new jobs
         if not self.parallelism:
@@ -86,10 +84,27 @@ class BaseExecutor(LoggingMixin):
             key=lambda x: x[1][1],
             reverse=True)
         for i in range(min((open_slots, len(self.queued_tasks)))):
-            key, (command, priority, queue) = sorted_queue.pop(0)
-            self.running[key] = command
+            key, (command, _, queue, ti) = sorted_queue.pop(0)
+            # TODO(jlowin) without a way to know what Job ran which tasks,
+            # there is a danger that another Job started running a task
+            # that was also queued to this executor. This is the last chance
+            # to check if that hapened. The most probable way is that a
+            # Scheduler tried to run a task that was originally queued by a
+            # Backfill. This fix reduces the probability of a collision but
+            # does NOT eliminate it.
             self.queued_tasks.pop(key)
-            self.execute_async(key, command=command, queue=queue)
+            ti.refresh_from_db()
+            if ti.state != State.RUNNING:
+                self.running[key] = command
+                self.execute_async(key, command=command, queue=queue)
+            else:
+                self.logger.debug(
+                    'Task is already running, not sending to '
+                    'executor: {}'.format(key))
+
+        # Calling child class sync method
+        self.logger.debug("Calling the {} sync method".format(self.__class__))
+        self.sync()
 
     def change_state(self, key, state):
         self.running.pop(key)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bdd7a4/airflow/executors/celery_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 95f3daa..de56baf 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -95,3 +95,4 @@ class CeleryExecutor(BaseExecutor):
                     async.state not in celery_states.READY_STATES
                     for async in self.tasks.values()]):
                 time.sleep(5)
+        self.sync()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bdd7a4/airflow/executors/local_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py
index f13ee6d..24ef6c6 100644
--- a/airflow/executors/local_executor.py
+++ b/airflow/executors/local_executor.py
@@ -73,4 +73,4 @@ class LocalExecutor(BaseExecutor):
         [self.queue.put((None, None)) for w in self.workers]
         # Wait for commands to finish
         self.queue.join()
-
+        self.sync()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bdd7a4/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 0b00644..8ea6673 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -238,7 +238,6 @@ class SchedulerJob(BaseJob):
 
         self.refresh_dags_every = refresh_dags_every
         self.do_pickle = do_pickle
-        self.queued_tis = set()
         super(SchedulerJob, self).__init__(*args, **kwargs)
 
         self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
@@ -567,47 +566,22 @@ class SchedulerJob(BaseJob):
 
         session.close()
 
-    def process_events(self, executor, dagbag):
-        """
-        Respond to executor events.
-
-        Used to identify queued tasks and schedule them for further processing.
-        """
-        for key, executor_state in list(executor.get_event_buffer().items()):
-            dag_id, task_id, execution_date = key
-            if dag_id not in dagbag.dags:
-                self.logger.error(
-                    'Executor reported a dag_id that was not found in the '
-                    'DagBag: {}'.format(dag_id))
-                continue
-            elif not dagbag.dags[dag_id].has_task(task_id):
-                self.logger.error(
-                    'Executor reported a task_id that was not found in the '
-                    'dag: {} in dag {}'.format(task_id, dag_id))
-                continue
-            task = dagbag.dags[dag_id].get_task(task_id)
-            ti = models.TaskInstance(task, execution_date)
-            ti.refresh_from_db()
-
-            if executor_state == State.SUCCESS:
-                # collect queued tasks for prioritiztion
-                if ti.state == State.QUEUED:
-                    self.queued_tis.add(ti)
-            else:
-                # special instructions for failed executions could go here
-                pass
-
     @provide_session
     def prioritize_queued(self, session, executor, dagbag):
         # Prioritizing queued task instances
 
         pools = {p.pool: p for p in session.query(models.Pool).all()}
-
+        TI = models.TaskInstance
+        queued_tis = (
+            session.query(TI)
+            .filter(TI.state == State.QUEUED)
+            .all()
+        )
         self.logger.info(
-            "Prioritizing {} queued jobs".format(len(self.queued_tis)))
+            "Prioritizing {} queued jobs".format(len(queued_tis)))
         session.expunge_all()
         d = defaultdict(list)
-        for ti in self.queued_tis:
+        for ti in queued_tis:
             if ti.dag_id not in dagbag.dags:
                 self.logger.info(
                     "DAG no longer in dagbag, deleting {}".format(ti))
@@ -621,8 +595,6 @@ class SchedulerJob(BaseJob):
             else:
                 d[ti.pool].append(ti)
 
-        self.queued_tis.clear()
-
         dag_blacklist = set(dagbag.paused_dags())
         for pool, tis in list(d.items()):
             if not pool:
@@ -676,6 +648,7 @@ class SchedulerJob(BaseJob):
                     open_slots -= 1
                 else:
                     session.delete(ti)
+                    session.commit()
                     continue
                 ti.task = task
 
@@ -721,7 +694,6 @@ class SchedulerJob(BaseJob):
             try:
                 loop_start_dttm = datetime.now()
                 try:
-                    self.process_events(executor=executor, dagbag=dagbag)
                     self.prioritize_queued(executor=executor, dagbag=dagbag)
                 except Exception as e:
                     self.logger.exception(e)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bdd7a4/tests/dags/test_issue_1225.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py
index 898cc04..ecfa646 100644
--- a/tests/dags/test_issue_1225.py
+++ b/tests/dags/test_issue_1225.py
@@ -23,6 +23,8 @@ from datetime import datetime
 from airflow.models import DAG
 from airflow.operators import DummyOperator, PythonOperator, SubDagOperator
 from airflow.utils.trigger_rule import TriggerRule
+import time
+
 DEFAULT_DATE = datetime(2016, 1, 1)
 default_args = dict(
     start_date=DEFAULT_DATE,
@@ -31,6 +33,16 @@ default_args = dict(
 def fail():
     raise ValueError('Expected failure.')
 
+def delayed_fail():
+    """
+    Delayed failure to make sure that processes are running before the error
+    is raised.
+    
+    TODO handle more directly (without sleeping)
+    """
+    time.sleep(5)
+    raise ValueError('Expected failure.')
+
 # DAG tests backfill with pooled tasks
 # Previously backfill would queue the task but never run it
 dag1 = DAG(dag_id='test_backfill_pooled_task_dag', default_args=default_args)
@@ -123,7 +135,9 @@ dag8 = DAG(
     end_date=DEFAULT_DATE,
     default_args=default_args)
 dag8_task1 = PythonOperator(
-    python_callable=fail,
+    # use delayed_fail because otherwise LocalExecutor will have a chance to
+    # complete the task
+    python_callable=delayed_fail,
     task_id='test_queued_task',
     dag=dag8,
     pool='test_queued_pool')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/43bdd7a4/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index bc815e8..6802aae 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -23,11 +23,13 @@ import unittest
 
 from airflow import AirflowException, settings
 from airflow.bin import cli
+from airflow.executors import DEFAULT_EXECUTOR
 from airflow.jobs import BackfillJob, SchedulerJob
-from airflow.models import DagBag, DagRun, Pool, TaskInstance as TI
+from airflow.models import DAG, DagBag, DagRun, Pool, TaskInstance as TI
+from airflow.operators import DummyOperator
+from airflow.utils.db import provide_session
 from airflow.utils.state import State
 from airflow.utils.timeout import timeout
-from airflow.utils.db import provide_session
 
 from airflow import configuration
 configuration.test_mode()
@@ -283,15 +285,25 @@ class SchedulerJobTest(unittest.TestCase):
         dag = self.dagbag.get_dag(dag_id)
         dag.clear()
 
-        scheduler = SchedulerJob(dag_id, num_runs=10)
+        scheduler = SchedulerJob(dag_id, num_runs=1)
         scheduler.run()
 
         task_1 = dag.tasks[0]
         logging.info("Trying to find task {}".format(task_1))
         ti = TI(task_1, dag.start_date)
         ti.refresh_from_db()
-        self.assertEqual(ti.state, State.FAILED)
+        self.assertEqual(ti.state, State.QUEUED)
 
+        # now we use a DIFFERENT scheduler and executor
+        # to simulate the num-runs CLI arg
+        scheduler2 = SchedulerJob(
+            dag_id,
+            num_runs=5,
+            executor=DEFAULT_EXECUTOR.__class__())
+        scheduler2.run()
+
+        ti.refresh_from_db()
+        self.assertEqual(ti.state, State.FAILED)
         dag.clear()
 
     def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self):


Mime
View raw message