airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davy...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-857] Use library assert statements instead of conditionals
Date Fri, 10 Feb 2017 19:54:39 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master b56e64224 -> 3ceb3abf1


[AIRFLOW-857] Use library assert statements instead of conditionals

Testing Done:
- Travis

Closes #2062 from saguziel/aguziel-fix-asserts


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3ceb3abf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3ceb3abf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3ceb3abf

Branch: refs/heads/master
Commit: 3ceb3abf166b77ea8baeb9015d33223143cdfb8a
Parents: b56e642
Author: Alex Guziel <alex.guziel@airbnb.com>
Authored: Fri Feb 10 11:53:34 2017 -0800
Committer: Dan Davydov <dan.davydov@airbnb.com>
Committed: Fri Feb 10 11:53:37 2017 -0800

----------------------------------------------------------------------
 tests/contrib/hooks/test_jira_hook.py         |   2 +-
 tests/contrib/operators/dataflow_operator.py  |   2 +-
 tests/contrib/operators/jira_operator_test.py |   8 +-
 tests/contrib/sensors/hdfs_sensors.py         |   2 +-
 tests/contrib/sensors/jira_sensor_test.py     |   4 +-
 tests/core.py                                 | 434 ++++++++++-----------
 tests/dags/no_dags.py                         |   2 +-
 tests/jobs.py                                 |   2 +-
 tests/models.py                               |  26 +-
 tests/operators/hive_operator.py              |  16 +-
 tests/operators/latest_only_operator.py       |  12 +-
 tests/operators/operators.py                  |   2 +-
 tests/operators/sensors.py                    |   2 +-
 tests/plugins_manager.py                      |  14 +-
 tests/www/api/experimental/test_endpoints.py  |   8 +-
 15 files changed, 268 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/contrib/hooks/test_jira_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_jira_hook.py b/tests/contrib/hooks/test_jira_hook.py
index 1a3d735..977944e 100644
--- a/tests/contrib/hooks/test_jira_hook.py
+++ b/tests/contrib/hooks/test_jira_hook.py
@@ -42,7 +42,7 @@ class TestJiraHook(unittest.TestCase):
     def test_jira_client_connection(self, jira_mock):
         jira_hook = JiraHook()
 
-        assert jira_mock.called
+        self.assertTrue(jira_mock.called)
         self.assertIsInstance(jira_hook.client, Mock)
         self.assertEqual(jira_hook.client.name, jira_mock.return_value.name)
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/contrib/operators/dataflow_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/dataflow_operator.py b/tests/contrib/operators/dataflow_operator.py
index 5329723..7455a45 100644
--- a/tests/contrib/operators/dataflow_operator.py
+++ b/tests/contrib/operators/dataflow_operator.py
@@ -69,7 +69,7 @@ class DataFlowPythonOperatorTest(unittest.TestCase):
         start_python_hook = dataflow_mock.return_value.start_python_dataflow
         gcs_download_hook = gcs_hook.return_value.download
         self.dataflow.execute(None)
-        assert dataflow_mock.called
+        self.assertTrue(dataflow_mock.called)
         expected_options = {
             'project': 'test',
             'staging_location': 'gs://test/staging',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/contrib/operators/jira_operator_test.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/jira_operator_test.py b/tests/contrib/operators/jira_operator_test.py
index 0188c0b..6d615df 100644
--- a/tests/contrib/operators/jira_operator_test.py
+++ b/tests/contrib/operators/jira_operator_test.py
@@ -74,8 +74,8 @@ class TestJiraOperator(unittest.TestCase):
         jira_ticket_search_operator.run(start_date=DEFAULT_DATE,
                                         end_date=DEFAULT_DATE, ignore_ti_state=True)
 
-        assert jira_mock.called
-        assert jira_mock.return_value.search_issues.called
+        self.assertTrue(jira_mock.called)
+        self.assertTrue(jira_mock.return_value.search_issues.called)
 
     @patch("airflow.contrib.hooks.jira_hook.JIRA",
            autospec=True, return_value=jira_client_mock)
@@ -93,8 +93,8 @@ class TestJiraOperator(unittest.TestCase):
         add_comment_operator.run(start_date=DEFAULT_DATE,
                                  end_date=DEFAULT_DATE, ignore_ti_state=True)
 
-        assert jira_mock.called
-        assert jira_mock.return_value.add_comment.called
+        self.assertTrue(jira_mock.called)
+        self.assertTrue(jira_mock.return_value.add_comment.called)
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/contrib/sensors/hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/hdfs_sensors.py b/tests/contrib/sensors/hdfs_sensors.py
index cabe349..0e2ed0c 100644
--- a/tests/contrib/sensors/hdfs_sensors.py
+++ b/tests/contrib/sensors/hdfs_sensors.py
@@ -248,4 +248,4 @@ class HdfsSensorRegexTests(unittest.TestCase):
         # When
         # Then
         with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
\ No newline at end of file
+            task.execute(None)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/contrib/sensors/jira_sensor_test.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/jira_sensor_test.py b/tests/contrib/sensors/jira_sensor_test.py
index 5ca58e4..77ca97f 100644
--- a/tests/contrib/sensors/jira_sensor_test.py
+++ b/tests/contrib/sensors/jira_sensor_test.py
@@ -73,8 +73,8 @@ class TestJiraSensor(unittest.TestCase):
         ticket_label_sensor.run(start_date=DEFAULT_DATE,
                                 end_date=DEFAULT_DATE, ignore_ti_state=True)
 
-        assert jira_mock.called
-        assert jira_mock.return_value.issue.called
+        self.assertTrue(jira_mock.called)
+        self.assertTrue(jira_mock.return_value.issue.called)
 
     @staticmethod
     def field_checker_func(context, issue):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 0bf4052..fba05f7 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -134,15 +134,15 @@ class CoreTest(unittest.TestCase):
             start_date=datetime(2015, 1, 2, 0, 0)))
 
         dag_run = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag)
-        assert dag_run is not None
-        assert dag_run.dag_id == dag.dag_id
-        assert dag_run.run_id is not None
-        assert dag_run.run_id != ''
-        assert dag_run.execution_date == datetime(2015, 1, 2, 0, 0), (
+        self.assertIsNotNone(dag_run)
+        self.assertEqual(dag.dag_id, dag_run.dag_id)
+        self.assertIsNotNone(dag_run.run_id)
+        self.assertNotEqual('', dag_run.run_id)
+        self.assertEqual(datetime(2015, 1, 2, 0, 0), dag_run.execution_date, msg=
             'dag_run.execution_date did not match expectation: {0}'
                 .format(dag_run.execution_date))
-        assert dag_run.state == State.RUNNING
-        assert dag_run.external_trigger == False
+        self.assertEqual(State.RUNNING, dag_run.state)
+        self.assertFalse(dag_run.external_trigger)
         dag.clear()
 
     def test_schedule_dag_fake_scheduled_previous(self):
@@ -165,15 +165,15 @@ class CoreTest(unittest.TestCase):
                           state=State.SUCCESS,
                           external_trigger=True)
         dag_run = scheduler.create_dag_run(dag)
