airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-247] Add EMR hook, operators and sensors. Add AWS base hook
Date Thu, 30 Jun 2016 22:50:35 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 096564848 -> 9f49f1285


[AIRFLOW-247] Add EMR hook, operators and sensors. Add AWS base hook

Closes #1630 from rfroetscher/emr


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9f49f128
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9f49f128
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9f49f128

Branch: refs/heads/master
Commit: 9f49f12853d83dd051f0f1ed58b5df20bfcfe087
Parents: 0965648
Author: Rob Froetscher <rfroetscher@lumoslabs.com>
Authored: Thu Jun 30 15:50:27 2016 -0700
Committer: Chris Riccomini <chrisr@wepay.com>
Committed: Thu Jun 30 15:50:27 2016 -0700

----------------------------------------------------------------------
 .../example_emr_job_flow_automatic_steps.py     |  72 +++++++++++
 .../example_emr_job_flow_manual_steps.py        |  92 ++++++++++++++
 airflow/contrib/hooks/aws_hook.py               |  35 ++++++
 airflow/contrib/hooks/emr_hook.py               |  62 +++++++++
 .../contrib/operators/emr_add_steps_operator.py |  60 +++++++++
 .../operators/emr_create_job_flow_operator.py   |  63 ++++++++++
 .../emr_terminate_job_flow_operator.py          |  55 ++++++++
 airflow/contrib/sensors/__init__.py             |  13 ++
 airflow/contrib/sensors/emr_base_sensor.py      |  53 ++++++++
 airflow/contrib/sensors/emr_job_flow_sensor.py  |  52 ++++++++
 airflow/contrib/sensors/emr_step_sensor.py      |  55 ++++++++
 airflow/utils/db.py                             |  51 ++++++++
 setup.py                                        |   4 +-
 tests/contrib/hooks/__init__.py                 |  13 ++
 tests/contrib/hooks/aws_hook.py                 |  47 +++++++
 tests/contrib/hooks/emr_hook.py                 |  53 ++++++++
 .../contrib/operators/emr_add_steps_operator.py |  53 ++++++++
 .../operators/emr_create_job_flow_operator.py   |  53 ++++++++
 .../emr_terminate_job_flow_operator.py          |  52 ++++++++
 tests/contrib/sensors/__init__.py               |  13 ++
 tests/contrib/sensors/emr_base_sensor.py        | 126 +++++++++++++++++++
 tests/contrib/sensors/emr_job_flow_sensor.py    | 123 ++++++++++++++++++
 tests/contrib/sensors/emr_step_sensor.py        | 119 ++++++++++++++++++
 23 files changed, 1318 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
