From commits-return-28240-archive-asf-public=cust-asf.ponee.io@airflow.incubator.apache.org Fri Nov 16 14:09:07 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 84E2C180670 for ; Fri, 16 Nov 2018 14:09:06 +0100 (CET) Received: (qmail 49840 invoked by uid 500); 16 Nov 2018 13:09:05 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 49831 invoked by uid 99); 16 Nov 2018 13:09:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Nov 2018 13:09:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 283B4180FAC for ; Fri, 16 Nov 2018 13:09:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.501 X-Spam-Level: X-Spam-Status: No, score=-109.501 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id OYhs5Ik_Jyq3 for ; Fri, 16 Nov 2018 13:09:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id E55ED62200 for ; Fri, 16 Nov 2018 13:09:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 30C11E1013 for ; Fri, 16 Nov 2018 13:09:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id DFBC021E0A for ; Fri, 16 Nov 2018 13:09:00 +0000 (UTC) Date: Fri, 16 Nov 2018 13:09:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: commits@airflow.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (AIRFLOW-3266) AWS Athena Operator in Airflow MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AIRFLOW-3266?page=3Dcom.atlassi= an.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D16= 689398#comment-16689398 ]=20 ASF GitHub Bot commented on AIRFLOW-3266: ----------------------------------------- ashb closed pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and = hook URL: https://github.com/apache/incubator-airflow/pull/4111 =20 =20 =20 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/hooks/aws_athena_hook.py b/airflow/contrib/hoo= ks/aws_athena_hook.py new file mode 100644 index 0000000000..f11ff23c51 --- /dev/null +++ b/airflow/contrib/hooks/aws_athena_hook.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from time import sleep +from airflow.contrib.hooks.aws_hook import AwsHook + + +class AWSAthenaHook(AwsHook): + """ + Interact with AWS Athena to run, poll queries and return query results + + :param aws_conn_id: aws connection to use. + :type aws_conn_id: str + :param sleep_time: Time to wait between two consecutive call to check = query status on athena + :type sleep_time: int + """ + + INTERMEDIATE_STATES =3D ('QUEUED', 'RUNNING',) + FAILURE_STATES =3D ('FAILED', 'CANCELLED',) + SUCCESS_STATES =3D ('SUCCEEDED',) + + def __init__(self, aws_conn_id=3D'aws_default', sleep_time=3D30, *args= , **kwargs): + super(AWSAthenaHook, self).__init__(aws_conn_id, **kwargs) + self.sleep_time =3D sleep_time + self.conn =3D None + + def get_conn(self): + """ + check if aws conn exists already or create one and return it + + :return: boto3 session + """ + if not self.conn: + self.conn =3D self.get_client_type('athena') + return self.conn + + def run_query(self, query, query_context, result_configuration, client= _request_token=3DNone): + """ + Run Presto query on athena with provided config and return submitt= ed query_execution_id + + :param query: Presto query to run + :type query: str + :param query_context: Context in which query need to be run + :type query_context: dict + :param result_configuration: Dict with path to store results in an= d config related to encryption + :type result_configuration: dict + :param client_request_token: Unique token created by user to avoid= multiple executions of same query + :type client_request_token: str + :return: str + """ + response =3D self.conn.start_query_execution(QueryString=3Dquery, + ClientRequestToken=3Dcl= ient_request_token, + QueryExecutionContext= =3Dquery_context, + ResultConfiguration=3Dr= esult_configuration) + query_execution_id =3D response['QueryExecutionId'] + return query_execution_id + + def check_query_status(self, query_execution_id): + """ + Fetch the status of submitted athena query. Returns None or one of= valid query states. + + :param query_execution_id: Id of submitted athena query + :type query_execution_id: str + :return: str + """ + response =3D self.conn.get_query_execution(QueryExecutionId=3Dquer= y_execution_id) + state =3D None + try: + state =3D response['QueryExecution']['Status']['State'] + except Exception as ex: + self.log.error('Exception while getting query state', ex) + finally: + return state + + def get_query_results(self, query_execution_id): + """ + Fetch submitted athena query results. returns none if query is in = intermediate state or + failed/cancelled state else dict of query output + + :param query_execution_id: Id of submitted athena query + :type query_execution_id: str + :return: dict + """ + query_state =3D self.check_query_status(query_execution_id) + if query_state is None: + self.log.error('Invalid Query state') + return None + elif query_state in self.INTERMEDIATE_STATES or query_state in sel= f.FAILURE_STATES: + self.log.error('Query is in {state} state. Cannot fetch result= s'.format(state=3Dquery_state)) + return None + return self.conn.get_query_results(QueryExecutionId=3Dquery_execut= ion_id) + + def poll_query_status(self, query_execution_id, max_tries=3DNone): + """ + Poll the status of submitted athena query until query state reache= s final state. + Returns one of the final states + + :param query_execution_id: Id of submitted athena query + :type query_execution_id: str + :param max_tries: Number of times to poll for query state before f= unction exits + :type max_tries: int + :return: str + """ + try_number =3D 1 + final_query_state =3D None # Query state when query reaches final= state or max_tries reached + while True: + query_state =3D self.check_query_status(query_execution_id) + if query_state is None: + self.log.info('Trial {try_number}: Invalid query state. Re= trying again'.format( + try_number=3Dtry_number)) + elif query_state in self.INTERMEDIATE_STATES: + self.log.info('Trial {try_number}: Query is still in an in= termediate state - {state}' + .format(try_number=3Dtry_number, state=3Dque= ry_state)) + else: + self.log.info('Trial {try_number}: Query execution complet= ed. Final state is {state}' + .format(try_number=3Dtry_number, state=3Dque= ry_state)) + final_query_state =3D query_state + break + if max_tries and try_number >=3D max_tries: # Break loop if m= ax_tries reached + final_query_state =3D query_state + break + try_number +=3D 1 + sleep(self.sleep_time) + return final_query_state + + def stop_query(self, query_execution_id): + """ + Cancel the submitted athena query + + :param query_execution_id: Id of submitted athena query + :type query_execution_id: str + :return: dict + """ + return self.conn.stop_query_execution(QueryExecutionId=3Dquery_exe= cution_id) diff --git a/airflow/contrib/operators/aws_athena_operator.py b/airflow/con= trib/operators/aws_athena_operator.py new file mode 100644 index 0000000000..432410e311 --- /dev/null +++ b/airflow/contrib/operators/aws_athena_operator.py @@ -0,0 +1,98 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from uuid import uuid4 + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.hooks.aws_athena_hook import AWSAthenaHook + + +class AWSAthenaOperator(BaseOperator): + """ + An operator that submit presto query to athena. + + :param query: Presto to be run on athena. (templated) + :type query: str + :param database: Database to select. (templated) + :type database: str + :param output_location: s3 path to write the query results into. (temp= lated) + :type output_location: str + :param aws_conn_id: aws connection to use + :type aws_conn_id: str + :param sleep_time: Time to wait between two consecutive call to check = query status on athena + :type sleep_time: int + """ + + ui_color =3D '#44b5e2' + template_fields =3D ('query', 'database', 'output_location') + + @apply_defaults + def __init__(self, query, database, output_location, aws_conn_id=3D'aw= s_default', client_request_token=3DNone, + query_execution_context=3DNone, result_configuration=3DNo= ne, sleep_time=3D30, *args, **kwargs): + super(AWSAthenaOperator, self).__init__(*args, **kwargs) + self.query =3D query + self.database =3D database + self.output_location =3D output_location + self.aws_conn_id =3D aws_conn_id + self.client_request_token =3D client_request_token or str(uuid4()) + self.query_execution_context =3D query_execution_context or {} + self.result_configuration =3D result_configuration or {} + self.sleep_time =3D sleep_time + self.query_execution_id =3D None + self.hook =3D None + + def get_hook(self): + return AWSAthenaHook(self.aws_conn_id, self.sleep_time) + + def execute(self, context): + """ + Run Presto Query on Athena + """ + self.hook =3D self.get_hook() + self.hook.get_conn() + + self.query_execution_context['Database'] =3D self.database + self.result_configuration['OutputLocation'] =3D self.output_locati= on + self.query_execution_id =3D self.hook.run_query(self.query, self.q= uery_execution_context, + self.result_configur= ation, self.client_request_token) + self.hook.poll_query_status(self.query_execution_id) + + def on_kill(self): + """ + Cancel the submitted athena query + """ + if self.query_execution_id: + self.log.info('=E2=9A=B0=EF=B8=8F=E2=9A=B0=EF=B8=8F=E2=9A=B0= =EF=B8=8F Received a kill Signal. Time to Die') + self.log.info('Stopping Query with executionId - {queryId}'.fo= rmat( + queryId=3Dself.query_execution_id)) + response =3D self.hook.stop_query(self.query_execution_id) + http_status_code =3D None + try: + http_status_code =3D response['ResponseMetadata']['HTTPSta= tusCode'] + except Exception as ex: + self.log.error('Exception while cancelling query', ex) + finally: + if http_status_code is None or http_status_code !=3D 200: + self.log.error('Unable to request query cancel on athe= na. Exiting') + else: + self.log.info('Polling Athena for query with id {query= Id} to reach final state'.format( + queryId=3Dself.query_execution_id)) + self.hook.poll_query_status(self.query_execution_id) diff --git a/docs/code.rst b/docs/code.rst index 817c5046ea..5c74b0ce3f 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -113,6 +113,7 @@ Operators .. Alphabetize this list =20 .. autoclass:: airflow.contrib.operators.adls_list_operator.AzureDataLakeS= torageListOperator +.. autoclass:: airflow.contrib.operators.aws_athena_operator.AWSAthenaOper= ator .. autoclass:: airflow.contrib.operators.awsbatch_operator.AWSBatchOperato= r .. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryC= heckOperator .. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryV= alueCheckOperator @@ -384,6 +385,7 @@ interface when possible and acting as building blocks f= or operators. Community contributed hooks ''''''''''''''''''''''''''' .. Alphabetize this list +.. autoclass:: airflow.contrib.hooks.aws_athena_hook.AWSAthenaHook .. autoclass:: airflow.contrib.hooks.aws_dynamodb_hook.AwsDynamoDBHook .. autoclass:: airflow.contrib.hooks.aws_hook.AwsHook .. autoclass:: airflow.contrib.hooks.aws_lambda_hook.AwsLambdaHook diff --git a/tests/contrib/operators/test_aws_athena_operator.py b/tests/co= ntrib/operators/test_aws_athena_operator.py new file mode 100644 index 0000000000..ecfb0d2890 --- /dev/null +++ b/tests/contrib/operators/test_aws_athena_operator.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# 'License'); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest + +from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperato= r +from airflow.contrib.hooks.aws_athena_hook import AWSAthenaHook +from airflow import configuration + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock =3D None + +MOCK_DATA =3D { + 'task_id': 'test_aws_athena_operator', + 'query': 'SELECT * FROM TEST_TABLE', + 'database': 'TEST_DATABASE', + 'outputLocation': 's3://test_s3_bucket/', + 'client_request_token': 'eac427d0-1c6d-4dfb-96aa-2835d3ac6595' +} + +query_context =3D { + 'Database': MOCK_DATA['database'] +} +result_configuration =3D { + 'OutputLocation': MOCK_DATA['outputLocation'] +} + + +class TestAWSAthenaOperator(unittest.TestCase): + + def setUp(self): + configuration.load_test_config() + + self.athena =3D AWSAthenaOperator(task_id=3D'test_aws_athena_opera= tor', query=3D'SELECT * FROM TEST_TABLE', + database=3D'TEST_DATABASE', output= _location=3D's3://test_s3_bucket/', + client_request_token=3D'eac427d0-1= c6d-4dfb-96aa-2835d3ac6595', + sleep_time=3D1) + + def test_init(self): + self.assertEqual(self.athena.task_id, MOCK_DATA['task_id']) + self.assertEqual(self.athena.query, MOCK_DATA['query']) + self.assertEqual(self.athena.database, MOCK_DATA['database']) + self.assertEqual(self.athena.aws_conn_id, 'aws_default') + self.assertEqual(self.athena.client_request_token, MOCK_DATA['clie= nt_request_token']) + self.assertEqual(self.athena.sleep_time, 1) + + @mock.patch.object(AWSAthenaHook, 'check_query_status', side_effect=3D= ("SUCCESS",)) + @mock.patch.object(AWSAthenaHook, 'run_query', return_value=3D'1234') + @mock.patch.object(AWSAthenaHook, 'get_conn') + def test_hook_run_small_success_query(self, mock_conn, mock_run_query,= mock_check_query_status): + self.athena.execute(None) + mock_run_query.assert_called_once_with(MOCK_DATA['query'], query_c= ontext, result_configuration, + MOCK_DATA['client_request_t= oken']) + self.assertEqual(mock_check_query_status.call_count, 1) + + @mock.patch.object(AWSAthenaHook, 'check_query_status', side_effect=3D= ("RUNNING", "RUNNING", "SUCCESS",)) + @mock.patch.object(AWSAthenaHook, 'run_query', return_value=3D'1234') + @mock.patch.object(AWSAthenaHook, 'get_conn') + def test_hook_run_big_success_query(self, mock_conn, mock_run_query, m= ock_check_query_status): + self.athena.execute(None) + mock_run_query.assert_called_once_with(MOCK_DATA['query'], query_c= ontext, result_configuration, + MOCK_DATA['client_request_t= oken']) + self.assertEqual(mock_check_query_status.call_count, 3) + + @mock.patch.object(AWSAthenaHook, 'check_query_status', side_effect=3D= ("RUNNING", "FAILED",)) + @mock.patch.object(AWSAthenaHook, 'run_query', return_value=3D'1234') + @mock.patch.object(AWSAthenaHook, 'get_conn') + def test_hook_run_failure_query(self, mock_conn, mock_run_query, mock_= check_query_status): + self.athena.execute(None) + mock_run_query.assert_called_once_with(MOCK_DATA['query'], query_c= ontext, result_configuration, + MOCK_DATA['client_request_t= oken']) + self.assertEqual(mock_check_query_status.call_count, 2) + + @mock.patch.object(AWSAthenaHook, 'check_query_status', side_effect=3D= ("RUNNING", "RUNNING", "CANCELLED",)) + @mock.patch.object(AWSAthenaHook, 'run_query', return_value=3D'1234') + @mock.patch.object(AWSAthenaHook, 'get_conn') + def test_hook_run_cancelled_query(self, mock_conn, mock_run_query, moc= k_check_query_status): + self.athena.execute(None) + mock_run_query.assert_called_once_with(MOCK_DATA['query'], query_c= ontext, result_configuration, + MOCK_DATA['client_request_t= oken']) + self.assertEqual(mock_check_query_status.call_count, 3) + + +if __name__ =3D=3D '__main__': + unittest.main() =20 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org > AWS Athena Operator in Airflow > ------------------------------ > > Key: AIRFLOW-3266 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3266 > Project: Apache Airflow > Issue Type: New Feature > Components: aws > Affects Versions: 1.10.0 > Reporter: Sai Phanindhra > Assignee: Sai Phanindhra > Priority: Minor > Fix For: 2.0.0 > > > There is no official athena operator as of now airflow. Either one has do= it using boto3 in python operator or using aws cli in bash operator. Eithe= r of these does not take care of total life cycle of query. Create a Athena= operator and hook to submit presto query and update task based of state of= submitted query. -- This message was sent by Atlassian JIRA (v7.6.3#76005)