airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davy...@apache.org
Subject [2/3] incubator-airflow git commit: [AIRFLOW-149] Task Dependency Engine + Why Isn't My Task Running View
Date Fri, 26 Aug 2016 22:08:03 GMT
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/airflow/ti_deps/deps/runnable_exec_date_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/runnable_exec_date_dep.py b/airflow/ti_deps/deps/runnable_exec_date_dep.py
new file mode 100644
index 0000000..8cbdabf
--- /dev/null
+++ b/airflow/ti_deps/deps/runnable_exec_date_dep.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from datetime import datetime
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+from airflow.utils.db import provide_session
+
+
+class RunnableExecDateDep(BaseTIDep):
+    NAME = "Execution Date"
+    IGNOREABLE = True
+
+    @provide_session
+    def _get_dep_statuses(self, ti, session, dep_context):
+        cur_date = datetime.now()
+
+        if ti.execution_date > cur_date:
+            yield self._failing_status(
+                reason="Execution date {0} is in the future (the current "
+                       "date is {1}).".format(ti.execution_date.isoformat(),
+                                              cur_date.isoformat()))
+
+        if ti.task.end_date and ti.execution_date > ti.task.end_date:
+            yield self._failing_status(
+                reason="The execution date is {0} but this is after the task's end date "
+                "{1}.".format(
+                    ti.execution_date.isoformat(),
+                    ti.task.end_date.isoformat()))
+
+        if (ti.task.dag and
+                ti.task.dag.end_date and
+                ti.execution_date > ti.task.dag.end_date):
+            yield self._failing_status(
+                reason="The execution date is {0} but this is after the task's DAG's "
+                "end date {1}.".format(
+                    ti.execution_date.isoformat(),
+                    ti.task.dag.end_date.isoformat()))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/airflow/ti_deps/deps/trigger_rule_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py
new file mode 100644
index 0000000..9a2e78d
--- /dev/null
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -0,0 +1,179 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from sqlalchemy import case, func
+
+import airflow
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+from airflow.utils.db import provide_session
+from airflow.utils.state import State
+
+
+class TriggerRuleDep(BaseTIDep):
+    """
+    Determines if a task's upstream tasks are in a state that allows a given task instance
+    to run.
+    """
+    NAME = "Trigger Rule"
+    IGNOREABLE = True
+    IS_TASK_DEP = True
+
+    @provide_session
+    def _get_dep_statuses(self, ti, session, dep_context):
+        TI = airflow.models.TaskInstance
+        TR = airflow.models.TriggerRule
+
+        # Checking that all upstream dependencies have succeeded
+        if not ti.task.upstream_list:
+            yield self._passing_status(
+                reason="The task instance did not have any upstream tasks.")
+            raise StopIteration
+
+        if ti.task.trigger_rule == TR.DUMMY:
+            yield self._passing_status(reason="The task had a dummy trigger rule set.")
+            raise StopIteration
+
+        # TODO(unknown): this query becomes quite expensive with dags that have many
+        # tasks. It should be refactored to let the task report to the dag run and get the
+        # aggregates from there.
+        qry = (
+            session
+            .query(
+                func.coalesce(func.sum(
+                    case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
+                func.coalesce(func.sum(
+                    case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
+                func.coalesce(func.sum(
+                    case([(TI.state == State.FAILED, 1)], else_=0)), 0),
+                func.coalesce(func.sum(
+                    case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0),
+                func.count(TI.task_id),
+            )
+            .filter(
+                TI.dag_id == ti.dag_id,
+                TI.task_id.in_(ti.task.upstream_task_ids),
+                TI.execution_date == ti.execution_date,
+                TI.state.in_([
+                    State.SUCCESS, State.FAILED,
+                    State.UPSTREAM_FAILED, State.SKIPPED]),
+            )
+        )
+
+        successes, skipped, failed, upstream_failed, done = qry.first()
+        for dep_status in self._evaluate_trigger_rule(
+                ti=ti,
+                successes=successes,
+                skipped=skipped,
+                failed=failed,
+                upstream_failed=upstream_failed,
+                done=done,
+                flag_upstream_failed=dep_context.flag_upstream_failed,
+                session=session):
+            yield dep_status
+
+    @provide_session
+    def _evaluate_trigger_rule(
+            self,
+            ti,
+            successes,
+            skipped,
+            failed,
+            upstream_failed,
+            done,
+            flag_upstream_failed,
+            session):
+        """
+        Yields a dependency status that indicate whether the given task instance's trigger
+        rule was met.
+
+        :param ti: the task instance to evaluate the trigger rule of
+        :type ti: TaskInstance
+        :param successes: Number of successful upstream tasks
+        :type successes: boolean
+        :param skipped: Number of skipped upstream tasks
+        :type skipped: boolean
+        :param failed: Number of failed upstream tasks
+        :type failed: boolean
+        :param upstream_failed: Number of upstream_failed upstream tasks
+        :type upstream_failed: boolean
+        :param done: Number of completed upstream tasks
+        :type done: boolean
+        :param flag_upstream_failed: This is a hack to generate
+            the upstream_failed state creation while checking to see
+            whether the task instance is runnable. It was the shortest
+            path to add the feature
+        :type flag_upstream_failed: boolean
+        :param session: database session
+        :type session: Session
+        """
+
+        TR = airflow.models.TriggerRule
+
+        task = ti.task
+        upstream = len(task.upstream_task_ids)
+        tr = task.trigger_rule
+        upstream_done = done >= upstream
+
+        # TODO(aoen): Ideally each individual trigger rules would be it's own class, but
+        # this isn't very feasible at the moment since the database queries need to be
+        # bundled together for efficiency.
+        # handling instant state assignment based on trigger rules
+        if flag_upstream_failed:
+            if tr == TR.ALL_SUCCESS:
+                if upstream_failed or failed:
+                    ti.set_state(State.UPSTREAM_FAILED, session)
+                elif skipped:
+                    ti.set_state(State.SKIPPED, session)
+            elif tr == TR.ALL_FAILED:
+                if successes or skipped:
+                    ti.set_state(State.SKIPPED, session)
+            elif tr == TR.ONE_SUCCESS:
+                if upstream_done and not successes:
+                    ti.set_state(State.SKIPPED, session)
+            elif tr == TR.ONE_FAILED:
+                if upstream_done and not (failed or upstream_failed):
+                    ti.set_state(State.SKIPPED, session)
+
+        if tr == TR.ONE_SUCCESS:
+            if successes <= 0:
+                yield self._failing_status(
+                    reason="Task's trigger rule '{0}' requires one upstream task "
+                           "success, but none were found.".format(tr))
+        elif tr == TR.ONE_FAILED:
+            if not failed and not upstream_failed:
+                yield self._failing_status(
+                    reason="Task's trigger rule '{0}' requires one upstream task failure "
+                           ", but none were found.".format(tr))
+        elif tr == TR.ALL_SUCCESS:
+            num_failures = upstream - successes
+            if num_failures > 0:
+                yield self._failing_status(
+                    reason="Task's trigger rule '{0}' requires all upstream tasks to "
+                           "have succeeded, but found {1} non-success(es)."
+                           .format(tr, num_failures))
+        elif tr == TR.ALL_FAILED:
+            num_successes = upstream - failed - upstream_failed
+            if num_successes > 0:
+                yield self._failing_status(
+                    reason="Task's trigger rule '{0}' requires all upstream tasks to "
+                           "have failed, but found {1} non-faliure(s)."
+                           .format(tr, num_successes))
+        elif tr == TR.ALL_DONE:
+            if not upstream_done:
+                yield self._failing_status(
+                    reason="Task's trigger rule '{0}' requires all upstream tasks to "
+                           "have completed, but found '{1}' task(s) that weren't done."
+                           .format(tr, upstream - done))
+        else:
+            yield self._failing_status(
+                reason="No strategy to evaluate trigger rule '{0}'.".format(tr))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/airflow/ti_deps/deps/valid_state_dep.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/deps/valid_state_dep.py b/airflow/ti_deps/deps/valid_state_dep.py
new file mode 100644
index 0000000..c08c1d9
--- /dev/null
+++ b/airflow/ti_deps/deps/valid_state_dep.py
@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from airflow.exceptions import AirflowException
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+from airflow.utils.db import provide_session
+
+
+class ValidStateDep(BaseTIDep):
+    NAME = "Task Instance State"
+    IGNOREABLE = True
+
+    """
+    Ensures that the task instance's state is in a given set of valid states.
+
+    :param valid_states: A list of valid states that a task instance can have to meet
+        this dependency.
+    :type valid_states: set(str)
+    :return: whether or not the task instance's state is valid
+    """
+    def __init__(self, valid_states):
+        super(ValidStateDep, self).__init__()
+
+        if not valid_states:
+            raise AirflowException(
+                'ValidStatesDep received an empty set of valid states.')
+        self._valid_states = valid_states
+
+    def __eq__(self, other):
+        return type(self) == type(other) and self._valid_states == other._valid_states
+
+    def __hash__(self):
+        return hash((type(self), tuple(self._valid_states)))
+
+    @provide_session
+    def _get_dep_statuses(self, ti, session, dep_context):
+        if dep_context.ignore_ti_state:
+            yield self._passing_status(
+                reason="Context specified that state should be ignored.")
+            raise StopIteration
+
+        if ti.state in self._valid_states:
+            yield self._passing_status(reason="Task state {} was valid.".format(ti.state))
+            raise StopIteration
+
+        yield self._failing_status(
+            reason="Task is in the '{0}' state which is not a valid state for "
+                   "execution. The task must be cleared in order to be run.".format(
+                       ti.state))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/airflow/utils/state.py
----------------------------------------------------------------------
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index b650964..4a1dfb6 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -85,18 +85,6 @@ class State(object):
             return 'black'
 
     @classmethod
-    def runnable(cls):
-        return [
-            cls.NONE,
-            cls.FAILED,
-            cls.UP_FOR_RETRY,
-            cls.UPSTREAM_FAILED,
-            cls.SKIPPED,
-            cls.QUEUED,
-            cls.SCHEDULED
-        ]
-
-    @classmethod
     def finished(cls):
         """
         A list of states indicating that a task started and completed a

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/airflow/www/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/www/__init__.py b/airflow/www/__init__.py
index 759b563..c82f579 100644
--- a/airflow/www/__init__.py
+++ b/airflow/www/__init__.py
@@ -11,4 +11,4 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/airflow/www/templates/airflow/dag.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html
index 79d2606..d7bc0e0 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -125,7 +125,7 @@
               <hr/>
           </div>
           <button id="btn_task" type="button" class="btn btn-primary">
-            Task Details
+            Task Instance Details
           </button>
           <button id="btn_rendered" type="button" class="btn btn-primary">
             Rendered
@@ -142,14 +142,21 @@
             Run
           </button>
           <span class="btn-group">
-            <button id="btn_force"
+            <button id="btn_ignore_all_deps"
               type="button" class="btn" data-toggle="button"
-              title="Ignore previous success, re-run regardless"
-              >Force</button>
-            <button id="btn_deps"
+              title="Ignores all non-critical dependencies, including task state and task_deps"
+              >Ignore All Deps</button>
+          </span>
+          <span class="btn-group">
+            <button id="btn_ignore_ti_state"
               type="button" class="btn" data-toggle="button"
-              tittle="Disregard the status of upstream dependencies and depends_on_past"
-              >Ignore Dependencies</button>
+              title="Ignore previous success/failure"
+              >Ignore Task State</button>
+          </span>
+          <button id="btn_ignore_task_deps"
+            type="button" class="btn" data-toggle="button"
+            title="Disregard the task-specific dependencies, e.g. status of upstream task instances and depends_on_past"
+            >Ignore Task Deps</button>
           </span>
           <hr/>
           <button id="btn_clear" type="button" class="btn btn-primary"
@@ -300,8 +307,9 @@ function updateQueryStringParameter(uri, key, value) {
       url = "{{ url_for('airflow.run') }}" +
         "?task_id=" + encodeURIComponent(task_id) +
         "&dag_id=" + encodeURIComponent(dag_id) +
-        "&force=" + $('#btn_force').hasClass('active') +
-        "&deps=" + $('#btn_deps').hasClass('active') +
+        "&ignore_all_deps=" + $('#btn_ignore_all_deps').hasClass('active') +
+        "&ignore_task_deps=" + $('#btn_ignore_task_deps').hasClass('active') +
+        "&ignore_ti_state=" + $('#btn_ignore_ti_state').hasClass('active') +
         "&execution_date=" + execution_date +
         "&origin=" + encodeURIComponent(window.location);
       window.location = url;

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/airflow/www/templates/airflow/task.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/task.html b/airflow/www/templates/airflow/task.html
index 653ad0b..88af001 100644
--- a/airflow/www/templates/airflow/task.html
+++ b/airflow/www/templates/airflow/task.html
@@ -22,6 +22,24 @@
     {{ super() }}
     <h4>{{ title }}</h4>
     <div>
+        <h5>Dependencies Blocking Task From Getting Scheduled</h5>
+        <table class="table table-striped table-bordered">
+            <tr>
+                <th>Dependency</th>
+                <th>Reason</th>
+            </tr>
+            {% for dependency, reason in failed_dep_reasons %}
+                <tr>
+                    <td>{{ dependency }}</td>
+                    {% autoescape false %}
+                    <td class='code'>{{ reason }}</td>
+                    {% endautoescape %}
+                </tr>
+            {% endfor %}
+        </table>
+        {{ html_code|safe }}
+    </div>
+    <div>
         {% for attr, value in special_attrs_rendered.items() %}
             <h5>Attribute: {{ attr }}</h5>
             {{ value|safe }}

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/airflow/www/templates/airflow/task_instance.html
----------------------------------------------------------------------
diff --git a/airflow/www/templates/airflow/task_instance.html b/airflow/www/templates/airflow/task_instance.html
index db8cd3e..59a2f73 100644
--- a/airflow/www/templates/airflow/task_instance.html
+++ b/airflow/www/templates/airflow/task_instance.html
@@ -39,7 +39,7 @@
   <ul class="nav nav-pills">
     <li><a href="{{ url_for("airflow.task", dag_id=dag.dag_id, task_id=task_id, execution_date=execution_date) }}">
         <span class="glyphicon glyphicon-certificate" aria-hidden="true"></span>
-      Task Details</a></li>
+      Task Instance Details</a></li>
     <li><a href="{{ url_for("airflow.rendered", dag_id=dag.dag_id, task_id=task_id, execution_date=execution_date) }}">
         <span class="glyphicon glyphicon-certificate" aria-hidden="true"></span>
       Rendered Template</a></li>

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/airflow/www/views.py
----------------------------------------------------------------------
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 28d17ea..1370a06 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -29,6 +29,7 @@ import json
 from lxml import html
 
 import inspect
+from textwrap import dedent
 import traceback
 
 import sqlalchemy as sqla
@@ -61,6 +62,7 @@ from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.settings import Session
 from airflow.models import XCom
+from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS
 
 from airflow.models import BaseOperator
 from airflow.operators.subdag_operator import SubDagOperator
@@ -799,6 +801,8 @@ class Airflow(BaseView):
     @login_required
     @wwwutils.action_logging
     def task(self):
+        TI = models.TaskInstance
+
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
         # Carrying execution_date through, even though it's irrelevant for
@@ -807,15 +811,17 @@ class Airflow(BaseView):
         dttm = dateutil.parser.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
+
         if not dag or task_id not in dag.task_ids:
             flash(
                 "Task [{}.{}] doesn't seem to exist"
                 " at the moment".format(dag_id, task_id),
                 "error")
             return redirect('/admin/')
-        task = dag.get_task(task_id)
-        task = copy.copy(task)
+        task = copy.copy(dag.get_task(task_id))
         task.resolve_template_files()
+        ti = TI(task=task, execution_date=dttm)
+        ti.refresh_from_db()
 
         attributes = []
         for attr_name in dir(task):
@@ -825,7 +831,6 @@ class Airflow(BaseView):
                                 attr_name not in attr_renderer:
                     attributes.append((attr_name, str(attr)))
 
-        title = "Task Details"
         # Color coding the special attributes that are code
         special_attrs_rendered = {}
         for attr_name in attr_renderer:
@@ -833,9 +838,29 @@ class Airflow(BaseView):
                 source = getattr(task, attr_name)
                 special_attrs_rendered[attr_name] = attr_renderer[attr_name](source)
 
+        no_failed_deps_result = [(
+            "Unknown",
+            dedent("""\
+            All dependencies are met but the task instance is not running. In most cases this just means that the task will probably be scheduled soon unless:<br/>
+            - The scheduler is down or under heavy load<br/>
+            {}
+            <br/>
+            If this task instance does not start soon please contact your Airflow administrator for assistance."""
+                   .format(
+                       "- This task instance already ran and had it's state changed manually (e.g. cleared in the UI)<br/>"
+                       if ti.state == State.NONE else "")))]
+
+        # Use the scheduler's context to figure out which dependencies are not met
+        dep_context = DepContext(SCHEDULER_DEPS)
+        failed_dep_reasons = [(dep.dep_name, dep.reason) for dep in
+                              ti.get_failed_dep_statuses(
+                                  dep_context=dep_context)]
+
+        title = "Task Instance Details"
         return self.render(
             'airflow/task.html',
             attributes=attributes,
+            failed_dep_reasons=failed_dep_reasons or no_failed_deps_result,
             task_id=task_id,
             execution_date=execution_date,
             special_attrs_rendered=special_attrs_rendered,
@@ -893,8 +918,9 @@ class Airflow(BaseView):
 
         execution_date = request.args.get('execution_date')
         execution_date = dateutil.parser.parse(execution_date)
-        force = request.args.get('force') == "true"
-        deps = request.args.get('deps') == "true"
+        ignore_all_deps = request.args.get('ignore_all_deps') == "true"
+        ignore_task_deps = request.args.get('ignore_task_deps') == "true"
+        ignore_ti_state = request.args.get('ignore_ti_state') == "true"
 
         try:
             from airflow.executors import DEFAULT_EXECUTOR as executor
@@ -908,9 +934,29 @@ class Airflow(BaseView):
             return redirect(origin)
 
         ti = models.TaskInstance(task=task, execution_date=execution_date)
+        ti.refresh_from_db()
+
+        # Make sure the task instance can be queued
+        dep_context = DepContext(
+            deps=QUEUE_DEPS,
+            ignore_all_deps=ignore_all_deps,
+            ignore_task_deps=ignore_task_deps,
+            ignore_ti_state=ignore_ti_state)
+        failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context))
+        if failed_deps:
+            failed_deps_str = ", ".join(
+                ["{}: {}".format(dep.dep_name, dep.reason) for dep in failed_deps])
+            flash("Could not queue task instance for execution, dependencies not met: "
+                  "{}".format(failed_deps_str),
+                  "error")
+            return redirect(origin)
+
         executor.start()
         executor.queue_task_instance(
-            ti, force=force, ignore_dependencies=deps)
+            ti,
+            ignore_all_deps=ignore_all_deps,
+            ignore_task_deps=ignore_task_deps,
+            ignore_ti_state=ignore_ti_state)
         executor.heartbeat()
         flash(
             "Sent {} to the message queue, "
@@ -926,7 +972,6 @@ class Airflow(BaseView):
         task_id = request.args.get('task_id')
         origin = request.args.get('origin')
         dag = dagbag.get_dag(dag_id)
-        task = dag.get_task(task_id)
 
         execution_date = request.args.get('execution_date')
         execution_date = dateutil.parser.parse(execution_date)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/docs/faq.rst
----------------------------------------------------------------------
diff --git a/docs/faq.rst b/docs/faq.rst
index 68b3189..d304b06 100644
--- a/docs/faq.rst
+++ b/docs/faq.rst
@@ -29,7 +29,7 @@ Here are some of the common causes:
   needs to have succeeded (except if it is the first run for that task).
   Also, if ``wait_for_downstream=True``, make sure you understand
   what it means.
-  You can view how these properties are set from the ``Task Details``
+  You can view how these properties are set from the ``Task Instance Details``
   page for your task.
 
 - Are the DagRuns you need created and active? A DagRun represents a specific

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/contrib/operators/fs_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/fs_operator.py b/tests/contrib/operators/fs_operator.py
index 2e79f86..f990157 100644
--- a/tests/contrib/operators/fs_operator.py
+++ b/tests/contrib/operators/fs_operator.py
@@ -58,7 +58,7 @@ class FileSensorTest(unittest.TestCase):
             _hook=self.hook,
             dag=self.dag,
         )
-        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/contrib/operators/ssh_execute_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/ssh_execute_operator.py b/tests/contrib/operators/ssh_execute_operator.py
index 355f163..2975726 100644
--- a/tests/contrib/operators/ssh_execute_operator.py
+++ b/tests/contrib/operators/ssh_execute_operator.py
@@ -59,7 +59,7 @@ class SSHExecuteOperatorTest(unittest.TestCase):
             ssh_hook=self.hook,
             dag=self.dag,
         )
-        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_with_env(self):
         task = SSHExecuteOperator(
@@ -69,7 +69,7 @@ class SSHExecuteOperatorTest(unittest.TestCase):
             env={"AIRFLOW_test": "test"},
             dag=self.dag,
         )
-        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 40d0fc6..e443a03 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -34,6 +34,7 @@ from dateutil.relativedelta import relativedelta
 from airflow import configuration
 from airflow.executors import SequentialExecutor, LocalExecutor
 from airflow.models import Variable
+from tests.test_utils.fake_datetime import FakeDatetime
 
 configuration.load_test_config()
 from airflow import jobs, models, DAG, utils, macros, settings, exceptions
@@ -76,13 +77,6 @@ except ImportError:
     import pickle
 
 
-class FakeDatetime(datetime):
-    "A fake replacement for datetime that can be mocked for testing."
-
-    def __new__(cls, *args, **kwargs):
-        return date.__new__(datetime, *args, **kwargs)
-
-
 def reset(dag_id=TEST_DAG_ID):
     session = Session()
     tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
@@ -325,7 +319,7 @@ class CoreTest(unittest.TestCase):
             task_id='time_sensor_check',
             target_time=time(0),
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_check_operators(self):
 
@@ -340,7 +334,7 @@ class CoreTest(unittest.TestCase):
             sql="select count(*) from operator_test_table",
             conn_id=conn_id,
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         t = ValueCheckOperator(
             task_id='value_check',
@@ -349,7 +343,7 @@ class CoreTest(unittest.TestCase):
             conn_id=conn_id,
             sql="SELECT 100",
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         captainHook.run("drop table operator_test_table")
 
@@ -382,7 +376,7 @@ class CoreTest(unittest.TestCase):
             task_id='time_sensor_check',
             bash_command="echo success",
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_bash_operator_multi_byte_output(self):
         t = BashOperator(
@@ -390,7 +384,7 @@ class CoreTest(unittest.TestCase):
             bash_command=u"echo \u2600",
             dag=self.dag,
             output_encoding='utf-8')
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_trigger_dagrun(self):
         def trigga(context, obj):
@@ -402,7 +396,7 @@ class CoreTest(unittest.TestCase):
             trigger_dag_id='example_bash_operator',
             python_callable=trigga,
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_dryrun(self):
         t = BashOperator(
@@ -417,14 +411,14 @@ class CoreTest(unittest.TestCase):
             task_id='time_sqlite',
             sql="CREATE TABLE IF NOT EXISTS unitest (dummy VARCHAR(20))",
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_timedelta_sensor(self):
         t = sensors.TimeDeltaSensor(
             task_id='timedelta_sensor_check',
             delta=timedelta(seconds=2),
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_external_task_sensor(self):
         t = sensors.ExternalTaskSensor(
@@ -432,7 +426,7 @@ class CoreTest(unittest.TestCase):
             external_dag_id=TEST_DAG_ID,
             external_task_id='time_sensor_check',
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_external_task_sensor_delta(self):
         t = sensors.ExternalTaskSensor(
@@ -442,7 +436,7 @@ class CoreTest(unittest.TestCase):
             execution_delta=timedelta(0),
             allowed_states=['success'],
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_external_task_sensor_fn(self):
         self.test_time_sensor()
@@ -454,7 +448,7 @@ class CoreTest(unittest.TestCase):
             execution_date_fn=lambda dt: dt + timedelta(0),
             allowed_states=['success'],
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         # double check that the execution is being called by failing the test
         t2 = sensors.ExternalTaskSensor(
@@ -467,7 +461,7 @@ class CoreTest(unittest.TestCase):
             poke_interval=1,
             dag=self.dag)
         with self.assertRaises(exceptions.AirflowSensorTimeout):
-            t2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_external_task_sensor_error_delta_and_fn(self):
         """
@@ -492,7 +486,7 @@ class CoreTest(unittest.TestCase):
         self.assertRaises(
             exceptions.AirflowTaskTimeout,
             t.run,
-            start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_python_op(self):
         def test_py_op(templates_dict, ds, **kwargs):
@@ -505,7 +499,7 @@ class CoreTest(unittest.TestCase):
             python_callable=test_py_op,
             templates_dict={'ds': "{{ ds }}"},
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_complex_template(self):
         def verify_templated_field(context):
@@ -519,7 +513,7 @@ class CoreTest(unittest.TestCase):
             },
             on_success_callback=verify_templated_field,
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_template_with_variable(self):
         """
@@ -541,7 +535,7 @@ class CoreTest(unittest.TestCase):
             some_templated_field='{{ var.value.a_variable }}',
             on_success_callback=verify_templated_field,
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
         assert val['success']
 
     def test_template_with_json_variable(self):
@@ -564,7 +558,7 @@ class CoreTest(unittest.TestCase):
             some_templated_field='{{ var.json.a_variable.obj.v2 }}',
             on_success_callback=verify_templated_field,
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
         assert val['success']
 
     def test_template_with_json_variable_as_value(self):
@@ -588,7 +582,7 @@ class CoreTest(unittest.TestCase):
             some_templated_field='{{ var.value.a_variable }}',
             on_success_callback=verify_templated_field,
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
         assert val['success']
 
     def test_import_examples(self):
@@ -598,7 +592,7 @@ class CoreTest(unittest.TestCase):
         TI = models.TaskInstance
         ti = TI(
             task=self.runme_0, execution_date=DEFAULT_DATE)
-        job = jobs.LocalTaskJob(task_instance=ti, force=True)
+        job = jobs.LocalTaskJob(task_instance=ti, ignore_ti_state=True)
         job.run()
 
     def test_scheduler_job(self):
@@ -611,7 +605,7 @@ class CoreTest(unittest.TestCase):
         ti = TI(
             task=self.runme_0, execution_date=DEFAULT_DATE)
         ti.dag = self.dag_bash
-        ti.run(force=True)
+        ti.run(ignore_ti_state=True)
 
     def test_doctests(self):
         modules = [utils, macros]
@@ -767,7 +761,7 @@ class CoreTest(unittest.TestCase):
 
         ti = TI(task=task, execution_date=DEFAULT_DATE)
         job = jobs.LocalTaskJob(
-            task_instance=ti, force=True, executor=SequentialExecutor())
+            task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
 
         # Running task instance asynchronously
         p = multiprocessing.Process(target=job.run)
@@ -812,11 +806,11 @@ class CoreTest(unittest.TestCase):
             dag=self.dag)
         session = settings.Session()
         try:
-            p.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            p.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
         except:
             pass
         try:
-            f.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            f.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
         except:
             pass
         p_fails = session.query(models.TaskFail).filter_by(
@@ -1192,8 +1186,9 @@ class WebUiTests(unittest.TestCase):
         response = self.app.get(url + "&confirmed=true")
         url = (
             "/admin/airflow/run?task_id=runme_0&"
-            "dag_id=example_bash_operator&force=true&deps=true&"
-            "execution_date={}&origin=/admin".format(DEFAULT_DATE_DS))
+            "dag_id=example_bash_operator&ignore_all_deps=false&ignore_ti_state=true&"
+            "ignore_task_deps=true&execution_date={}&"
+            "origin=/admin".format(DEFAULT_DATE_DS))
         response = self.app.get(url)
         response = self.app.get(
             "/admin/airflow/refresh?dag_id=example_bash_operator")
@@ -1232,7 +1227,7 @@ class WebUiTests(unittest.TestCase):
         TI = models.TaskInstance
         ti = TI(
             task=self.runme_0, execution_date=DEFAULT_DATE)
-        job = jobs.LocalTaskJob(task_instance=ti, force=True)
+        job = jobs.LocalTaskJob(task_instance=ti, ignore_ti_state=True)
         job.run()
 
         response = self.app.get(url)
@@ -1454,7 +1449,7 @@ class HttpOpSensorTest(unittest.TestCase):
             data={"client": "ubuntu", "q": "airflow"},
             headers={},
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     @mock.patch('requests.Session', FakeSession)
     def test_get_response_check(self):
@@ -1466,7 +1461,7 @@ class HttpOpSensorTest(unittest.TestCase):
             response_check=lambda response: ("airbnb/airflow" in response.text),
             headers={},
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     @mock.patch('requests.Session', FakeSession)
     def test_sensor(self):
@@ -1480,7 +1475,7 @@ class HttpOpSensorTest(unittest.TestCase):
             poke_interval=5,
             timeout=15,
             dag=self.dag)
-        sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
 class FakeWebHDFSHook(object):
     def __init__(self, conn_id):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/dags/test_issue_1225.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py
index 8f43b08..021561f 100644
--- a/tests/dags/test_issue_1225.py
+++ b/tests/dags/test_issue_1225.py
@@ -39,7 +39,7 @@ def delayed_fail():
     """
     Delayed failure to make sure that processes are running before the error
     is raised.
-    
+
     TODO handle more directly (without sleeping)
     """
     time.sleep(5)
@@ -129,17 +129,3 @@ dag7_subdag1 = SubDagOperator(
     subdag=subdag7)
 subdag7_task1.set_downstream(subdag7_task2)
 subdag7_task2.set_downstream(subdag7_task3)
-
-# DAG tests that queued tasks are run
-dag8 = DAG(
-    dag_id='test_scheduled_queued_tasks',
-    start_date=DEFAULT_DATE,
-    end_date=DEFAULT_DATE,
-    default_args=default_args)
-dag8_task1 = PythonOperator(
-    # use delayed_fail because otherwise LocalExecutor will have a chance to
-    # complete the task
-    python_callable=delayed_fail,
-    task_id='test_queued_task',
-    dag=dag8,
-    pool='test_queued_pool')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/dags/test_scheduler_dags.py
----------------------------------------------------------------------
diff --git a/tests/dags/test_scheduler_dags.py b/tests/dags/test_scheduler_dags.py
index 224e7c5..fad0568 100644
--- a/tests/dags/test_scheduler_dags.py
+++ b/tests/dags/test_scheduler_dags.py
@@ -22,7 +22,7 @@ DEFAULT_DATE = datetime(2100, 1, 1)
 # Previously backfill would queue the task but never run it
 dag1 = DAG(
     dag_id='test_start_date_scheduling',
-    start_date=DEFAULT_DATE)
+    start_date=datetime(2100, 1, 1))
 dag1_task1 = DummyOperator(
     task_id='dummy',
     dag=dag1,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index f7a0a26..ae94d98 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -325,51 +325,6 @@ class SchedulerJobTest(unittest.TestCase):
             },
             dagrun_state=State.FAILED)
 
-    def test_scheduler_pooled_tasks(self):
-        """
-        Test that the scheduler handles queued tasks correctly
-        See issue #1299
-        """
-        session = settings.Session()
-        if not (
-                session.query(Pool)
-                .filter(Pool.pool == 'test_queued_pool')
-                .first()):
-            pool = Pool(pool='test_queued_pool', slots=5)
-            session.merge(pool)
-            session.commit()
-        session.close()
-
-        dag_id = 'test_scheduled_queued_tasks'
-        dag = self.dagbag.get_dag(dag_id)
-        dag.clear()
-
-        scheduler = SchedulerJob(dag_id,
-                                 num_runs=1,
-                                 executor=TestExecutor(),
-                                 **self.default_scheduler_args)
-        scheduler.run()
-
-        task_1 = dag.tasks[0]
-        logging.info("Trying to find task {}".format(task_1))
-        ti = TI(task_1, dag.start_date)
-        ti.refresh_from_db()
-        logging.error("TI is: {}".format(ti))
-        self.assertEqual(ti.state, State.QUEUED)
-
-        # now we use a DIFFERENT scheduler and executor
-        # to simulate the num-runs CLI arg
-        scheduler2 = SchedulerJob(
-            dag_id,
-            num_runs=5,
-            executor=DEFAULT_EXECUTOR.__class__(),
-            **self.default_scheduler_args)
-        scheduler2.run()
-
-        ti.refresh_from_db()
-        self.assertEqual(ti.state, State.FAILED)
-        dag.clear()
-
     def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self):
         """
         DagRun is marked a success if ignore_first_depends_on_past=True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 49b33d1..6354e71 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -22,7 +22,7 @@ import os
 import unittest
 import time
 
-from airflow import models, AirflowException
+from airflow import models, settings, AirflowException
 from airflow.exceptions import AirflowSkipException
 from airflow.models import DAG, TaskInstance as TI
 from airflow.models import State as ST
@@ -30,6 +30,7 @@ from airflow.models import DagModel
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
+from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.utils.state import State
 from mock import patch
 from nose_parameterized import parameterized
@@ -288,11 +289,15 @@ class TaskInstanceTest(unittest.TestCase):
         dag >> op5
         self.assertIs(op5.dag, dag)
 
-    def test_run_pooling_task(self):
+    @patch.object(TI, 'pool_full')
+    def test_run_pooling_task(self, mock_pool_full):
         """
         test that running task with mark_success param update task state as
         SUCCESS without running task.
         """
+        # Mock the pool out with a full pool because the pool doesn't actually exist
+        mock_pool_full.return_value = True
+
         dag = models.DAG(dag_id='test_run_pooling_task')
         task = DummyOperator(task_id='test_run_pooling_task_op', dag=dag,
                              pool='test_run_pooling_task_pool', owner='airflow',
@@ -302,11 +307,15 @@ class TaskInstanceTest(unittest.TestCase):
         ti.run()
         self.assertEqual(ti.state, models.State.QUEUED)
 
-    def test_run_pooling_task_with_mark_success(self):
+    @patch.object(TI, 'pool_full')
+    def test_run_pooling_task_with_mark_success(self, mock_pool_full):
         """
         test that running task with mark_success param update task state as SUCCESS
         without running task.
         """
+        # Mock the pool out with a full pool because the pool doesn't actually exist
+        mock_pool_full.return_value = True
+
         dag = models.DAG(dag_id='test_run_pooling_task_with_mark_success')
         task = DummyOperator(
             task_id='test_run_pooling_task_with_mark_success_op',
@@ -340,7 +349,6 @@ class TaskInstanceTest(unittest.TestCase):
         ti.run()
         self.assertTrue(ti.state == models.State.SKIPPED)
 
-
     def test_retry_delay(self):
         """
         Test that retry delays are respected
@@ -378,10 +386,14 @@ class TaskInstanceTest(unittest.TestCase):
         run_with_error(ti)
         self.assertEqual(ti.state, State.FAILED)
 
-    def test_retry_handling(self):
+    @patch.object(TI, 'pool_full')
+    def test_retry_handling(self, mock_pool_full):
         """
         Test that task retries are handled properly
         """
+        # Mock the pool with a pool with slots open since the pool doesn't actually exist
+        mock_pool_full.return_value = False
+
         dag = models.DAG(dag_id='test_retry_handling')
         task = BashOperator(
             task_id='test_retry_handling_op',
@@ -411,6 +423,10 @@ class TaskInstanceTest(unittest.TestCase):
         self.assertEqual(ti.state, State.FAILED)
         self.assertEqual(ti.try_number, 2)
 
+        # Clear the TI state since you can't run a task with a FAILED state without
+        # clearing it first
+        ti.set_state(None, settings.Session())
+
         # third run -- up for retry
         run_with_error(ti)
         self.assertEqual(ti.state, State.UP_FOR_RETRY)
@@ -534,10 +550,15 @@ class TaskInstanceTest(unittest.TestCase):
         run_date = task.start_date + datetime.timedelta(days=5)
 
         ti = TI(downstream, run_date)
-        completed = ti.evaluate_trigger_rule(
-            successes=successes, skipped=skipped, failed=failed,
-            upstream_failed=upstream_failed, done=done,
+        dep_results = TriggerRuleDep()._evaluate_trigger_rule(
+            ti=ti,
+            successes=successes,
+            skipped=skipped,
+            failed=failed,
+            upstream_failed=upstream_failed,
+            done=done,
             flag_upstream_failed=flag_upstream_failed)
+        completed = all([dep.passed for dep in dep_results])
 
         self.assertEqual(completed, expect_completed)
         self.assertEqual(ti.state, expect_state)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index e52ca54..fd5e096 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -94,7 +94,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             import airflow.operators.hive_operator
             t = operators.hive_operator.HiveOperator(
                 task_id='basic_hql', hql=self.hql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         def test_hive_queues(self):
             import airflow.operators.hive_operator
@@ -103,7 +103,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 mapred_queue='default', mapred_queue_priority='HIGH',
                 mapred_job_name='airflow.test_hive_queues',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
 
         def test_hive_dryrun(self):
@@ -117,7 +117,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             t = operators.hive_operator.HiveOperator(
                 task_id='beeline_hql', hive_cli_conn_id='beeline_default',
                 hql=self.hql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         def test_presto(self):
             sql = """
@@ -126,7 +126,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             import airflow.operators.presto_check_operator
             t = operators.presto_check_operator.PrestoCheckOperator(
                 task_id='presto_check', sql=sql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         def test_presto_to_mysql(self):
             import airflow.operators.presto_to_mysql
@@ -140,14 +140,14 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 mysql_table='test_static_babynames',
                 mysql_preoperator='TRUNCATE TABLE test_static_babynames;',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         def test_hdfs_sensor(self):
             t = operators.sensors.HdfsSensor(
                 task_id='hdfs_sensor_check',
                 filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         def test_webhdfs_sensor(self):
             t = operators.sensors.WebHdfsSensor(
@@ -155,7 +155,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
                 timeout=120,
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         def test_sql_sensor(self):
             t = operators.sensors.SqlSensor(
@@ -163,7 +163,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 conn_id='presto_default',
                 sql="SELECT 'x' FROM airflow.static_babynames LIMIT 1;",
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         def test_hive_stats(self):
             import airflow.operators.hive_stats_operator
@@ -172,14 +172,14 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 table="airflow.static_babynames_partitioned",
                 partition={'ds': DEFAULT_DATE_DS},
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         def test_named_hive_partition_sensor(self):
             t = operators.sensors.NamedHivePartitionSensor(
                 task_id='hive_partition_check',
                 partition_names=["airflow.static_babynames_partitioned/ds={{ds}}"],
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         def test_named_hive_partition_sensor_succeeds_on_multiple_partitions(self):
             t = operators.sensors.NamedHivePartitionSensor(
@@ -189,7 +189,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                     "airflow.static_babynames_partitioned/ds={{ds}}"
                 ],
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         @nose.tools.raises(airflow.exceptions.AirflowSensorTimeout)
         def test_named_hive_partition_sensor_times_out_on_nonexistent_partition(self):
@@ -202,14 +202,14 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 poke_interval=0.1,
                 timeout=1,
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         def test_hive_partition_sensor(self):
             t = operators.sensors.HivePartitionSensor(
                 task_id='hive_partition_check',
                 table='airflow.static_babynames_partitioned',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         def test_hive_metastore_sql_sensor(self):
             t = operators.sensors.MetastorePartitionSensor(
@@ -217,7 +217,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 table='airflow.static_babynames_partitioned',
                 partition_name='ds={}'.format(DEFAULT_DATE_DS),
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         def test_hive2samba(self):
             import airflow.operators.hive_to_samba_operator
@@ -227,7 +227,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 hql="SELECT * FROM airflow.static_babynames LIMIT 10000",
                 destination_filepath='test_airflow.csv',
                 dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         def test_hive_to_mysql(self):
             import airflow.operators.hive_to_mysql
@@ -247,4 +247,4 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 ],
                 dag=self.dag)
             t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 9ecde52..60e7df4 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -57,7 +57,7 @@ class MySqlTest(unittest.TestCase):
             sql=sql,
             mysql_conn_id='airflow_db',
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def mysql_operator_test_multi(self):
         sql = [
@@ -69,7 +69,7 @@ class MySqlTest(unittest.TestCase):
             task_id='mysql_operator_test_multi',
             mysql_conn_id='airflow_db',
             sql=sql, dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_mysql_to_mysql(self):
         sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
@@ -86,7 +86,7 @@ class MySqlTest(unittest.TestCase):
             destination_table="test_mysql_to_mysql",
             sql=sql,
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_sql_sensor(self):
         t = operators.sensors.SqlSensor(
@@ -94,7 +94,7 @@ class MySqlTest(unittest.TestCase):
             conn_id='mysql_default',
             sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
 @skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator')
 class PostgresTest(unittest.TestCase):
@@ -113,7 +113,7 @@ class PostgresTest(unittest.TestCase):
         import airflow.operators.postgres_operator
         t = operators.postgres_operator.PostgresOperator(
             task_id='basic_postgres', sql=sql, dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
         autocommitTask = operators.postgres_operator.PostgresOperator(
             task_id='basic_postgres_with_autocommit',
@@ -123,7 +123,7 @@ class PostgresTest(unittest.TestCase):
         autocommitTask.run(
             start_date=DEFAULT_DATE,
             end_date=DEFAULT_DATE,
-            force=True)
+            ignore_ti_state=True)
 
     def postgres_operator_test_multi(self):
         sql = [
@@ -133,7 +133,7 @@ class PostgresTest(unittest.TestCase):
         import airflow.operators.postgres_operator
         t = operators.postgres_operator.PostgresOperator(
             task_id='postgres_operator_test_multi', sql=sql, dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_postgres_to_postgres(self):
         sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
@@ -150,7 +150,7 @@ class PostgresTest(unittest.TestCase):
             destination_table="test_postgres_to_postgres",
             sql=sql,
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_sql_sensor(self):
         t = operators.sensors.SqlSensor(
@@ -158,7 +158,7 @@ class PostgresTest(unittest.TestCase):
             conn_id='postgres_default',
             sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
 @skipUnlessImported('airflow.operators.hive_operator', 'HiveOperator')
 @skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator')
@@ -189,7 +189,7 @@ class TransferTests(unittest.TestCase):
             recreate=True,
             delimiter=",",
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
     def test_mysql_to_hive_partition(self):
         from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
@@ -205,4 +205,4 @@ class TransferTests(unittest.TestCase):
             create=True,
             delimiter=",",
             dag=self.dag)
-        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
index aa65cfe..ebce1dc 100644
--- a/tests/operators/sensors.py
+++ b/tests/operators/sensors.py
@@ -89,7 +89,7 @@ class SensorTimeoutTest(unittest.TestCase):
         self.assertRaises(
             AirflowSensorTimeout,
             t.run,
-            start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+            start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
 
 
 class HttpSensorTests(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/test_utils/README.md
----------------------------------------------------------------------
diff --git a/tests/test_utils/README.md b/tests/test_utils/README.md
new file mode 100644
index 0000000..8a5c90d
--- /dev/null
+++ b/tests/test_utils/README.md
@@ -0,0 +1 @@
+Utilities for use in tests.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/test_utils/__init__.py
----------------------------------------------------------------------
diff --git a/tests/test_utils/__init__.py b/tests/test_utils/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/test_utils/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/test_utils/fake_datetime.py
----------------------------------------------------------------------
diff --git a/tests/test_utils/fake_datetime.py b/tests/test_utils/fake_datetime.py
new file mode 100644
index 0000000..9b8102f
--- /dev/null
+++ b/tests/test_utils/fake_datetime.py
@@ -0,0 +1,24 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+
+
+class FakeDatetime(datetime):
+    """
+    A fake replacement for datetime that can be mocked for testing.
+    """
+
+    def __new__(cls, *args, **kwargs):
+        return date.__new__(datetime, *args, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/__init__.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/__init__.py b/tests/ti_deps/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/ti_deps/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/contexts/__init__.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/contexts/__init__.py b/tests/ti_deps/contexts/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/ti_deps/contexts/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/__init__.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/__init__.py b/tests/ti_deps/deps/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/ti_deps/deps/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/dag_ti_slots_available_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/dag_ti_slots_available_dep.py b/tests/ti_deps/deps/dag_ti_slots_available_dep.py
new file mode 100644
index 0000000..6077d96
--- /dev/null
+++ b/tests/ti_deps/deps/dag_ti_slots_available_dep.py
@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.ti_deps.deps.dag_ti_slots_available_dep import DagTISlotsAvailableDep
+from fake_models import FakeDag, FakeTask, FakeTI
+
+
+class DagTISlotsAvailableDepTest(unittest.TestCase):
+
+    def test_concurrency_reached(self):
+        """
+        Test concurrency reached should fail dep
+        """
+        dag = FakeDag(concurrency=1, concurrency_reached=True)
+        task = FakeTask(dag=dag)
+        ti = FakeTI(task=task, dag_id="fake_dag")
+
+        self.assertFalse(DagTISlotsAvailableDep().is_met(ti=ti, dep_context=None))
+
+    def test_all_conditions_met(self):
+        """
+        Test all conditions met should pass dep
+        """
+        dag = FakeDag(concurrency=1, concurrency_reached=False)
+        task = FakeTask(dag=dag)
+        ti = FakeTI(task=task, dag_id="fake_dag")
+
+        self.assertTrue(DagTISlotsAvailableDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/dag_unpaused_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/dag_unpaused_dep.py b/tests/ti_deps/deps/dag_unpaused_dep.py
new file mode 100644
index 0000000..8721a51
--- /dev/null
+++ b/tests/ti_deps/deps/dag_unpaused_dep.py
@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.ti_deps.deps.dag_unpaused_dep import DagUnpausedDep
+from fake_models import FakeDag, FakeTask, FakeTI
+
+
+class DagUnpausedDepTest(unittest.TestCase):
+
+    def test_concurrency_reached(self):
+        """
+        Test paused DAG should fail dependency
+        """
+        dag = FakeDag(is_paused=True)
+        task = FakeTask(dag=dag)
+        ti = FakeTI(task=task, dag_id="fake_dag")
+
+        self.assertFalse(DagUnpausedDep().is_met(ti=ti, dep_context=None))
+
+    def test_all_conditions_met(self):
+        """
+        Test all conditions met should pass dep
+        """
+        dag = FakeDag(is_paused=False)
+        task = FakeTask(dag=dag)
+        ti = FakeTI(task=task, dag_id="fake_dag")
+
+        self.assertTrue(DagUnpausedDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/dagrun_exists_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/dagrun_exists_dep.py b/tests/ti_deps/deps/dagrun_exists_dep.py
new file mode 100644
index 0000000..1141647
--- /dev/null
+++ b/tests/ti_deps/deps/dagrun_exists_dep.py
@@ -0,0 +1,41 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep
+from fake_models import FakeDag, FakeTask, FakeTI
+
+
+class DagrunRunningDepTest(unittest.TestCase):
+
+    def test_dagrun_doesnt_exist(self):
+        """
+        Task instances without dagruns should fail this dep
+        """
+        dag = FakeDag(running_dagruns=[], max_active_runs=1)
+        task = FakeTask(dag=dag)
+        ti = FakeTI(dagrun=None, task=task, dag_id="fake_dag")
+
+        self.assertFalse(DagrunRunningDep().is_met(ti=ti, dep_context=None))
+
+    def test_dagrun_exists(self):
+        """
+        Task instances with a dagrun should pass this dep
+        """
+        dag = FakeDag(running_dagruns=[], max_active_runs=1)
+        task = FakeTask(dag=dag)
+        ti = FakeTI(dagrun="Fake Dagrun", task=task, dag_id="fake_dag")
+
+        self.assertTrue(DagrunRunningDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/fake_models.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/fake_models.py b/tests/ti_deps/deps/fake_models.py
new file mode 100644
index 0000000..777b7f2
--- /dev/null
+++ b/tests/ti_deps/deps/fake_models.py
@@ -0,0 +1,57 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# A collection of fake models used for unit testing
+
+
+class FakeTI(object):
+
+    def __init__(self, **kwds):
+        self.__dict__.update(kwds)
+
+    def pool_full(self):
+        # Allow users of this fake to set pool_filled in the contructor to make this
+        # return True
+        try:
+            return self.pool_filled
+        except AttributeError:
+            # If pool_filled was not set default to false
+            return False
+
+    def get_dagrun(self, _):
+        return self.dagrun
+
+    def are_dependents_done(self, session):
+        return self.dependents_done
+
+
+class FakeTask(object):
+
+    def __init__(self, **kwds):
+        self.__dict__.update(kwds)
+
+
+class FakeDag(object):
+
+    def __init__(self, **kwds):
+        self.__dict__.update(kwds)
+
+    def get_running_dagruns(self, _):
+        return self.running_dagruns
+
+
+class FakeContext(object):
+
+    def __init__(self, **kwds):
+        self.__dict__.update(kwds)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/not_in_retry_period_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/not_in_retry_period_dep.py b/tests/ti_deps/deps/not_in_retry_period_dep.py
new file mode 100644
index 0000000..a6657ba
--- /dev/null
+++ b/tests/ti_deps/deps/not_in_retry_period_dep.py
@@ -0,0 +1,61 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime, timedelta
+
+from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
+from airflow.utils.state import State
+from fake_models import FakeDag, FakeTask, FakeTI
+
+
+class NotInRetryPeriodDepTest(unittest.TestCase):
+
+    def test_still_in_retry_period(self):
+        """
+        Task instances that are in their retry period should fail this dep
+        """
+        dag = FakeDag()
+        task = FakeTask(dag=dag, retry_delay=timedelta(minutes=1))
+        ti = FakeTI(
+            task=task,
+            state=State.UP_FOR_RETRY,
+            end_date=datetime(2016, 1, 1),
+            is_premature=True)
+
+        self.assertFalse(NotInRetryPeriodDep().is_met(ti=ti, dep_context=None))
+
+    def test_retry_period_finished(self):
+        """
+        Task instance's that have had their retry period elapse should pass this dep
+        """
+        dag = FakeDag()
+        task = FakeTask(dag=dag, retry_delay=timedelta(minutes=1))
+        ti = FakeTI(
+            task=task,
+            state=State.UP_FOR_RETRY,
+            end_date=datetime(2016, 1, 1),
+            is_premature=False)
+
+        self.assertTrue(NotInRetryPeriodDep().is_met(ti=ti, dep_context=None))
+
+    def test_not_in_retry_period(self):
+        """
+        Task instance's that are not up for retry can not be in their retry period
+        """
+        dag = FakeDag()
+        task = FakeTask(dag=dag)
+        ti = FakeTI(task=task, state=State.SUCCESS)
+
+        self.assertTrue(NotInRetryPeriodDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/not_running_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/not_running_dep.py b/tests/ti_deps/deps/not_running_dep.py
new file mode 100644
index 0000000..159d923
--- /dev/null
+++ b/tests/ti_deps/deps/not_running_dep.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime
+
+from airflow.ti_deps.deps.not_running_dep import NotRunningDep
+from airflow.utils.state import State
+from fake_models import FakeTI
+
+
+class NotRunningDepTest(unittest.TestCase):
+
+    def test_ti_running(self):
+        """
+        Running task instances should fail this dep
+        """
+        ti = FakeTI(state=State.RUNNING, start_date=datetime(2016, 1, 1))
+
+        self.assertFalse(NotRunningDep().is_met(ti=ti, dep_context=None))
+
+    def test_ti_not_running(self):
+        """
+        Non-running task instances should pass this dep
+        """
+        ti = FakeTI(state=State.NONE, start_date=datetime(2016, 1, 1))
+
+        self.assertTrue(NotRunningDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/not_skipped_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/not_skipped_dep.py b/tests/ti_deps/deps/not_skipped_dep.py
new file mode 100644
index 0000000..6d7ef55
--- /dev/null
+++ b/tests/ti_deps/deps/not_skipped_dep.py
@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.ti_deps.deps.not_skipped_dep import NotSkippedDep
+from airflow.utils.state import State
+from fake_models import FakeTI
+
+
+class NotSkippedDepTest(unittest.TestCase):
+
+    def test_skipped(self):
+        """
+        Skipped task instances should fail this dep
+        """
+        ti = FakeTI(state=State.SKIPPED)
+
+        self.assertFalse(NotSkippedDep().is_met(ti=ti, dep_context=None))
+
+    def test_not_skipped(self):
+        """
+        Non-skipped task instances should pass this dep
+        """
+        ti = FakeTI(state=State.RUNNING)
+
+        self.assertTrue(NotSkippedDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/pool_has_space_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/pool_has_space_dep.py b/tests/ti_deps/deps/pool_has_space_dep.py
new file mode 100644
index 0000000..411547a
--- /dev/null
+++ b/tests/ti_deps/deps/pool_has_space_dep.py
@@ -0,0 +1,37 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow.ti_deps.deps.pool_has_space_dep import PoolHasSpaceDep
+from fake_models import FakeTI
+
+
+class PoolHasSpaceDepTest(unittest.TestCase):
+
+    def test_pool_full(self):
+        """
+        Full pools should fail this dep
+        """
+        ti = FakeTI(pool="fake_pool", pool_filled=True)
+
+        self.assertFalse(PoolHasSpaceDep().is_met(ti=ti, dep_context=None))
+
+    def test_not_skipped(self):
+        """
+        Pools with room should pass this dep
+        """
+        ti = FakeTI(pool="fake_pool", pool_filled=False)
+
+        self.assertTrue(PoolHasSpaceDep().is_met(ti=ti, dep_context=None))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f3604147/tests/ti_deps/deps/prev_dagrun_dep.py
----------------------------------------------------------------------
diff --git a/tests/ti_deps/deps/prev_dagrun_dep.py b/tests/ti_deps/deps/prev_dagrun_dep.py
new file mode 100644
index 0000000..4873467
--- /dev/null
+++ b/tests/ti_deps/deps/prev_dagrun_dep.py
@@ -0,0 +1,143 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime
+
+from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
+from airflow.utils.state import State
+from fake_models import FakeContext, FakeTask, FakeTI
+
+
+class PrevDagrunDepTest(unittest.TestCase):
+
+    def test_not_depends_on_past(self):
+        """
+        If depends on past isn't set in the task then the previous dagrun should be
+        ignored, even though there is no previous_ti which would normally fail the dep
+        """
+        task = FakeTask(
+            depends_on_past=False,
+            start_date=datetime(2016, 1, 1),
+            wait_for_downstream=False)
+        prev_ti = FakeTI(
+            task=task,
+            execution_date=datetime(2016, 1, 2),
+            state=State.SUCCESS,
+            dependents_done=True)
+        ti = FakeTI(
+            task=task,
+            previous_ti=prev_ti,
+            execution_date=datetime(2016, 1, 3))
+        dep_context = FakeContext(ignore_depends_on_past=False)
+
+        self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
+
+    def test_context_ignore_depends_on_past(self):
+        """
+        If the context overrides depends_on_past then the dep should be met, even though
+        there is no previous_ti which would normally fail the dep
+        """
+        task = FakeTask(
+            depends_on_past=True,
+            start_date=datetime(2016, 1, 1),
+            wait_for_downstream=False)
+        prev_ti = FakeTI(
+            task=task,
+            execution_date=datetime(2016, 1, 2),
+            state=State.SUCCESS,
+            dependents_done=True)
+        ti = FakeTI(
+            task=task,
+            previous_ti=prev_ti,
+            execution_date=datetime(2016, 1, 3))
+        dep_context = FakeContext(ignore_depends_on_past=True)
+
+        self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
+
+    def test_first_task_run(self):
+        """
+        The first task run for a TI should pass since it has no previous dagrun.
+        """
+        task = FakeTask(
+            depends_on_past=True,
+            start_date=datetime(2016, 1, 1),
+            wait_for_downstream=False)
+        prev_ti = None
+        ti = FakeTI(
+            task=task,
+            previous_ti=prev_ti,
+            execution_date=datetime(2016, 1, 1))
+        dep_context = FakeContext(ignore_depends_on_past=False)
+
+        self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
+
+    def test_prev_ti_bad_state(self):
+        """
+        If the previous TI did not complete execution this dep should fail.
+        """
+        task = FakeTask(
+            depends_on_past=True,
+            start_date=datetime(2016, 1, 1),
+            wait_for_downstream=False)
+        prev_ti = FakeTI(
+            state=State.NONE,
+            dependents_done=True)
+        ti = FakeTI(
+            task=task,
+            previous_ti=prev_ti,
+            execution_date=datetime(2016, 1, 2))
+        dep_context = FakeContext(ignore_depends_on_past=False)
+
+        self.assertFalse(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
+
+    def test_failed_wait_for_downstream(self):
+        """
+        If the previous TI specified to wait for the downstream tasks of the previous
+        dagrun then it should fail this dep if the downstream TIs of the previous TI are
+        not done.
+        """
+        task = FakeTask(
+            depends_on_past=True,
+            start_date=datetime(2016, 1, 1),
+            wait_for_downstream=True)
+        prev_ti = FakeTI(
+            state=State.SUCCESS,
+            dependents_done=False)
+        ti = FakeTI(
+            task=task,
+            previous_ti=prev_ti,
+            execution_date=datetime(2016, 1, 2))
+        dep_context = FakeContext(ignore_depends_on_past=False)
+
+        self.assertFalse(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))
+
+    def test_all_met(self):
+        """
+        Test to make sure all of the conditions for the dep are met
+        """
+        task = FakeTask(
+            depends_on_past=True,
+            start_date=datetime(2016, 1, 1),
+            wait_for_downstream=True)
+        prev_ti = FakeTI(
+            state=State.SUCCESS,
+            dependents_done=True)
+        ti = FakeTI(
+            task=task,
+            previous_ti=prev_ti,
+            execution_date=datetime(2016, 1, 2))
+        dep_context = FakeContext(ignore_depends_on_past=False)
+
+        self.assertTrue(PrevDagrunDep().is_met(ti=ti, dep_context=dep_context))



Mime
View raw message