airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From art...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-74] SubdagOperators can consume all celeryd worker processes
Date Tue, 24 Apr 2018 17:13:40 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master be886b986 -> 64d950166


[AIRFLOW-74] SubdagOperators can consume all celeryd worker processes

Closes #3251 from feng-tao/airflow-74


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/64d95016
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/64d95016
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/64d95016

Branch: refs/heads/master
Commit: 64d950166773749c0e4aa0d7032b080cadd56a53
Parents: be886b9
Author: Tao feng <tfeng@lyft.com>
Authored: Tue Apr 24 10:13:25 2018 -0700
Committer: Arthur Wiedmer <arthur@apache.org>
Committed: Tue Apr 24 10:13:25 2018 -0700

----------------------------------------------------------------------
 UPDATING.md                          |  2 ++
 airflow/operators/subdag_operator.py | 22 ++++++++++++++--------
 tests/operators/subdag_operator.py   | 19 +++++++++++++------
 3 files changed, 29 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/64d95016/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 881539f..609c8db 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -5,6 +5,8 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### Default executor for SubDagOperator is changed to SequentialExecutor
+
 ### New Webserver UI with Role-Based Access Control
 
 The current webserver UI uses the Flask-Admin extension. The new webserver UI uses the [Flask-AppBuilder
(FAB)](https://github.com/dpgaspar/Flask-AppBuilder) extension. FAB has built-in authentication
support and Role-Based Access Control (RBAC), which provides configurable roles and permissions
for individual users.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/64d95016/airflow/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/subdag_operator.py b/airflow/operators/subdag_operator.py
index c3c7591..052095e 100644
--- a/airflow/operators/subdag_operator.py
+++ b/airflow/operators/subdag_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,10 +18,10 @@
 # under the License.
 
 from airflow.exceptions import AirflowException
+from airflow.executors.sequential_executor import SequentialExecutor
 from airflow.models import BaseOperator, Pool
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.db import provide_session
-from airflow.executors import GetDefaultExecutor
 
 
 class SubDagOperator(BaseOperator):
@@ -35,16 +35,19 @@ class SubDagOperator(BaseOperator):
     def __init__(
             self,
             subdag,
-            executor=GetDefaultExecutor(),
+            executor=SequentialExecutor(),
             *args, **kwargs):
         """
-        Yo dawg. This runs a sub dag. By convention, a sub dag's dag_id
+        This runs a sub dag. By convention, a sub dag's dag_id
         should be prefixed by its parent and a dot. As in `parent.child`.
 
         :param subdag: the DAG object to run as a subdag of the current DAG.
-        :type subdag: airflow.DAG
-        :param dag: the parent DAG
-        :type subdag: airflow.DAG
+        :type subdag: airflow.DAG.
+        :param dag: the parent DAG for the subdag.
+        :type dag: airflow.DAG.
+        :param executor: the executor for this subdag. Default to use SequentialExecutor.
+                         Please find AIRFLOW-74 for more details.
+        :type executor: airflow.executors.
         """
         import airflow.models
         dag = kwargs.get('dag') or airflow.models._CONTEXT_MANAGER_DAG
@@ -88,6 +91,9 @@ class SubDagOperator(BaseOperator):
                     )
 
         self.subdag = subdag
+        # Airflow pool is not honored by SubDagOperator.
+        # Hence resources could be consumed by SubdagOperators
+        # Use other executor with your own risk.
         self.executor = executor
 
     def execute(self, context):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/64d95016/tests/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py
index 5b51f1c..af47c5c 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,18 +17,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import os
 import unittest
 
 from mock import Mock
 
 import airflow
+from airflow.exceptions import AirflowException
+from airflow.executors.sequential_executor import SequentialExecutor
 from airflow.models import DAG, DagBag
-from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.subdag_operator import SubDagOperator
-from airflow.jobs import BackfillJob
-from airflow.exceptions import AirflowException
 from airflow.utils.timezone import datetime
 
 DEFAULT_DATE = datetime(2016, 1, 1)
@@ -143,3 +141,12 @@ class SubDagOperatorTests(unittest.TestCase):
 
         # now make sure dag picks up the subdag error
         self.assertRaises(AirflowException, dag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+    def test_subdag_executor(self):
+        """
+        Test default subdag executor is SequentialExecutor
+        """
+        dag = DAG('parent', default_args=default_args)
+        subdag_good = DAG('parent.test', default_args=default_args)
+        subdag = SubDagOperator(task_id='test', dag=dag, subdag=subdag_good)
+        self.assertEqual(type(subdag.executor), SequentialExecutor)


Mime
View raw message