airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xua...@apache.org
Subject [1/3] incubator-airflow git commit: [AIRFLOW-1005] Improve Airflow startup time
Date Sun, 19 Mar 2017 15:20:44 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 7d11444a5 -> 23a16f7ad


[AIRFLOW-1005] Improve Airflow startup time

Airflow’s startup time can be reduced by 50% by
deferring imports of Cryptography (and relatedly,
not generating Fernet keys unless we have to) and
Alembic.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/996dd309
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/996dd309
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/996dd309

Branch: refs/heads/master
Commit: 996dd309331b010b15e34b99222430283ad7d8a4
Parents: a8027a3
Author: Jeremiah Lowin <jlowin@apache.org>
Authored: Fri Mar 17 18:25:24 2017 -0400
Committer: Jeremiah Lowin <jlowin@apache.org>
Committed: Sun Mar 19 10:06:33 2017 -0400

----------------------------------------------------------------------
 airflow/configuration.py | 30 ++++++++++++++++------------
 airflow/models.py        | 46 ++++++++++++++++++++++++++++---------------
 airflow/utils/db.py      | 11 ++++++-----
 3 files changed, 53 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/996dd309/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 895c08d..9c7a03e 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -43,18 +43,16 @@ warnings.filterwarnings(
     action='default', category=PendingDeprecationWarning, module='airflow')
 
 
-try:
-    from cryptography.fernet import Fernet
-except ImportError:
-    pass
-
-
 def generate_fernet_key():
     try:
-        FERNET_KEY = Fernet.generate_key().decode()
+        from cryptography.fernet import Fernet
+    except ImportError:
+        pass
+    try:
+        key = Fernet.generate_key().decode()
     except NameError:
-        FERNET_KEY = "cryptography_not_found_storing_passwords_in_plain_text"
-    return FERNET_KEY
+        key = "cryptography_not_found_storing_passwords_in_plain_text"
+    return key
 
 
 def expand_env_var(env_var):