-        assert dag_run is not None
-        assert dag_run.dag_id == dag.dag_id
-        assert dag_run.run_id is not None
-        assert dag_run.run_id != ''
-        assert dag_run.execution_date == DEFAULT_DATE + delta, (
+        self.assertIsNotNone(dag_run)
+        self.assertEqual(dag.dag_id, dag_run.dag_id)
+        self.assertIsNotNone(dag_run.run_id)
+        self.assertNotEqual('', dag_run.run_id)
+        self.assertEqual(DEFAULT_DATE + delta, dag_run.execution_date, msg=
             'dag_run.execution_date did not match expectation: {0}'
                 .format(dag_run.execution_date))
-        assert dag_run.state == State.RUNNING
-        assert dag_run.external_trigger == False
+        self.assertEqual(State.RUNNING, dag_run.state)
+        self.assertFalse(dag_run.external_trigger)
 
     def test_schedule_dag_once(self):
         """
@@ -189,8 +189,8 @@ class CoreTest(unittest.TestCase):
         dag_run = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag)
         dag_run2 = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag)
 
-        assert dag_run is not None
-        assert dag_run2 is None
+        self.assertIsNotNone(dag_run)
+        self.assertIsNone(dag_run2)
         dag.clear()
 
     def test_fractional_seconds(self):
@@ -246,9 +246,9 @@ class CoreTest(unittest.TestCase):
         additional_dag_run = scheduler.create_dag_run(dag)
 
         for dag_run in dag_runs:
-            assert dag_run is not None
+            self.assertIsNotNone(dag_run)
 
-        assert additional_dag_run is None
+        self.assertIsNone(additional_dag_run)
 
     @mock.patch('airflow.jobs.datetime', FakeDatetime)
     def test_schedule_dag_no_end_date_up_to_today_only(self):
@@ -288,16 +288,16 @@ class CoreTest(unittest.TestCase):
         additional_dag_run = scheduler.create_dag_run(dag)
 
         for dag_run in dag_runs:
-            assert dag_run is not None
+            self.assertIsNotNone(dag_run)
 
-        assert additional_dag_run is None
+        self.assertIsNone(additional_dag_run)
 
     def test_confirm_unittest_mod(self):
-        assert configuration.get('core', 'unit_test_mode')
+        self.assertTrue(configuration.get('core', 'unit_test_mode'))
 
     def test_pickling(self):
         dp = self.dag.pickle()
-        assert self.dag.dag_id == dp.pickle.dag_id
+        self.assertEqual(dp.pickle.dag_id, self.dag.dag_id)
 
     def test_rich_comparison_ops(self):
 
@@ -317,32 +317,32 @@ class CoreTest(unittest.TestCase):
             d.last_loaded = self.dag.last_loaded
 
         # test identity equality
-        assert self.dag == self.dag
+        self.assertEqual(self.dag, self.dag)
 
         # test dag (in)equality based on _comps
-        assert self.dag == dag_eq
-        assert self.dag != dag_diff_name
-        assert self.dag != dag_diff_load_time
+        self.assertEqual(dag_eq, self.dag)
+        self.assertNotEqual(dag_diff_name, self.dag)
+        self.assertNotEqual(dag_diff_load_time, self.dag)
 
         # test dag inequality based on type even if _comps happen to match
-        assert self.dag != dag_subclass
+        self.assertNotEqual(dag_subclass, self.dag)
 
         # a dag should equal an unpickled version of itself
-        assert self.dag == pickle.loads(pickle.dumps(self.dag))
+        self.assertEqual(pickle.loads(pickle.dumps(self.dag)), self.dag)
 
         # dags are ordered based on dag_id no matter what the type is
-        assert self.dag < dag_diff_name
-        assert not self.dag < dag_diff_load_time
-        assert self.dag < dag_subclass_diff_name
+        self.assertLess(self.dag, dag_diff_name)
+        self.assertGreater(self.dag, dag_diff_load_time)
+        self.assertLess(self.dag, dag_subclass_diff_name)
 
         # greater than should have been created automatically by functools
-        assert dag_diff_name > self.dag
+        self.assertGreater(dag_diff_name, self.dag)
 
         # hashes are non-random and match equality
-        assert hash(self.dag) == hash(self.dag)
-        assert hash(self.dag) == hash(dag_eq)
-        assert hash(self.dag) != hash(dag_diff_name)
-        assert hash(self.dag) != hash(dag_subclass)
+        self.assertEqual(hash(self.dag), hash(self.dag))
+        self.assertEqual(hash(dag_eq), hash(self.dag))
+        self.assertNotEqual(hash(dag_diff_name), hash(self.dag))
+        self.assertNotEqual(hash(dag_subclass), hash(self.dag))
 
     def test_time_sensor(self):
         t = sensors.TimeSensor(
@@ -588,7 +588,7 @@ class CoreTest(unittest.TestCase):
             on_success_callback=verify_templated_field,
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-        assert val['success']
+        self.assertTrue(val['success'])
 
     def test_template_with_json_variable(self):
         """
@@ -611,7 +611,7 @@ class CoreTest(unittest.TestCase):
             on_success_callback=verify_templated_field,
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-        assert val['success']
+        self.assertTrue(val['success'])
 
     def test_template_with_json_variable_as_value(self):
         """
@@ -635,7 +635,7 @@ class CoreTest(unittest.TestCase):
             on_success_callback=verify_templated_field,
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-        assert val['success']
+        self.assertTrue(val['success'])
 
     def test_template_non_bool(self):
         """
@@ -673,7 +673,7 @@ class CoreTest(unittest.TestCase):
             "child_process_log_directory")
         latest_log_directory_path = os.path.join(log_base_directory, "latest")
         # verify that the symlink to the latest logs exists
-        assert os.path.islink(latest_log_directory_path)
+        self.assertTrue(os.path.islink(latest_log_directory_path))
 
         # verify that the symlink points to the correct log directory
         log_directory = os.path.join(log_base_directory, "2016-01-01")
@@ -695,53 +695,53 @@ class CoreTest(unittest.TestCase):
 
     def test_variable_set_get_round_trip(self):
         Variable.set("tested_var_set_id", "Monday morning breakfast")
-        assert "Monday morning breakfast" == Variable.get("tested_var_set_id")
+        self.assertEqual("Monday morning breakfast", Variable.get("tested_var_set_id"))
 
     def test_variable_set_get_round_trip_json(self):
         value = {"a": 17, "b": 47}
         Variable.set("tested_var_set_id", value, serialize_json=True)
-        assert value == Variable.get("tested_var_set_id", deserialize_json=True)
+        self.assertEqual(value, Variable.get("tested_var_set_id", deserialize_json=True))
 
     def test_get_non_existing_var_should_return_default(self):
         default_value = "some default val"
-        assert default_value == Variable.get("thisIdDoesNotExist",
-                                             default_var=default_value)
+        self.assertEqual(default_value, Variable.get("thisIdDoesNotExist",
+                                             default_var=default_value))
 
     def test_get_non_existing_var_should_not_deserialize_json_default(self):
         default_value = "}{ this is a non JSON default }{"
-        assert default_value == Variable.get("thisIdDoesNotExist",
+        self.assertEqual(default_value, Variable.get("thisIdDoesNotExist",
                                              default_var=default_value,
-                                             deserialize_json=True)
+                                             deserialize_json=True))
 
     def test_variable_setdefault_round_trip(self):
         key = "tested_var_setdefault_1_id"
         value = "Monday morning breakfast in Paris"
         Variable.setdefault(key, value)
-        assert value == Variable.get(key)
+        self.assertEqual(value, Variable.get(key))
 
     def test_variable_setdefault_round_trip_json(self):
         key = "tested_var_setdefault_2_id"
         value = {"city": 'Paris', "Hapiness": True}
         Variable.setdefault(key, value, deserialize_json=True)
-        assert value == Variable.get(key, deserialize_json=True)
+        self.assertEqual(value, Variable.get(key, deserialize_json=True))
 
     def test_parameterized_config_gen(self):
 
         cfg = configuration.parameterized_config(configuration.DEFAULT_CONFIG)
 
         # making sure some basic building blocks are present:
-        assert "[core]" in cfg
-        assert "dags_folder" in cfg
-        assert "sql_alchemy_conn" in cfg
-        assert "fernet_key" in cfg
+        self.assertIn("[core]", cfg)
+        self.assertIn("dags_folder", cfg)
+        self.assertIn("sql_alchemy_conn", cfg)
+        self.assertIn("fernet_key", cfg)
 
         # making sure replacement actually happened
-        assert "{AIRFLOW_HOME}" not in cfg
-        assert "{FERNET_KEY}" not in cfg
+        self.assertNotIn("{AIRFLOW_HOME}", cfg)
+        self.assertNotIn("{FERNET_KEY}", cfg)
 
     def test_config_use_original_when_original_and_fallback_are_present(self):
