airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/4] incubator-airflow git commit: Revert "[AIRFLOW-719] Prevent DAGs from ending prematurely"
Date Tue, 04 Apr 2017 15:04:27 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master f2dae7d15 -> 4a6bef69d


Revert "[AIRFLOW-719] Prevent DAGs from ending prematurely"

This reverts commit 1fdcf2480555f06cce3fc9bba97fbf3d64f074d3.

This reinstates the previous logic (< 1.8.0) that ALL_SUCCESS requires
all tasks to be successful instead of also counting SKIPPED
tasks as part of the successful tasks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/92965e82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/92965e82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/92965e82

Branch: refs/heads/master
Commit: 92965e8275c6f2ec2282ad46c09950bab10c1cb2
Parents: 4c09050
Author: Bolke de Bruin <bolke@xs4all.nl>
Authored: Mon Mar 27 20:12:29 2017 -0700
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Tue Mar 28 17:42:48 2017 -0700

----------------------------------------------------------------------
 airflow/ti_deps/deps/trigger_rule_dep.py      |  6 +-
 tests/dags/test_dagrun_short_circuit_false.py | 38 ----------
 tests/models.py                               | 83 +++++++++++-----------
 3 files changed, 46 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92965e82/airflow/ti_deps/deps/trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py
index 3a77b00..cf06c0b 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -135,7 +135,7 @@ class TriggerRuleDep(BaseTIDep):
             if tr == TR.ALL_SUCCESS:
                 if upstream_failed or failed:
                     ti.set_state(State.UPSTREAM_FAILED, session)
-                elif skipped == upstream:
+                elif skipped:
                     ti.set_state(State.SKIPPED, session)
             elif tr == TR.ALL_FAILED:
                 if successes or skipped:
@@ -148,7 +148,7 @@ class TriggerRuleDep(BaseTIDep):
                     ti.set_state(State.SKIPPED, session)
 
         if tr == TR.ONE_SUCCESS:
-            if successes <= 0 and skipped <= 0:
+            if successes <= 0:
                 yield self._failing_status(
                     reason="Task's trigger rule '{0}' requires one upstream "
                     "task success, but none were found. "
@@ -162,7 +162,7 @@ class TriggerRuleDep(BaseTIDep):
                     "upstream_tasks_state={1}, upstream_task_ids={2}"
                     .format(tr, upstream_tasks_state, task.upstream_task_ids))
         elif tr == TR.ALL_SUCCESS:
