airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From san...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-333][AIRFLOW-258] Fix non-module plugin components
Date Sun, 02 Oct 2016 06:43:26 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master c37740f53 -> eb5982d4a


[AIRFLOW-333][AIRFLOW-258] Fix non-module plugin components

* Distinguish between module and non-module plugin
components
* Fix handling of non-module plugin components

  * admin views, flask blueprints, and menu links
need to not be
    wrapped in modules

* Fix improper use of zope.deprecation.deprecated

  * zope.deprecation.deprecated does NOT support
classes as
    first parameter
  * deprecating classes must be handled by calling
the deprecate
    function on the class name

* Added tests for plugin loading
* Updated plugin documentation to match test
plugin
* Updated executors to always load plugins
* More logging

Closes #1738 from gwax/plugin_module_fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb5982d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb5982d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb5982d4

Branch: refs/heads/master
Commit: eb5982d4aac135051666d2977bbf6adfc8b9e2a7
Parents: c37740f
Author: George Leslie-Waksman <george@cloverhealth.com>
Authored: Sat Oct 1 23:43:20 2016 -0700
Committer: Siddharth Anand <siddharthanand@yahoo.com>
Committed: Sat Oct 1 23:43:20 2016 -0700

----------------------------------------------------------------------
 airflow/__init__.py           |  1 +
 airflow/configuration.py      | 11 ++++++
 airflow/executors/__init__.py | 13 +++++--
 airflow/hooks/__init__.py     | 19 +++++-----
 airflow/macros/__init__.py    | 18 +++++-----
 airflow/operators/__init__.py | 18 +++++-----
 airflow/plugins_manager.py    | 37 ++++++++++++--------
 airflow/www/app.py            |  4 +++
 docs/plugins.rst              | 17 +++++----
 tests/plugins/test_plugin.py  | 72 ++++++++++++++++++++++++++++++++++++++
 tests/plugins_manager.py      | 70 ++++++++++++++++++++++++++++++++++++
 11 files changed, 232 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/__init__.py b/airflow/__init__.py
index a8744da..1e40fe9 100644
--- a/airflow/__init__.py
+++ b/airflow/__init__.py
@@ -82,4 +82,5 @@ from airflow import contrib
 
 operators._integrate_plugins()
 hooks._integrate_plugins()
+executors._integrate_plugins()
 macros._integrate_plugins()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 09b1b61..c21ae73 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -402,6 +402,7 @@ TEST_CONFIG = """\
 unit_test_mode = True
 airflow_home = {AIRFLOW_HOME}
 dags_folder = {TEST_DAGS_FOLDER}
+plugins_folder = {TEST_PLUGINS_FOLDER}
 base_log_folder = {AIRFLOW_HOME}/logs
 executor = SequentialExecutor
 sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
@@ -683,6 +684,16 @@ if os.path.exists(_TEST_DAGS_FOLDER):
 else:
     TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags')
 
+# Set up plugins folder for unit tests
+_TEST_PLUGINS_FOLDER = os.path.join(
+    os.path.dirname(os.path.dirname(os.path.realpath(__file__))),
+    'tests',
+    'plugins')
+if os.path.exists(_TEST_PLUGINS_FOLDER):
+    TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER
+else:
+    TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, 'plugins')
+
 
 def parameterized_config(template):
     """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/executors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index 9396198..77f139e 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import logging
+import sys
 
 from airflow import configuration
 from airflow.executors.base_executor import BaseExecutor
@@ -26,6 +27,14 @@ except:
 
 from airflow.exceptions import AirflowException
 
+
+def _integrate_plugins():
+    """Integrate plugins to the context."""
+    from airflow.plugins_manager import executors_modules
+    for executors_module in executors_modules:
+        sys.modules[executors_module.__name__] = executors_module
+        globals()[executors_module._name] = executors_module
+
 _EXECUTOR = configuration.get('core', 'EXECUTOR')
 
 if _EXECUTOR == 'LocalExecutor':
@@ -39,9 +48,7 @@ elif _EXECUTOR == 'MesosExecutor':
     DEFAULT_EXECUTOR = MesosExecutor()
 else:
     # Loading plugins
-    from airflow.plugins_manager import executors as _executors
-    for _executor in _executors:
-        globals()[_executor.__name__] = _executor
+    _integrate_plugins()
     if _EXECUTOR in globals():
         DEFAULT_EXECUTOR = globals()[_EXECUTOR]()
     else:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py
index 8942bff..cc09f5a 100644
--- a/airflow/hooks/__init__.py
+++ b/airflow/hooks/__init__.py
@@ -64,21 +64,24 @@ if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
 
 def _integrate_plugins():
     """Integrate plugins to the context"""