-        assert configuration.has_option("core", "FERNET_KEY")
-        assert not configuration.has_option("core", "FERNET_KEY_CMD")
+        self.assertTrue(configuration.has_option("core", "FERNET_KEY"))
+        self.assertFalse(configuration.has_option("core", "FERNET_KEY_CMD"))
 
         FERNET_KEY = configuration.get('core', 'FERNET_KEY')
 
@@ -752,14 +752,14 @@ class CoreTest(unittest.TestCase):
             "FERNET_KEY"
         )
 
-        assert FALLBACK_FERNET_KEY == FERNET_KEY
+        self.assertEqual(FERNET_KEY, FALLBACK_FERNET_KEY)
 
         # restore the conf back to the original state
         configuration.remove_option("core", "FERNET_KEY_CMD")
 
     def test_config_throw_error_when_original_and_fallback_is_absent(self):
-        assert configuration.has_option("core", "FERNET_KEY")
-        assert not configuration.has_option("core", "FERNET_KEY_CMD")
+        self.assertTrue(configuration.has_option("core", "FERNET_KEY"))
+        self.assertFalse(configuration.has_option("core", "FERNET_KEY_CMD"))
 
         FERNET_KEY = configuration.get("core", "FERNET_KEY")
         configuration.remove_option("core", "FERNET_KEY")
@@ -769,20 +769,20 @@ class CoreTest(unittest.TestCase):
 
         exception = str(cm.exception)
         message = "section/key [core/fernet_key] not found in config"
-        assert exception == message
+        self.assertEqual(message, exception)
 
         # restore the conf back to the original state
         configuration.set("core", "FERNET_KEY", FERNET_KEY)
-        assert configuration.has_option("core", "FERNET_KEY")
+        self.assertTrue(configuration.has_option("core", "FERNET_KEY"))
 
     def test_config_override_original_when_non_empty_envvar_is_provided(self):
         key = "AIRFLOW__CORE__FERNET_KEY"
         value = "some value"
-        assert key not in os.environ
+        self.assertNotIn(key, os.environ)
 
         os.environ[key] = value
         FERNET_KEY = configuration.get('core', 'FERNET_KEY')
-        assert FERNET_KEY == value
+        self.assertEqual(value, FERNET_KEY)
 
         # restore the envvar back to the original state
         del os.environ[key]
@@ -790,11 +790,11 @@ class CoreTest(unittest.TestCase):
     def test_config_override_original_when_empty_envvar_is_provided(self):
         key = "AIRFLOW__CORE__FERNET_KEY"
         value = ""
-        assert key not in os.environ
+        self.assertNotIn(key, os.environ)
 
         os.environ[key] = value
         FERNET_KEY = configuration.get('core', 'FERNET_KEY')
-        assert FERNET_KEY == value
+        self.assertEqual(value, FERNET_KEY)
 
         # restore the envvar back to the original state
         del os.environ[key]
@@ -806,43 +806,43 @@ class CoreTest(unittest.TestCase):
         class Blah(LoggingMixin):
             pass
 
-        assert Blah().logger.name == "tests.core.Blah"
-        assert SequentialExecutor().logger.name == "airflow.executors.sequential_executor.SequentialExecutor"
-        assert LocalExecutor().logger.name == "airflow.executors.local_executor.LocalExecutor"
+        self.assertEqual("tests.core.Blah", Blah().logger.name)
+        self.assertEqual("airflow.executors.sequential_executor.SequentialExecutor", SequentialExecutor().logger.name)
+        self.assertEqual("airflow.executors.local_executor.LocalExecutor", LocalExecutor().logger.name)
 
     def test_round_time(self):
 
         rt1 = round_time(datetime(2015, 1, 1, 6), timedelta(days=1))
-        assert rt1 == datetime(2015, 1, 1, 0, 0)
+        self.assertEqual(datetime(2015, 1, 1, 0, 0), rt1)
 
         rt2 = round_time(datetime(2015, 1, 2), relativedelta(months=1))
-        assert rt2 == datetime(2015, 1, 1, 0, 0)
+        self.assertEqual(datetime(2015, 1, 1, 0, 0), rt2)
 
         rt3 = round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime(
             2015, 9, 14, 0, 0))
-        assert rt3 == datetime(2015, 9, 16, 0, 0)
+        self.assertEqual(datetime(2015, 9, 16, 0, 0), rt3)
 
         rt4 = round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime(
             2015, 9, 14, 0, 0))
-        assert rt4 == datetime(2015, 9, 15, 0, 0)
+        self.assertEqual(datetime(2015, 9, 15, 0, 0), rt4)
 
         rt5 = round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime(
             2015, 9, 14, 0, 0))
-        assert rt5 == datetime(2015, 9, 14, 0, 0)
+        self.assertEqual(datetime(2015, 9, 14, 0, 0), rt5)
 
         rt6 = round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(
             2015, 9, 14, 0, 0))
-        assert rt6 == datetime(2015, 9, 14, 0, 0)
+        self.assertEqual(datetime(2015, 9, 14, 0, 0), rt6)
 
     def test_infer_time_unit(self):
 
-        assert infer_time_unit([130, 5400, 10]) == 'minutes'
+        self.assertEqual('minutes', infer_time_unit([130, 5400, 10]))
 
-        assert infer_time_unit([110, 50, 10, 100]) == 'seconds'
+        self.assertEqual('seconds', infer_time_unit([110, 50, 10, 100]))
 
-        assert infer_time_unit([100000, 50000, 10000, 20000]) == 'hours'
+        self.assertEqual('hours', infer_time_unit([100000, 50000, 10000, 20000]))
 
-        assert infer_time_unit([200000, 100000]) == 'days'
+        self.assertEqual('days', infer_time_unit([200000, 100000]))
 
     def test_scale_time_units(self):
 
@@ -913,7 +913,7 @@ class CoreTest(unittest.TestCase):
         session = settings.Session()
         ti.refresh_from_db(session=session)
         # making sure it's actually running
