airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
Date Fri, 21 Sep 2018 19:25:54 GMT
yrqls21 commented on a change in pull request #3873: [Airflow-2760] Decouple DAG parsing loop
from scheduler loop
URL: https://github.com/apache/incubator-airflow/pull/3873#discussion_r219603609
 
 

 ##########
 File path: airflow/utils/dag_processing.py
 ##########
 @@ -308,6 +369,249 @@ def file_path(self):
         raise NotImplementedError()
 
 
+class DagParsingStat(object):
+    def __init__(self,
+                 file_paths,
+                 all_pids,
+                 done,
+                 all_files_processed,
+                 result_count):
+        self.file_paths = file_paths
+        self.all_pids = all_pids
+        self.done = done
+        self.all_files_processed = all_files_processed
+        self.result_count = result_count
+
+
+class DagParsingSignal(object):
+    AGENT_HEARTBEAT = "agent_heartbeat"
+    MANAGER_DONE = "manager_done"
+    TERMINATE_MANAGER = "terminate_manager"
+    END_MANAGER = "end_manager"
+
+
+class DagFileProcessorAgent(LoggingMixin):
+    """
+    Agent for DAG file processors. It is responsible for all DAG parsing
+    related jobs in scheduler process. Mainly it will collect DAG parsing
+    result from DAG file processor manager and communicate signal/DAG parsing
+    stat with DAG file processor manager.
+    """
+
+    def __init__(self,
+                 dag_directory,
+                 file_paths,
+                 max_runs,
+                 processor_factory,
+                 async_mode):
+        """
+        :param dag_directory: Directory where DAG definitions are kept. All
+        files in file_paths should be under this directory
+        :type dag_directory: unicode
+        :param file_paths: list of file paths that contain DAG definitions
+        :type file_paths: list[unicode]
+        :param max_runs: The number of times to parse and schedule each file. -1
+        for unlimited.
+        :type max_runs: int
+        :param processor_factory: function that creates processors for DAG
+        definition files. Arguments are (dag_definition_path, log_file_path)
+        :type processor_factory: (unicode, unicode, list) -> (AbstractDagFileProcessor)
+        :param async_mode: Whether to start agent in async mode
+        :type async_mode: bool
+        """
+        self._file_paths = file_paths
+        self._file_path_queue = []
+        self._dag_directory = dag_directory
+        self._max_runs = max_runs
+        self._processor_factory = processor_factory
+        self._async_mode = async_mode
+        # Map from file path to the processor
+        self._processors = {}
+        # Map from file path to the last runtime
+        self._last_runtime = {}
+        # Map from file path to the last finish time
+        self._last_finish_time = {}
+        # Map from file path to the number of runs
+        self._run_count = defaultdict(int)
+        # Pids of DAG parse
+        self._all_pids = []
+        # Pipe for communicating signals
+        self._parent_signal_conn, self._child_signal_conn = multiprocessing.Pipe()
+        # Pipe for communicating DagParsingStat
+        self._stat_queue = multiprocessing.Queue()
+        self._result_queue = multiprocessing.Queue()
+        self._process = None
+        self._done = False
+        # Initialized as true so we do not deactivate w/o any actual DAG parsing.
+        self._all_files_processed = True
+        self._result_count = 0
+
+    def start(self):
+        """
+        Launch DagFileProcessorManager processor and start DAG parsing loop in manager.
+        """
+        self._process = self._launch_process(self._dag_directory,
+                                             self._file_paths,
+                                             self._max_runs,
+                                             self._processor_factory,
+                                             self._child_signal_conn,
+                                             self._stat_queue,
+                                             self._result_queue,
+                                             self._async_mode)
+        self.log.info("Launched DagFileProcessorManager with pid: {}"
+                      .format(self._process.pid))
+
+    def heartbeat(self):
+        """
+        Should only be used when launched DAG file processor manager in sync mode.
+        Send agent heartbeat signal to the manager.
+        """
+        self._parent_signal_conn.send(DagParsingSignal.AGENT_HEARTBEAT)
+
+    def wait_until_finished(self):
+        """
+        Should only be used when launched DAG file processor manager in sync mode.
+        Wait for done signal from the manager.
+        """
+        while True:
+            if self._parent_signal_conn.poll() \
+                    and self._parent_signal_conn.recv() == DagParsingSignal.MANAGER_DONE:
+                break
+            time.sleep(0.1)
 
 Review comment:
   You are perfectly correct. Totally forget about `recv()` is blocking. Will update to just
use `recv()`. Ty!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message