airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] ashb commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
Date Fri, 28 Sep 2018 21:56:10 GMT
ashb commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop
from scheduler loop
URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r221389771
 
 

 ##########
 File path: airflow/jobs.py
 ##########
 @@ -1551,163 +1473,111 @@ def _execute(self):
                 (executors.LocalExecutor, executors.SequentialExecutor):
             pickle_dags = True
 
-        # Use multiple processes to parse and generate tasks for the
-        # DAGs in parallel. By processing them in separate processes,
-        # we can get parallelism and isolation from potentially harmful
-        # user code.
-        self.log.info(
-            "Processing files using up to %s processes at a time",
-            self.max_threads)
         self.log.info("Running execute loop for %s seconds", self.run_duration)
         self.log.info("Processing each file at most %s times", self.num_runs)
-        self.log.info(
-            "Process each file at most once every %s seconds",
-            self.file_process_interval)
-        self.log.info(
-            "Checking for new files in %s every %s seconds",
-            self.subdir,
-            self.dag_dir_list_interval)
 
         # Build up a list of Python files that could contain DAGs
         self.log.info("Searching for files in %s", self.subdir)
         known_file_paths = list_py_file_paths(self.subdir)
         self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
 
-        def processor_factory(file_path):
+        def processor_factory(file_path, zombies):
             return DagFileProcessor(file_path,
                                     pickle_dags,
-                                    self.dag_ids)
+                                    self.dag_ids,
+                                    zombies)
+
+        # When using sqlite, we do not use async_mode
+        # so the scheduler job and DAG parser don't access the DB at the same time.
+        async_mode = not self.using_sqlite
 
-        processor_manager = DagFileProcessorManager(self.subdir,
-                                                    known_file_paths,
-                                                    self.max_threads,
-                                                    self.file_process_interval,
-                                                    self.num_runs,
-                                                    processor_factory)
+        self.processor_agent = DagFileProcessorAgent(self.subdir,
+                                                     known_file_paths,
+                                                     self.num_runs,
+                                                     processor_factory,
+                                                     async_mode)
 
         try:
-            self._execute_helper(processor_manager)
+            self._execute_helper()
         finally:
+            self.processor_agent.end()
             self.log.info("Exited execute loop")
 
-            # Kill all child processes on exit since we don't want to leave
-            # them as orphaned.
-            pids_to_kill = processor_manager.get_all_pids()
-            if len(pids_to_kill) > 0:
-                # First try SIGTERM
-                this_process = psutil.Process(os.getpid())
-                # Only check child processes to ensure that we don't have a case
-                # where we kill the wrong process because a child process died
-                # but the PID got reused.
-                child_processes = [x for x in this_process.children(recursive=True)
-                                   if x.is_running() and x.pid in pids_to_kill]
-                for child in child_processes:
-                    self.log.info("Terminating child PID: %s", child.pid)
-                    child.terminate()
-                # TODO: Remove magic number
-                timeout = 5
-                self.log.info(
-                    "Waiting up to %s seconds for processes to exit...", timeout)
-                try:
-                    psutil.wait_procs(
-                        child_processes, timeout=timeout,
-                        callback=lambda x: self.log.info('Terminated PID %s', x.pid))
-                except psutil.TimeoutExpired:
-                    self.log.debug("Ran out of time while waiting for processes to exit")
-
-                # Then SIGKILL
-                child_processes = [x for x in this_process.children(recursive=True)
-                                   if x.is_running() and x.pid in pids_to_kill]
-                if len(child_processes) > 0:
-                    self.log.info("SIGKILL processes that did not terminate gracefully")
-                    for child in child_processes:
-                        self.log.info("Killing child PID: %s", child.pid)
-                        child.kill()
-                        child.wait()
-
-    def _execute_helper(self, processor_manager):
-        """
-        :param processor_manager: manager to use
-        :type processor_manager: DagFileProcessorManager
+    def _execute_helper(self):
+        """
+        The actual scheduler loop. The main steps in the loop are:
+            #. Harvest DAG parsing results through DagFileProcessorAgent
+            #. Find and queue executable tasks
+                #. Change task instance state in DB
+                #. Queue tasks in executor
+            #. Heartbeat executor
+                #. Execute queued tasks in executor asynchronously
+                #. Sync on the states of running tasks
+
+        See https://bit.ly/2QfkcMV for a graphic representation of these steps.
 
 Review comment:
   This needs to live in repo (put it in the docs tree with the other images) - we can't rely
on bit.ly being around.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message