airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [airflow] feluelle commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class to avoid cyclic imports
Date Wed, 27 Nov 2019 12:44:50 GMT
feluelle commented on a change in pull request #6596: [AIRFLOW-6004] Untangle Executors class
to avoid cyclic imports
URL: https://github.com/apache/airflow/pull/6596#discussion_r351262731
 
 

 ##########
 File path: airflow/utils/dag_processing.py
 ##########
 @@ -63,204 +62,109 @@ class SimpleDag(BaseDag):
     :type pickle_id: unicode
     """
 
-    def __init__(self, dag, pickle_id=None):
-        self._dag_id = dag.dag_id
-        self._task_ids = [task.task_id for task in dag.tasks]
-        self._full_filepath = dag.full_filepath
-        self._is_paused = dag.is_paused
-        self._concurrency = dag.concurrency
-        self._pickle_id = pickle_id
-        self._task_special_args = {}
+    def __init__(self, dag, pickle_id: Optional[str] = None):
+        self._dag_id: str = dag.dag_id
+        self._task_ids: List[str] = [task.task_id for task in dag.tasks]
+        self._full_filepath: str = dag.full_filepath
+        self._is_paused: bool = dag.is_paused
+        self._concurrency: int = dag.concurrency
+        self._pickle_id: Optional[str] = pickle_id
+        self._task_special_args: Dict[str, Any] = {}
         for task in dag.tasks:
             special_args = {}
             if task.task_concurrency is not None:
                 special_args['task_concurrency'] = task.task_concurrency
-            if len(special_args) > 0:
+            if special_args:
                 self._task_special_args[task.task_id] = special_args
 
     @property
-    def dag_id(self):
+    def dag_id(self) -> str:
         """
         :return: the DAG ID
         :rtype: unicode
         """
         return self._dag_id
 
     @property
-    def task_ids(self):
+    def task_ids(self) -> List[str]:
         """
         :return: A list of task IDs that are in this DAG
         :rtype: list[unicode]
         """
         return self._task_ids
 
     @property
-    def full_filepath(self):
+    def full_filepath(self) -> str:
         """
         :return: The absolute path to the file that contains this DAG's definition
         :rtype: unicode
         """
         return self._full_filepath
 
     @property
-    def concurrency(self):
+    def concurrency(self) -> int:
         """
         :return: maximum number of tasks that can run simultaneously from this DAG
         :rtype: int
         """
         return self._concurrency
 
     @property
-    def is_paused(self):
+    def is_paused(self) -> bool:
         """
         :return: whether this DAG is paused or not
         :rtype: bool
         """
         return self._is_paused
 
     @property
-    def pickle_id(self):
+    def pickle_id(self) -> Optional[str]:
         """
         :return: The pickle ID for this DAG, if it has one. Otherwise None.
         :rtype: unicode
         """
         return self._pickle_id
 
     @property
-    def task_special_args(self):
+    def task_special_args(self) -> Dict[str, Any]:
+        """Special arguments of the task."""
         return self._task_special_args
 
-    def get_task_special_arg(self, task_id, special_arg_name):
+    def get_task_special_arg(self, task_id: str, special_arg_name: str):
+        """Retrieve special arguments of the task."""
         if task_id in self._task_special_args and special_arg_name in self._task_special_args[task_id]:
             return self._task_special_args[task_id][special_arg_name]
         else:
             return None
 
 
-class SimpleTaskInstance:
-    def __init__(self, ti):
-        self._dag_id = ti.dag_id
-        self._task_id = ti.task_id
-        self._execution_date = ti.execution_date
-        self._start_date = ti.start_date
-        self._end_date = ti.end_date
-        self._try_number = ti.try_number
-        self._state = ti.state
-        self._executor_config = ti.executor_config
-        if hasattr(ti, 'run_as_user'):
-            self._run_as_user = ti.run_as_user
-        else:
-            self._run_as_user = None
-        if hasattr(ti, 'pool'):
-            self._pool = ti.pool
-        else:
-            self._pool = None
-        if hasattr(ti, 'priority_weight'):
-            self._priority_weight = ti.priority_weight
-        else:
-            self._priority_weight = None
-        self._queue = ti.queue
-        self._key = ti.key
-
-    @property
-    def dag_id(self):
-        return self._dag_id
-
-    @property
-    def task_id(self):
-        return self._task_id
-
-    @property
-    def execution_date(self):
-        return self._execution_date
-
-    @property
-    def start_date(self):
-        return self._start_date
-
-    @property
-    def end_date(self):
-        return self._end_date
-
-    @property
-    def try_number(self):
-        return self._try_number
-
-    @property
-    def state(self):
-        return self._state
-
-    @property
-    def pool(self):
-        return self._pool
-
-    @property
-    def priority_weight(self):
-        return self._priority_weight
-
-    @property
-    def queue(self):
-        return self._queue
-
-    @property
-    def key(self):
-        return self._key
-
-    @property
-    def executor_config(self):
-        return self._executor_config
-
-    @provide_session
-    def construct_task_instance(self, session=None, lock_for_update=False):
-        """
-        Construct a TaskInstance from the database based on the primary key
-
-        :param session: DB session.
-        :param lock_for_update: if True, indicates that the database should
-            lock the TaskInstance (issuing a FOR UPDATE clause) until the
-            session is committed.
-        """
-        TI = airflow.models.TaskInstance
-
-        qry = session.query(TI).filter(
-            TI.dag_id == self._dag_id,
-            TI.task_id == self._task_id,
-            TI.execution_date == self._execution_date)
-
-        if lock_for_update:
-            ti = qry.with_for_update().first()
-        else:
-            ti = qry.first()
-        return ti
-
-
 class SimpleDagBag(BaseDagBag):
     """
     A collection of SimpleDag objects with some convenience methods.
     """
 
-    def __init__(self, simple_dags):
+    def __init__(self, simple_dags: List[SimpleDag]):
         """
         Constructor.
 
         :param simple_dags: SimpleDag objects that should be in this
-        :type list(airflow.utils.dag_processing.SimpleDagBag)
+        :type List(airflow.utils.dag_processing.SimpleDag)
         """
         self.simple_dags = simple_dags
-        self.dag_id_to_simple_dag = {}
+        self.dag_id_to_simple_dag: Dict[str, SimpleDag] = {}
 
         for simple_dag in simple_dags:
             self.dag_id_to_simple_dag[simple_dag.dag_id] = simple_dag
 
     @property
-    def dag_ids(self):
+    def dag_ids(self) -> KeysView[str]:
         """
         :return: IDs of all the DAGs in this
         :rtype: list[unicode]
         """
         return self.dag_id_to_simple_dag.keys()
 
-    def get_dag(self, dag_id):
+    def get_dag(self, dag_id: str) -> SimpleDag:
 
 Review comment:
   Same here. `:rtype: airflow.utils.dag_processing.SimpleDag` can be removed, right?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message