new file mode 100644
index 0000000..18399c7
--- /dev/null
+++ b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import timedelta, datetime
+
+from airflow import DAG
+from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
+from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
+
+DEFAULT_ARGS = {
+    'owner': 'airflow',
+    'depends_on_past': False,
+    'start_date': datetime(2016, 3, 13),
+    'email': ['airflow@airflow.com'],
+    'email_on_failure': False,
+    'email_on_retry': False
+}
+
+SPARK_TEST_STEPS = [
+    {
+        'Name': 'calculate_pi',
+        'ActionOnFailure': 'CONTINUE',
+        'HadoopJarStep': {
+            'Jar': 'command-runner.jar',
+            'Args': [
+                '/usr/lib/spark/bin/run-example',
+                'SparkPi',
+                '10'
+            ]
+        }
+    }
+]
+
+JOB_FLOW_OVERRIDES = {
+    'Name': 'PiCalc',
+    'Steps': SPARK_TEST_STEPS
+}
+
+dag = DAG(
+    'emr_job_flow_automatic_steps_dag',
+    default_args=DEFAULT_ARGS,
+    dagrun_timeout=timedelta(hours=2),
+    schedule_interval='0 3 * * *'
+)
+
+job_flow_creator = EmrCreateJobFlowOperator(
+    task_id='create_job_flow',
+    job_flow_overrides=JOB_FLOW_OVERRIDES,
+    aws_conn_id='aws_default',
+    emr_conn_id='emr_default',
+    dag=dag
+)
+
+job_sensor = EmrJobFlowSensor(
+    task_id='check_job_flow',
+    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
+    aws_conn_id='aws_default',
+    dag=dag
+)
+
+job_flow_creator.set_downstream(job_sensor)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py b/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py
new file mode 100644
index 0000000..b498d50
--- /dev/null
+++ b/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import timedelta, datetime
+
+from airflow import DAG
+from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
+from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
+from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
+from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
+
+DEFAULT_ARGS = {
+    'owner': 'airflow',
+    'depends_on_past': False,
+    'start_date': datetime(2016, 3, 13),
+    'email': ['airflow@airflow.com'],
+    'email_on_failure': False,
+    'email_on_retry': False
+}
+
+SPARK_TEST_STEPS = [
+    {
+        'Name': 'calculate_pi',
+        'ActionOnFailure': 'CONTINUE',
+        'HadoopJarStep': {
+            'Jar': 'command-runner.jar',
+            'Args': [
+                '/usr/lib/spark/bin/run-example',
+                'SparkPi',
+                '10'
+            ]
+        }
+    }
+]
+
+JOB_FLOW_OVERRIDES = {
+    'Name': 'PiCalc',
+    'KeepJobFlowAliveWhenNoSteps': True
+}
+
+dag = DAG(
+    'emr_job_flow_manual_steps_dag',
+    default_args=DEFAULT_ARGS,
+    dagrun_timeout=timedelta(hours=2),
+    schedule_interval='0 3 * * *'
+)
+
+cluster_creator = EmrCreateJobFlowOperator(
+    task_id='create_job_flow',
+    job_flow_overrides=JOB_FLOW_OVERRIDES,
+    aws_conn_id='aws_default',
+    emr_conn_id='emr_default',
+    dag=dag
+)
+
+step_adder = EmrAddStepsOperator(
+    task_id='add_steps',
+    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
+    aws_conn_id='aws_default',
+    steps=SPARK_TEST_STEPS,
+    dag=dag
+)
+
+step_checker = EmrStepSensor(
+    task_id='watch_step',
+    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
+    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
+    aws_conn_id='aws_default',
+    dag=dag
+)
+
+cluster_remover = EmrTerminateJobFlowOperator(
+    task_id='remove_cluster',
+    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
+    aws_conn_id='aws_default',
+    dag=dag
+)
+
+cluster_creator.set_downstream(step_adder)
+step_adder.set_downstream(step_checker)
+step_checker.set_downstream(cluster_remover)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/airflow/contrib/hooks/aws_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/aws_hook.py b/airflow/contrib/hooks/aws_hook.py
new file mode 100644
index 0000000..37a02ee
--- /dev/null
+++ b/airflow/contrib/hooks/aws_hook.py
@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import boto3
+from airflow.hooks.base_hook import BaseHook
+
+
+class AwsHook(BaseHook):
+    """
+    Interact with AWS.
+
+    This class is a thin wrapper around the boto3 python library.
+    """
+    def __init__(self, aws_conn_id='aws_default'):
+        self.aws_conn_id = aws_conn_id
+
+    def get_client_type(self, client_type):
+        connection_object = self.get_connection(self.aws_conn_id)
+        return boto3.client(
+            client_type,
+            region_name=connection_object.extra_dejson.get('region_name'),
+            aws_access_key_id=connection_object.login,
+            aws_secret_access_key=connection_object.password,
+        )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/airflow/contrib/hooks/emr_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/emr_hook.py b/airflow/contrib/hooks/emr_hook.py
new file mode 100644
index 0000000..cee2398
--- /dev/null
+++ b/airflow/contrib/hooks/emr_hook.py
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.aws_hook import AwsHook
+
+
+class EmrHook(AwsHook):
+    """
+    Interact with AWS EMR. emr_conn_id is only neccessary for using the create_job_flow method.
+    """
+
+    def __init__(self, emr_conn_id=None, *args, **kwargs):
+        self.emr_conn_id = emr_conn_id
+        super(EmrHook, self).__init__(*args, **kwargs)
+
+    def get_conn(self):
+        self.conn = self.get_client_type('emr')
+        return self.conn
+
+    def create_job_flow(self, job_flow_overrides):
+        """
+        Creates a job flow using the config from the EMR connection.
+        Keys of the json extra hash may have the arguments of the boto3 run_job_flow method.
+        Overrides for this config may be passed as the job_flow_overrides.
+        """
+
+        if not self.emr_conn_id:
+            raise AirflowException('emr_conn_id must be present to use create_job_flow')
+
+        emr_conn = self.get_connection(self.emr_conn_id)
+
+        config = emr_conn.extra_dejson.copy()
+        config.update(job_flow_overrides)
+
+        response = self.get_conn().run_job_flow(
+            Name=config.get('Name'),
+            LogUri=config.get('LogUri'),
+            ReleaseLabel=config.get('ReleaseLabel'),
+            Instances=config.get('Instances'),
+            Steps=config.get('Steps', []),
+            BootstrapActions=config.get('BootstrapActions', []),
+            Applications=config.get('Applications'),
+            Configurations=config.get('Configurations', []),
+            VisibleToAllUsers=config.get('VisibleToAllUsers'),
+            JobFlowRole=config.get('JobFlowRole'),
+            ServiceRole=config.get('ServiceRole'),
+            Tags=config.get('Tags')
+        )
+
+        return response

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/airflow/contrib/operators/emr_add_steps_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py
new file mode 100644
index 0000000..342df5b
--- /dev/null
+++ b/airflow/contrib/operators/emr_add_steps_operator.py
@@ -0,0 +1,60 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.models import BaseOperator
+from airflow.utils import apply_defaults
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.emr_hook import EmrHook
+
+
+class EmrAddStepsOperator(BaseOperator):
+    """
+    An operator that adds steps to an existing EMR job_flow.
+
+    :param job_flow_id: id of the JobFlow to add steps to
+    :type job_flow_name: str
+    :param aws_conn_id: aws connection to uses
+    :type aws_conn_id: str
+    :param steps: boto3 style steps to be added to the jobflow
+    :type steps: list
+    """
+    template_fields = ['job_flow_id']
+    template_ext = ()
+    ui_color = '#f9c915'
+
+    @apply_defaults
+    def __init__(
+            self,
+            job_flow_id,
+            aws_conn_id='s3_default',
+            steps=[],
+            *args, **kwargs):
+        super(EmrAddStepsOperator, self).__init__(*args, **kwargs)
+        self.job_flow_id = job_flow_id
+        self.aws_conn_id = aws_conn_id
+        self.steps = steps
+
+    def execute(self, context):
+        emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
+
+        logging.info('Adding steps to %s', self.job_flow_id)
+        response = emr.add_job_flow_steps(JobFlowId=self.job_flow_id, Steps=self.steps)
+
+        if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
+            raise AirflowException('Adding steps failed: %s' % response)
+        else:
+            logging.info('Steps %s added to JobFlow', response['StepIds'])
+            return response['StepIds']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/airflow/contrib/operators/emr_create_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py b/airflow/contrib/operators/emr_create_job_flow_operator.py
new file mode 100644
index 0000000..37d885d
--- /dev/null
+++ b/airflow/contrib/operators/emr_create_job_flow_operator.py
@@ -0,0 +1,63 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.emr_hook import EmrHook
+from airflow.models import BaseOperator
+from airflow.utils import apply_defaults
+from airflow.exceptions import AirflowException
+
+
+class EmrCreateJobFlowOperator(BaseOperator):
+    """
+    Creates an EMR JobFlow, reading the config from the EMR connection.
+    A dictionary of JobFlow overrides can be passed that override the config from the connection.
+
+    :param aws_conn_id: aws connection to uses
+    :type aws_conn_id: str
+    :param emr_conn_id: emr connection to use
+    :type emr_conn_id: str
+    :param job_flow_overrides: boto3 style arguments to override emr_connection extra
+    :type steps: dict
+    """
+    template_fields = []
+    template_ext = ()
+    ui_color = '#f9c915'
+
+    @apply_defaults
+    def __init__(
+            self,
+            aws_conn_id='s3_default',
+            emr_conn_id='emr_default',
+            job_flow_overrides=None,
+            *args, **kwargs):
+        super(EmrCreateJobFlowOperator, self).__init__(*args, **kwargs)
+        self.aws_conn_id = aws_conn_id
+        self.emr_conn_id = emr_conn_id
+        if job_flow_overrides is None:
+            job_flow_overrides = {}
+        self.job_flow_overrides = job_flow_overrides
+
+    def execute(self, context):
+        emr = EmrHook(aws_conn_id=self.aws_conn_id, emr_conn_id=self.emr_conn_id)
+
+        logging.info('Creating JobFlow')
+        response = emr.create_job_flow(self.job_flow_overrides)
+
+        if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
+            raise AirflowException('JobFlow creation failed: %s' % response)
+        else:
+            logging.info('JobFlow with id %s created', response['JobFlowId'])
+            return response['JobFlowId']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/airflow/contrib/operators/emr_terminate_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_terminate_job_flow_operator.py b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
new file mode 100644
index 0000000..1b57276
--- /dev/null
+++ b/airflow/contrib/operators/emr_terminate_job_flow_operator.py
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.models import BaseOperator
+from airflow.utils import apply_defaults
+from airflow.exceptions import AirflowException
+from airflow.contrib.hooks.emr_hook import EmrHook
+
+
+class EmrTerminateJobFlowOperator(BaseOperator):
+    """
+    Operator to terminate EMR JobFlows.
+
+    :param job_flow_id: id of the JobFlow to terminate
+    :type job_flow_name: str
+    :param aws_conn_id: aws connection to uses
+    :type aws_conn_id: str
+    """
+    template_fields = ['job_flow_id']
+    template_ext = ()
+    ui_color = '#f9c915'
+
+    @apply_defaults
+    def __init__(
+            self,
+            job_flow_id,
+            aws_conn_id='s3_default',
+            *args, **kwargs):
+        super(EmrTerminateJobFlowOperator, self).__init__(*args, **kwargs)
+        self.job_flow_id = job_flow_id
+        self.aws_conn_id = aws_conn_id
+
+    def execute(self, context):
+        emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
+
+        logging.info('Terminating JobFlow %s', self.job_flow_id)
+        response = emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])
+
+        if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
+            raise AirflowException('JobFlow termination failed: %s' % response)
+        else:
+            logging.info('JobFlow with id %s terminated', self.job_flow_id)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/airflow/contrib/sensors/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/__init__.py b/airflow/contrib/sensors/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/airflow/contrib/sensors/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/airflow/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_base_sensor.py b/airflow/contrib/sensors/emr_base_sensor.py
new file mode 100644
index 0000000..c5dd1ca
--- /dev/null
+++ b/airflow/contrib/sensors/emr_base_sensor.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.operators.sensors import BaseSensorOperator
+from airflow.utils import apply_defaults
+from airflow.exceptions import AirflowException
+
+
+class EmrBaseSensor(BaseSensorOperator):
+    """
+    Contains general sensor behavior for EMR.
+    Subclasses should implement get_emr_response() and state_from_response() methods.
+    Subclasses should also implment NON_TERMINAL_STATES and FAILED_STATE constants.
+    """
+
+    @apply_defaults
+    def __init__(
+            self,
+            aws_conn_id='aws_default',
+            *args, **kwargs):
+        super(EmrBaseSensor, self).__init__(*args, **kwargs)
+        self.aws_conn_id = aws_conn_id
+
+    def poke(self, context):
+        response = self.get_emr_response()
+
+        if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
+            logging.info('Bad HTTP response: %s' % response)
+            return False
+
+        state = self.state_from_response(response)
+        logging.info('Job flow currently %s' % state)
+
+        if state in self.NON_TERMINAL_STATES:
+            return False
+
+        if state == self.FAILED_STATE:
+            raise AirflowException('EMR job failed')
+
+        return True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/airflow/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_job_flow_sensor.py b/airflow/contrib/sensors/emr_job_flow_sensor.py
new file mode 100644
index 0000000..662b3b8
--- /dev/null
+++ b/airflow/contrib/sensors/emr_job_flow_sensor.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+
+from airflow.contrib.hooks.emr_hook import EmrHook
+from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
+from airflow.utils import apply_defaults
+
+
+class EmrJobFlowSensor(EmrBaseSensor):
+    """
+    Asks for the state of the JobFlow until it reaches a terminal state.
+    If it fails the sensor errors, failing the task.
+
+    :param job_flow_id: job_flow_id to check the state of
+    :type job_flow_id: string
+    """
+
+    NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING', 'TERMINATING']
+    FAILED_STATE = 'TERMINATED_WITH_ERRORS'
+    template_fields = ['job_flow_id']
+    template_ext = ()
+
+    @apply_defaults
+    def __init__(
+            self,
+            job_flow_id,
+            *args, **kwargs):
+        super(EmrJobFlowSensor, self).__init__(*args, **kwargs)
+        self.job_flow_id = job_flow_id
+
+    def get_emr_response(self):
+        emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
+
+        logging.info('Poking cluster %s' % self.job_flow_id)
+        return emr.describe_cluster(ClusterId=self.job_flow_id)
+
+    def state_from_response(self, response):
+        return response['Cluster']['Status']['State']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/airflow/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/sensors/emr_step_sensor.py b/airflow/contrib/sensors/emr_step_sensor.py
new file mode 100644
index 0000000..4cc6bc4
--- /dev/null
+++ b/airflow/contrib/sensors/emr_step_sensor.py
@@ -0,0 +1,55 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.emr_hook import EmrHook
+from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
+from airflow.utils import apply_defaults
+
+
+class EmrStepSensor(EmrBaseSensor):
+    """
+    Asks for the state of the step until it reaches a terminal state.
+    If it fails the sensor errors, failing the task.
+
+    :param job_flow_id: job_flow_idwhich contains the step check the state of
+    :type job_flow_id: string
+    :param step_id: step to check the state of
+    :type step_id: string
+    """
+
+    NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
+    FAILED_STATE = 'FAILED'
+    template_fields = ['job_flow_id', 'step_id']
+    template_ext = ()
+
+    @apply_defaults
+    def __init__(
+            self,
+            job_flow_id,
+            step_id,
+            *args, **kwargs):
+        super(EmrStepSensor, self).__init__(*args, **kwargs)
+        self.job_flow_id = job_flow_id
+        self.step_id = step_id
+
+    def get_emr_response(self):
+        emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
+
+        logging.info('Poking step {0} on cluster {1}'.format(self.step_id, self.job_flow_id))
+        return emr.describe_step(ClusterId=self.job_flow_id, StepId=self.step_id)
+
+    def state_from_response(self, response):
+        return response['Step']['Status']['State']

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index e165e2e..849cb39 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -186,6 +186,57 @@ def initdb():
         models.Connection(
             conn_id='fs_default', conn_type='fs',
             extra='{"path": "/"}'))
