airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1554] Fix wrong DagFileProcessor termination method call
Date Tue, 05 Dec 2017 18:39:32 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master c0c71cac6 -> ff0d75f06


[AIRFLOW-1554] Fix wrong DagFileProcessor termination method call

Closes #2821 from
pdambrauskas/fix/wrong_termination_call


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ff0d75f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ff0d75f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ff0d75f0

Branch: refs/heads/master
Commit: ff0d75f062a34c267f0902c1d4c7b148ba4b490a
Parents: c0c71ca
Author: Paulius <paulius@exacaster.com>
Authored: Tue Dec 5 19:39:25 2017 +0100
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Tue Dec 5 19:39:25 2017 +0100

----------------------------------------------------------------------
 airflow/utils/dag_processing.py    |  2 +-
 tests/utils/test_dag_processing.py | 51 +++++++++++++++++++++++++++++++++
 2 files changed, 52 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ff0d75f0/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index 965e88b..dc0c7a6 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -430,7 +430,7 @@ class DagFileProcessorManager(LoggingMixin):
                 filtered_processors[file_path] = processor
             else:
                 self.log.warning("Stopping processor for %s", file_path)
-                processor.stop()
+                processor.terminate()
         self._processors = filtered_processors
 
     def processing_count(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ff0d75f0/tests/utils/test_dag_processing.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_dag_processing.py b/tests/utils/test_dag_processing.py
new file mode 100644
index 0000000..2b60cd0
--- /dev/null
+++ b/tests/utils/test_dag_processing.py
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from mock import MagicMock
+
+from airflow.utils.dag_processing import DagFileProcessorManager
+
+
+class TestDagFileProcessorManager(unittest.TestCase):
+    def test_set_file_paths_when_processor_file_path_not_in_new_file_paths(self):
+        manager = DagFileProcessorManager(dag_directory='directory', file_paths=['abc.txt'],
+                                          parallelism=1, process_file_interval=1,
+                                          max_runs=1, processor_factory=MagicMock().return_value)
+
+        mock_processor = MagicMock()
+        mock_processor.stop.side_effect = AttributeError(
+            'DagFileProcessor object has no attribute stop')
+        mock_processor.terminate.side_effect = None
+
+        manager._processors['missing_file.txt'] = mock_processor
+
+        manager.set_file_paths(['abc.txt'])
+        self.assertDictEqual(manager._processors, {})
+
+    def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
+        manager = DagFileProcessorManager(dag_directory='directory', file_paths=['abc.txt'],
+                                          parallelism=1, process_file_interval=1,
+                                          max_runs=1, processor_factory=MagicMock().return_value)
+
+        mock_processor = MagicMock()
+        mock_processor.stop.side_effect = AttributeError(
+            'DagFileProcessor object has no attribute stop')
+        mock_processor.terminate.side_effect = None
+
+        manager._processors['abc.txt'] = mock_processor
+
+        manager.set_file_paths(['abc.txt'])
+        self.assertDictEqual(manager._processors, {'abc.txt': mock_processor})


Mime
View raw message