-    from airflow.plugins_manager import hooks as _hooks
-    for _hook_module in _hooks:
-        sys.modules[_hook_module.__name__] = _hook_module
-        globals()[_hook_module._name] = _hook_module
+    from airflow.plugins_manager import hooks_modules
+    for hooks_module in hooks_modules:
+        sys.modules[hooks_module.__name__] = hooks_module
+        globals()[hooks_module._name] = hooks_module
 
         ##########################################################
         # TODO FIXME Remove in Airflow 2.0
 
         if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
             from zope.deprecation import deprecated as _deprecated
-            for _hook in _hook_module._objects:
-                globals()[_hook.__name__] = _deprecated(
-                    _hook,
+            for _hook in hooks_module._objects:
+                hook_name = _hook.__name__
+                globals()[hook_name] = _hook
+                _deprecated(
+                    hook_name,
                     "Importing plugin hook '{i}' directly from "
                     "'airflow.hooks' has been deprecated. Please "
                     "import from 'airflow.hooks.[plugin_module]' "
                     "instead. Support for direct imports will be dropped "
-                    "entirely in Airflow 2.0.".format(i=_hook))
+                    "entirely in Airflow 2.0.".format(i=hook_name))
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/macros/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/macros/__init__.py b/airflow/macros/__init__.py
index 0e1ba70..59b9a25 100644
--- a/airflow/macros/__init__.py
+++ b/airflow/macros/__init__.py
@@ -65,10 +65,10 @@ def ds_format(ds, input_format, output_format):
 def _integrate_plugins():
     """Integrate plugins to the context"""
     import sys
-    from airflow.plugins_manager import macros as _macros
-    for _macro_module in _macros:
-        sys.modules[_macro_module.__name__] = _macro_module
-        globals()[_macro_module._name] = _macro_module
+    from airflow.plugins_manager import macros_modules
+    for macros_module in macros_modules:
+        sys.modules[macros_module.__name__] = macros_module
+        globals()[macros_module._name] = macros_module
 
         ##########################################################
         # TODO FIXME Remove in Airflow 2.0
@@ -76,11 +76,13 @@ def _integrate_plugins():
         import os as _os
         if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
             from zope.deprecation import deprecated as _deprecated
-            for _macro in _macro_module._objects:
-                globals()[_macro.__name__] = _deprecated(
-                    _macro,
+            for _macro in macros_module._objects:
+                macro_name = _macro.__name__
+                globals()[macro_name] = _macro
+                _deprecated(
+                    macro_name,
                     "Importing plugin macro '{i}' directly from "
                     "'airflow.macros' has been deprecated. Please "
                     "import from 'airflow.macros.[plugin_module]' "
                     "instead. Support for direct imports will be dropped "
-                    "entirely in Airflow 2.0.".format(i=_macro))
+                    "entirely in Airflow 2.0.".format(i=macro_name))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index 4cfac7b..50b05ff 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -101,21 +101,23 @@ if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
 
 def _integrate_plugins():
     """Integrate plugins to the context"""
-    from airflow.plugins_manager import operators as _operators
-    for _operator_module in _operators:
-        sys.modules[_operator_module.__name__] = _operator_module
-        globals()[_operator_module._name] = _operator_module
+    from airflow.plugins_manager import operators_modules
+    for operators_module in operators_modules:
+        sys.modules[operators_module.__name__] = operators_module
+        globals()[operators_module._name] = operators_module
 
         ##########################################################
         # TODO FIXME Remove in Airflow 2.0
 
         if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False):
             from zope.deprecation import deprecated as _deprecated
-            for _operator in _operator_module._objects:
-                globals()[_operator.__name__] = _deprecated(
-                    _operator,
+            for _operator in operators_module._objects:
+                operator_name = _operator.__name__
+                globals()[operator_name] = _operator
+                _deprecated(
+                    operator_name,
                     "Importing plugin operator '{i}' directly from "
                     "'airflow.operators' has been deprecated. Please "
                     "import from 'airflow.operators.[plugin_module]' "
                     "instead. Support for direct imports will be dropped "
-                    "entirely in Airflow 2.0.".format(i=_operator))
+                    "entirely in Airflow 2.0.".format(i=operator_name))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/plugins_manager.py
----------------------------------------------------------------------
diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py
index 940aa87..e0af20c 100644
--- a/airflow/plugins_manager.py
+++ b/airflow/plugins_manager.py
@@ -24,8 +24,6 @@ import logging
 import os
 import re
 import sys
-from itertools import chain
-merge = chain.from_iterable
 
 from airflow import configuration
 
