airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/3] incubator-airflow git commit: [AIRFLOW-1094] Run unit tests under contrib in Travis
Date Mon, 17 Apr 2017 08:05:08 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 74c1ce254 -> 219c50641


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/operators/test_sqoop_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_sqoop_operator.py b/tests/contrib/operators/test_sqoop_operator.py
new file mode 100644
index 0000000..a46dc93
--- /dev/null
+++ b/tests/contrib/operators/test_sqoop_operator.py
@@ -0,0 +1,93 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.operators.sqoop_operator import SqoopOperator
+
+
+class TestSqoopOperator(unittest.TestCase):
+    _config = {
+        'cmd_type': 'export',
+        'table': 'target_table',
+        'query': 'SELECT * FROM schema.table',
+        'target_dir': '/path/on/hdfs/to/import',
+        'append': True,
+        'file_type': 'avro',
+        'columns': 'a,b,c',
+        'num_mappers': 22,
+        'split_by': 'id',
+        'export_dir': '/path/on/hdfs/to/export',
+        'input_null_string': '\n',
+        'input_null_non_string': '\t',
+        'staging_table': 'target_table_staging',
+        'clear_staging_table': True,
+        'enclosed_by': '"',
+        'escaped_by': '\\',
+        'input_fields_terminated_by': '|',
+        'input_lines_terminated_by': '\n',
+        'input_optionally_enclosed_by': '"',
+        'batch': True,
+        'relaxed_isolation': True,
+        'direct': True,
+        'driver': 'com.microsoft.jdbc.sqlserver.SQLServerDriver',
+        'properties': {
+            'mapred.map.max.attempts': '1'
+        }
+    }
+
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': datetime.datetime(2017, 1, 1)
+        }
+        self.dag = DAG('test_dag_id', default_args=args)
+
+    def test_execute(self, conn_id='sqoop_default'):
+        operator = SqoopOperator(
+            task_id='sqoop_job',
+            dag=self.dag,
+            **self._config
+        )
+
+        self.assertEqual(conn_id, operator.conn_id)
+
+        self.assertEqual(self._config['cmd_type'], operator.cmd_type)
+        self.assertEqual(self._config['table'], operator.table)
+        self.assertEqual(self._config['target_dir'], operator.target_dir)
+        self.assertEqual(self._config['append'], operator.append)
+        self.assertEqual(self._config['file_type'], operator.file_type)
+        self.assertEqual(self._config['num_mappers'], operator.num_mappers)
+        self.assertEqual(self._config['split_by'], operator.split_by)
+        self.assertEqual(self._config['input_null_string'],
+                         operator.input_null_string)
+        self.assertEqual(self._config['input_null_non_string'],
+                         operator.input_null_non_string)
+        self.assertEqual(self._config['staging_table'], operator.staging_table)
+        self.assertEqual(self._config['clear_staging_table'],
+                         operator.clear_staging_table)
+        self.assertEqual(self._config['batch'], operator.batch)
+        self.assertEqual(self._config['relaxed_isolation'],
+                         operator.relaxed_isolation)
+        self.assertEqual(self._config['direct'], operator.direct)
+        self.assertEqual(self._config['driver'], operator.driver)
+        self.assertEqual(self._config['properties'], operator.properties)
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/operators/test_ssh_execute_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_ssh_execute_operator.py b/tests/contrib/operators/test_ssh_execute_operator.py
new file mode 100644
index 0000000..0c2b9f2
--- /dev/null
+++ b/tests/contrib/operators/test_ssh_execute_operator.py
@@ -0,0 +1,95 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import os
+import sys
+from datetime import datetime
+from io import StringIO
+
+import mock
+
+from airflow import configuration
+from airflow.settings import Session
+from airflow import models, DAG
+from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
+
+
+TEST_DAG_ID = 'unit_tests'
+DEFAULT_DATE = datetime(2015, 1, 1)
+configuration.load_test_config()
+
+
+def reset(dag_id=TEST_DAG_ID):
+    session = Session()
+    tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
+    tis.delete()
+    session.commit()
+    session.close()
+
+reset()
+
+
+class SSHExecuteOperatorTest(unittest.TestCase):
+
+    def setUp(self):
+
+        if sys.version_info[0] == 3:
+            raise unittest.SkipTest('SSHExecuteOperatorTest won\'t work with '
+                                    'python3. No need to test anything here')
+
+        configuration.load_test_config()
+        from airflow.contrib.hooks.ssh_hook import SSHHook
+        hook = mock.MagicMock(spec=SSHHook)
+        hook.no_host_key_check = True
+        hook.Popen.return_value.stdout = StringIO(u'stdout')
+        hook.Popen.return_value.returncode = False
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE,
+            'provide_context': True
+        }
+        dag = DAG(TEST_DAG_ID+'test_schedule_dag_once', default_args=args)
+        dag.schedule_interval = '@once'
+        self.hook = hook
+        self.dag = dag
+
+    @mock.patch('airflow.contrib.operators.ssh_execute_operator.SSHTempFileContent')
+    def test_simple(self, temp_file):
+        temp_file.return_value.__enter__ = lambda x: 'filepath'
+        task = SSHExecuteOperator(
+            task_id="test",
+            bash_command="echo airflow",
+            ssh_hook=self.hook,
+            dag=self.dag,
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    @mock.patch('airflow.contrib.operators.ssh_execute_operator.SSHTempFileContent')
+    def test_with_env(self, temp_file):
+        temp_file.return_value.__enter__ = lambda x: 'filepath'
+        test_env = os.environ.copy()
+        test_env['AIRFLOW_test'] = "test"
+        task = SSHExecuteOperator(
+            task_id="test",
+            bash_command="echo $AIRFLOW_HOME",
+            ssh_hook=self.hook,
+            env=test_env['AIRFLOW_test'],
+            dag=self.dag,
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/datadog_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/datadog_sensor.py b/tests/contrib/sensors/datadog_sensor.py
deleted file mode 100644
index 4d601e1..0000000
--- a/tests/contrib/sensors/datadog_sensor.py
+++ /dev/null
@@ -1,91 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-from mock import patch
-
-from airflow.contrib.sensors.datadog_sensor import DatadogSensor
-
-
-at_least_one_event = [{'alert_type': 'info',
-                       'comments': [],
-                       'date_happened': 1419436860,
-                       'device_name': None,
-                       'host': None,
-                       'id': 2603387619536318140,
-                       'is_aggregate': False,
-                       'priority': 'normal',
-                       'resource': '/api/v1/events/2603387619536318140',
-                       'source': 'My Apps',
-                       'tags': ['application:web', 'version:1'],
-                       'text': 'And let me tell you all about it here!',
-                       'title': 'Something big happened!',
-                       'url': '/event/jump_to?event_id=2603387619536318140'},
-                      {'alert_type': 'info',
-                       'comments': [],
-                       'date_happened': 1419436865,
-                       'device_name': None,
-                       'host': None,
-                       'id': 2603387619536318141,
-                       'is_aggregate': False,
-                       'priority': 'normal',
-                       'resource': '/api/v1/events/2603387619536318141',
-                       'source': 'My Apps',
-                       'tags': ['application:web', 'version:1'],
-                       'text': 'And let me tell you all about it here!',
-                       'title': 'Something big happened!',
-                       'url': '/event/jump_to?event_id=2603387619536318141'}]
-
-zero_events = []
-
-
-class TestDatadogSensor(unittest.TestCase):
-    @patch('airflow.contrib.hooks.datadog_hook.api.Event.query')
-    @patch('airflow.contrib.sensors.datadog_sensor.api.Event.query')
-    def test_sensor_ok(self, api1, api2):
-        api1.return_value = at_least_one_event
-        api2.return_value = at_least_one_event
-
-        sensor = DatadogSensor(
-            task_id='test_datadog',
-            datadog_conn_id='datadog_default',
-            from_seconds_ago=3600,
-            up_to_seconds_from_now=0,
-            priority=None,
-            sources=None,
-            tags=None,
-            response_check=None)
-
-        self.assertTrue(sensor.poke({}))
-
-    @patch('airflow.contrib.hooks.datadog_hook.api.Event.query')
-    @patch('airflow.contrib.sensors.datadog_sensor.api.Event.query')
-    def test_sensor_fail(self, api1, api2):
-        api1.return_value = zero_events
-        api2.return_value = zero_events
-
-        sensor = DatadogSensor(
-            task_id='test_datadog',
-            datadog_conn_id='datadog_default',
-            from_seconds_ago=0,
-            up_to_seconds_from_now=0,
-            priority=None,
-            sources=None,
-            tags=None,
-            response_check=None)
-
-        self.assertFalse(sensor.poke({}))
-
-if __name__ == '__main__':
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/emr_base_sensor.py b/tests/contrib/sensors/emr_base_sensor.py
deleted file mode 100644
index 0b8ad2f..0000000
--- a/tests/contrib/sensors/emr_base_sensor.py
+++ /dev/null
@@ -1,126 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-
-from airflow import configuration
-from airflow.exceptions import AirflowException
-from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
-
-
-class TestEmrBaseSensor(unittest.TestCase):
-    def setUp(self):
-        configuration.load_test_config()
-
-    def test_subclasses_that_implment_required_methods_and_constants_succeed_when_response_is_good(self):
-        class EmrBaseSensorSubclass(EmrBaseSensor):
-            NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
-            FAILED_STATE = 'FAILED'
-
-            def get_emr_response(self):
-                return {
-                    'SomeKey': {'State': 'COMPLETED'},
-                    'ResponseMetadata': {'HTTPStatusCode': 200}
-                }
-
-            def state_from_response(self, response):
-                return response['SomeKey']['State']
-
-        operator = EmrBaseSensorSubclass(
-            task_id='test_task',
-            poke_interval=2,
-            job_flow_id='j-8989898989',
-            aws_conn_id='aws_test'
-        )
-
-        operator.execute(None)
-
-    def test_poke_returns_false_when_state_is_a_non_terminal_state(self):
-        class EmrBaseSensorSubclass(EmrBaseSensor):
-            NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
-            FAILED_STATE = 'FAILED'
-
-            def get_emr_response(self):
-                return {
-                    'SomeKey': {'State': 'PENDING'},
-                    'ResponseMetadata': {'HTTPStatusCode': 200}
-                }
-
-            def state_from_response(self, response):
-                return response['SomeKey']['State']
-
-        operator = EmrBaseSensorSubclass(
-            task_id='test_task',
-            poke_interval=2,
-            job_flow_id='j-8989898989',
-            aws_conn_id='aws_test'
-        )
-
-        self.assertEqual(operator.poke(None), False)
-
-    def test_poke_returns_false_when_http_response_is_bad(self):
-        class EmrBaseSensorSubclass(EmrBaseSensor):
-            NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
-            FAILED_STATE = 'FAILED'
-
-            def get_emr_response(self):
-                return {
-                    'SomeKey': {'State': 'COMPLETED'},
-                    'ResponseMetadata': {'HTTPStatusCode': 400}
-                }
-
-            def state_from_response(self, response):
-                return response['SomeKey']['State']
-
-        operator = EmrBaseSensorSubclass(
-            task_id='test_task',
-            poke_interval=2,
-            job_flow_id='j-8989898989',
-            aws_conn_id='aws_test'
-        )
-
-        self.assertEqual(operator.poke(None), False)
-
-
-    def test_poke_raises_error_when_job_has_failed(self):
-        class EmrBaseSensorSubclass(EmrBaseSensor):
-            NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
-            FAILED_STATE = 'FAILED'
-
-            def get_emr_response(self):
-                return {
-                    'SomeKey': {'State': 'FAILED'},
-                    'ResponseMetadata': {'HTTPStatusCode': 200}
-                }
-
-            def state_from_response(self, response):
-                return response['SomeKey']['State']
-
-        operator = EmrBaseSensorSubclass(
-            task_id='test_task',
-            poke_interval=2,
-            job_flow_id='j-8989898989',
-            aws_conn_id='aws_test'
-        )
-
-        with self.assertRaises(AirflowException) as context:
-
-            operator.poke(None)
-
-
-        self.assertTrue('EMR job failed' in context.exception)
-
-
-if __name__ == '__main__':
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/emr_job_flow_sensor.py b/tests/contrib/sensors/emr_job_flow_sensor.py
deleted file mode 100644
index f993786..0000000
--- a/tests/contrib/sensors/emr_job_flow_sensor.py
+++ /dev/null
@@ -1,123 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-import datetime
-from dateutil.tz import tzlocal
-from mock import MagicMock, patch
-
-from airflow import configuration
-from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
-
-DESCRIBE_CLUSTER_RUNNING_RETURN = {
-    'Cluster': {
-        'Applications': [
-            {'Name': 'Spark', 'Version': '1.6.1'}
-        ],
-        'AutoTerminate': True,
-        'Configurations': [],
-        'Ec2InstanceAttributes': {'IamInstanceProfile': 'EMR_EC2_DefaultRole'},
-        'Id': 'j-27ZY9GBEEU2GU',
-        'LogUri': 's3n://some-location/',
-        'Name': 'PiCalc',
-        'NormalizedInstanceHours': 0,
-        'ReleaseLabel': 'emr-4.6.0',
-        'ServiceRole': 'EMR_DefaultRole',
-        'Status': {
-            'State': 'STARTING',
-            'StateChangeReason': {},
-            'Timeline': {'CreationDateTime': datetime.datetime(2016, 6, 27, 21, 5, 2, 348000, tzinfo=tzlocal())}
-        },
-        'Tags': [
-            {'Key': 'app', 'Value': 'analytics'},
-            {'Key': 'environment', 'Value': 'development'}
-        ],
-        'TerminationProtected': False,
-        'VisibleToAllUsers': True
-    },
-    'ResponseMetadata': {
-        'HTTPStatusCode': 200,
-        'RequestId': 'd5456308-3caa-11e6-9d46-951401f04e0e'
-    }
-}
-
-DESCRIBE_CLUSTER_TERMINATED_RETURN = {
-    'Cluster': {
-        'Applications': [
-            {'Name': 'Spark', 'Version': '1.6.1'}
-        ],
-        'AutoTerminate': True,
-        'Configurations': [],
-        'Ec2InstanceAttributes': {'IamInstanceProfile': 'EMR_EC2_DefaultRole'},
-        'Id': 'j-27ZY9GBEEU2GU',
-        'LogUri': 's3n://some-location/',
-        'Name': 'PiCalc',
-        'NormalizedInstanceHours': 0,
-        'ReleaseLabel': 'emr-4.6.0',
-        'ServiceRole': 'EMR_DefaultRole',
-        'Status': {
-            'State': 'TERMINATED',
-            'StateChangeReason': {},
-            'Timeline': {'CreationDateTime': datetime.datetime(2016, 6, 27, 21, 5, 2, 348000, tzinfo=tzlocal())}
-        },
-        'Tags': [
-            {'Key': 'app', 'Value': 'analytics'},
-            {'Key': 'environment', 'Value': 'development'}
-        ],
-        'TerminationProtected': False,
-        'VisibleToAllUsers': True
-    },
-    'ResponseMetadata': {
-        'HTTPStatusCode': 200,
-        'RequestId': 'd5456308-3caa-11e6-9d46-951401f04e0e'
-    }
-}
-
-
-class TestEmrJobFlowSensor(unittest.TestCase):
-    def setUp(self):
-        configuration.load_test_config()
-
-        # Mock out the emr_client (moto has incorrect response)
-        self.mock_emr_client = MagicMock()
-        self.mock_emr_client.describe_cluster.side_effect = [
-            DESCRIBE_CLUSTER_RUNNING_RETURN,
-            DESCRIBE_CLUSTER_TERMINATED_RETURN
-        ]
-
-        # Mock out the emr_client creator
-        self.boto3_client_mock = MagicMock(return_value=self.mock_emr_client)
-
-
-    def test_execute_calls_with_the_job_flow_id_until_it_reaches_a_terminal_state(self):
-        with patch('boto3.client', self.boto3_client_mock):
-
-            operator = EmrJobFlowSensor(
-                task_id='test_task',
-                poke_interval=2,
-                job_flow_id='j-8989898989',
-                aws_conn_id='aws_default'
-            )
-
-            operator.execute(None)
-
-            # make sure we called twice
-            self.assertEqual(self.mock_emr_client.describe_cluster.call_count, 2)
-
-            # make sure it was called with the job_flow_id
-            self.mock_emr_client.describe_cluster.assert_called_with(ClusterId='j-8989898989')
-
-
-if __name__ == '__main__':
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/emr_step_sensor.py b/tests/contrib/sensors/emr_step_sensor.py
deleted file mode 100644
index 58ee461..0000000
--- a/tests/contrib/sensors/emr_step_sensor.py
+++ /dev/null
@@ -1,119 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-import datetime
-from dateutil.tz import tzlocal
-from mock import MagicMock, patch
-import boto3
-
-from airflow import configuration
-from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
-
-DESCRIBE_JOB_STEP_RUNNING_RETURN = {
-    'ResponseMetadata': {
-        'HTTPStatusCode': 200,
-        'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6'
-    },
-    'Step': {
-        'ActionOnFailure': 'CONTINUE',
-        'Config': {
-            'Args': [
-                '/usr/lib/spark/bin/run-example',
-                'SparkPi',
-                '10'
-            ],
-            'Jar': 'command-runner.jar',
-            'Properties': {}
-        },
-        'Id': 's-VK57YR1Z9Z5N',
-        'Name': 'calculate_pi',
-        'Status': {
-            'State': 'RUNNING',
-            'StateChangeReason': {},
-            'Timeline': {
-                'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()),
-                'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal())
-            }
-        }
-    }
-}
-
-DESCRIBE_JOB_STEP_COMPLETED_RETURN = {
-    'ResponseMetadata': {
-        'HTTPStatusCode': 200,
-        'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6'
-    },
-    'Step': {
-        'ActionOnFailure': 'CONTINUE',
-        'Config': {
-            'Args': [
-                '/usr/lib/spark/bin/run-example',
-                'SparkPi',
-                '10'
-            ],
-            'Jar': 'command-runner.jar',
-            'Properties': {}
-        },
-        'Id': 's-VK57YR1Z9Z5N',
-        'Name': 'calculate_pi',
-        'Status': {
-            'State': 'COMPLETED',
-            'StateChangeReason': {},
-            'Timeline': {
-                'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()),
-                'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal())
-            }
-        }
-    }
-}
-
-
-class TestEmrStepSensor(unittest.TestCase):
-    def setUp(self):
-        configuration.load_test_config()
-
-        # Mock out the emr_client (moto has incorrect response)
-        self.mock_emr_client = MagicMock()
-        self.mock_emr_client.describe_step.side_effect = [
-            DESCRIBE_JOB_STEP_RUNNING_RETURN,
-            DESCRIBE_JOB_STEP_COMPLETED_RETURN
-        ]
-
-        # Mock out the emr_client creator
-        self.boto3_client_mock = MagicMock(return_value=self.mock_emr_client)
-
-
-    def test_execute_calls_with_the_job_flow_id_and_step_id_until_it_reaches_a_terminal_state(self):
-        with patch('boto3.client', self.boto3_client_mock):
-
-            operator = EmrStepSensor(
-                task_id='test_task',
-                poke_interval=1,
-                job_flow_id='j-8989898989',
-                step_id='s-VK57YR1Z9Z5N',
-                aws_conn_id='aws_default',
-            )
-
-            operator.execute(None)
-
-            # make sure we called twice
-            self.assertEqual(self.mock_emr_client.describe_step.call_count, 2)
-
-            # make sure it was called with the job_flow_id and step_id
-            self.mock_emr_client.describe_step.assert_called_with(ClusterId='j-8989898989', StepId='s-VK57YR1Z9Z5N')
-
-
-if __name__ == '__main__':
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/ftp_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/ftp_sensor.py b/tests/contrib/sensors/ftp_sensor.py
deleted file mode 100644
index 50f8b8b..0000000
--- a/tests/contrib/sensors/ftp_sensor.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-from ftplib import error_perm
-
-from mock import MagicMock
-
-from airflow.contrib.hooks.ftp_hook import FTPHook
-from airflow.contrib.sensors.ftp_sensor import FTPSensor
-
-
-class TestFTPSensor(unittest.TestCase):
-    def setUp(self):
-        super(TestFTPSensor, self).setUp()
-        self._create_hook_orig = FTPSensor._create_hook
-        self.hook_mock = MagicMock(spec=FTPHook)
-
-        def _create_hook_mock(sensor):
-            mock = MagicMock()
-            mock.__enter__ = lambda x: self.hook_mock
-
-            return mock
-
-        FTPSensor._create_hook = _create_hook_mock
-
-    def tearDown(self):
-        FTPSensor._create_hook = self._create_hook_orig
-        super(TestFTPSensor, self).tearDown()
-
-    def test_poke(self):
-        op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp",
-                       task_id="test_task")
-
-        self.hook_mock.get_mod_time.side_effect = \
-            [error_perm("550: Can't check for file existence"), None]
-
-        self.assertFalse(op.poke(None))
-        self.assertTrue(op.poke(None))
-
-    def test_poke_fails_due_error(self):
-        op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp",
-                       task_id="test_task")
-
-        self.hook_mock.get_mod_time.side_effect = \
-            error_perm("530: Login authentication failed")
-
-        with self.assertRaises(error_perm) as context:
-            op.execute(None)
-
-        self.assertTrue("530" in str(context.exception))
-
-
-if __name__ == '__main__':
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/hdfs_sensors.py b/tests/contrib/sensors/hdfs_sensors.py
deleted file mode 100644
index 0e2ed0c..0000000
--- a/tests/contrib/sensors/hdfs_sensors.py
+++ /dev/null
@@ -1,251 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import logging
-import sys
-import unittest
-import re
-from datetime import timedelta
-from airflow.contrib.sensors.hdfs_sensors import HdfsSensorFolder, HdfsSensorRegex
-from airflow.exceptions import AirflowSensorTimeout
-
-
-class HdfsSensorFolderTests(unittest.TestCase):
-    def setUp(self):
-        if sys.version_info[0] == 3:
-            raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here')
-        from tests.core import FakeHDFSHook
-        self.hook = FakeHDFSHook
-        self.logger = logging.getLogger()
-        self.logger.setLevel(logging.DEBUG)
-
-    def test_should_be_empty_directory(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
-        task = HdfsSensorFolder(task_id='Should_be_empty_directory',
-                                filepath='/datadirectory/empty_directory',
-                                be_empty=True,
-                                timeout=1,
-                                retry_delay=timedelta(seconds=1),
-                                poke_interval=1,
-                                hook=self.hook)
-
-        # When
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
-
-    def test_should_be_empty_directory_fail(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
-        task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
-                                filepath='/datadirectory/not_empty_directory',
-                                be_empty=True,
-                                timeout=1,
-                                retry_delay=timedelta(seconds=1),
-                                poke_interval=1,
-                                hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-    def test_should_be_a_non_empty_directory(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
-        task = HdfsSensorFolder(task_id='Should_be_non_empty_directory',
-                                filepath='/datadirectory/not_empty_directory',
-                                timeout=1,
-                                retry_delay=timedelta(seconds=1),
-                                poke_interval=1,
-                                hook=self.hook)
-
-        # When
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
-
-    def test_should_be_non_empty_directory_fail(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
-        task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
-                                filepath='/datadirectory/empty_directory',
-                                timeout=1,
-                                retry_delay=timedelta(seconds=1),
-                                poke_interval=1,
-                                hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-
-class HdfsSensorRegexTests(unittest.TestCase):
-    def setUp(self):
-        if sys.version_info[0] == 3:
-            raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here')
-        from tests.core import FakeHDFSHook
-        self.hook = FakeHDFSHook
-        self.logger = logging.getLogger()
-        self.logger.setLevel(logging.DEBUG)
-
-    def test_should_match_regex(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
-        compiled_regex = re.compile("test[1-2]file")
-        task = HdfsSensorRegex(task_id='Should_match_the_regex',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
-
-    def test_should_not_match_regex(self):
-        """
-        test the empty directory behaviour
-        :return:
-        """
-        # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
-        compiled_regex = re.compile("^IDoNotExist")
-        task = HdfsSensorRegex(task_id='Should_not_match_the_regex',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-    def test_should_match_regex_and_filesize(self):
-        """
-        test the file size behaviour with regex
-        :return:
-        """
-        # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
-        compiled_regex = re.compile("test[1-2]file")
-        task = HdfsSensorRegex(task_id='Should_match_the_regex_and_filesize',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               ignore_copying=True,
-                               ignored_ext=['_COPYING_', 'sftp'],
-                               file_size=10,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        task.execute(None)
-
-        # Then
-        # Nothing happens, nothing is raised exec is ok
-
-    def test_should_match_regex_but_filesize(self):
-        """
-        test the file size behaviour with regex
-        :return:
-        """
-        # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
-        compiled_regex = re.compile("test[1-2]file")
-        task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               file_size=20,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)
-
-    def test_should_match_regex_but_copyingext(self):
-        """
-        test the file size behaviour with regex
-        :return:
-        """
-        # Given
-        self.logger.debug('#' * 10)
-        self.logger.debug('Running %s', self._testMethodName)
-        self.logger.debug('#' * 10)
-        compiled_regex = re.compile("copying_file_\d+.txt")
-        task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
-                               filepath='/datadirectory/regex_dir',
-                               regex=compiled_regex,
-                               ignored_ext=['_COPYING_', 'sftp'],
-                               file_size=20,
-                               timeout=1,
-                               retry_delay=timedelta(seconds=1),
-                               poke_interval=1,
-                               hook=self.hook)
-
-        # When
-        # Then
-        with self.assertRaises(AirflowSensorTimeout):
-            task.execute(None)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/jira_sensor_test.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/jira_sensor_test.py b/tests/contrib/sensors/jira_sensor_test.py
deleted file mode 100644
index 77ca97f..0000000
--- a/tests/contrib/sensors/jira_sensor_test.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import unittest
-import datetime
-from mock import Mock
-from mock import patch
-
-from airflow import DAG, configuration
-from airflow.contrib.sensors.jira_sensor import JiraTicketSensor
-from airflow import models
-from airflow.utils import db
-
-DEFAULT_DATE = datetime.datetime(2017, 1, 1)
-jira_client_mock = Mock(
-        name="jira_client_for_test"
-)
-
-minimal_test_ticket = {
-    "id": "911539",
-    "self": "https://sandbox.localhost/jira/rest/api/2/issue/911539",
-    "key": "TEST-1226",
-    "fields": {
-        "labels": [
-            "test-label-1",
-            "test-label-2"
-        ],
-        "description": "this is a test description",
-    }
-}
-
-
-class TestJiraSensor(unittest.TestCase):
-    def setUp(self):
-        configuration.load_test_config()
-        args = {
-            'owner': 'airflow',
-            'start_date': DEFAULT_DATE
-        }
-        dag = DAG('test_dag_id', default_args=args)
-        self.dag = dag
-        db.merge_conn(
-                models.Connection(
-                        conn_id='jira_default', conn_type='jira',
-                        host='https://localhost/jira/', port=443,
-                        extra='{"verify": "False", "project": "AIRFLOW"}'))
-
-    @patch("airflow.contrib.hooks.jira_hook.JIRA",
-           autospec=True, return_value=jira_client_mock)
-    def test_issue_label_set(self, jira_mock):
-        jira_mock.return_value.issue.return_value = minimal_test_ticket
-
-        ticket_label_sensor = JiraTicketSensor(task_id='search-ticket-test',
-                                               ticket_id='TEST-1226',
-                                               field_checker_func=
-                                               TestJiraSensor.field_checker_func,
-                                               timeout=518400,
-                                               poke_interval=10,
-                                               dag=self.dag)
-
-        ticket_label_sensor.run(start_date=DEFAULT_DATE,
-                                end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-        self.assertTrue(jira_mock.called)
-        self.assertTrue(jira_mock.return_value.issue.called)
-
-    @staticmethod
-    def field_checker_func(context, issue):
-        return "test-label-1" in issue['fields']['labels']
-
-
-if __name__ == '__main__':
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/redis_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/redis_sensor.py b/tests/contrib/sensors/redis_sensor.py
deleted file mode 100644
index 8022a92..0000000
--- a/tests/contrib/sensors/redis_sensor.py
+++ /dev/null
@@ -1,64 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import unittest
-import datetime
-
-from mock import patch
-
-from airflow import DAG
-from airflow import configuration
-from airflow.contrib.sensors.redis_key_sensor import RedisKeySensor
-
-DEFAULT_DATE = datetime.datetime(2017, 1, 1)
-
-
-class TestRedisSensor(unittest.TestCase):
-
-    def setUp(self):
-        configuration.load_test_config()
-        args = {
-            'owner': 'airflow',
-            'start_date': DEFAULT_DATE
-        }
-
-        self.dag = DAG('test_dag_id', default_args=args)
-        self.sensor = RedisKeySensor(
-            task_id='test_task',
-            redis_conn_id='redis_default',
-            dag=self.dag,
-            key='test_key'
-        )
-
-    @patch("airflow.contrib.hooks.redis_hook.RedisHook.key_exists")
-    def test_poke(self, key_exists):
-        key_exists.return_value = True
-        self.assertTrue(self.sensor.poke(None))
-
-        key_exists.return_value = False
-        self.assertFalse(self.sensor.poke(None))
-
-    @patch("airflow.contrib.hooks.redis_hook.StrictRedis.exists")
-    def test_existing_key_called(self, redis_client_exists):
-        self.sensor.run(
-            start_date=DEFAULT_DATE,
-            end_date=DEFAULT_DATE, ignore_ti_state=True
-        )
-
-        self.assertTrue(redis_client_exists.called_with('test_key'))
-
-
-if __name__ == '__main__':
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_datadog_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_datadog_sensor.py b/tests/contrib/sensors/test_datadog_sensor.py
new file mode 100644
index 0000000..d845c54
--- /dev/null
+++ b/tests/contrib/sensors/test_datadog_sensor.py
@@ -0,0 +1,106 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import unittest
+from mock import patch
+
+from airflow import configuration
+from airflow.utils import db
+from airflow import models
+from airflow.contrib.sensors.datadog_sensor import DatadogSensor
+
+
+at_least_one_event = [{'alert_type': 'info',
+                       'comments': [],
+                       'date_happened': 1419436860,
+                       'device_name': None,
+                       'host': None,
+                       'id': 2603387619536318140,
+                       'is_aggregate': False,
+                       'priority': 'normal',
+                       'resource': '/api/v1/events/2603387619536318140',
+                       'source': 'My Apps',
+                       'tags': ['application:web', 'version:1'],
+                       'text': 'And let me tell you all about it here!',
+                       'title': 'Something big happened!',
+                       'url': '/event/jump_to?event_id=2603387619536318140'},
+                      {'alert_type': 'info',
+                       'comments': [],
+                       'date_happened': 1419436865,
+                       'device_name': None,
+                       'host': None,
+                       'id': 2603387619536318141,
+                       'is_aggregate': False,
+                       'priority': 'normal',
+                       'resource': '/api/v1/events/2603387619536318141',
+                       'source': 'My Apps',
+                       'tags': ['application:web', 'version:1'],
+                       'text': 'And let me tell you all about it here!',
+                       'title': 'Something big happened!',
+                       'url': '/event/jump_to?event_id=2603387619536318141'}]
+
+zero_events = []
+
+
+class TestDatadogSensor(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+        db.merge_conn(
+            models.Connection(
+                conn_id='datadog_default', conn_type='datadog',
+                login='login', password='password',
+                extra=json.dumps({'api_key': 'api_key', 'app_key': 'app_key'})
+            )
+        )
+
+    @patch('airflow.contrib.hooks.datadog_hook.api.Event.query')
+    @patch('airflow.contrib.sensors.datadog_sensor.api.Event.query')
+    def test_sensor_ok(self, api1, api2):
+        api1.return_value = at_least_one_event
+        api2.return_value = at_least_one_event
+
+        sensor = DatadogSensor(
+            task_id='test_datadog',
+            datadog_conn_id='datadog_default',
+            from_seconds_ago=3600,
+            up_to_seconds_from_now=0,
+            priority=None,
+            sources=None,
+            tags=None,
+            response_check=None)
+
+        self.assertTrue(sensor.poke({}))
+
+    @patch('airflow.contrib.hooks.datadog_hook.api.Event.query')
+    @patch('airflow.contrib.sensors.datadog_sensor.api.Event.query')
+    def test_sensor_fail(self, api1, api2):
+        api1.return_value = zero_events
+        api2.return_value = zero_events
+
+        sensor = DatadogSensor(
+            task_id='test_datadog',
+            datadog_conn_id='datadog_default',
+            from_seconds_ago=0,
+            up_to_seconds_from_now=0,
+            priority=None,
+            sources=None,
+            tags=None,
+            response_check=None)
+
+        self.assertFalse(sensor.poke({}))
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_emr_base_sensor.py b/tests/contrib/sensors/test_emr_base_sensor.py
new file mode 100644
index 0000000..9c39abb
--- /dev/null
+++ b/tests/contrib/sensors/test_emr_base_sensor.py
@@ -0,0 +1,126 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow import configuration
+from airflow.exceptions import AirflowException
+from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
+
+
+class TestEmrBaseSensor(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+
+    def test_subclasses_that_implment_required_methods_and_constants_succeed_when_response_is_good(self):
+        class EmrBaseSensorSubclass(EmrBaseSensor):
+            NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
+            FAILED_STATE = 'FAILED'
+
+            def get_emr_response(self):
+                return {
+                    'SomeKey': {'State': 'COMPLETED'},
+                    'ResponseMetadata': {'HTTPStatusCode': 200}
+                }
+
+            def state_from_response(self, response):
+                return response['SomeKey']['State']
+
+        operator = EmrBaseSensorSubclass(
+            task_id='test_task',
+            poke_interval=2,
+            job_flow_id='j-8989898989',
+            aws_conn_id='aws_test'
+        )
+
+        operator.execute(None)
+
+    def test_poke_returns_false_when_state_is_a_non_terminal_state(self):
+        class EmrBaseSensorSubclass(EmrBaseSensor):
+            NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
+            FAILED_STATE = 'FAILED'
+
+            def get_emr_response(self):
+                return {
+                    'SomeKey': {'State': 'PENDING'},
+                    'ResponseMetadata': {'HTTPStatusCode': 200}
+                }
+
+            def state_from_response(self, response):
+                return response['SomeKey']['State']
+
+        operator = EmrBaseSensorSubclass(
+            task_id='test_task',
+            poke_interval=2,
+            job_flow_id='j-8989898989',
+            aws_conn_id='aws_test'
+        )
+
+        self.assertEqual(operator.poke(None), False)
+
+    def test_poke_returns_false_when_http_response_is_bad(self):
+        class EmrBaseSensorSubclass(EmrBaseSensor):
+            NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
+            FAILED_STATE = 'FAILED'
+
+            def get_emr_response(self):
+                return {
+                    'SomeKey': {'State': 'COMPLETED'},
+                    'ResponseMetadata': {'HTTPStatusCode': 400}
+                }
+
+            def state_from_response(self, response):
+                return response['SomeKey']['State']
+
+        operator = EmrBaseSensorSubclass(
+            task_id='test_task',
+            poke_interval=2,
+            job_flow_id='j-8989898989',
+            aws_conn_id='aws_test'
+        )
+
+        self.assertEqual(operator.poke(None), False)
+
+
+    def test_poke_raises_error_when_job_has_failed(self):
+        class EmrBaseSensorSubclass(EmrBaseSensor):
+            NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
+            FAILED_STATE = 'FAILED'
+
+            def get_emr_response(self):
+                return {
+                    'SomeKey': {'State': 'FAILED'},
+                    'ResponseMetadata': {'HTTPStatusCode': 200}
+                }
+
+            def state_from_response(self, response):
+                return response['SomeKey']['State']
+
+        operator = EmrBaseSensorSubclass(
+            task_id='test_task',
+            poke_interval=2,
+            job_flow_id='j-8989898989',
+            aws_conn_id='aws_test'
+        )
+
+        with self.assertRaises(AirflowException) as context:
+
+            operator.poke(None)
+
+
+        self.assertIn('EMR job failed', str(context.exception))
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_emr_job_flow_sensor.py b/tests/contrib/sensors/test_emr_job_flow_sensor.py
new file mode 100644
index 0000000..f993786
--- /dev/null
+++ b/tests/contrib/sensors/test_emr_job_flow_sensor.py
@@ -0,0 +1,123 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import datetime
+from dateutil.tz import tzlocal
+from mock import MagicMock, patch
+
+from airflow import configuration
+from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
+
+DESCRIBE_CLUSTER_RUNNING_RETURN = {
+    'Cluster': {
+        'Applications': [
+            {'Name': 'Spark', 'Version': '1.6.1'}
+        ],
+        'AutoTerminate': True,
+        'Configurations': [],
+        'Ec2InstanceAttributes': {'IamInstanceProfile': 'EMR_EC2_DefaultRole'},
+        'Id': 'j-27ZY9GBEEU2GU',
+        'LogUri': 's3n://some-location/',
+        'Name': 'PiCalc',
+        'NormalizedInstanceHours': 0,
+        'ReleaseLabel': 'emr-4.6.0',
+        'ServiceRole': 'EMR_DefaultRole',
+        'Status': {
+            'State': 'STARTING',
+            'StateChangeReason': {},
+            'Timeline': {'CreationDateTime': datetime.datetime(2016, 6, 27, 21, 5, 2, 348000, tzinfo=tzlocal())}
+        },
+        'Tags': [
+            {'Key': 'app', 'Value': 'analytics'},
+            {'Key': 'environment', 'Value': 'development'}
+        ],
+        'TerminationProtected': False,
+        'VisibleToAllUsers': True
+    },
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+        'RequestId': 'd5456308-3caa-11e6-9d46-951401f04e0e'
+    }
+}
+
+DESCRIBE_CLUSTER_TERMINATED_RETURN = {
+    'Cluster': {
+        'Applications': [
+            {'Name': 'Spark', 'Version': '1.6.1'}
+        ],
+        'AutoTerminate': True,
+        'Configurations': [],
+        'Ec2InstanceAttributes': {'IamInstanceProfile': 'EMR_EC2_DefaultRole'},
+        'Id': 'j-27ZY9GBEEU2GU',
+        'LogUri': 's3n://some-location/',
+        'Name': 'PiCalc',
+        'NormalizedInstanceHours': 0,
+        'ReleaseLabel': 'emr-4.6.0',
+        'ServiceRole': 'EMR_DefaultRole',
+        'Status': {
+            'State': 'TERMINATED',
+            'StateChangeReason': {},
+            'Timeline': {'CreationDateTime': datetime.datetime(2016, 6, 27, 21, 5, 2, 348000, tzinfo=tzlocal())}
+        },
+        'Tags': [
+            {'Key': 'app', 'Value': 'analytics'},
+            {'Key': 'environment', 'Value': 'development'}
+        ],
+        'TerminationProtected': False,
+        'VisibleToAllUsers': True
+    },
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+        'RequestId': 'd5456308-3caa-11e6-9d46-951401f04e0e'
+    }
+}
+
+
+class TestEmrJobFlowSensor(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+
+        # Mock out the emr_client (moto has incorrect response)
+        self.mock_emr_client = MagicMock()
+        self.mock_emr_client.describe_cluster.side_effect = [
+            DESCRIBE_CLUSTER_RUNNING_RETURN,
+            DESCRIBE_CLUSTER_TERMINATED_RETURN
+        ]
+
+        # Mock out the emr_client creator
+        self.boto3_client_mock = MagicMock(return_value=self.mock_emr_client)
+
+
+    def test_execute_calls_with_the_job_flow_id_until_it_reaches_a_terminal_state(self):
+        with patch('boto3.client', self.boto3_client_mock):
+
+            operator = EmrJobFlowSensor(
+                task_id='test_task',
+                poke_interval=2,
+                job_flow_id='j-8989898989',
+                aws_conn_id='aws_default'
+            )
+
+            operator.execute(None)
+
+            # make sure we called twice
+            self.assertEqual(self.mock_emr_client.describe_cluster.call_count, 2)
+
+            # make sure it was called with the job_flow_id
+            self.mock_emr_client.describe_cluster.assert_called_with(ClusterId='j-8989898989')
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_emr_step_sensor.py b/tests/contrib/sensors/test_emr_step_sensor.py
new file mode 100644
index 0000000..58ee461
--- /dev/null
+++ b/tests/contrib/sensors/test_emr_step_sensor.py
@@ -0,0 +1,119 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import datetime
+from dateutil.tz import tzlocal
+from mock import MagicMock, patch
+import boto3
+
+from airflow import configuration
+from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
+
+DESCRIBE_JOB_STEP_RUNNING_RETURN = {
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+        'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6'
+    },
+    'Step': {
+        'ActionOnFailure': 'CONTINUE',
+        'Config': {
+            'Args': [
+                '/usr/lib/spark/bin/run-example',
+                'SparkPi',
+                '10'
+            ],
+            'Jar': 'command-runner.jar',
+            'Properties': {}
+        },
+        'Id': 's-VK57YR1Z9Z5N',
+        'Name': 'calculate_pi',
+        'Status': {
+            'State': 'RUNNING',
+            'StateChangeReason': {},
+            'Timeline': {
+                'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()),
+                'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal())
+            }
+        }
+    }
+}
+
+DESCRIBE_JOB_STEP_COMPLETED_RETURN = {
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+        'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6'
+    },
+    'Step': {
+        'ActionOnFailure': 'CONTINUE',
+        'Config': {
+            'Args': [
+                '/usr/lib/spark/bin/run-example',
+                'SparkPi',
+                '10'
+            ],
+            'Jar': 'command-runner.jar',
+            'Properties': {}
+        },
+        'Id': 's-VK57YR1Z9Z5N',
+        'Name': 'calculate_pi',
+        'Status': {
+            'State': 'COMPLETED',
+            'StateChangeReason': {},
+            'Timeline': {
+                'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()),
+                'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal())
+            }
+        }
+    }
+}
+
+
+class TestEmrStepSensor(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+
+        # Mock out the emr_client (moto has incorrect response)
+        self.mock_emr_client = MagicMock()
+        self.mock_emr_client.describe_step.side_effect = [
+            DESCRIBE_JOB_STEP_RUNNING_RETURN,
+            DESCRIBE_JOB_STEP_COMPLETED_RETURN
+        ]
+
+        # Mock out the emr_client creator
+        self.boto3_client_mock = MagicMock(return_value=self.mock_emr_client)
+
+
+    def test_execute_calls_with_the_job_flow_id_and_step_id_until_it_reaches_a_terminal_state(self):
+        with patch('boto3.client', self.boto3_client_mock):
+
+            operator = EmrStepSensor(
+                task_id='test_task',
+                poke_interval=1,
+                job_flow_id='j-8989898989',
+                step_id='s-VK57YR1Z9Z5N',
+                aws_conn_id='aws_default',
+            )
+
+            operator.execute(None)
+
+            # make sure we called twice
+            self.assertEqual(self.mock_emr_client.describe_step.call_count, 2)
+
+            # make sure it was called with the job_flow_id and step_id
+            self.mock_emr_client.describe_step.assert_called_with(ClusterId='j-8989898989', StepId='s-VK57YR1Z9Z5N')
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_ftp_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_ftp_sensor.py b/tests/contrib/sensors/test_ftp_sensor.py
new file mode 100644
index 0000000..50f8b8b
--- /dev/null
+++ b/tests/contrib/sensors/test_ftp_sensor.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from ftplib import error_perm
+
+from mock import MagicMock
+
+from airflow.contrib.hooks.ftp_hook import FTPHook
+from airflow.contrib.sensors.ftp_sensor import FTPSensor
+
+
+class TestFTPSensor(unittest.TestCase):
+    def setUp(self):
+        super(TestFTPSensor, self).setUp()
+        self._create_hook_orig = FTPSensor._create_hook
+        self.hook_mock = MagicMock(spec=FTPHook)
+
+        def _create_hook_mock(sensor):
+            mock = MagicMock()
+            mock.__enter__ = lambda x: self.hook_mock
+
+            return mock
+
+        FTPSensor._create_hook = _create_hook_mock
+
+    def tearDown(self):
+        FTPSensor._create_hook = self._create_hook_orig
+        super(TestFTPSensor, self).tearDown()
+
+    def test_poke(self):
+        op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp",
+                       task_id="test_task")
+
+        self.hook_mock.get_mod_time.side_effect = \
+            [error_perm("550: Can't check for file existence"), None]
+
+        self.assertFalse(op.poke(None))
+        self.assertTrue(op.poke(None))
+
+    def test_poke_fails_due_error(self):
+        op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp",
+                       task_id="test_task")
+
+        self.hook_mock.get_mod_time.side_effect = \
+            error_perm("530: Login authentication failed")
+
+        with self.assertRaises(error_perm) as context:
+            op.execute(None)
+
+        self.assertTrue("530" in str(context.exception))
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_hdfs_sensors.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_hdfs_sensors.py b/tests/contrib/sensors/test_hdfs_sensors.py
new file mode 100644
index 0000000..0e2ed0c
--- /dev/null
+++ b/tests/contrib/sensors/test_hdfs_sensors.py
@@ -0,0 +1,251 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import sys
+import unittest
+import re
+from datetime import timedelta
+from airflow.contrib.sensors.hdfs_sensors import HdfsSensorFolder, HdfsSensorRegex
+from airflow.exceptions import AirflowSensorTimeout
+
+
+class HdfsSensorFolderTests(unittest.TestCase):
+    def setUp(self):
+        if sys.version_info[0] == 3:
+            raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here')
+        from tests.core import FakeHDFSHook
+        self.hook = FakeHDFSHook
+        self.logger = logging.getLogger()
+        self.logger.setLevel(logging.DEBUG)
+
+    def test_should_be_empty_directory(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        task = HdfsSensorFolder(task_id='Should_be_empty_directory',
+                                filepath='/datadirectory/empty_directory',
+                                be_empty=True,
+                                timeout=1,
+                                retry_delay=timedelta(seconds=1),
+                                poke_interval=1,
+                                hook=self.hook)
+
+        # When
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_should_be_empty_directory_fail(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
+                                filepath='/datadirectory/not_empty_directory',
+                                be_empty=True,
+                                timeout=1,
+                                retry_delay=timedelta(seconds=1),
+                                poke_interval=1,
+                                hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+    def test_should_be_a_non_empty_directory(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        task = HdfsSensorFolder(task_id='Should_be_non_empty_directory',
+                                filepath='/datadirectory/not_empty_directory',
+                                timeout=1,
+                                retry_delay=timedelta(seconds=1),
+                                poke_interval=1,
+                                hook=self.hook)
+
+        # When
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_should_be_non_empty_directory_fail(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail',
+                                filepath='/datadirectory/empty_directory',
+                                timeout=1,
+                                retry_delay=timedelta(seconds=1),
+                                poke_interval=1,
+                                hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+
+class HdfsSensorRegexTests(unittest.TestCase):
+    def setUp(self):
+        if sys.version_info[0] == 3:
+            raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here')
+        from tests.core import FakeHDFSHook
+        self.hook = FakeHDFSHook
+        self.logger = logging.getLogger()
+        self.logger.setLevel(logging.DEBUG)
+
+    def test_should_match_regex(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        compiled_regex = re.compile("test[1-2]file")
+        task = HdfsSensorRegex(task_id='Should_match_the_regex',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_should_not_match_regex(self):
+        """
+        test the empty directory behaviour
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        compiled_regex = re.compile("^IDoNotExist")
+        task = HdfsSensorRegex(task_id='Should_not_match_the_regex',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+    def test_should_match_regex_and_filesize(self):
+        """
+        test the file size behaviour with regex
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        compiled_regex = re.compile("test[1-2]file")
+        task = HdfsSensorRegex(task_id='Should_match_the_regex_and_filesize',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               ignore_copying=True,
+                               ignored_ext=['_COPYING_', 'sftp'],
+                               file_size=10,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        task.execute(None)
+
+        # Then
+        # Nothing happens, nothing is raised exec is ok
+
+    def test_should_match_regex_but_filesize(self):
+        """
+        test the file size behaviour with regex
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        compiled_regex = re.compile("test[1-2]file")
+        task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               file_size=20,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)
+
+    def test_should_match_regex_but_copyingext(self):
+        """
+        test the file size behaviour with regex
+        :return:
+        """
+        # Given
+        self.logger.debug('#' * 10)
+        self.logger.debug('Running %s', self._testMethodName)
+        self.logger.debug('#' * 10)
+        compiled_regex = re.compile("copying_file_\d+.txt")
+        task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize',
+                               filepath='/datadirectory/regex_dir',
+                               regex=compiled_regex,
+                               ignored_ext=['_COPYING_', 'sftp'],
+                               file_size=20,
+                               timeout=1,
+                               retry_delay=timedelta(seconds=1),
+                               poke_interval=1,
+                               hook=self.hook)
+
+        # When
+        # Then
+        with self.assertRaises(AirflowSensorTimeout):
+            task.execute(None)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_jira_sensor_test.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_jira_sensor_test.py b/tests/contrib/sensors/test_jira_sensor_test.py
new file mode 100644
index 0000000..77ca97f
--- /dev/null
+++ b/tests/contrib/sensors/test_jira_sensor_test.py
@@ -0,0 +1,85 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import datetime
+from mock import Mock
+from mock import patch
+
+from airflow import DAG, configuration
+from airflow.contrib.sensors.jira_sensor import JiraTicketSensor
+from airflow import models
+from airflow.utils import db
+
+DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+jira_client_mock = Mock(
+        name="jira_client_for_test"
+)
+
+minimal_test_ticket = {
+    "id": "911539",
+    "self": "https://sandbox.localhost/jira/rest/api/2/issue/911539",
+    "key": "TEST-1226",
+    "fields": {
+        "labels": [
+            "test-label-1",
+            "test-label-2"
+        ],
+        "description": "this is a test description",
+    }
+}
+
+
+class TestJiraSensor(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
+        dag = DAG('test_dag_id', default_args=args)
+        self.dag = dag
+        db.merge_conn(
+                models.Connection(
+                        conn_id='jira_default', conn_type='jira',
+                        host='https://localhost/jira/', port=443,
+                        extra='{"verify": "False", "project": "AIRFLOW"}'))
+
+    @patch("airflow.contrib.hooks.jira_hook.JIRA",
+           autospec=True, return_value=jira_client_mock)
+    def test_issue_label_set(self, jira_mock):
+        jira_mock.return_value.issue.return_value = minimal_test_ticket
+
+        ticket_label_sensor = JiraTicketSensor(task_id='search-ticket-test',
+                                               ticket_id='TEST-1226',
+                                               field_checker_func=
+                                               TestJiraSensor.field_checker_func,
+                                               timeout=518400,
+                                               poke_interval=10,
+                                               dag=self.dag)
+
+        ticket_label_sensor.run(start_date=DEFAULT_DATE,
+                                end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+        self.assertTrue(jira_mock.called)
+        self.assertTrue(jira_mock.return_value.issue.called)
+
+    @staticmethod
+    def field_checker_func(context, issue):
+        return "test-label-1" in issue['fields']['labels']
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_redis_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/test_redis_sensor.py b/tests/contrib/sensors/test_redis_sensor.py
new file mode 100644
index 0000000..8022a92
--- /dev/null
+++ b/tests/contrib/sensors/test_redis_sensor.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+import datetime
+
+from mock import patch
+
+from airflow import DAG
+from airflow import configuration
+from airflow.contrib.sensors.redis_key_sensor import RedisKeySensor
+
+DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+
+
+class TestRedisSensor(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
+
+        self.dag = DAG('test_dag_id', default_args=args)
+        self.sensor = RedisKeySensor(
+            task_id='test_task',
+            redis_conn_id='redis_default',
+            dag=self.dag,
+            key='test_key'
+        )
+
+    @patch("airflow.contrib.hooks.redis_hook.RedisHook.key_exists")
+    def test_poke(self, key_exists):
+        key_exists.return_value = True
+        self.assertTrue(self.sensor.poke(None))
+
+        key_exists.return_value = False
+        self.assertFalse(self.sensor.poke(None))
+
+    @patch("airflow.contrib.hooks.redis_hook.StrictRedis.exists")
+    def test_existing_key_called(self, redis_client_exists):
+        self.sensor.run(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE, ignore_ti_state=True
+        )
+
+        self.assertTrue(redis_client_exists.called_with('test_key'))
+
+
+if __name__ == '__main__':
+    unittest.main()


Mime
View raw message