airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
Date Fri, 21 Sep 2018 19:39:39 GMT
yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop
from scheduler loop
URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r219606806
 
 

 ##########
 File path: airflow/utils/dag_processing.py
 ##########
 @@ -308,6 +369,249 @@ def file_path(self):
         raise NotImplementedError()
 
 
+class DagParsingStat(object):
+    def __init__(self,
+                 file_paths,
+                 all_pids,
+                 done,
+                 all_files_processed,
+                 result_count):
+        self.file_paths = file_paths
+        self.all_pids = all_pids
+        self.done = done
+        self.all_files_processed = all_files_processed
+        self.result_count = result_count
+
+
+class DagParsingSignal(object):
+    AGENT_HEARTBEAT = "agent_heartbeat"
+    MANAGER_DONE = "manager_done"
+    TERMINATE_MANAGER = "terminate_manager"
+    END_MANAGER = "end_manager"
+
+
+class DagFileProcessorAgent(LoggingMixin):
+    """
+    Agent for DAG file processors. It is responsible for all DAG parsing
+    related jobs in scheduler process. Mainly it will collect DAG parsing
+    result from DAG file processor manager and communicate signal/DAG parsing
+    stat with DAG file processor manager.
+    """
+
+    def __init__(self,
+                 dag_directory,
+                 file_paths,
+                 max_runs,
+                 processor_factory,
+                 async_mode):
+        """
+        :param dag_directory: Directory where DAG definitions are kept. All
+        files in file_paths should be under this directory
+        :type dag_directory: unicode
+        :param file_paths: list of file paths that contain DAG definitions
+        :type file_paths: list[unicode]
+        :param max_runs: The number of times to parse and schedule each file. -1
+        for unlimited.
+        :type max_runs: int
+        :param processor_factory: function that creates processors for DAG
+        definition files. Arguments are (dag_definition_path, log_file_path)
+        :type processor_factory: (unicode, unicode, list) -> (AbstractDagFileProcessor)
+        :param async_mode: Whether to start agent in async mode
+        :type async_mode: bool
+        """
+        self._file_paths = file_paths
+        self._file_path_queue = []
+        self._dag_directory = dag_directory
+        self._max_runs = max_runs
+        self._processor_factory = processor_factory
+        self._async_mode = async_mode
+        # Map from file path to the processor
+        self._processors = {}
+        # Map from file path to the last runtime
+        self._last_runtime = {}
+        # Map from file path to the last finish time
+        self._last_finish_time = {}
+        # Map from file path to the number of runs
+        self._run_count = defaultdict(int)
+        # Pids of DAG parse
+        self._all_pids = []
+        # Pipe for communicating signals
+        self._parent_signal_conn, self._child_signal_conn = multiprocessing.Pipe()
+        # Pipe for communicating DagParsingStat
+        self._stat_queue = multiprocessing.Queue()
+        self._result_queue = multiprocessing.Queue()
+        self._process = None
+        self._done = False
+        # Initialized as true so we do not deactivate w/o any actual DAG parsing.
+        self._all_files_processed = True
+        self._result_count = 0
+
+    def start(self):
+        """
+        Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
+        """
+        self._process = self._launch_process(self._dag_directory,
+                                             self._file_paths,
+                                             self._max_runs,
+                                             self._processor_factory,
+                                             self._child_signal_conn,
+                                             self._stat_queue,
+                                             self._result_queue,
+                                             self._async_mode)
+        self.log.info("Launched DagFileProcessorManager with pid: {}"
+                      .format(self._process.pid))
+
+    def heartbeat(self):
+        """
+        Should only be used when launched DAG file processor manager in sync mode.
+        Send agent heartbeat signal to the manager.
+        """
+        self._parent_signal_conn.send(DagParsingSignal.AGENT_HEARTBEAT)
+
+    def wait_until_finished(self):
+        """
+        Should only be used when launched DAG file processor manager in sync mode.
+        Wait for done signal from the manager.
+        """
+        while True:
+            if self._parent_signal_conn.poll() \
+                    and self._parent_signal_conn.recv() == DagParsingSignal.MANAGER_DONE:
+                break
+            time.sleep(0.1)
+
+    @staticmethod
+    def _launch_process(dag_directory,
+                        file_paths,
+                        max_runs,
+                        processor_factory,
+                        signal_conn,
+                        _stat_queue,
+                        result_queue,
+                        async_mode):
+        def helper():
+            # Reload configurations and settings to avoid collision with parent process.
+            # Because this process may need custom configurations that cannot be shared,
+            # e.g. RotatingFileHandler. And it can cause connection corruption if we
+            # do not recreate the SQLA connection pool.
+            os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER'] = 'True'
+            reload(airflow.config_templates.airflow_local_settings)
+            reload(airflow.settings)
+            del os.environ['CONFIG_PROCESSOR_MANAGER_LOGGER']
+            processor_manager = DagFileProcessorManager(dag_directory,
+                                                        file_paths,
+                                                        max_runs,
+                                                        processor_factory,
+                                                        signal_conn,
+                                                        _stat_queue,
+                                                        result_queue,
+                                                        async_mode)
+
+            processor_manager.start()
+
+        p = multiprocessing.Process(target=helper,
+                                    args=(),
+                                    name="DagFileProcessorManager")
+        p.start()
+        return p
+
+    def harvest_simple_dags(self):
+        """
+        Harvest DAG parsing results from result queue and sync metadata from stat queue.
+        :return: List of parsing result in SimpleDag format.
+        """
+        # Metadata and results to be harvested can be inconsistent,
+        # but it should not be a big problem.
+        self._sync_metadata()
+        # Heartbeating after syncing metadata so we do not restart manager
+        # if it processed all files for max_run times and exit normally.
+        self._heartbeat_manager()
+        simple_dags = []
+        # multiprocessing.Queue().qsize will not work on MacOS.
+        if sys.platform == "darwin":
+            qsize = self._result_count
+        else:
+            qsize = self._result_queue.qsize()
+        for _ in xrange(qsize):
 
 Review comment:
   👍 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message