+    merge_conn(
+        models.Connection(
+            conn_id='aws_default', conn_type='aws',
+            extra='{"region_name": "us-east-1"}'))
+    merge_conn(
+        models.Connection(
+            conn_id='emr_default', conn_type='emr',
+            extra='''
+                {   "Name": "default_job_flow_name",
+                    "LogUri": "s3://my-emr-log-bucket/default_job_flow_location",
+                    "ReleaseLabel": "emr-4.6.0",
+                    "Instances": {
+                        "InstanceGroups": [
+                            {
+                                "Name": "Master nodes",
+                                "Market": "ON_DEMAND",
+                                "InstanceRole": "MASTER",
+                                "InstanceType": "r3.2xlarge",
+                                "InstanceCount": 1
+                            },
+                            {
+                                "Name": "Slave nodes",
+                                "Market": "ON_DEMAND",
+                                "InstanceRole": "CORE",
+                                "InstanceType": "r3.2xlarge",
+                                "InstanceCount": 1
+                            }
+                        ]
+                    },
+                    "Ec2KeyName": "mykey",
+                    "KeepJobFlowAliveWhenNoSteps": false,
+                    "TerminationProtected": false,
+                    "Ec2SubnetId": "somesubnet",
+                    "Applications":[
+                        { "Name": "Spark" }
+                    ],
+                    "VisibleToAllUsers": true,
+                    "JobFlowRole": "EMR_EC2_DefaultRole",
+                    "ServiceRole": "EMR_DefaultRole",
+                    "Tags": [
+                        {
+                            "Key": "app",
+                            "Value": "analytics"
+                        },
+                        {
+                            "Key": "environment",
+                            "Value": "development"
+                        }
+                    ]
+                }
+            '''))
 
     # Known event types
     KET = models.KnownEventType

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index e64ed36..cf7f48e 100644
--- a/setup.py
+++ b/setup.py
@@ -115,6 +115,7 @@ doc = [
 ]
 docker = ['docker-py>=1.6.0']
 druid = ['pydruid>=0.2.1']
