airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-860][AIRFLOW-935] Fix plugin executor import cycle and executor selection
Date Sat, 13 May 2017 10:57:05 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master aaf308a29 -> 08a784ede


[AIRFLOW-860][AIRFLOW-935] Fix plugin executor import cycle and executor selection

When a plugin is made with a custom Operator and
executor, an import cycle occurs when the executor
is chosen in airflow.cfg because the
executors/__init__.py starts loading plugins too
early.
changed DEFAULT_EXECUTOR use to a function call
which returns the default executor. This lazy
approach fixes the import cycle.

revision eb5982d (included in 1.8) breaks plugin
executors altogether. It makes a new module for
every plugin, so import statements need to be
adapted, but the executor selection is left
unchanged, so it ends up assigning the plugin
module as an executor.
fixed executor selection to work with the new
plugin modules system introduced in 1.8. in
Airflow.cfg a executor can now be specified as
{plugin_name}.{executor_name}

Fixes:
 -
https://issues.apache.org/jira/browse/AIRFLOW-860
 -
https://issues.apache.org/jira/browse/AIRFLOW-935

Closes #2120 from stverhae/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/08a784ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/08a784ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/08a784ed

Branch: refs/heads/master
Commit: 08a784ede6ffe1a2389255e1c6d597e9b2131080
Parents: aaf308a
Author: Stijn Verhaegen <Stijn_Verhaegen@applied-maths.com>
Authored: Sat May 13 12:55:45 2017 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Sat May 13 12:56:16 2017 +0200

----------------------------------------------------------------------
 airflow/bin/cli.py                   |  4 +-
 airflow/executors/__init__.py        | 69 +++++++++++++++++++++----------
 airflow/jobs.py                      |  2 +-
 airflow/models.py                    |  9 ++--
 airflow/operators/subdag_operator.py |  4 +-
 airflow/www/views.py                 |  3 +-
 tests/plugins_manager.py             |  8 ++++
 7 files changed, 68 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08a784ed/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 8e92ea1..e43907e 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -45,7 +45,7 @@ from airflow import api
 from airflow import jobs, settings
 from airflow import configuration as conf
 from airflow.exceptions import AirflowException
-from airflow.executors import DEFAULT_EXECUTOR
+from airflow.executors import GetDefaultExecutor
 from airflow.models import (DagModel, DagBag, TaskInstance,
                             DagPickle, DagRun, Variable, DagStat,
                             Pool, Connection)
@@ -439,7 +439,7 @@ def run(args, dag=None):
                 print(e)
                 raise e
 
