airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [airflow] dimberman commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution.
Date Thu, 21 Nov 2019 19:36:57 GMT
dimberman commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate
to speed up task execution.
URL: https://github.com/apache/airflow/pull/6627#discussion_r349279807
 
 

 ##########
 File path: airflow/task/task_runner/standard_task_runner.py
 ##########
 @@ -17,28 +17,89 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import os
+
 import psutil
+from setproctitle import setproctitle
 
 from airflow.task.task_runner.base_task_runner import BaseTaskRunner
 from airflow.utils.helpers import reap_process_group
 
+CAN_FORK = hasattr(os, 'fork')
+
 
 class StandardTaskRunner(BaseTaskRunner):
     """
     Runs the raw Airflow task by invoking through the Bash shell.
     """
     def __init__(self, local_task_job):
         super().__init__(local_task_job)
+        self._rc = None
 
     def start(self):
-        self.process = self.run_command()
+        if CAN_FORK and not self.run_as_user:
+            self.process = self._start_by_fork()
+        else:
+            self.process = self._start_by_exec()
+
+    def _start_by_exec(self):
+        subprocess = self.run_command()
+        return psutil.Process(subprocess.pid)
+
+    def _start_by_fork(self):
+        pid = os.fork()
+        if pid:
+            self.log.info("Started process %d to run task", pid)
+            return psutil.Process(pid)
+        else:
+            from airflow.bin.cli import CLIFactory
+            import signal
+            import airflow.settings as settings
+
+            signal.signal(signal.SIGINT, signal.SIG_DFL)
+            signal.signal(signal.SIGTERM, signal.SIG_DFL)
+            # Start a new process group
+            os.setpgid(0, 0)
+
+            # Force a new SQLAlchemy session. We can't share open DB handles between process.
+            settings.engine.pool.dispose()
+            settings.engine.pool.recreate()
+
+            parser = CLIFactory.get_parser()
+            args = parser.parse_args(self._command[1:])
+            setproctitle(
+                "airflow task runner: {0.dag_id} {0.task_id} {0.execution_date} {0.job_id}".format(args)
+            )
+            try:
+                args.func(args)
+                os._exit(0)
+            except Exception:
+                os._exit(1)
+
+    def return_code(self, timeout=0):
+        # We call this multiple times, but we can only wait on the process once
+        if self._rc is not None or not self.process:
+            return self._rc
+
+        try:
+            self._rc = self.process.wait(timeout=timeout)
+            self.process = None
+        except psutil.TimeoutExpired:
+            pass
 
-    def return_code(self):
-        return self.process.poll()
+        return self._rc
 
     def terminate(self):
-        if self.process and psutil.pid_exists(self.process.pid):
-            reap_process_group(self.process.pid, self.log)
+        if self.process:
+            if self.process.is_running():
 
 Review comment:
   can you break this into a function?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message