airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] XD-DENG commented on a change in pull request #4234: [AIRFLOW-2761] Parallelize enqueue in celery executor
Date Tue, 27 Nov 2018 14:45:29 GMT
XD-DENG commented on a change in pull request #4234: [AIRFLOW-2761] Parallelize enqueue in
celery executor
URL: https://github.com/apache/incubator-airflow/pull/4234#discussion_r236628518
 
 

 ##########
 File path: airflow/jobs.py
 ##########
 @@ -1374,24 +1352,65 @@ def _execute_task_instances(self,
         :type simple_dag_bag: SimpleDagBag
         :param states: Execute TaskInstances in these states
         :type states: Tuple[State]
-        :return: None
+        :return: Number of task instance with state changed.
         """
         executable_tis = self._find_executable_task_instances(simple_dag_bag, states,
                                                               session=session)
 
         def query(result, items):
-            tis_with_state_changed = self._change_state_for_executable_task_instances(
-                items,
-                states,
-                session=session)
+            simple_tis_with_state_changed = \
+                self._change_state_for_executable_task_instances(items,
+                                                                 states,
+                                                                 session=session)
             self._enqueue_task_instances_with_queued_state(
                 simple_dag_bag,
-                tis_with_state_changed)
+                simple_tis_with_state_changed)
             session.commit()
-            return result + len(tis_with_state_changed)
+            return result + len(simple_tis_with_state_changed)
 
         return helpers.reduce_in_chunks(query, executable_tis, 0, self.max_tis_per_query)
 
+    @provide_session
+    def _change_state_for_tasks_failed_to_execute(self, session):
+        """
+        If there are tasks left over in the executor,
+        we set them back to SCHEDULED to avoid creating hanging tasks.
+        :param session:
+        :return:
+        """
 
 Review comment:
   Minor: shall these two lines be removed if we don’t give details for them?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message