From commits-return-14599-archive-asf-public=cust-asf.ponee.io@airflow.incubator.apache.org Fri May 4 09:02:07 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 039D21807A3 for ; Fri, 4 May 2018 09:02:05 +0200 (CEST) Received: (qmail 44305 invoked by uid 500); 4 May 2018 07:02:05 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 44170 invoked by uid 99); 4 May 2018 07:02:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 May 2018 07:02:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 7806F180778 for ; Fri, 4 May 2018 07:02:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -11.711 X-Spam-Level: X-Spam-Status: No, score=-11.711 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id mheiK-f0FUxr for ; Fri, 4 May 2018 07:02:00 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id D730E5FB55 for ; Fri, 4 May 2018 07:01:59 +0000 (UTC) Received: (qmail 40947 invoked by uid 99); 4 May 2018 07:01:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 May 2018 07:01:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3BBDDF6968; Fri, 4 May 2018 07:01:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fokko@apache.org To: commits@airflow.incubator.apache.org Date: Fri, 04 May 2018 07:02:08 -0000 Message-Id: <60967e5dbdf6405d8e621a66e03acbcf@git.apache.org> In-Reply-To: <08f2d044bfa9455681d16227ced35728@git.apache.org> References: <08f2d044bfa9455681d16227ced35728@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/50] incubator-airflow git commit: [AIRFLOW-1575] Add AWS Kinesis Firehose Hook for inserting batch records [AIRFLOW-1575] Add AWS Kinesis Firehose Hook for inserting batch records Closes #3275 from sid88in/feature/kinesis_hookv2 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2d588e94 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2d588e94 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2d588e94 Branch: refs/heads/v1-10-test Commit: 2d588e9433cd9a1a1381cf939f579f7d7e53330f Parents: e691acc Author: sid.gupta Authored: Sat Apr 28 23:11:36 2018 -0700 Committer: r39132 Committed: Sat Apr 28 23:11:36 2018 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/aws_firehose_hook.py | 52 ++++++++++++++++ tests/contrib/hooks/test_aws_firehose_hook.py | 70 ++++++++++++++++++++++ 2 files changed, 122 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d588e94/airflow/contrib/hooks/aws_firehose_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/aws_firehose_hook.py b/airflow/contrib/hooks/aws_firehose_hook.py new file mode 100644 index 0000000..cf7b2fc --- /dev/null +++ b/airflow/contrib/hooks/aws_firehose_hook.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.contrib.hooks.aws_hook import AwsHook + + +class AwsFirehoseHook(AwsHook): + """ + Interact with AWS Kinesis Firehose. + :param delivery_stream: Name of the delivery stream + :type delivery_stream: str + :param region_name: AWS region name (example: us-east-1) + :type region_name: str + """ + + def __init__(self, delivery_stream, region_name=None, *args, **kwargs): + self.delivery_stream = delivery_stream + self.region_name = region_name + super(AwsFirehoseHook, self).__init__(*args, **kwargs) + + def get_conn(self): + """ + Returns AwsHook connection object. + """ + + self.conn = self.get_client_type('firehose', self.region_name) + return self.conn + + def put_records(self, records): + """ + Write batch records to Kinesis Firehose + """ + + firehose_conn = self.get_conn() + + response = firehose_conn.put_record_batch( + DeliveryStreamName=self.delivery_stream, + Records=records + ) + + return response http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2d588e94/tests/contrib/hooks/test_aws_firehose_hook.py ---------------------------------------------------------------------- diff --git a/tests/contrib/hooks/test_aws_firehose_hook.py b/tests/contrib/hooks/test_aws_firehose_hook.py new file mode 100644 index 0000000..0a2c809 --- /dev/null +++ b/tests/contrib/hooks/test_aws_firehose_hook.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest +import uuid + +from airflow.contrib.hooks.aws_firehose_hook import AwsFirehoseHook + +try: + from moto import mock_kinesis +except ImportError: + mock_kinesis = None + + +class TestAwsFirehoseHook(unittest.TestCase): + + @unittest.skipIf(mock_kinesis is None, 'mock_kinesis package not present') + @mock_kinesis + def test_get_conn_returns_a_boto3_connection(self): + hook = AwsFirehoseHook(aws_conn_id='aws_default', + delivery_stream="test_airflow", region_name="us-east-1") + self.assertIsNotNone(hook.get_conn()) + + @unittest.skipIf(mock_kinesis is None, 'mock_kinesis package not present') + @mock_kinesis + def test_insert_batch_records_kinesis_firehose(self): + hook = AwsFirehoseHook(aws_conn_id='aws_default', + delivery_stream="test_airflow", region_name="us-east-1") + + response = hook.get_conn().create_delivery_stream( + DeliveryStreamName="test_airflow", + S3DestinationConfiguration={ + 'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role', + 'BucketARN': 'arn:aws:s3:::kinesis-test', + 'Prefix': 'airflow/', + 'BufferingHints': { + 'SizeInMBs': 123, + 'IntervalInSeconds': 124 + }, + 'CompressionFormat': 'UNCOMPRESSED', + } + ) + + stream_arn = response['DeliveryStreamARN'] + self.assertEquals( + stream_arn, "arn:aws:firehose:us-east-1:123456789012:deliverystream/test_airflow") + + records = [{"Data": str(uuid.uuid4())} + for _ in range(100)] + + response = hook.put_records(records) + + self.assertEquals(response['FailedPutCount'], 0) + self.assertEquals(response['ResponseMetadata']['HTTPStatusCode'], 200) + + +if __name__ == '__main__': + unittest.main()