@@ -74,6 +72,7 @@ for root, dirs, files in os.walk(plugins_folder, followlinks=True):
             if file_ext != '.py':
                 continue
 
+            logging.info('Importing plugin module ' + filepath)
             # normalize root path as namespace
             namespace = '_'.join([re.sub(norm_pattern, '__', root), mod_name])
 
@@ -93,6 +92,7 @@ for root, dirs, files in os.walk(plugins_folder, followlinks=True):
 
 
 def make_module(name, objects):
+    logging.info('Creating module ' + name)
     name = name.lower()
     module = imp.new_module(name)
     module._name = name.split('.')[-1]
@@ -100,18 +100,25 @@ def make_module(name, objects):
     module.__dict__.update((o.__name__, o) for o in objects)
     return module
 
-operators, hooks, executors, macros, admin_views = [], [], [], [], []
-flask_blueprints, menu_links = [], []
+# Plugin components to integrate as modules
+operators_modules = []
+hooks_modules = []
+executors_modules = []
+macros_modules = []
+
+# Plugin components to integrate directly
+admin_views = []
+flask_blueprints = []
+menu_links = []
 
 for p in plugins:
-    operators.append(make_module('airflow.operators.' + p.name, p.operators))
-    hooks.append(make_module('airflow.hooks.' + p.name, p.hooks))
-    executors.append(make_module('airflow.executors.' + p.name, p.executors))
-    macros.append(make_module('airflow.macros.' + p.name, p.macros))
-    admin_views.append(
-        make_module('airflow.www.admin_views' + p.name, p.admin_views))
-    flask_blueprints.append(
-        make_module(
-            'airflow.www.flask_blueprints' + p.name, p.flask_blueprints))
-    menu_links.append(
-        make_module('airflow.www.menu_links' + p.name, p.menu_links))
+    operators_modules.append(
+        make_module('airflow.operators.' + p.name, p.operators))
+    hooks_modules.append(make_module('airflow.hooks.' + p.name, p.hooks))
+    executors_modules.append(
+        make_module('airflow.executors.' + p.name, p.executors))
+    macros_modules.append(make_module('airflow.macros.' + p.name, p.macros))
+
+    admin_views.extend(p.admin_views)
+    flask_blueprints.extend(p.flask_blueprints)
+    menu_links.extend(p.menu_links)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/www/app.py
----------------------------------------------------------------------
diff --git a/airflow/www/app.py b/airflow/www/app.py
index 10d8420..c4dff6f 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import logging
 import socket
 
 from flask import Flask
@@ -109,10 +110,13 @@ def create_app(config=None):
             from airflow.plugins_manager import (
                 admin_views, flask_blueprints, menu_links)
             for v in admin_views:
+                logging.info('Adding view ' + v.name)
                 admin.add_view(v)
             for bp in flask_blueprints:
+                logging.info('Adding blueprint ' + bp.name)
                 app.register_blueprint(bp)
             for ml in sorted(menu_links, key=lambda x: x.name):
+                logging.info('Adding menu link ' + ml.name)
                 admin.add_link(ml)
 
         integrate_plugins()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/docs/plugins.rst
----------------------------------------------------------------------
diff --git a/docs/plugins.rst b/docs/plugins.rst
index 5dde383..8d2078f 100644
--- a/docs/plugins.rst
+++ b/docs/plugins.rst
@@ -96,18 +96,22 @@ definitions in Airflow.
     from airflow.models import  BaseOperator
     from airflow.executors.base_executor import BaseExecutor
 
-    # Will show up under airflow.hooks.PluginHook
+    # Will show up under airflow.hooks.test_plugin.PluginHook
     class PluginHook(BaseHook):
         pass
 
-    # Will show up under airflow.operators.PluginOperator
+    # Will show up under airflow.operators.test_plugin.PluginOperator
     class PluginOperator(BaseOperator):
         pass
 
-    # Will show up under airflow.executors.PluginExecutor
+    # Will show up under airflow.executors.test_plugin.PluginExecutor
     class PluginExecutor(BaseExecutor):
         pass
 
+    # Will show up under airflow.macros.test_plugin.plugin_macro
+    def plugin_macro():
+        pass
+
     # Creating a flask admin BaseView
     class TestView(BaseView):
         @expose('/')
@@ -119,10 +123,10 @@ definitions in Airflow.
     # Creating a flask blueprint to intergrate the templates and static folder
     bp = Blueprint(
         "test_plugin", __name__,
-        template_folder='templates', # registers airflow/plugins/templates as a Jinja template
folder 
+        template_folder='templates', # registers airflow/plugins/templates as a Jinja template
folder
         static_folder='static',
         static_url_path='/static/test_plugin')
