airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] yrqls21 commented on a change in pull request #3830: [AIRFLOW-2156] Parallelize Celery Executor
Date Wed, 05 Sep 2018 00:58:21 GMT
yrqls21 commented on a change in pull request #3830: [AIRFLOW-2156] Parallelize Celery Executor
URL: https://github.com/apache/incubator-airflow/pull/3830#discussion_r215109405
 
 

 ##########
 File path: airflow/executors/celery_executor.py
 ##########
 @@ -85,30 +139,67 @@ def execute_async(self, key, command,
             args=[command], queue=queue)
         self.last_state[key] = celery_states.PENDING
 
+    def _num_tasks_per_process(self):
+        """
+        How many Celery tasks should be sent to each worker process.
+        :return: Number of tasks that should be used per process
+        :rtype: int
+        """
+        return max(1,
+                   int(math.ceil(1.0 * len(self.tasks) / self._sync_parallelism)))
+
     def sync(self):
-        self.log.debug("Inquiring about %s celery task(s)", len(self.tasks))
-        for key, task in list(self.tasks.items()):
-            try:
-                state = task.state
-                if self.last_state[key] != state:
-                    if state == celery_states.SUCCESS:
-                        self.success(key)
-                        del self.tasks[key]
-                        del self.last_state[key]
-                    elif state == celery_states.FAILURE:
-                        self.fail(key)
-                        del self.tasks[key]
-                        del self.last_state[key]
-                    elif state == celery_states.REVOKED:
-                        self.fail(key)
-                        del self.tasks[key]
-                        del self.last_state[key]
-                    else:
-                        self.log.info("Unexpected state: %s", state)
-                        self.last_state[key] = state
-            except Exception as e:
-                self.log.error("Error syncing the celery executor, ignoring it:")
-                self.log.exception(e)
+        num_processes = min(len(self.tasks), self._sync_parallelism)
+        if num_processes == 0:
+            self.log.debug("No task to query celery, skipping sync")
+            return
+
+        self.log.debug("Inquiring about %s celery task(s) using %s processes",
+                       len(self.tasks), num_processes)
+
+        # Recreate the process pool each sync in case processes in the pool die
+        self._sync_pool = Pool(processes=num_processes)
+
+        # Use chunking instead of a work queue to reduce context switching since tasks are
+        # roughly uniform in size
+        chunksize = self._num_tasks_per_process()
+
+        self.log.debug("Waiting for inquiries to complete...")
 
 Review comment:
   My bad here, we are still on 1.8. Will update.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message