From commits-return-20910-archive-asf-public=cust-asf.ponee.io@airflow.incubator.apache.org Wed Sep 5 02:58:23 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id B736B180668 for ; Wed, 5 Sep 2018 02:58:22 +0200 (CEST) Received: (qmail 58948 invoked by uid 500); 5 Sep 2018 00:58:21 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 58939 invoked by uid 99); 5 Sep 2018 00:58:21 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Sep 2018 00:58:21 +0000 From: GitBox To: commits@airflow.apache.org Subject: [GitHub] yrqls21 commented on a change in pull request #3830: [AIRFLOW-2156] Parallelize Celery Executor Message-ID: <153610910102.23480.11526793789150919453.gitbox@gitbox.apache.org> Date: Wed, 05 Sep 2018 00:58:21 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit yrqls21 commented on a change in pull request #3830: [AIRFLOW-2156] Parallelize Celery Executor URL: https://github.com/apache/incubator-airflow/pull/3830#discussion_r215109405 ########## File path: airflow/executors/celery_executor.py ########## @@ -85,30 +139,67 @@ def execute_async(self, key, command, args=[command], queue=queue) self.last_state[key] = celery_states.PENDING + def _num_tasks_per_process(self): + """ + How many Celery tasks should be sent to each worker process. + :return: Number of tasks that should be used per process + :rtype: int + """ + return max(1, + int(math.ceil(1.0 * len(self.tasks) / self._sync_parallelism))) + def sync(self): - self.log.debug("Inquiring about %s celery task(s)", len(self.tasks)) - for key, task in list(self.tasks.items()): - try: - state = task.state - if self.last_state[key] != state: - if state == celery_states.SUCCESS: - self.success(key) - del self.tasks[key] - del self.last_state[key] - elif state == celery_states.FAILURE: - self.fail(key) - del self.tasks[key] - del self.last_state[key] - elif state == celery_states.REVOKED: - self.fail(key) - del self.tasks[key] - del self.last_state[key] - else: - self.log.info("Unexpected state: %s", state) - self.last_state[key] = state - except Exception as e: - self.log.error("Error syncing the celery executor, ignoring it:") - self.log.exception(e) + num_processes = min(len(self.tasks), self._sync_parallelism) + if num_processes == 0: + self.log.debug("No task to query celery, skipping sync") + return + + self.log.debug("Inquiring about %s celery task(s) using %s processes", + len(self.tasks), num_processes) + + # Recreate the process pool each sync in case processes in the pool die + self._sync_pool = Pool(processes=num_processes) + + # Use chunking instead of a work queue to reduce context switching since tasks are + # roughly uniform in size + chunksize = self._num_tasks_per_process() + + self.log.debug("Waiting for inquiries to complete...") Review comment: My bad here, we are still on 1.8. Will update. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services