-        assert State.RUNNING == ti.state
+        self.assertEqual(State.RUNNING, ti.state)
         ti = (
             session.query(TI)
             .filter_by(
@@ -930,7 +930,7 @@ class CoreTest(unittest.TestCase):
 
         # making sure that the task ended up as failed
         ti.refresh_from_db(session=session)
-        assert State.FAILED == ti.state
+        self.assertEqual(State.FAILED, ti.state)
         session.close()
 
     def test_task_fail_duration(self):
@@ -964,10 +964,10 @@ class CoreTest(unittest.TestCase):
             dag_id=self.dag.dag_id,
             execution_date=DEFAULT_DATE).all()
         print(f_fails)
-        assert len(p_fails) == 0
-        assert len(f_fails) == 1
+        self.assertEqual(0, len(p_fails))
+        self.assertEqual(1, len(f_fails))
         # C
-        assert sum([f.duration for f in f_fails]) >= 3
+        self.assertGreaterEqual(sum([f.duration for f in f_fails]), 3)
 
     def test_dag_stats(self):
         """Correctly sets/dirties/cleans rows of DagStat table"""
@@ -987,11 +987,11 @@ class CoreTest(unittest.TestCase):
 
         qry = session.query(models.DagStat).all()
 
-        assert len(qry) == 1
-        assert qry[0].dag_id == self.dag_bash.dag_id and\
-                qry[0].state == State.RUNNING and\
-                qry[0].count == 1 and\
-                qry[0].dirty == False
+        self.assertEqual(1, len(qry))
+        self.assertEqual(self.dag_bash.dag_id, qry[0].dag_id)
+        self.assertEqual(State.RUNNING, qry[0].state)
+        self.assertEqual(1, qry[0].count)
+        self.assertFalse(qry[0].dirty)
 
         run2 = self.dag_bash.create_dagrun(
             run_id="run2",
@@ -1002,11 +1002,11 @@ class CoreTest(unittest.TestCase):
 
         qry = session.query(models.DagStat).all()
 
-        assert len(qry) == 1
-        assert qry[0].dag_id == self.dag_bash.dag_id and\
-                qry[0].state == State.RUNNING and\
-                qry[0].count == 2 and\
-                qry[0].dirty == False
+        self.assertEqual(1, len(qry))
+        self.assertEqual(self.dag_bash.dag_id, qry[0].dag_id)
+        self.assertEqual(State.RUNNING, qry[0].state)
+        self.assertEqual(2, qry[0].count)
+        self.assertFalse(qry[0].dirty)
 
         session.query(models.DagRun).first().state = State.SUCCESS
         session.commit()
@@ -1014,18 +1014,18 @@ class CoreTest(unittest.TestCase):
         models.DagStat.clean_dirty([self.dag_bash.dag_id], session=session)
 
         qry = session.query(models.DagStat).filter(models.DagStat.state == State.SUCCESS).all()
-        assert len(qry) == 1
-        assert qry[0].dag_id == self.dag_bash.dag_id and\
-                qry[0].state == State.SUCCESS and\
-                qry[0].count == 1 and\
-                qry[0].dirty == False
+        self.assertEqual(1, len(qry))
+        self.assertEqual(self.dag_bash.dag_id, qry[0].dag_id)
+        self.assertEqual(State.SUCCESS, qry[0].state)
+        self.assertEqual(1, qry[0].count)
+        self.assertFalse(qry[0].dirty)
 
         qry = session.query(models.DagStat).filter(models.DagStat.state == State.RUNNING).all()
-        assert len(qry) == 1
-        assert qry[0].dag_id == self.dag_bash.dag_id and\
-                qry[0].state == State.RUNNING and\
-                qry[0].count == 1 and\
-                qry[0].dirty == False
+        self.assertEqual(1, len(qry))
+        self.assertEqual(self.dag_bash.dag_id, qry[0].dag_id)
+        self.assertEqual(State.RUNNING, qry[0].state)
+        self.assertEqual(1, qry[0].count)
+        self.assertFalse(qry[0].dirty)
 
         session.query(models.DagRun).delete()
         session.query(models.DagStat).delete()
@@ -1281,12 +1281,12 @@ class CliTests(unittest.TestCase):
         args = self.parser.parse_args([
             'pause', 'example_bash_operator'])
         cli.pause(args)
-        assert self.dagbag.dags['example_bash_operator'].is_paused in [True, 1]
+        self.assertIn(self.dagbag.dags['example_bash_operator'].is_paused, [True, 1])
 
         args = self.parser.parse_args([
             'unpause', 'example_bash_operator'])
         cli.unpause(args)
-        assert self.dagbag.dags['example_bash_operator'].is_paused in [False, 0]
+        self.assertIn(self.dagbag.dags['example_bash_operator'].is_paused, [False, 0])
 
     def test_subdag_clear(self):
         args = self.parser.parse_args([
@@ -1314,7 +1314,7 @@ class CliTests(unittest.TestCase):
             '-s', DEFAULT_DATE.isoformat()]))
 
     def test_process_subdir_path_with_placeholder(self):
-        assert cli.process_subdir('DAGS_FOLDER/abc') == os.path.join(settings.DAGS_FOLDER, 'abc')
+        self.assertEqual(os.path.join(settings.DAGS_FOLDER, 'abc'), cli.process_subdir('DAGS_FOLDER/abc'))
 
     def test_trigger_dag(self):
         cli.trigger_dag(self.parser.parse_args([
@@ -1373,22 +1373,22 @@ class CliTests(unittest.TestCase):
         cli.variables(self.parser.parse_args([
             'variables', '-i', 'variables1.json']))
 
-        assert models.Variable.get('bar') == 'original'
-        assert models.Variable.get('foo') == '{"foo": "bar"}'
+        self.assertEqual('original', models.Variable.get('bar'))
+        self.assertEqual('{"foo": "bar"}', models.Variable.get('foo'))
         # Second export
         cli.variables(self.parser.parse_args([
             'variables', '-e', 'variables2.json']))
 
         second_exp = open('variables2.json', 'r')
-        assert second_exp.read() == first_exp.read()
+        self.assertEqual(first_exp.read(), second_exp.read())
         second_exp.close()
         first_exp.close()
         # Second import
         cli.variables(self.parser.parse_args([
             'variables', '-i', 'variables2.json']))
 
-        assert models.Variable.get('bar') == 'original'
-        assert models.Variable.get('foo') == '{"foo": "bar"}'
+        self.assertEqual('original', models.Variable.get('bar'))
+        self.assertEqual('{"foo": "bar"}', models.Variable.get('foo'))
 
         session = settings.Session()
         session.query(Variable).delete()
@@ -1413,91 +1413,91 @@ class WebUiTests(unittest.TestCase):
 
     def test_index(self):
         response = self.app.get('/', follow_redirects=True)
-        assert "DAGs" in response.data.decode('utf-8')
-        assert "example_bash_operator" in response.data.decode('utf-8')
+        self.assertIn("DAGs", response.data.decode('utf-8'))
+        self.assertIn("example_bash_operator", response.data.decode('utf-8'))
 
     def test_query(self):
         response = self.app.get('/admin/queryview/')
-        assert "Ad Hoc Query" in response.data.decode('utf-8')
+        self.assertIn("Ad Hoc Query", response.data.decode('utf-8'))
         response = self.app.get(
             "/admin/queryview/?"
             "conn_id=airflow_db&"
             "sql=SELECT+COUNT%281%29+as+TEST+FROM+task_instance")
-        assert "TEST" in response.data.decode('utf-8')
+        self.assertIn("TEST", response.data.decode('utf-8'))
 
     def test_health(self):
         response = self.app.get('/health')
-        assert 'The server is healthy!' in response.data.decode('utf-8')
+        self.assertIn('The server is healthy!', response.data.decode('utf-8'))
 
     def test_headers(self):
         response = self.app.get('/admin/airflow/headers')
-        assert '"headers":' in response.data.decode('utf-8')
+        self.assertIn('"headers":', response.data.decode('utf-8'))
 
     def test_noaccess(self):
         response = self.app.get('/admin/airflow/noaccess')
-        assert "You don't seem to have access." in response.data.decode('utf-8')
+        self.assertIn("You don't seem to have access.", response.data.decode('utf-8'))
 
     def test_pickle_info(self):
         response = self.app.get('/admin/airflow/pickle_info')
-        assert '{' in response.data.decode('utf-8')
+        self.assertIn('{', response.data.decode('utf-8'))
 
     def test_dag_views(self):
         response = self.app.get(
             '/admin/airflow/graph?dag_id=example_bash_operator')
-        assert "runme_0" in response.data.decode('utf-8')
+        self.assertIn("runme_0", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/tree?num_runs=25&dag_id=example_bash_operator')
-        assert "runme_0" in response.data.decode('utf-8')
+        self.assertIn("runme_0", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/duration?days=30&dag_id=example_bash_operator')
-        assert "example_bash_operator" in response.data.decode('utf-8')
+        self.assertIn("example_bash_operator", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/tries?days=30&dag_id=example_bash_operator')
-        assert "example_bash_operator" in response.data.decode('utf-8')
+        self.assertIn("example_bash_operator", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/landing_times?'
             'days=30&dag_id=example_bash_operator')
-        assert "example_bash_operator" in response.data.decode('utf-8')
+        self.assertIn("example_bash_operator", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/gantt?dag_id=example_bash_operator')
-        assert "example_bash_operator" in response.data.decode('utf-8')
+        self.assertIn("example_bash_operator", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/code?dag_id=example_bash_operator')
-        assert "example_bash_operator" in response.data.decode('utf-8')
+        self.assertIn("example_bash_operator", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/blocked')
         response = self.app.get(
             '/admin/configurationview/')
-        assert "Airflow Configuration" in response.data.decode('utf-8')
-        assert "Running Configuration" in response.data.decode('utf-8')
+        self.assertIn("Airflow Configuration", response.data.decode('utf-8'))
+        self.assertIn("Running Configuration", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/rendered?'
             'task_id=runme_1&dag_id=example_bash_operator&'
             'execution_date={}'.format(DEFAULT_DATE_ISO))
-        assert "example_bash_operator" in response.data.decode('utf-8')
+        self.assertIn("example_bash_operator", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/log?task_id=run_this_last&'
             'dag_id=example_bash_operator&execution_date={}'
             ''.format(DEFAULT_DATE_ISO))
-        assert "run_this_last" in response.data.decode('utf-8')
+        self.assertIn("run_this_last", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/task?'
             'task_id=runme_0&dag_id=example_bash_operator&'
             'execution_date={}'.format(DEFAULT_DATE_DS))
-        assert "Attributes" in response.data.decode('utf-8')
+        self.assertIn("Attributes", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/dag_stats')
-        assert "example_bash_operator" in response.data.decode('utf-8')
+        self.assertIn("example_bash_operator", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/task_stats')
-        assert "example_bash_operator" in response.data.decode('utf-8')
+        self.assertIn("example_bash_operator", response.data.decode('utf-8'))
         url = (
             "/admin/airflow/success?task_id=run_this_last&"
             "dag_id=example_bash_operator&upstream=false&downstream=false&"
             "future=false&past=false&execution_date={}&"
             "origin=/admin".format(DEFAULT_DATE_DS))
         response = self.app.get(url)
-        assert "Wait a minute" in response.data.decode('utf-8')
+        self.assertIn("Wait a minute", response.data.decode('utf-8'))
         response = self.app.get(url + "&confirmed=true")
         response = self.app.get(
             '/admin/airflow/clear?task_id=run_this_last&'
@@ -1505,19 +1505,19 @@ class WebUiTests(unittest.TestCase):
             'upstream=true&downstream=false&'
             'execution_date={}&'
             'origin=/admin'.format(DEFAULT_DATE_DS))
-        assert "Wait a minute" in response.data.decode('utf-8')
+        self.assertIn("Wait a minute", response.data.decode('utf-8'))
         url = (
             "/admin/airflow/success?task_id=section-1&"
             "dag_id=example_subdag_operator&upstream=true&downstream=true&"
             "recursive=true&future=false&past=false&execution_date={}&"
             "origin=/admin".format(DEFAULT_DATE_DS))
         response = self.app.get(url)
-        assert "Wait a minute" in response.data.decode('utf-8')
-        assert "section-1-task-1" in response.data.decode('utf-8')
-        assert "section-1-task-2" in response.data.decode('utf-8')
-        assert "section-1-task-3" in response.data.decode('utf-8')
-        assert "section-1-task-4" in response.data.decode('utf-8')
-        assert "section-1-task-5" in response.data.decode('utf-8')
+        self.assertIn("Wait a minute", response.data.decode('utf-8'))
+        self.assertIn("section-1-task-1", response.data.decode('utf-8'))
+        self.assertIn("section-1-task-2", response.data.decode('utf-8'))
+        self.assertIn("section-1-task-3", response.data.decode('utf-8'))
+        self.assertIn("section-1-task-4", response.data.decode('utf-8'))
+        self.assertIn("section-1-task-5", response.data.decode('utf-8'))
         response = self.app.get(url + "&confirmed=true")
         url = (
             "/admin/airflow/clear?task_id=runme_1&"
@@ -1526,7 +1526,7 @@ class WebUiTests(unittest.TestCase):
             "execution_date={}&"
             "origin=/admin".format(DEFAULT_DATE_DS))
         response = self.app.get(url)
-        assert "Wait a minute" in response.data.decode('utf-8')
+        self.assertIn("Wait a minute", response.data.decode('utf-8'))
         response = self.app.get(url + "&confirmed=true")
         url = (
             "/admin/airflow/run?task_id=runme_0&"
@@ -1541,7 +1541,7 @@ class WebUiTests(unittest.TestCase):
             "/admin/airflow/paused?"
             "dag_id=example_python_operator&is_paused=false")
         response = self.app.get("/admin/xcom", follow_redirects=True)
-        assert "Xcoms" in response.data.decode('utf-8')
+        self.assertIn("Xcoms", response.data.decode('utf-8'))
 
     def test_charts(self):
         session = Session()
@@ -1553,14 +1553,14 @@ class WebUiTests(unittest.TestCase):
         response = self.app.get(
             '/admin/airflow/chart'
             '?chart_id={}&iteration_no=1'.format(chart_id))
-        assert "Airflow task instance by type" in response.data.decode('utf-8')
+        self.assertIn("Airflow task instance by type", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/chart_data'
             '?chart_id={}&iteration_no=1'.format(chart_id))
-        assert "example" in response.data.decode('utf-8')
+        self.assertIn("example", response.data.decode('utf-8'))
         response = self.app.get(
             '/admin/airflow/dag_details?dag_id=example_branch_operator')
-        assert "run_this_first" in response.data.decode('utf-8')
+        self.assertIn("run_this_first", response.data.decode('utf-8'))
 
     def test_fetch_task_instance(self):
         url = (
@@ -1568,7 +1568,7 @@ class WebUiTests(unittest.TestCase):
             "dag_id=example_bash_operator&"
             "execution_date={}".format(DEFAULT_DATE_DS))
         response = self.app.get(url)
-        assert "{}" in response.data.decode('utf-8')
+        self.assertIn("{}", response.data.decode('utf-8'))
 
         TI = models.TaskInstance
         ti = TI(
@@ -1577,7 +1577,7 @@ class WebUiTests(unittest.TestCase):
         job.run()
 
         response = self.app.get(url)
-        assert "runme_0" in response.data.decode('utf-8')
+        self.assertIn("runme_0", response.data.decode('utf-8'))
 
     def tearDown(self):
         configuration.conf.set("webserver", "expose_config", "False")
@@ -1624,19 +1624,19 @@ class WebPasswordAuthTest(unittest.TestCase):
         return self.app.get('/admin/airflow/logout', follow_redirects=True)
 
     def test_login_logout_password_auth(self):
-        assert configuration.getboolean('webserver', 'authenticate') is True
+        self.assertTrue(configuration.getboolean('webserver', 'authenticate'))
 
         response = self.login('user1', 'whatever')
-        assert 'Incorrect login details' in response.data.decode('utf-8')
+        self.assertIn('Incorrect login details', response.data.decode('utf-8'))
 
         response = self.login('airflow_passwordauth', 'wrongpassword')
-        assert 'Incorrect login details' in response.data.decode('utf-8')
+        self.assertIn('Incorrect login details', response.data.decode('utf-8'))
 
         response = self.login('airflow_passwordauth', 'password')
-        assert 'Data Profiling' in response.data.decode('utf-8')
+        self.assertIn('Data Profiling', response.data.decode('utf-8'))
 
         response = self.logout()
-        assert 'form-signin' in response.data.decode('utf-8')
+        self.assertIn('form-signin', response.data.decode('utf-8'))
 
     def test_unauthorized_password_auth(self):
         response = self.app.get("/admin/airflow/landing_times")
@@ -1691,19 +1691,19 @@ class WebLdapAuthTest(unittest.TestCase):
         return self.app.get('/admin/airflow/logout', follow_redirects=True)
 
     def test_login_logout_ldap(self):
-        assert configuration.getboolean('webserver', 'authenticate') is True
+        self.assertTrue(configuration.getboolean('webserver', 'authenticate'))
 
         response = self.login('user1', 'userx')
-        assert 'Incorrect login details' in response.data.decode('utf-8')
+        self.assertIn('Incorrect login details', response.data.decode('utf-8'))
 
         response = self.login('userz', 'user1')
-        assert 'Incorrect login details' in response.data.decode('utf-8')
+        self.assertIn('Incorrect login details', response.data.decode('utf-8'))
 
         response = self.login('user1', 'user1')
-        assert 'Data Profiling' in response.data.decode('utf-8')
+        self.assertIn('Data Profiling', response.data.decode('utf-8'))
 
         response = self.logout()
-        assert 'form-signin' in response.data.decode('utf-8')
+        self.assertIn('form-signin', response.data.decode('utf-8'))
 
     def test_unauthorized(self):
         response = self.app.get("/admin/airflow/landing_times")
@@ -1711,8 +1711,8 @@ class WebLdapAuthTest(unittest.TestCase):
 
     def test_no_filter(self):
         response = self.login('user1', 'user1')
-        assert 'Data Profiling' in response.data.decode('utf-8')
-        assert 'Connections' in response.data.decode('utf-8')
+        self.assertIn('Data Profiling', response.data.decode('utf-8'))
+        self.assertIn('Connections', response.data.decode('utf-8'))
 
     def test_with_filters(self):
         configuration.conf.set('ldap', 'superuser_filter',
@@ -1721,10 +1721,10 @@ class WebLdapAuthTest(unittest.TestCase):
                                'description=dataprofiler')
 
         response = self.login('dataprofiler', 'dataprofiler')
-        assert 'Data Profiling' in response.data.decode('utf-8')
+        self.assertIn('Data Profiling', response.data.decode('utf-8'))
 
         response = self.login('superuser', 'superuser')
-        assert 'Connections' in response.data.decode('utf-8')
+        self.assertIn('Connections', response.data.decode('utf-8'))
 
     def tearDown(self):
         configuration.load_test_config()
@@ -1760,7 +1760,7 @@ class LdapGroupTest(unittest.TestCase):
             mu = models.User(username=user,
                              is_superuser=False)
             auth = LdapUser(mu)
-            assert set(auth.ldap_groups) == set(users[user])
+            self.assertEqual(set(users[user]), set(auth.ldap_groups))
 
     def tearDown(self):
         configuration.load_test_config()
@@ -1924,58 +1924,58 @@ class ConnectionTest(unittest.TestCase):
 
     def test_using_env_var(self):
         c = SqliteHook.get_connection(conn_id='test_uri')
-        assert c.host == 'ec2.compute.com'
-        assert c.schema == 'the_database'
-        assert c.login == 'username'
-        assert c.password == 'password'
-        assert c.port == 5432
+        self.assertEqual('ec2.compute.com', c.host)
+        self.assertEqual('the_database', c.schema)
+        self.assertEqual('username', c.login)
+        self.assertEqual('password', c.password)
+        self.assertEqual(5432, c.port)
 
     def test_using_unix_socket_env_var(self):
         c = SqliteHook.get_connection(conn_id='test_uri_no_creds')
-        assert c.host == 'ec2.compute.com'
-        assert c.schema == 'the_database'
-        assert c.login is None
-        assert c.password is None
-        assert c.port is None
+        self.assertEqual('ec2.compute.com', c.host)
+        self.assertEqual('the_database', c.schema)
+        self.assertIsNone(c.login)
+        self.assertIsNone(c.password)
+        self.assertIsNone(c.port)
 
     def test_param_setup(self):
         c = models.Connection(conn_id='local_mysql', conn_type='mysql',
                               host='localhost', login='airflow',
                               password='airflow', schema='airflow')
-        assert c.host == 'localhost'
-        assert c.schema == 'airflow'
-        assert c.login == 'airflow'
-        assert c.password == 'airflow'
-        assert c.port is None
+        self.assertEqual('localhost', c.host)
+        self.assertEqual('airflow', c.schema)
+        self.assertEqual('airflow', c.login)
+        self.assertEqual('airflow', c.password)
+        self.assertIsNone(c.port)
 
     def test_env_var_priority(self):
         c = SqliteHook.get_connection(conn_id='airflow_db')
-        assert c.host != 'ec2.compute.com'
+        self.assertNotEqual('ec2.compute.com', c.host)
 
         os.environ['AIRFLOW_CONN_AIRFLOW_DB'] = \
             'postgres://username:password@ec2.compute.com:5432/the_database'
         c = SqliteHook.get_connection(conn_id='airflow_db')
-        assert c.host == 'ec2.compute.com'
-        assert c.schema == 'the_database'
-        assert c.login == 'username'
-        assert c.password == 'password'
-        assert c.port == 5432
+        self.assertEqual('ec2.compute.com', c.host)
+        self.assertEqual('the_database', c.schema)
+        self.assertEqual('username', c.login)
+        self.assertEqual('password', c.password)
+        self.assertEqual(5432, c.port)
         del os.environ['AIRFLOW_CONN_AIRFLOW_DB']
 
     def test_dbapi_get_uri(self):
         conn = BaseHook.get_connection(conn_id='test_uri')
         hook = conn.get_hook()
-        assert hook.get_uri() == 'postgres://username:password@ec2.compute.com:5432/the_database'
+        self.assertEqual('postgres://username:password@ec2.compute.com:5432/the_database', hook.get_uri())
         conn2 = BaseHook.get_connection(conn_id='test_uri_no_creds')
         hook2 = conn2.get_hook()
-        assert hook2.get_uri() == 'postgres://ec2.compute.com/the_database'
+        self.assertEqual('postgres://ec2.compute.com/the_database', hook2.get_uri())
 
     def test_dbapi_get_sqlalchemy_engine(self):
         conn = BaseHook.get_connection(conn_id='test_uri')
         hook = conn.get_hook()
         engine = hook.get_sqlalchemy_engine()
-        assert isinstance(engine, sqlalchemy.engine.Engine)
-        assert str(engine.url) == 'postgres://username:password@ec2.compute.com:5432/the_database'
+        self.assertIsInstance(engine, sqlalchemy.engine.Engine)
+        self.assertEqual('postgres://username:password@ec2.compute.com:5432/the_database', str(engine.url))
 
 
 class WebHDFSHookTest(unittest.TestCase):
@@ -1985,12 +1985,12 @@ class WebHDFSHookTest(unittest.TestCase):
     def test_simple_init(self):
         from airflow.hooks.webhdfs_hook import WebHDFSHook
         c = WebHDFSHook()
-        assert c.proxy_user == None
+        self.assertIsNone(c.proxy_user)
 
     def test_init_proxy_user(self):
         from airflow.hooks.webhdfs_hook import WebHDFSHook
         c = WebHDFSHook(proxy_user='someone')
-        assert c.proxy_user == 'someone'
+        self.assertEqual('someone', c.proxy_user)
 
 
 try:
@@ -2076,14 +2076,14 @@ class EmailTest(unittest.TestCase):
     def test_default_backend(self, mock_send_email):
         res = utils.email.send_email('to', 'subject', 'content')
         mock_send_email.assert_called_with('to', 'subject', 'content')
-        assert res == mock_send_email.return_value
+        self.assertEqual(mock_send_email.return_value, res)
 
     @mock.patch('airflow.utils.email.send_email_smtp')
     def test_custom_backend(self, mock_send_email):
         configuration.set('email', 'EMAIL_BACKEND', 'tests.core.send_email_test')
         utils.email.send_email('to', 'subject', 'content')
         send_email_test.assert_called_with('to', 'subject', 'content', files=None, dryrun=False, cc=None, bcc=None, mime_subtype='mixed')
-        assert not mock_send_email.called
+        self.assertFalse(mock_send_email.called)
 
 
 class EmailSmtpTest(unittest.TestCase):
@@ -2096,18 +2096,18 @@ class EmailSmtpTest(unittest.TestCase):
         attachment.write(b'attachment')
         attachment.seek(0)
         utils.email.send_email_smtp('to', 'subject', 'content', files=[attachment.name])
-        assert mock_send_mime.called
+        self.assertTrue(mock_send_mime.called)
         call_args = mock_send_mime.call_args[0]
-        assert call_args[0] == configuration.get('smtp', 'SMTP_MAIL_FROM')
-        assert call_args[1] == ['to']
+        self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), call_args[0])
+        self.assertEqual(['to'], call_args[1])
         msg = call_args[2]
-        assert msg['Subject'] == 'subject'
-        assert msg['From'] == configuration.get('smtp', 'SMTP_MAIL_FROM')
-        assert len(msg.get_payload()) == 2
-        assert msg.get_payload()[-1].get(u'Content-Disposition') == \
-               u'attachment; filename="' + os.path.basename(attachment.name) + '"'
+        self.assertEqual('subject', msg['Subject'])
+        self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
+        self.assertEqual(2, len(msg.get_payload()))
+        self.assertEqual(u'attachment; filename="' + os.path.basename(attachment.name) + '"',
+            msg.get_payload()[-1].get(u'Content-Disposition'))
         mimeapp = MIMEApplication('attachment')
-        assert msg.get_payload()[-1].get_payload() == mimeapp.get_payload()
+        self.assertEqual(mimeapp.get_payload(), msg.get_payload()[-1].get_payload())
 
     @mock.patch('airflow.utils.email.send_MIME_email')
     def test_send_bcc_smtp(self, mock_send_mime):
@@ -2115,18 +2115,18 @@ class EmailSmtpTest(unittest.TestCase):
         attachment.write(b'attachment')
         attachment.seek(0)
         utils.email.send_email_smtp('to', 'subject', 'content', files=[attachment.name], cc='cc', bcc='bcc')
-        assert mock_send_mime.called
+        self.assertTrue(mock_send_mime.called)
         call_args = mock_send_mime.call_args[0]
-        assert call_args[0] == configuration.get('smtp', 'SMTP_MAIL_FROM')
-        assert call_args[1] == ['to', 'cc', 'bcc']
+        self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), call_args[0])
+        self.assertEqual(['to', 'cc', 'bcc'], call_args[1])
         msg = call_args[2]
-        assert msg['Subject'] == 'subject'
-        assert msg['From'] == configuration.get('smtp', 'SMTP_MAIL_FROM')
-        assert len(msg.get_payload()) == 2
-        assert msg.get_payload()[-1].get(u'Content-Disposition') == \
-               u'attachment; filename="' + os.path.basename(attachment.name) + '"'
+        self.assertEqual('subject', msg['Subject'])
+        self.assertEqual(configuration.get('smtp', 'SMTP_MAIL_FROM'), msg['From'])
+        self.assertEqual(2, len(msg.get_payload()))
+        self.assertEqual(u'attachment; filename="' + os.path.basename(attachment.name) + '"',
+            msg.get_payload()[-1].get(u'Content-Disposition'))
         mimeapp = MIMEApplication('attachment')
-        assert msg.get_payload()[-1].get_payload() == mimeapp.get_payload()
+        self.assertEqual(mimeapp.get_payload(), msg.get_payload()[-1].get_payload())
 
 
     @mock.patch('smtplib.SMTP_SSL')
@@ -2140,13 +2140,13 @@ class EmailSmtpTest(unittest.TestCase):
             configuration.get('smtp', 'SMTP_HOST'),
             configuration.getint('smtp', 'SMTP_PORT'),
         )
-        assert mock_smtp.return_value.starttls.called
+        self.assertTrue(mock_smtp.return_value.starttls.called)
         mock_smtp.return_value.login.assert_called_with(
             configuration.get('smtp', 'SMTP_USER'),
             configuration.get('smtp', 'SMTP_PASSWORD'),
         )
         mock_smtp.return_value.sendmail.assert_called_with('from', 'to', msg.as_string())
-        assert mock_smtp.return_value.quit.called
+        self.assertTrue(mock_smtp.return_value.quit.called)
 
     @mock.patch('smtplib.SMTP_SSL')
     @mock.patch('smtplib.SMTP')
@@ -2155,7 +2155,7 @@ class EmailSmtpTest(unittest.TestCase):
         mock_smtp.return_value = mock.Mock()
         mock_smtp_ssl.return_value = mock.Mock()
         utils.email.send_MIME_email('from', 'to', MIMEMultipart(), dryrun=False)
-        assert not mock_smtp.called
+        self.assertFalse(mock_smtp.called)
         mock_smtp_ssl.assert_called_with(
             configuration.get('smtp', 'SMTP_HOST'),
             configuration.getint('smtp', 'SMTP_PORT'),
@@ -2169,19 +2169,19 @@ class EmailSmtpTest(unittest.TestCase):
         mock_smtp.return_value = mock.Mock()
         mock_smtp_ssl.return_value = mock.Mock()
         utils.email.send_MIME_email('from', 'to', MIMEMultipart(), dryrun=False)
-        assert not mock_smtp_ssl.called
+        self.assertFalse(mock_smtp_ssl.called)
         mock_smtp.assert_called_with(
             configuration.get('smtp', 'SMTP_HOST'),
             configuration.getint('smtp', 'SMTP_PORT'),
         )
-        assert not mock_smtp.login.called
+        self.assertFalse(mock_smtp.login.called)
 
     @mock.patch('smtplib.SMTP_SSL')
     @mock.patch('smtplib.SMTP')
     def test_send_mime_dryrun(self, mock_smtp, mock_smtp_ssl):
         utils.email.send_MIME_email('from', 'to', MIMEMultipart(), dryrun=True)
-        assert not mock_smtp.called
-        assert not mock_smtp_ssl.called
+        self.assertFalse(mock_smtp.called)
+        self.assertFalse(mock_smtp_ssl.called)
 
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/dags/no_dags.py
----------------------------------------------------------------------
diff --git a/tests/dags/no_dags.py b/tests/dags/no_dags.py
index a84b6da..759b563 100644
--- a/tests/dags/no_dags.py
+++ b/tests/dags/no_dags.py
@@ -11,4 +11,4 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
\ No newline at end of file
+#

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index e520b44..7f3c285 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -1056,7 +1056,7 @@ class SchedulerJobTest(unittest.TestCase):
         logging.info("Test ran in %.2fs, expected %.2fs",
                      run_duration,
                      expected_run_duration)
-        assert run_duration - expected_run_duration < 5.0
+        self.assertLess(run_duration - expected_run_duration, 5.0)
 
     def test_dag_with_system_exit(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 37f109d..346f47c 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -49,8 +49,8 @@ class DagTest(unittest.TestCase):
         """
         dag = models.DAG('test-dag')
 
-        assert type(dag.params) == dict
-        assert len(dag.params) == 0
+        self.assertEqual(dict, type(dag.params))
+        self.assertEqual(0, len(dag.params))
 
     def test_params_passed_and_params_in_default_args_no_override(self):
         """
@@ -68,7 +68,7 @@ class DagTest(unittest.TestCase):
 
         params_combined = params1.copy()
         params_combined.update(params2)
-        assert dag.params == params_combined
+        self.assertEqual(params_combined, dag.params)
 
     def test_dag_as_context_manager(self):
         """
@@ -121,7 +121,7 @@ class DagRunTest(unittest.TestCase):
     def test_id_for_date(self):
         run_id = models.DagRun.id_for_date(
             datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None))
-        assert run_id == 'scheduled__2015-01-02T03:04:05', (
+        self.assertEqual('scheduled__2015-01-02T03:04:05', run_id, msg=
             'Generated run_id did not match expectations: {0}'.format(run_id))
 
 
@@ -139,10 +139,10 @@ class DagBagTest(unittest.TestCase):
         for dag_id in some_expected_dag_ids:
             dag = dagbag.get_dag(dag_id)
 
-            assert dag is not None
-            assert dag.dag_id == dag_id
+            self.assertIsNotNone(dag)
+            self.assertEqual(dag_id, dag.dag_id)
 
-        assert dagbag.size() >= 7
+        self.assertGreaterEqual(dagbag.size(), 7)
 
     def test_get_non_existing_dag(self):
         """
@@ -151,7 +151,7 @@ class DagBagTest(unittest.TestCase):
         dagbag = models.DagBag(include_examples=True)
 
         non_existing_dag_id = "non_existing_dag_id"
-        assert dagbag.get_dag(non_existing_dag_id) is None
+        self.assertIsNone(dagbag.get_dag(non_existing_dag_id))
 
     def test_process_file_that_contains_multi_bytes_char(self):
         """
@@ -163,7 +163,7 @@ class DagBagTest(unittest.TestCase):
         f.flush()
 
         dagbag = models.DagBag(include_examples=True)
-        assert dagbag.process_file(f.name) == []
+        self.assertEqual([], dagbag.process_file(f.name))
 
     def test_zip(self):
         """
@@ -171,7 +171,7 @@ class DagBagTest(unittest.TestCase):
         """
         dagbag = models.DagBag()
         dagbag.process_file(os.path.join(TEST_DAGS_FOLDER, "test_zip.zip"))
-        assert dagbag.get_dag("test_zip_dag")
+        self.assertTrue(dagbag.get_dag("test_zip_dag"))
 
     @patch.object(DagModel,'get_current')
     def test_get_dag_without_refresh(self, mock_dagmodel):
@@ -196,9 +196,9 @@ class DagBagTest(unittest.TestCase):
         processed_files = dagbag.process_file_calls
 
         # Should not call process_file agani, since it's already loaded during init.
-        assert dagbag.process_file_calls == 1
-        assert dagbag.get_dag(dag_id) != None
-        assert dagbag.process_file_calls == 1
+        self.assertEqual(1, dagbag.process_file_calls)
+        self.assertIsNotNone(dagbag.get_dag(dag_id))
+        self.assertEqual(1, dagbag.process_file_calls)
 
     def test_get_dag_fileloc(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index fec5e69..69166fd 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -83,9 +83,9 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             hook.get_conn(self.nondefault_schema)
 
             # Verify
-            assert connect_mock.called
+            self.assertTrue(connect_mock.called)
             (args, kwargs) = connect_mock.call_args_list[0]
-            assert kwargs['database'] == self.nondefault_schema
+            self.assertEqual(self.nondefault_schema, kwargs['database'])
 
         def test_get_results_with_schema(self):
             from airflow.hooks.hive_hooks import HiveServer2Hook
@@ -126,10 +126,10 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             hook.get_records(sql, self.nondefault_schema)
 
             # Verify
-            assert self.connect_mock.called
+            self.assertTrue(self.connect_mock.called)
             (args, kwargs) = self.connect_mock.call_args_list[0]
-            assert args[0] == sql
-            assert kwargs['schema'] == self.nondefault_schema
+            self.assertEqual(sql, args[0])
+            self.assertEqual(self.nondefault_schema, kwargs['schema'])
 
         @mock.patch('HiveServer2Hook.get_results', return_value={'data': []})
         def test_get_pandas_df_with_schema(self, get_results_mock):
@@ -143,10 +143,10 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             hook.get_pandas_df(sql, self.nondefault_schema)
 
             # Verify
-            assert self.connect_mock.called
+            self.assertTrue(self.connect_mock.called)
             (args, kwargs) = self.connect_mock.call_args_list[0]
-            assert args[0] == sql
-            assert kwargs['schema'] == self.nondefault_schema
+            self.assertEqual(sql, args[0])
+            self.assertEqual(self.nondefault_schema, kwargs['schema'])
 
     class HivePrestoTest(unittest.TestCase):
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/operators/latest_only_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/latest_only_operator.py b/tests/operators/latest_only_operator.py
index 37aec38..3ac5fac 100644
--- a/tests/operators/latest_only_operator.py
+++ b/tests/operators/latest_only_operator.py
@@ -77,17 +77,17 @@ class LatestOnlyOperatorTest(unittest.TestCase):
         latest_instances = get_task_instances('latest')
         exec_date_to_latest_state = {
             ti.execution_date: ti.state for ti in latest_instances}
-        assert exec_date_to_latest_state == {
+        self.assertEqual({
             datetime.datetime(2016, 1, 1): 'success',
             datetime.datetime(2016, 1, 1, 12): 'success',
-            datetime.datetime(2016, 1, 2): 'success',
-        }
+            datetime.datetime(2016, 1, 2): 'success', }, 
+            exec_date_to_latest_state)
 
         downstream_instances = get_task_instances('downstream')
         exec_date_to_downstream_state = {
             ti.execution_date: ti.state for ti in downstream_instances}
-        assert exec_date_to_downstream_state == {
+        self.assertEqual({
             datetime.datetime(2016, 1, 1): 'skipped',
             datetime.datetime(2016, 1, 1, 12): 'skipped',
-            datetime.datetime(2016, 1, 2): 'success',
-        }
+            datetime.datetime(2016, 1, 2): 'success',},
+            exec_date_to_downstream_state)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 7aaf12e..6fc2449 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -87,7 +87,7 @@ class MySqlTest(unittest.TestCase):
                 h.bulk_load("test_airflow", t.name)
                 c.execute("SELECT dummy FROM test_airflow")
                 results = tuple(result[0] for result in c.fetchall())
-                assert sorted(records) == sorted(results)
+                self.assertEqual(sorted(results), sorted(records))
 
     def test_mysql_to_mysql(self):
         sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
index e8b272b..e77216b 100644
--- a/tests/operators/sensors.py
+++ b/tests/operators/sensors.py
@@ -180,4 +180,4 @@ class HdfsSensorTests(unittest.TestCase):
         # When
         # Then
         with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
\ No newline at end of file
+            task.execute(None)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/plugins_manager.py
----------------------------------------------------------------------
diff --git a/tests/plugins_manager.py b/tests/plugins_manager.py
index 0012cdf..520f822 100644
--- a/tests/plugins_manager.py
+++ b/tests/plugins_manager.py
@@ -35,19 +35,19 @@ class PluginsTest(unittest.TestCase):
 
     def test_operators(self):
         from airflow.operators.test_plugin import PluginOperator
-        assert issubclass(PluginOperator, BaseOperator)
+        self.assertTrue(issubclass(PluginOperator, BaseOperator))
 
     def test_hooks(self):
         from airflow.hooks.test_plugin import PluginHook
-        assert issubclass(PluginHook, BaseHook)
+        self.assertTrue(issubclass(PluginHook, BaseHook))
 
     def test_executors(self):
         from airflow.executors.test_plugin import PluginExecutor
-        assert issubclass(PluginExecutor, BaseExecutor)
+        self.assertTrue(issubclass(PluginExecutor, BaseExecutor))
 
     def test_macros(self):
         from airflow.macros.test_plugin import plugin_macro
-        assert callable(plugin_macro)
+        self.assertTrue(callable(plugin_macro))
 
     def test_admin_views(self):
         app = cached_app()
@@ -55,11 +55,11 @@ class PluginsTest(unittest.TestCase):
         category = admin._menu_categories['Test Plugin']
         [admin_view] = [v for v in category.get_children()
                         if isinstance(v, MenuView)]
-        assert admin_view.name == 'Test View'
+        self.assertEqual('Test View', admin_view.name)
 
     def test_flask_blueprints(self):
         app = cached_app()
-        assert isinstance(app.blueprints['test_plugin'], Blueprint)
+        self.assertIsInstance(app.blueprints['test_plugin'], Blueprint)
 
     def test_menu_links(self):
         app = cached_app()
@@ -67,4 +67,4 @@ class PluginsTest(unittest.TestCase):
         category = admin._menu_categories['Test Plugin']
         [menu_link] = [ml for ml in category.get_children()
                        if isinstance(ml, MenuLink)]
-        assert menu_link.name == 'Test Menu Link'
+        self.assertEqual('Test Menu Link', menu_link.name)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ceb3abf/tests/www/api/experimental/test_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www/api/experimental/test_endpoints.py b/tests/www/api/experimental/test_endpoints.py
index 2134a44..8218360 100644
--- a/tests/www/api/experimental/test_endpoints.py
+++ b/tests/www/api/experimental/test_endpoints.py
@@ -31,16 +31,16 @@ class ApiExperimentalTests(unittest.TestCase):
         url_template = '/api/experimental/dags/{}/tasks/{}'
 
         response = self.app.get(url_template.format('example_bash_operator', 'runme_0'))
-        assert '"email"' in response.data.decode('utf-8')
-        assert 'error' not in response.data.decode('utf-8')
+        self.assertIn('"email"', response.data.decode('utf-8'))
+        self.assertNotIn('error', response.data.decode('utf-8'))
         self.assertEqual(200, response.status_code)
 
         response = self.app.get(url_template.format('example_bash_operator', 'DNE'))
-        assert 'error' in response.data.decode('utf-8')
+        self.assertIn('error', response.data.decode('utf-8'))
         self.assertEqual(404, response.status_code)
 
         response = self.app.get(url_template.format('DNE', 'DNE'))
-        assert 'error' in response.data.decode('utf-8')
+        self.assertIn('error', response.data.decode('utf-8'))
         self.assertEqual(404, response.status_code)
 
     def test_trigger_dag(self):


Mime
View raw message