airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1606][Airflow-1606][AIRFLOW-1605][AIRFLOW-160] DAG.sync_to_db is now a normal method
Date Wed, 13 Sep 2017 11:26:44 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 956699fe6 -> 028b3b88f


[AIRFLOW-1606][Airflow-1606][AIRFLOW-1605][AIRFLOW-160] DAG.sync_to_db is now a normal method

Previously it was a static method that took as
it's first argument a
DAG, which really meant it wasn't truly a static
method.

To avoid reversing the parameter order I have
given sensible defaults
from the one and only use in the rest of the code
base.

Also remove documented "sync_to_db" parameter on
DagBag that no longer
exists -- this doc string refers to a parameter
that was removed in
[AIRFLOW-160].

Closes #2605 from ashb/AIRFLOW-1606-db-sync_to_db-
not-static


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/028b3b88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/028b3b88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/028b3b88

Branch: refs/heads/master
Commit: 028b3b88ff4f191c78bf1d9c41bf43a792f640ff
Parents: 956699f
Author: Ash Berlin-Taylor <ash_github@firemirror.com>
Authored: Wed Sep 13 13:26:39 2017 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Wed Sep 13 13:26:39 2017 +0200

----------------------------------------------------------------------
 airflow/models.py   | 26 +++++++++++++-------------
 airflow/utils/db.py |  4 +---
 2 files changed, 14 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/028b3b88/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 7dc51d1..20d750e 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -173,10 +173,6 @@ class DagBag(BaseDagBag, LoggingMixin):
     :param include_examples: whether to include the examples that ship
         with airflow or not
     :type include_examples: bool
-    :param sync_to_db: whether to sync the properties of the DAGs to
-        the metadata DB while finding them, typically should be done
-        by the scheduler job only
-    :type sync_to_db: bool
     """
 
     def __init__(
@@ -3736,9 +3732,8 @@ class DAG(BaseDag, LoggingMixin):
 
         return run
 
-    @staticmethod
     @provide_session
-    def sync_to_db(dag, owner, sync_time, session=None):
+    def sync_to_db(self, owner=None, sync_time=None, session=None):
         """
         Save attributes about this DAG to the DB. Note that this method
         can be called for both DAGs and SubDAGs. A SubDag is actually a
@@ -3751,21 +3746,26 @@ class DAG(BaseDag, LoggingMixin):
         :return: None
         """
 
+        if owner is None:
+            owner = self.owner
+        if sync_time is None:
+            sync_time = datetime.utcnow()
+
         orm_dag = session.query(
-            DagModel).filter(DagModel.dag_id == dag.dag_id).first()
+            DagModel).filter(DagModel.dag_id == self.dag_id).first()
         if not orm_dag:
-            orm_dag = DagModel(dag_id=dag.dag_id)
-            dag.logger.info("Creating ORM DAG for %s", dag.dag_id)
-        orm_dag.fileloc = dag.fileloc
-        orm_dag.is_subdag = dag.is_subdag
+            orm_dag = DagModel(dag_id=self.dag_id)
+            self.logger.info("Creating ORM DAG for %s", self.dag_id)
+        orm_dag.fileloc = self.fileloc
+        orm_dag.is_subdag = self.is_subdag
         orm_dag.owners = owner
         orm_dag.is_active = True
         orm_dag.last_scheduler_run = sync_time
         session.merge(orm_dag)
         session.commit()
 
-        for subdag in dag.subdags:
-            DAG.sync_to_db(subdag, owner, sync_time, session=session)
+        for subdag in self.subdags:
+            subdag.sync_to_db(owner=owner, sync_time=sync_time, session=session)
 
     @staticmethod
     @provide_session

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/028b3b88/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index b3c8a4d..c7e58e7 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -17,7 +17,6 @@ from __future__ import division
 from __future__ import print_function
 from __future__ import unicode_literals
 
-from datetime import datetime
 from functools import wraps
 
 import os
@@ -281,9 +280,8 @@ def initdb():
 
     dagbag = models.DagBag()
     # Save individual DAGs in the ORM
-    now = datetime.utcnow()
     for dag in dagbag.dags.values():
-        models.DAG.sync_to_db(dag, dag.owner, now)
+        dag.sync_to_db()
     # Deactivate the unknown ones
     models.DAG.deactivate_unknown_dags(dagbag.dags.keys())
 


Mime
View raw message