@@ -774,14 +772,19 @@ def parameterized_config(template):
     current scope
     :param template: a config content templated with {{variables}}
     """
-    FERNET_KEY = generate_fernet_key()
     all_vars = {k: v for d in [globals(), locals()] for k, v in d.items()}
+    if 'FERNET_KEY' not in all_vars:
+        all_vars['FERNET_KEY'] = ''
     return template.format(**all_vars)
 
 TEST_CONFIG_FILE = AIRFLOW_HOME + '/unittests.cfg'
+# only generate a Fernet key if we need to create a new config file
+if not os.path.isfile(TEST_CONFIG_FILE) or not os.path.isfile(AIRFLOW_CONFIG):
+    FERNET_KEY = generate_fernet_key()
 if not os.path.isfile(TEST_CONFIG_FILE):
-    logging.info("Creating new airflow config file for unit tests in: " +
-                 TEST_CONFIG_FILE)
+    logging.info(
+        'Creating new Airflow config file for unit tests in: {}'.format(
+            TEST_CONFIG_FILE))
     with open(TEST_CONFIG_FILE, 'w') as f:
         f.write(parameterized_config(TEST_CONFIG))
 
@@ -789,7 +792,8 @@ if not os.path.isfile(AIRFLOW_CONFIG):
     # These configuration options are used to generate a default configuration
     # when it is missing. The right way to change your configuration is to
     # alter your configuration file, not this code.
-    logging.info("Creating new airflow config file in: " + AIRFLOW_CONFIG)
+    logging.info('Creating new Airflow config file in: {}'.format(
+        AIRFLOW_CONFIG))
     with open(AIRFLOW_CONFIG, 'w') as f:
         f.write(parameterized_config(DEFAULT_CONFIG))
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/996dd309/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 561b002..f2d955b 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -11,6 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 from __future__ import absolute_import
 from __future__ import division
 from __future__ import print_function
@@ -44,7 +45,6 @@ import textwrap
 import traceback
 import warnings
 import hashlib
-
 from urllib.parse import urlparse
 
 from sqlalchemy import (
@@ -66,6 +66,7 @@ from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
 from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
+
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
 from airflow.utils.dates import cron_presets, date_range as utils_date_range
 from airflow.utils.db import provide_session
@@ -85,13 +86,17 @@ XCOM_RETURN_KEY = 'return_value'
 
 Stats = settings.Stats
 
-ENCRYPTION_ON = False
-try:
+
+def get_fernet():
+    """
+    Deferred load of Fernet key.
+
+    This function could fail either because Cryptography is not installed
+    or because the Fernet key is invalid.
+    """
     from cryptography.fernet import Fernet
-    FERNET = Fernet(configuration.get('core', 'FERNET_KEY').encode('utf-8'))
-    ENCRYPTION_ON = True
-except:
-    pass
+    return Fernet(configuration.get('core', 'FERNET_KEY').encode('utf-8'))
+
 
 if 'mysql' in settings.SQL_ALCHEMY_CONN:
     LongText = LONGTEXT
@@ -572,18 +577,21 @@ class Connection(Base):
 
     def get_password(self):
         if self._password and self.is_encrypted:
-            if not ENCRYPTION_ON:
+            try:
+                fernet = get_fernet()
+            except:
                 raise AirflowException(
                     "Can't decrypt encrypted password for login={}, \
                     FERNET_KEY configuration is missing".format(self.login))
-            return FERNET.decrypt(bytes(self._password, 'utf-8')).decode()
+            return fernet.decrypt(bytes(self._password, 'utf-8')).decode()
         else:
             return self._password
 
     def set_password(self, value):
         if value:
             try:
-                self._password = FERNET.encrypt(bytes(value, 'utf-8')).decode()
+                fernet = get_fernet()
+                self._password = fernet.encrypt(bytes(value, 'utf-8')).decode()
                 self.is_encrypted = True
             except NameError:
                 self._password = value
@@ -596,18 +604,21 @@ class Connection(Base):
 
     def get_extra(self):
         if self._extra and self.is_extra_encrypted:
-            if not ENCRYPTION_ON:
+            try:
+                fernet = get_fernet()
+            except:
                 raise AirflowException(
                     "Can't decrypt `extra` params for login={},\
                     FERNET_KEY configuration is missing".format(self.login))
-            return FERNET.decrypt(bytes(self._extra, 'utf-8')).decode()
+            return fernet.decrypt(bytes(self._extra, 'utf-8')).decode()
         else:
             return self._extra
 
     def set_extra(self, value):
         if value:
             try:
-                self._extra = FERNET.encrypt(bytes(value, 'utf-8')).decode()
+                fernet = get_fernet()
+                self._extra = fernet.encrypt(bytes(value, 'utf-8')).decode()
                 self.is_extra_encrypted = True
             except NameError:
                 self._extra = value
@@ -3556,18 +3567,21 @@ class Variable(Base):
 
     def get_val(self):
         if self._val and self.is_encrypted:
-            if not ENCRYPTION_ON:
+            try:
+                fernet = get_fernet()
+            except:
                 raise AirflowException(
                     "Can't decrypt _val for key={}, FERNET_KEY configuration \
                     missing".format(self.key))
-            return FERNET.decrypt(bytes(self._val, 'utf-8')).decode()
+            return fernet.decrypt(bytes(self._val, 'utf-8')).decode()
         else:
             return self._val
 
     def set_val(self, value):
         if value:
             try:
-                self._val = FERNET.encrypt(bytes(value, 'utf-8')).decode()
+                fernet = get_fernet()
+                self._val = fernet.encrypt(bytes(value, 'utf-8')).decode()
                 self.is_encrypted = True
             except NameError:
                 self._val = value

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/996dd309/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 977a949..49a8d62 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -22,16 +22,11 @@ from functools import wraps
 import logging
 import os
 
-from alembic.config import Config
-from alembic import command
-from alembic.migration import MigrationContext
-
 from sqlalchemy import event, exc
 from sqlalchemy.pool import Pool
 
 from airflow import settings
 
-
 def provide_session(func):
     """
     Function decorator that provides a session if it isn't provided.
@@ -284,6 +279,10 @@ def initdb():
 
 
 def upgradedb():
+    # alembic adds significant import time, so we import it lazily
+    from alembic import command
+    from alembic.config import Config
+
     logging.info("Creating tables")
     current_dir = os.path.dirname(os.path.abspath(__file__))
     package_dir = os.path.normpath(os.path.join(current_dir, '..'))
@@ -299,6 +298,8 @@ def resetdb():
     Clear out the database
     '''
     from airflow import models
+    # alembic adds significant import time, so we import it lazily
+    from alembic.migration import MigrationContext
 
     logging.info("Dropping tables that exist")
     models.Base.metadata.drop_all(settings.engine)


Mime
View raw message