-        executor = DEFAULT_EXECUTOR
+        executor = GetDefaultExecutor()
         executor.start()
         print("Sending to executor.")
         executor.queue_task_instance(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08a784ed/airflow/executors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py
index cd78f69..a8cb087 100644
--- a/airflow/executors/__init__.py
+++ b/airflow/executors/__init__.py
@@ -22,6 +22,7 @@ from airflow.executors.sequential_executor import SequentialExecutor
 
 from airflow.exceptions import AirflowException
 
+DEFAULT_EXECUTOR = None
 
 def _integrate_plugins():
     """Integrate plugins to the context."""
@@ -30,27 +31,51 @@ def _integrate_plugins():
         sys.modules[executors_module.__name__] = executors_module
         globals()[executors_module._name] = executors_module
 
-_EXECUTOR = configuration.get('core', 'EXECUTOR')
-
-if _EXECUTOR == 'LocalExecutor':
-    DEFAULT_EXECUTOR = LocalExecutor()
-elif _EXECUTOR == 'SequentialExecutor':
-    DEFAULT_EXECUTOR = SequentialExecutor()
-elif _EXECUTOR == 'CeleryExecutor':
-    from airflow.executors.celery_executor import CeleryExecutor
-    DEFAULT_EXECUTOR = CeleryExecutor()
-elif _EXECUTOR == 'DaskExecutor':
-    from airflow.executors.dask_executor import DaskExecutor
-    DEFAULT_EXECUTOR = DaskExecutor()
-elif _EXECUTOR == 'MesosExecutor':
-    from airflow.contrib.executors.mesos_executor import MesosExecutor
-    DEFAULT_EXECUTOR = MesosExecutor()
-else:
-    # Loading plugins
-    _integrate_plugins()
-    if _EXECUTOR in globals():
-        DEFAULT_EXECUTOR = globals()[_EXECUTOR]()
+def GetDefaultExecutor():
+    """Creates a new instance of the configured executor if none exists and returns it"""
+    global DEFAULT_EXECUTOR
+
+    if DEFAULT_EXECUTOR is not None:
+        return DEFAULT_EXECUTOR
+
+    executor_name = configuration.get('core', 'EXECUTOR')
+
+    DEFAULT_EXECUTOR = _get_executor(executor_name)
+
+    logging.info("Using executor " + executor_name)
+
+    return DEFAULT_EXECUTOR
+
+
+def _get_executor(executor_name):
+    """
+    Creates a new instance of the named executor. In case the executor name is not know in
airflow, 
+    look for it in the plugins
+    """
+    if executor_name == 'LocalExecutor':
+        return LocalExecutor()
+    elif executor_name == 'SequentialExecutor':
+        return SequentialExecutor()
+    elif executor_name == 'CeleryExecutor':
+        from airflow.executors.celery_executor import CeleryExecutor
+        return CeleryExecutor()
+    elif executor_name == 'DaskExecutor':
+        from airflow.executors.dask_executor import DaskExecutor
+        return DaskExecutor()
+    elif executor_name == 'MesosExecutor':
+        from airflow.contrib.executors.mesos_executor import MesosExecutor
+        return MesosExecutor()
     else:
-        raise AirflowException("Executor {0} not supported.".format(_EXECUTOR))
+        # Loading plugins
+        _integrate_plugins()
+        executor_path = executor_name.split('.')
+        if len(executor_path) != 2:
+            raise AirflowException(
+                "Executor {0} not supported: please specify in format plugin_module.executor".format(executor_name))
+
+        if executor_path[0] in globals():
+            return globals()[executor_path[0]].__dict__[executor_path[1]]()
+        else:
+            raise AirflowException("Executor {0} not supported.".format(executor_name))
+
 
-logging.info("Using executor " + _EXECUTOR)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08a784ed/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index e9b6094..0c724a0 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -95,7 +95,7 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(
             self,
-            executor=executors.DEFAULT_EXECUTOR,
+            executor=executors.GetDefaultExecutor(),
             heartrate=conf.getfloat('scheduler', 'JOB_HEARTBEAT_SEC'),
             *args, **kwargs):
         self.hostname = socket.getfqdn()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08a784ed/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 53ca38d..753bffa 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -60,7 +60,7 @@ from croniter import croniter
 import six
 
 from airflow import settings, utils
-from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor
+from airflow.executors import GetDefaultExecutor, LocalExecutor
 from airflow import configuration
 from airflow.exceptions import AirflowException, AirflowSkipException, AirflowTaskTimeout
 from airflow.dag.base_dag import BaseDag, BaseDagBag
@@ -166,9 +166,12 @@ class DagBag(BaseDagBag, LoggingMixin):
     def __init__(
             self,
             dag_folder=None,
-            executor=DEFAULT_EXECUTOR,
+            executor=None,
             include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES')):
 
+        # do not use default arg in signature, to fix import cycle on plugin load
+        if executor is None:
+            executor = GetDefaultExecutor()
         dag_folder = dag_folder or settings.DAGS_FOLDER
         self.logger.info("Filling up the DagBag from {}".format(dag_folder))
         self.dag_folder = dag_folder
@@ -3377,7 +3380,7 @@ class DAG(BaseDag, LoggingMixin):
         if not executor and local:
             executor = LocalExecutor()
         elif not executor:
-            executor = DEFAULT_EXECUTOR
+            executor = GetDefaultExecutor()
         job = BackfillJob(
             self,
             start_date=start_date,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08a784ed/airflow/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/subdag_operator.py b/airflow/operators/subdag_operator.py
index 569d0a0..f4f008d 100644
--- a/airflow/operators/subdag_operator.py
+++ b/airflow/operators/subdag_operator.py
@@ -16,7 +16,7 @@ from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator, Pool
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.db import provide_session
-from airflow.executors import DEFAULT_EXECUTOR
+from airflow.executors import GetDefaultExecutor
 
 
 class SubDagOperator(BaseOperator):
@@ -30,7 +30,7 @@ class SubDagOperator(BaseOperator):
     def __init__(
             self,
             subdag,
-            executor=DEFAULT_EXECUTOR,
+            executor=GetDefaultExecutor(),
             *args, **kwargs):
         """
         Yo dawg. This runs a sub dag. By convention, a sub dag's dag_id

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08a784ed/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index c71c5f9..5d52d5e 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -938,8 +938,9 @@ class Airflow(BaseView):
         ignore_ti_state = request.args.get('ignore_ti_state') == "true"
 
         try:
-            from airflow.executors import DEFAULT_EXECUTOR as executor
+            from airflow.executors import GetDefaultExecutor
             from airflow.executors import CeleryExecutor
+            executor = GetDefaultExecutor()
             if not isinstance(executor, CeleryExecutor):
                 flash("Only works with the CeleryExecutor, sorry", "error")
                 return redirect(origin)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08a784ed/tests/plugins_manager.py
----------------------------------------------------------------------
diff --git a/tests/plugins_manager.py b/tests/plugins_manager.py
index 520f822..65eb12f 100644
--- a/tests/plugins_manager.py
+++ b/tests/plugins_manager.py
@@ -45,6 +45,14 @@ class PluginsTest(unittest.TestCase):
         from airflow.executors.test_plugin import PluginExecutor
         self.assertTrue(issubclass(PluginExecutor, BaseExecutor))
 
+        from airflow.executors import GetDefaultExecutor
+        self.assertTrue(issubclass(type(GetDefaultExecutor()), BaseExecutor))
+
+        # test plugin executor import based on a name string, (like defined in airflow.cfg)
+        # this is not identical to the first assertion!
+        from airflow.executors import _get_executor
+        self.assertTrue(issubclass(type(_get_executor('test_plugin.PluginExecutor')), BaseExecutor))
+
     def test_macros(self):
         from airflow.macros.test_plugin import plugin_macro
         self.assertTrue(callable(plugin_macro))


Mime
View raw message