-        
+
     ml = MenuLink(
         category='Test Plugin',
         name='Test Menu Link',
@@ -132,8 +136,9 @@ definitions in Airflow.
     class AirflowTestPlugin(AirflowPlugin):
         name = "test_plugin"
         operators = [PluginOperator]
-        flask_blueprints = [bp]
         hooks = [PluginHook]
         executors = [PluginExecutor]
+        macros = [plugin_macro]
         admin_views = [v]
+        flask_blueprints = [bp]
         menu_links = [ml]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/tests/plugins/test_plugin.py
----------------------------------------------------------------------
diff --git a/tests/plugins/test_plugin.py b/tests/plugins/test_plugin.py
new file mode 100644
index 0000000..e628919
--- /dev/null
+++ b/tests/plugins/test_plugin.py
@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is the class you derive to create a plugin
+from airflow.plugins_manager import AirflowPlugin
+
+from flask import Blueprint
+from flask_admin import BaseView, expose
+from flask_admin.base import MenuLink
+
+# Importing base classes that we need to derive
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import  BaseOperator
+from airflow.executors.base_executor import BaseExecutor
+
+# Will show up under airflow.hooks.test_plugin.PluginHook
+class PluginHook(BaseHook):
+    pass
+
+# Will show up under airflow.operators.test_plugin.PluginOperator
+class PluginOperator(BaseOperator):
+    pass
+
+# Will show up under airflow.executors.test_plugin.PluginExecutor
+class PluginExecutor(BaseExecutor):
+    pass
+
+# Will show up under airflow.macros.test_plugin.plugin_macro
+def plugin_macro():
+    pass
+
+# Creating a flask admin BaseView
+class TestView(BaseView):
+    @expose('/')
+    def test(self):
+        # in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html
+        return self.render("test_plugin/test.html", content="Hello galaxy!")
+v = TestView(category="Test Plugin", name="Test View")
+
+# Creating a flask blueprint to intergrate the templates and static folder
+bp = Blueprint(
+    "test_plugin", __name__,
+    template_folder='templates', # registers airflow/plugins/templates as a Jinja template
folder
+    static_folder='static',
+    static_url_path='/static/test_plugin')
+
+ml = MenuLink(
+    category='Test Plugin',
+    name='Test Menu Link',
+    url='http://pythonhosted.org/airflow/')
+
+# Defining the plugin class
+class AirflowTestPlugin(AirflowPlugin):
+    name = "test_plugin"
+    operators = [PluginOperator]
+    hooks = [PluginHook]
+    executors = [PluginExecutor]
+    macros = [plugin_macro]
+    admin_views = [v]
+    flask_blueprints = [bp]
+    menu_links = [ml]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/tests/plugins_manager.py
----------------------------------------------------------------------
diff --git a/tests/plugins_manager.py b/tests/plugins_manager.py
new file mode 100644
index 0000000..0012cdf
--- /dev/null
+++ b/tests/plugins_manager.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import inspect
+import logging
+import unittest
+
+from flask.blueprints import Blueprint
+from flask_admin import BaseView
+from flask_admin.menu import MenuLink, MenuView
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import  BaseOperator
+from airflow.executors.base_executor import BaseExecutor
+from airflow.www.app import cached_app
+
+
+class PluginsTest(unittest.TestCase):
+
+    def test_operators(self):
+        from airflow.operators.test_plugin import PluginOperator
+        assert issubclass(PluginOperator, BaseOperator)
+
+    def test_hooks(self):
+        from airflow.hooks.test_plugin import PluginHook
+        assert issubclass(PluginHook, BaseHook)
+
+    def test_executors(self):
+        from airflow.executors.test_plugin import PluginExecutor
+        assert issubclass(PluginExecutor, BaseExecutor)
+
+    def test_macros(self):
+        from airflow.macros.test_plugin import plugin_macro
+        assert callable(plugin_macro)
+
+    def test_admin_views(self):
+        app = cached_app()
+        [admin] = app.extensions['admin']
+        category = admin._menu_categories['Test Plugin']
+        [admin_view] = [v for v in category.get_children()
+                        if isinstance(v, MenuView)]
+        assert admin_view.name == 'Test View'
+
+    def test_flask_blueprints(self):
+        app = cached_app()
+        assert isinstance(app.blueprints['test_plugin'], Blueprint)
+
+    def test_menu_links(self):
+        app = cached_app()
+        [admin] = app.extensions['admin']
+        category = admin._menu_categories['Test Plugin']
+        [menu_link] = [ml for ml in category.get_children()
+                       if isinstance(ml, MenuLink)]
+        assert menu_link.name == 'Test Menu Link'


Mime
View raw message