-            num_failures = upstream - (successes + skipped)
+            num_failures = upstream - successes
             if num_failures > 0:
                 yield self._failing_status(
                     reason="Task's trigger rule '{0}' requires all upstream "

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92965e82/tests/dags/test_dagrun_short_circuit_false.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_dagrun_short_circuit_false.py b/tests/dags/test_dagrun_short_circuit_false.py
deleted file mode 100644
index 805ab67..0000000
--- a/tests/dags/test_dagrun_short_circuit_false.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from datetime import datetime
-
-from airflow.models import DAG
-from airflow.operators.python_operator import ShortCircuitOperator
-from airflow.operators.dummy_operator import DummyOperator
-
-
-# DAG that has its short circuit op fail and skip multiple downstream tasks
-dag = DAG(
-    dag_id='test_dagrun_short_circuit_false',
-    start_date=datetime(2017, 1, 1)
-)
-dag_task1 = ShortCircuitOperator(
-    task_id='test_short_circuit_false',
-    dag=dag,
-    python_callable=lambda: False)
-dag_task2 = DummyOperator(
-    task_id='test_state_skipped1',
-    dag=dag)
-dag_task3 = DummyOperator(
-    task_id='test_state_skipped2',
-    dag=dag)
-dag_task1.set_downstream(dag_task2)
-dag_task2.set_downstream(dag_task3)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92965e82/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index dcba354..3e77894 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -31,11 +31,12 @@ from airflow.models import DagModel
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
+from airflow.operators.python_operator import ShortCircuitOperator
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.utils.state import State
 from mock import patch
 from nose_parameterized import parameterized
-from tests.core import TEST_DAG_FOLDER
+
 
 DEFAULT_DATE = datetime.datetime(2016, 1, 1)
 TEST_DAGS_FOLDER = os.path.join(
@@ -197,17 +198,13 @@ class DagTest(unittest.TestCase):
 
 class DagRunTest(unittest.TestCase):
 
-    def setUp(self):
-        self.dagbag = models.DagBag(dag_folder=TEST_DAG_FOLDER)
-
-    def create_dag_run(self, dag_id, state=State.RUNNING, task_states=None):
+    def create_dag_run(self, dag, state=State.RUNNING, task_states=None):
         now = datetime.datetime.now()
-        dag = self.dagbag.get_dag(dag_id)
         dag_run = dag.create_dagrun(
             run_id='manual__' + now.isoformat(),
             execution_date=now,
             start_date=now,
-            state=State.RUNNING,
+            state=state,
             external_trigger=False,
         )
 
@@ -223,37 +220,38 @@ class DagRunTest(unittest.TestCase):
     def test_id_for_date(self):
         run_id = models.DagRun.id_for_date(
             datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None))
-        self.assertEqual(
-            'scheduled__2015-01-02T03:04:05', run_id,
-            'Generated run_id did not match expectations: {0}'.format(run_id))
-
-    def test_dagrun_running_when_upstream_skipped(self):
-        """
-        Tests that a DAG run is not failed when an upstream task is skipped
-        """
-        initial_task_states = {
-            'test_short_circuit_false': State.SUCCESS,
-            'test_state_skipped1': State.SKIPPED,
-            'test_state_skipped2': State.NONE,
-        }
-        # dags/test_dagrun_short_circuit_false.py
-        dag_run = self.create_dag_run('test_dagrun_short_circuit_false',
-                                      state=State.RUNNING,
-                                      task_states=initial_task_states)
-        updated_dag_state = dag_run.update_state()
-        self.assertEqual(State.RUNNING, updated_dag_state)
+        self.assertEqual('scheduled__2015-01-02T03:04:05', run_id,
+                         msg='Generated run_id did not match expectations: {0}'
+                         .format(run_id))
 
     def test_dagrun_success_when_all_skipped(self):
         """
         Tests that a DAG run succeeds when all tasks are skipped
         """
+        dag = DAG(
+            dag_id='test_dagrun_success_when_all_skipped',
+            start_date=datetime.datetime(2017, 1, 1)
+        )
+        dag_task1 = ShortCircuitOperator(
+            task_id='test_short_circuit_false',
+            dag=dag,
+            python_callable=lambda: False)
+        dag_task2 = DummyOperator(
+            task_id='test_state_skipped1',
+            dag=dag)
+        dag_task3 = DummyOperator(
+            task_id='test_state_skipped2',
+            dag=dag)
+        dag_task1.set_downstream(dag_task2)
+        dag_task2.set_downstream(dag_task3)
+
         initial_task_states = {
             'test_short_circuit_false': State.SUCCESS,
             'test_state_skipped1': State.SKIPPED,
             'test_state_skipped2': State.SKIPPED,
         }
-        # dags/test_dagrun_short_circuit_false.py
-        dag_run = self.create_dag_run('test_dagrun_short_circuit_false',
+
+        dag_run = self.create_dag_run(dag=dag,
                                       state=State.RUNNING,
                                       task_states=initial_task_states)
         updated_dag_state = dag_run.update_state()
@@ -314,10 +312,17 @@ class DagRunTest(unittest.TestCase):
         """
         Make sure that a proper value is returned when a dagrun has no task instances
         """
+        dag = DAG(
+            dag_id='test_get_task_instance_on_empty_dagrun',
+            start_date=datetime.datetime(2017, 1, 1)
+        )
+        dag_task1 = ShortCircuitOperator(
+            task_id='test_short_circuit_false',
+            dag=dag,
+            python_callable=lambda: False)
+
         session = settings.Session()
 
-        # Any dag will work for this
-        dag = self.dagbag.get_dag('test_dagrun_short_circuit_false')
         now = datetime.datetime.now()
 
         # Don't use create_dagrun since it will create the task instances too which we
@@ -713,7 +718,7 @@ class TaskInstanceTest(unittest.TestCase):
         self.assertEqual(dt, ti.end_date+max_delay)
 
     def test_depends_on_past(self):
-        dagbag = models.DagBag(dag_folder=TEST_DAG_FOLDER)
+        dagbag = models.DagBag()
         dag = dagbag.get_dag('test_depends_on_past')
         dag.clear()
         task = dag.tasks[0]
@@ -742,11 +747,10 @@ class TaskInstanceTest(unittest.TestCase):
         #
         # Tests for all_success
         #
-        ['all_success', 5, 0, 0, 0, 5, True, None, True],
-        ['all_success', 2, 0, 0, 0, 2, True, None, False],
-        ['all_success', 2, 0, 1, 0, 3, True, ST.UPSTREAM_FAILED, False],
-        ['all_success', 2, 1, 0, 0, 3, True, None, False],
-        ['all_success', 0, 5, 0, 0, 5, True, ST.SKIPPED, True],
+        ['all_success', 5, 0, 0, 0, 0, True, None, True],
+        ['all_success', 2, 0, 0, 0, 0, True, None, False],
+        ['all_success', 2, 0, 1, 0, 0, True, ST.UPSTREAM_FAILED, False],
+        ['all_success', 2, 1, 0, 0, 0, True, ST.SKIPPED, False],
         #
         # Tests for one_success
         #
@@ -754,7 +758,6 @@ class TaskInstanceTest(unittest.TestCase):
         ['one_success', 2, 0, 0, 0, 2, True, None, True],
         ['one_success', 2, 0, 1, 0, 3, True, None, True],
         ['one_success', 2, 1, 0, 0, 3, True, None, True],
-        ['one_success', 0, 2, 0, 0, 2, True, None, True],
         #
         # Tests for all_failed
         #
@@ -766,9 +769,9 @@ class TaskInstanceTest(unittest.TestCase):
         #
         # Tests for one_failed
         #
-        ['one_failed', 5, 0, 0, 0, 5, True, ST.SKIPPED, False],
-        ['one_failed', 2, 0, 0, 0, 2, True, None, False],
-        ['one_failed', 2, 0, 1, 0, 2, True, None, True],
+        ['one_failed', 5, 0, 0, 0, 0, True, None, False],
+        ['one_failed', 2, 0, 0, 0, 0, True, None, False],
+        ['one_failed', 2, 0, 1, 0, 0, True, None, True],
         ['one_failed', 2, 1, 0, 0, 3, True, None, False],
         ['one_failed', 2, 3, 0, 0, 5, True, ST.SKIPPED, False],
         #


Mime
View raw message