airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] ashb commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
Date Fri, 21 Sep 2018 18:46:53 GMT
ashb commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop
from scheduler loop
URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r219592770
 
 

 ##########
 File path: airflow/utils/dag_processing.py
 ##########
 @@ -308,6 +369,249 @@ def file_path(self):
         raise NotImplementedError()
 
 
+class DagParsingStat(object):
+    def __init__(self,
+                 file_paths,
+                 all_pids,
+                 done,
+                 all_files_processed,
+                 result_count):
+        self.file_paths = file_paths
+        self.all_pids = all_pids
+        self.done = done
+        self.all_files_processed = all_files_processed
+        self.result_count = result_count
+
+
+class DagParsingSignal(object):
+    AGENT_HEARTBEAT = "agent_heartbeat"
+    MANAGER_DONE = "manager_done"
+    TERMINATE_MANAGER = "terminate_manager"
+    END_MANAGER = "end_manager"
+
+
+class DagFileProcessorAgent(LoggingMixin):
+    """
+    Agent for DAG file processors. It is responsible for all DAG parsing
+    related jobs in scheduler process. Mainly it will collect DAG parsing
+    result from DAG file processor manager and communicate signal/DAG parsing
+    stat with DAG file processor manager.
+    """
+
+    def __init__(self,
+                 dag_directory,
+                 file_paths,
+                 max_runs,
+                 processor_factory,
+                 async_mode):
+        """
+        :param dag_directory: Directory where DAG definitions are kept. All
+        files in file_paths should be under this directory
+        :type dag_directory: unicode
+        :param file_paths: list of file paths that contain DAG definitions
+        :type file_paths: list[unicode]
+        :param max_runs: The number of times to parse and schedule each file. -1
+        for unlimited.
+        :type max_runs: int
+        :param processor_factory: function that creates processors for DAG
+        definition files. Arguments are (dag_definition_path, log_file_path)
+        :type processor_factory: (unicode, unicode, list) -> (AbstractDagFileProcessor)
+        :param async_mode: Whether to start agent in async mode
+        :type async_mode: bool
+        """
+        self._file_paths = file_paths
+        self._file_path_queue = []
+        self._dag_directory = dag_directory
+        self._max_runs = max_runs
+        self._processor_factory = processor_factory
+        self._async_mode = async_mode
+        # Map from file path to the processor
+        self._processors = {}
+        # Map from file path to the last runtime
+        self._last_runtime = {}
+        # Map from file path to the last finish time
+        self._last_finish_time = {}
+        # Map from file path to the number of runs
+        self._run_count = defaultdict(int)
+        # Pids of DAG parse
+        self._all_pids = []
+        # Pipe for communicating signals
+        self._parent_signal_conn, self._child_signal_conn = multiprocessing.Pipe()
+        # Pipe for communicating DagParsingStat
+        self._stat_queue = multiprocessing.Queue()
+        self._result_queue = multiprocessing.Queue()
+        self._process = None
+        self._done = False
+        # Initialized as true so we do not deactivate w/o any actual DAG parsing.
+        self._all_files_processed = True
+        self._result_count = 0
+
+    def start(self):
+        """
+        Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
+        """
+        self._process = self._launch_process(self._dag_directory,
+                                             self._file_paths,
+                                             self._max_runs,
+                                             self._processor_factory,
+                                             self._child_signal_conn,
+                                             self._stat_queue,
+                                             self._result_queue,
+                                             self._async_mode)
+        self.log.info("Launched DagFileProcessorManager with pid: {}"
+                      .format(self._process.pid))
+
+    def heartbeat(self):
+        """
+        Should only be used when launched DAG file processor manager in sync mode.
+        Send agent heartbeat signal to the manager.
+        """
+        self._parent_signal_conn.send(DagParsingSignal.AGENT_HEARTBEAT)
+
+    def wait_until_finished(self):
+        """
+        Should only be used when launched DAG file processor manager in sync mode.
+        Wait for done signal from the manager.
+        """
+        while True:
+            if self._parent_signal_conn.poll() \
+                    and self._parent_signal_conn.recv() == DagParsingSignal.MANAGER_DONE:
+                break
+            time.sleep(0.1)
+
+    @staticmethod
+    def _launch_process(dag_directory,
+                        file_paths,
+                        max_runs,
+                        processor_factory,
+                        signal_conn,
+                        _stat_queue,
+                        result_queue,
+                        async_mode):
+        def helper():
+            # Reload configurations and settings to avoid collision with parent process.
+            # Because this process may need custom configurations that cannot be shared,
+            # e.g. RotatingFileHandler. And it can cause connection corruption if we
+            # do not recreate the SQLA connection pool.
+            os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
+            reload(airflow.config_templates.airflow_local_settings)
+            reload(airflow.settings)
+            del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
+            processor_manager = DagFileProcessorManager(dag_directory,
+                                                        file_paths,
+                                                        max_runs,
+                                                        processor_factory,
+                                                        signal_conn,
+                                                        _stat_queue,
+                                                        result_queue,
+                                                        async_mode)
+
+            processor_manager.start()
+
+        p = multiprocessing.Process(target=helper,
+                                    args=(),
+                                    name="DagFileProcessorManager")
+        p.start()
+        return p
+
+    def harvest_simple_dags(self):
+        """
+        Harvest DAG parsing results from result queue and sync metadata from stat queue.
+        :return: List of parsing result in SimpleDag format.
+        """
+        # Metadata and results to be harvested can be inconsistent,
+        # but it should not be a big problem.
+        self._sync_metadata()
+        # Heartbeating after syncing metadata so we do not restart manager
+        # if it processed all files for max_run times and exit normally.
+        self._heartbeat_manager()
+        simple_dags = []
+        # multiprocessing.Queue().qsize will not work on MacOS.
+        if sys.platform == "darwin":
+            qsize = self._result_count
+        else:
+            qsize = self._result_queue.qsize()
+        for _ in xrange(qsize):
+            simple_dags.append(self._result_queue.get())
+
+        self._result_count = 0
+
+        return simple_dags
+
+    def _heartbeat_manager(self):
+        """
+        Heartbeat DAG file processor and start it if it is not alive.
+        :return:
+        """
+        if self._process and not self._process.is_alive() and not self.done:
+            self.start()
+
+    def _sync_metadata(self):
+        """
+        Sync metadata from stat queue and only keep the latest stat.
+        :return:
+        """
+        while not self._stat_queue.empty():
+            stat = self._stat_queue.get()
+            self._file_paths = stat.file_paths
+            self._all_pids = stat.all_pids
+            self._done = stat.done
+            self._all_files_processed = stat.all_files_processed
+            self._result_count += stat.result_count
+
+    @property
+    def file_paths(self):
+        return self._file_paths
+
+    @property
+    def done(self):
+        return self._done
+
+    @property
+    def all_files_processed(self):
+        return self._all_files_processed
+
+    def terminate(self):
+        """
+        Send termination signal to DAG parsing processor manager
+        and expect it to terminate all DAG file processors.
+        """
+        self.log.info("Sending termination signal to manager.")
+        self._child_signal_conn.send(DagParsingSignal.TERMINATE_MANAGER)
+
+    def end(self):
+        """
+        Terminate (and then kill) the manager process launched.
+        :return:
+        """
+        if not self._process or not self._process.is_alive():
+            self.log.warn('Ending without manager process.')
+            return
+        this_process = psutil.Process(os.getpid())
+        manager_process = psutil.Process(self._process.pid)
+        # First try SIGTERM
+        if manager_process.is_running() \
+                and manager_process.pid in [x.pid for x in this_process.children()]:
+            self.log.info(
+                "Terminating manager process: {}".format(manager_process.pid))
+            manager_process.terminate()
+            timeout = 5
 
 Review comment:
   Does this timeout need to be configureable?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message