+emr = ['boto3>=1.0.0']
 gcp_api = [
     'httplib2',
     'google-api-python-client>=1.5.0, <1.6.0',
@@ -156,7 +157,7 @@ qds = ['qds-sdk>=1.9.0']
 cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x
 
 all_dbs = postgres + mysql + hive + mssql + hdfs + vertica + cloudant
-devel = ['lxml>=3.3.4', 'nose', 'nose-parameterized', 'mock', 'click', 'jira']
+devel = ['lxml>=3.3.4', 'nose', 'nose-parameterized', 'mock', 'click', 'jira', 'moto']
 devel_minreq = devel + mysql + doc + password + s3
 devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
 devel_all = devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker
@@ -228,6 +229,7 @@ def do_setup():
             'qds': qds,
             'rabbitmq': rabbitmq,
             's3': s3,
+            'emr': emr,
             'samba': samba,
             'slack': slack,
             'statsd': statsd,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/tests/contrib/hooks/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/__init__.py b/tests/contrib/hooks/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/contrib/hooks/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/tests/contrib/hooks/aws_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/aws_hook.py b/tests/contrib/hooks/aws_hook.py
new file mode 100644
index 0000000..2a40127
--- /dev/null
+++ b/tests/contrib/hooks/aws_hook.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import boto3
+
+from airflow import configuration
+from airflow.contrib.hooks.aws_hook import AwsHook
+
+
+try:
+    from moto import mock_emr
+except ImportError:
+    mock_emr = None
+
+
+class TestAwsHook(unittest.TestCase):
+    @mock_emr
+    def setUp(self):
+        configuration.test_mode()
+
+    @unittest.skipIf(mock_emr is None, 'mock_emr package not present')
+    @mock_emr
+    def test_get_client_type_returns_a_boto3_client_of_the_requested_type(self):
+        client = boto3.client('emr', region_name='us-east-1')
+        if len(client.list_clusters()['Clusters']):
+            raise ValueError('AWS not properly mocked')
+
+        hook = AwsHook(aws_conn_id='aws_default')
+        client_from_hook = hook.get_client_type('emr')
+
+        self.assertEqual(client_from_hook.list_clusters()['Clusters'], [])
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/tests/contrib/hooks/emr_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/emr_hook.py b/tests/contrib/hooks/emr_hook.py
new file mode 100644
index 0000000..fbe91e8
--- /dev/null
+++ b/tests/contrib/hooks/emr_hook.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+import boto3
+
+from airflow import configuration
+from airflow.contrib.hooks.emr_hook import EmrHook
+
+
+try:
+    from moto import mock_emr
+except ImportError:
+    mock_emr = None
+
+
+class TestEmrHook(unittest.TestCase):
+    @mock_emr
+    def setUp(self):
+        configuration.test_mode()
+
+    @unittest.skipIf(mock_emr is None, 'mock_emr package not present')
+    @mock_emr
+    def test_get_conn_returns_a_boto3_connection(self):
+        hook = EmrHook(aws_conn_id='aws_default')
+        self.assertIsNotNone(hook.get_conn().list_clusters())
+
+    @unittest.skipIf(mock_emr is None, 'mock_emr package not present')
+    @mock_emr
+    def test_create_job_flow_uses_the_emr_config_to_create_a_cluster(self):
+        client = boto3.client('emr', region_name='us-east-1')
+        if len(client.list_clusters()['Clusters']):
+            raise ValueError('AWS not properly mocked')
+
+        hook = EmrHook(aws_conn_id='aws_default', emr_conn_id='emr_default')
+        cluster = hook.create_job_flow({'Name': 'test_cluster'})
+
+        self.assertEqual(client.list_clusters()['Clusters'][0]['Id'], cluster['JobFlowId'])
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/tests/contrib/operators/emr_add_steps_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/emr_add_steps_operator.py b/tests/contrib/operators/emr_add_steps_operator.py
new file mode 100644
index 0000000..4335e5e
--- /dev/null
+++ b/tests/contrib/operators/emr_add_steps_operator.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import MagicMock, patch
+
+from airflow import configuration
+from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
+
+ADD_STEPS_SUCCESS_RETURN = {
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200
+    },
+    'StepIds': ['s-2LH3R5GW3A53T']
+}
+
+
+class TestEmrAddStepsOperator(unittest.TestCase):
+    def setUp(self):
+        configuration.test_mode()
+
+        # Mock out the emr_client (moto has incorrect response)
+        mock_emr_client = MagicMock()
+        mock_emr_client.add_job_flow_steps.return_value = ADD_STEPS_SUCCESS_RETURN
+
+        # Mock out the emr_client creator
+        self.boto3_client_mock = MagicMock(return_value=mock_emr_client)
+
+
+    def test_execute_adds_steps_to_the_job_flow_and_returns_step_ids(self):
+        with patch('boto3.client', self.boto3_client_mock):
+
+            operator = EmrAddStepsOperator(
+                task_id='test_task',
+                job_flow_id='j-8989898989',
+                aws_conn_id='aws_default'
+            )
+
+            self.assertEqual(operator.execute(None), ['s-2LH3R5GW3A53T'])
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/tests/contrib/operators/emr_create_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/emr_create_job_flow_operator.py b/tests/contrib/operators/emr_create_job_flow_operator.py
new file mode 100644
index 0000000..ac32125
--- /dev/null
+++ b/tests/contrib/operators/emr_create_job_flow_operator.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+from mock import MagicMock, patch
+
+from airflow import configuration
+from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
+
+RUN_JOB_FLOW_SUCCESS_RETURN = {
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200
+    },
+    'JobFlowId': 'j-8989898989'
+}
+
+class TestEmrCreateJobFlowOperator(unittest.TestCase):
+    def setUp(self):
+        configuration.test_mode()
+
+        # Mock out the emr_client (moto has incorrect response)
+        mock_emr_client = MagicMock()
+        mock_emr_client.run_job_flow.return_value = RUN_JOB_FLOW_SUCCESS_RETURN
+
+        # Mock out the emr_client creator
+        self.boto3_client_mock = MagicMock(return_value=mock_emr_client)
+
+
+    def test_execute_uses_the_emr_config_to_create_a_cluster_and_returns_job_id(self):
+        with patch('boto3.client', self.boto3_client_mock):
+
+            operator = EmrCreateJobFlowOperator(
+                task_id='test_task',
+                aws_conn_id='aws_default',
+                emr_conn_id='emr_default'
+            )
+
+            self.assertEqual(operator.execute(None), 'j-8989898989')
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/tests/contrib/operators/emr_terminate_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/emr_terminate_job_flow_operator.py b/tests/contrib/operators/emr_terminate_job_flow_operator.py
new file mode 100644
index 0000000..38f20e2
--- /dev/null
+++ b/tests/contrib/operators/emr_terminate_job_flow_operator.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from mock import MagicMock, patch
+
+from airflow import configuration
+from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
+
+TERMINATE_SUCCESS_RETURN = {
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200
+    }
+}
+
+
+class TestEmrTerminateJobFlowOperator(unittest.TestCase):
+    def setUp(self):
+        configuration.test_mode()
+
+        # Mock out the emr_client (moto has incorrect response)
+        mock_emr_client = MagicMock()
+        mock_emr_client.terminate_job_flows.return_value = TERMINATE_SUCCESS_RETURN
+
+        # Mock out the emr_client creator
+        self.boto3_client_mock = MagicMock(return_value=mock_emr_client)
+
+
+    def test_execute_terminates_the_job_flow_and_does_not_error(self):
+        with patch('boto3.client', self.boto3_client_mock):
+
+            operator = EmrTerminateJobFlowOperator(
+                task_id='test_task',
+                job_flow_id='j-8989898989',
+                aws_conn_id='aws_default'
+            )
+
+            operator.execute(None)
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/tests/contrib/sensors/__init__.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/__init__.py b/tests/contrib/sensors/__init__.py
new file mode 100644
index 0000000..9d7677a
--- /dev/null
+++ b/tests/contrib/sensors/__init__.py
@@ -0,0 +1,13 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/tests/contrib/sensors/emr_base_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/emr_base_sensor.py b/tests/contrib/sensors/emr_base_sensor.py
new file mode 100644
index 0000000..ca86a23
--- /dev/null
+++ b/tests/contrib/sensors/emr_base_sensor.py
@@ -0,0 +1,126 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+
+from airflow import configuration
+from airflow.exceptions import AirflowException
+from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
+
+
+class TestEmrBaseSensor(unittest.TestCase):
+    def setUp(self):
+        configuration.test_mode()
+
+    def test_subclasses_that_implment_required_methods_and_constants_succeed_when_response_is_good(self):
+        class EmrBaseSensorSubclass(EmrBaseSensor):
+            NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
+            FAILED_STATE = 'FAILED'
+
+            def get_emr_response(self):
+                return {
+                    'SomeKey': {'State': 'COMPLETED'},
+                    'ResponseMetadata': {'HTTPStatusCode': 200}
+                }
+
+            def state_from_response(self, response):
+                return response['SomeKey']['State']
+
+        operator = EmrBaseSensorSubclass(
+            task_id='test_task',
+            poke_interval=2,
+            job_flow_id='j-8989898989',
+            aws_conn_id='aws_test'
+        )
+
+        operator.execute(None)
+
+    def test_poke_returns_false_when_state_is_a_non_terminal_state(self):
+        class EmrBaseSensorSubclass(EmrBaseSensor):
+            NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
+            FAILED_STATE = 'FAILED'
+
+            def get_emr_response(self):
+                return {
+                    'SomeKey': {'State': 'PENDING'},
+                    'ResponseMetadata': {'HTTPStatusCode': 200}
+                }
+
+            def state_from_response(self, response):
+                return response['SomeKey']['State']
+
+        operator = EmrBaseSensorSubclass(
+            task_id='test_task',
+            poke_interval=2,
+            job_flow_id='j-8989898989',
+            aws_conn_id='aws_test'
+        )
+
+        self.assertEqual(operator.poke(None), False)
+
+    def test_poke_returns_false_when_http_response_is_bad(self):
+        class EmrBaseSensorSubclass(EmrBaseSensor):
+            NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
+            FAILED_STATE = 'FAILED'
+
+            def get_emr_response(self):
+                return {
+                    'SomeKey': {'State': 'COMPLETED'},
+                    'ResponseMetadata': {'HTTPStatusCode': 400}
+                }
+
+            def state_from_response(self, response):
+                return response['SomeKey']['State']
+
+        operator = EmrBaseSensorSubclass(
+            task_id='test_task',
+            poke_interval=2,
+            job_flow_id='j-8989898989',
+            aws_conn_id='aws_test'
+        )
+
+        self.assertEqual(operator.poke(None), False)
+
+
+    def test_poke_raises_error_when_job_has_failed(self):
+        class EmrBaseSensorSubclass(EmrBaseSensor):
+            NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE']
+            FAILED_STATE = 'FAILED'
+
+            def get_emr_response(self):
+                return {
+                    'SomeKey': {'State': 'FAILED'},
+                    'ResponseMetadata': {'HTTPStatusCode': 200}
+                }
+
+            def state_from_response(self, response):
+                return response['SomeKey']['State']
+
+        operator = EmrBaseSensorSubclass(
+            task_id='test_task',
+            poke_interval=2,
+            job_flow_id='j-8989898989',
+            aws_conn_id='aws_test'
+        )
+
+        with self.assertRaises(AirflowException) as context:
+
+            operator.poke(None)
+
+
+        self.assertTrue('EMR job failed' in context.exception)
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/tests/contrib/sensors/emr_job_flow_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/emr_job_flow_sensor.py b/tests/contrib/sensors/emr_job_flow_sensor.py
new file mode 100644
index 0000000..068f554
--- /dev/null
+++ b/tests/contrib/sensors/emr_job_flow_sensor.py
@@ -0,0 +1,123 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import datetime
+from dateutil.tz import tzlocal
+from mock import MagicMock, patch
+
+from airflow import configuration
+from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
+
+DESCRIBE_CLUSTER_RUNNING_RETURN = {
+    'Cluster': {
+        'Applications': [
+            {'Name': 'Spark', 'Version': '1.6.1'}
+        ],
+        'AutoTerminate': True,
+        'Configurations': [],
+        'Ec2InstanceAttributes': {'IamInstanceProfile': 'EMR_EC2_DefaultRole'},
+        'Id': 'j-27ZY9GBEEU2GU',
+        'LogUri': 's3n://some-location/',
+        'Name': 'PiCalc',
+        'NormalizedInstanceHours': 0,
+        'ReleaseLabel': 'emr-4.6.0',
+        'ServiceRole': 'EMR_DefaultRole',
+        'Status': {
+            'State': 'STARTING',
+            'StateChangeReason': {},
+            'Timeline': {'CreationDateTime': datetime.datetime(2016, 6, 27, 21, 5, 2, 348000, tzinfo=tzlocal())}
+        },
+        'Tags': [
+            {'Key': 'app', 'Value': 'analytics'},
+            {'Key': 'environment', 'Value': 'development'}
+        ],
+        'TerminationProtected': False,
+        'VisibleToAllUsers': True
+    },
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+        'RequestId': 'd5456308-3caa-11e6-9d46-951401f04e0e'
+    }
+}
+
+DESCRIBE_CLUSTER_TERMINATED_RETURN = {
+    'Cluster': {
+        'Applications': [
+            {'Name': 'Spark', 'Version': '1.6.1'}
+        ],
+        'AutoTerminate': True,
+        'Configurations': [],
+        'Ec2InstanceAttributes': {'IamInstanceProfile': 'EMR_EC2_DefaultRole'},
+        'Id': 'j-27ZY9GBEEU2GU',
+        'LogUri': 's3n://some-location/',
+        'Name': 'PiCalc',
+        'NormalizedInstanceHours': 0,
+        'ReleaseLabel': 'emr-4.6.0',
+        'ServiceRole': 'EMR_DefaultRole',
+        'Status': {
+            'State': 'TERMINATED',
+            'StateChangeReason': {},
+            'Timeline': {'CreationDateTime': datetime.datetime(2016, 6, 27, 21, 5, 2, 348000, tzinfo=tzlocal())}
+        },
+        'Tags': [
+            {'Key': 'app', 'Value': 'analytics'},
+            {'Key': 'environment', 'Value': 'development'}
+        ],
+        'TerminationProtected': False,
+        'VisibleToAllUsers': True
+    },
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+        'RequestId': 'd5456308-3caa-11e6-9d46-951401f04e0e'
+    }
+}
+
+
+class TestEmrJobFlowSensor(unittest.TestCase):
+    def setUp(self):
+        configuration.test_mode()
+
+        # Mock out the emr_client (moto has incorrect response)
+        self.mock_emr_client = MagicMock()
+        self.mock_emr_client.describe_cluster.side_effect = [
+            DESCRIBE_CLUSTER_RUNNING_RETURN,
+            DESCRIBE_CLUSTER_TERMINATED_RETURN
+        ]
+
+        # Mock out the emr_client creator
+        self.boto3_client_mock = MagicMock(return_value=self.mock_emr_client)
+
+
+    def test_execute_calls_with_the_job_flow_id_until_it_reaches_a_terminal_state(self):
+        with patch('boto3.client', self.boto3_client_mock):
+
+            operator = EmrJobFlowSensor(
+                task_id='test_task',
+                poke_interval=2,
+                job_flow_id='j-8989898989',
+                aws_conn_id='aws_default'
+            )
+
+            operator.execute(None)
+
+            # make sure we called twice
+            self.assertEqual(self.mock_emr_client.describe_cluster.call_count, 2)
+
+            # make sure it was called with the job_flow_id
+            self.mock_emr_client.describe_cluster.assert_called_with(ClusterId='j-8989898989')
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f49f128/tests/contrib/sensors/emr_step_sensor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/sensors/emr_step_sensor.py b/tests/contrib/sensors/emr_step_sensor.py
new file mode 100644
index 0000000..4b0a059
--- /dev/null
+++ b/tests/contrib/sensors/emr_step_sensor.py
@@ -0,0 +1,119 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import datetime
+from dateutil.tz import tzlocal
+from mock import MagicMock, patch
+import boto3
+
+from airflow import configuration
+from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
+
+DESCRIBE_JOB_STEP_RUNNING_RETURN = {
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+        'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6'
+    },
+    'Step': {
+        'ActionOnFailure': 'CONTINUE',
+        'Config': {
+            'Args': [
+                '/usr/lib/spark/bin/run-example',
+                'SparkPi',
+                '10'
+            ],
+            'Jar': 'command-runner.jar',
+            'Properties': {}
+        },
+        'Id': 's-VK57YR1Z9Z5N',
+        'Name': 'calculate_pi',
+        'Status': {
+            'State': 'RUNNING',
+            'StateChangeReason': {},
+            'Timeline': {
+                'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()),
+                'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal())
+            }
+        }
+    }
+}
+
+DESCRIBE_JOB_STEP_COMPLETED_RETURN = {
+    'ResponseMetadata': {
+        'HTTPStatusCode': 200,
+        'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6'
+    },
+    'Step': {
+        'ActionOnFailure': 'CONTINUE',
+        'Config': {
+            'Args': [
+                '/usr/lib/spark/bin/run-example',
+                'SparkPi',
+                '10'
+            ],
+            'Jar': 'command-runner.jar',
+            'Properties': {}
+        },
+        'Id': 's-VK57YR1Z9Z5N',
+        'Name': 'calculate_pi',
+        'Status': {
+            'State': 'COMPLETED',
+            'StateChangeReason': {},
+            'Timeline': {
+                'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()),
+                'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal())
+            }
+        }
+    }
+}
+
+
+class TestEmrStepSensor(unittest.TestCase):
+    def setUp(self):
+        configuration.test_mode()
+
+        # Mock out the emr_client (moto has incorrect response)
+        self.mock_emr_client = MagicMock()
+        self.mock_emr_client.describe_step.side_effect = [
+            DESCRIBE_JOB_STEP_RUNNING_RETURN,
+            DESCRIBE_JOB_STEP_COMPLETED_RETURN
+        ]
+
+        # Mock out the emr_client creator
+        self.boto3_client_mock = MagicMock(return_value=self.mock_emr_client)
+
+
+    def test_execute_calls_with_the_job_flow_id_and_step_id_until_it_reaches_a_terminal_state(self):
+        with patch('boto3.client', self.boto3_client_mock):
+
+            operator = EmrStepSensor(
+                task_id='test_task',
+                poke_interval=1,
+                job_flow_id='j-8989898989',
+                step_id='s-VK57YR1Z9Z5N',
+                aws_conn_id='aws_default',
+            )
+
+            operator.execute(None)
+
+            # make sure we called twice
+            self.assertEqual(self.mock_emr_client.describe_step.call_count, 2)
+
+            # make sure it was called with the job_flow_id and step_id
+            self.mock_emr_client.describe_step.assert_called_with(ClusterId='j-8989898989', StepId='s-VK57YR1Z9Z5N')
+
+
+if __name__ == '__main__':
+    unittest.main()


Mime
View raw message