airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [airflow] kaxil commented on a change in pull request #6697: [AIRFLOW-6135] Extract DAG processing from SchedulerJob into separate class
Date Mon, 02 Dec 2019 23:10:52 GMT
kaxil commented on a change in pull request #6697: [AIRFLOW-6135] Extract DAG processing from
SchedulerJob into separate class
URL: https://github.com/apache/airflow/pull/6697#discussion_r352907442
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -290,105 +292,28 @@ def start_time(self):
         return self._start_time
 
 
-class SchedulerJob(BaseJob):
-    """
-    This SchedulerJob runs for a specific time interval and schedules the jobs
-    that are ready to run. It figures out the latest runs for each
-    task and sees if the dependencies for the next schedules are met.
-    If so, it creates appropriate TaskInstances and sends run commands to the
-    executor. It does this for each task in each DAG and repeats.
-
-    :param dag_id: if specified, only schedule tasks with this DAG ID
-    :type dag_id: unicode
-    :param dag_ids: if specified, only schedule tasks with these DAG IDs
-    :type dag_ids: list[unicode]
-    :param subdir: directory containing Python files with Airflow DAG
-        definitions, or a specific path to a file
-    :type subdir: unicode
-    :param num_runs: The number of times to try to schedule each DAG file.
-        -1 for unlimited times.
-    :type num_runs: int
-    :param processor_poll_interval: The number of seconds to wait between
-        polls of running processors
-    :type processor_poll_interval: int
-    :param do_pickle: once a DAG object is obtained by executing the Python
-        file, whether to serialize the DAG object to the DB
-    :type do_pickle: bool
+class DagFileProcessor:
     """
+    Process a Python file containing Airflow DAGs.
 
-    __mapper_args__ = {
-        'polymorphic_identity': 'SchedulerJob'
-    }
-    heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
-
-    def __init__(
-            self,
-            dag_id=None,
-            dag_ids=None,
-            subdir=settings.DAGS_FOLDER,
-            num_runs=conf.getint('scheduler', 'num_runs'),
-            processor_poll_interval=conf.getfloat('scheduler', 'processor_poll_interval'),
-            do_pickle=False,
-            log=None,
-            *args, **kwargs):
-        # for BaseJob compatibility
-        self.dag_id = dag_id
-        self.dag_ids = [dag_id] if dag_id else []
-        if dag_ids:
-            self.dag_ids.extend(dag_ids)
-
-        self.subdir = subdir
-
-        self.num_runs = num_runs
-        self._processor_poll_interval = processor_poll_interval
-
-        self.do_pickle = do_pickle
-        super().__init__(*args, **kwargs)
-
-        self.max_threads = conf.getint('scheduler', 'max_threads')
-
-        if log:
-            self._log = log
-
-        self.using_sqlite = False
-        if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
-            self.using_sqlite = True
-
-        self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
-        self.processor_agent = None
-
-        signal.signal(signal.SIGINT, self._exit_gracefully)
-        signal.signal(signal.SIGTERM, self._exit_gracefully)
-
-    def _exit_gracefully(self, signum, frame):
-        """
-        Helper method to clean up processor_agent to avoid leaving orphan processes.
-        """
-        self.log.info("Exiting gracefully upon receiving signal %s", signum)
-        if self.processor_agent:
-            self.processor_agent.end()
-        sys.exit(os.EX_OK)
+    This includes:
 
-    def is_alive(self, grace_multiplier=None):
-        """
-        Is this SchedulerJob alive?
+    1. Execute the file and look for DAG objects in the namespace.
+    2. Pickle the DAG and save it to the DB (if necessary).
+    3. For each DAG, see what tasks should run and create appropriate task
+    instances in the DB.
+    4. Record any errors importing the file into ORM
+    5. Kill (in ORM) any task instances belonging to the DAGs that haven't
+    issued a heartbeat in a while.
 
-        We define alive as in a state of running and a heartbeat within the
-        threshold defined in the ``scheduler_health_check_threshold`` config
-        setting.
-
-        ``grace_multiplier`` is accepted for compatibility with the parent class.
-
-        :rtype: boolean
-        """
-        if grace_multiplier is not None:
-            # Accept the same behaviour as superclass
-            return super().is_alive(grace_multiplier=grace_multiplier)
-        scheduler_health_check_threshold = conf.getint('scheduler', 'scheduler_health_check_threshold')
-        return (
-            self.state == State.RUNNING and
-            (timezone.utcnow() - self.latest_heartbeat).seconds < scheduler_health_check_threshold
-        )
+    :param dag_ids: If specified, only look at these DAG ID's
+    :type dag_ids: list[unicode]
 
 Review comment:
   ```suggestion
       :type dag_ids: list[str]
   ```
   
   now that we no longer support Py2

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message