From commits-return-14740-archive-asf-public=cust-asf.ponee.io@airflow.incubator.apache.org Tue May 8 12:42:24 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id A792018063B for ; Tue, 8 May 2018 12:42:23 +0200 (CEST) Received: (qmail 26654 invoked by uid 500); 8 May 2018 10:42:22 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 26645 invoked by uid 99); 8 May 2018 10:42:22 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 May 2018 10:42:22 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 459EAC1328 for ; Tue, 8 May 2018 10:42:22 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -11.701 X-Spam-Level: X-Spam-Status: No, score=-11.701 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, SPF_PASS=-0.001, USER_IN_DEF_SPF_WL=-7.5] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id N9M36hqk9EaS for ; Tue, 8 May 2018 10:42:18 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 706FE5F5FA for ; Tue, 8 May 2018 10:42:17 +0000 (UTC) Received: (qmail 26621 invoked by uid 99); 8 May 2018 10:42:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 May 2018 10:42:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9AB37DFF06; Tue, 8 May 2018 10:42:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fokko@apache.org To: commits@airflow.incubator.apache.org Message-Id: <14d0c1e9708f4d4da3e077cc80a21839@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-airflow git commit: [AIRFLOW-2427] Add tests to named hive sensor Date: Tue, 8 May 2018 10:42:16 +0000 (UTC) Repository: incubator-airflow Updated Branches: refs/heads/v1-10-test a1fdd8180 -> 1e6c0e96b [AIRFLOW-2427] Add tests to named hive sensor Closes #3323 from gglanzani/AIRFLOW-2427 (cherry picked from commit b18b437c216b0c4b3ffb41e4934f3c2dd966c14b) Signed-off-by: Fokko Driesprong Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1e6c0e96 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1e6c0e96 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1e6c0e96 Branch: refs/heads/v1-10-test Commit: 1e6c0e96bd15f1fa79cac21e96789d0182d5e9d4 Parents: a1fdd81 Author: Giovanni Lanzani Authored: Tue May 8 12:41:51 2018 +0200 Committer: Fokko Driesprong Committed: Tue May 8 12:42:10 2018 +0200 ---------------------------------------------------------------------- airflow/sensors/named_hive_partition_sensor.py | 68 +++++----- .../sensors/test_named_hive_partition_sensor.py | 130 +++++++++++++++++++ 2 files changed, 169 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1e6c0e96/airflow/sensors/named_hive_partition_sensor.py ---------------------------------------------------------------------- diff --git a/airflow/sensors/named_hive_partition_sensor.py b/airflow/sensors/named_hive_partition_sensor.py index a42a360..4a076a3 100644 --- a/airflow/sensors/named_hive_partition_sensor.py +++ b/airflow/sensors/named_hive_partition_sensor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -48,6 +48,7 @@ class NamedHivePartitionSensor(BaseSensorOperator): partition_names, metastore_conn_id='metastore_default', poke_interval=60 * 3, + hook=None, *args, **kwargs): super(NamedHivePartitionSensor, self).__init__( @@ -58,37 +59,46 @@ class NamedHivePartitionSensor(BaseSensorOperator): self.metastore_conn_id = metastore_conn_id self.partition_names = partition_names - self.next_poke_idx = 0 - - @classmethod - def parse_partition_name(self, partition): - try: - schema, table_partition = partition.split('.', 1) - table, partition = table_partition.split('/', 1) - return schema, table, partition - except ValueError as e: - raise ValueError('Could not parse ' + partition) - - def poke(self, context): - if not hasattr(self, 'hook'): + self.hook = hook + if self.hook and metastore_conn_id != 'metastore_default': + self.log.warning('A hook was passed but a non default' + 'metastore_conn_id=' + '{} was used'.format(metastore_conn_id)) + + @staticmethod + def parse_partition_name(partition): + first_split = partition.split('.', 1) + if len(first_split) == 1: + schema = 'default' + table_partition = max(first_split) # poor man first + else: + schema, table_partition = first_split + second_split = table_partition.split('/', 1) + if len(second_split) == 1: + raise ValueError('Could not parse ' + partition + + 'into table, partition') + else: + table, partition = second_split + return schema, table, partition + + def poke_partition(self, partition): + if not self.hook: from airflow.hooks.hive_hooks import HiveMetastoreHook self.hook = HiveMetastoreHook( metastore_conn_id=self.metastore_conn_id) - def poke_partition(partition): - - schema, table, partition = self.parse_partition_name(partition) + schema, table, partition = self.parse_partition_name(partition) - self.log.info( - 'Poking for {schema}.{table}/{partition}'.format(**locals()) - ) - return self.hook.check_for_named_partition( - schema, table, partition) + self.log.info( + 'Poking for {schema}.{table}/{partition}'.format(**locals()) + ) + return self.hook.check_for_named_partition( + schema, table, partition) - while self.next_poke_idx < len(self.partition_names): - if poke_partition(self.partition_names[self.next_poke_idx]): - self.next_poke_idx += 1 - else: - return False + def poke(self, context): - return True + self.partition_names = [ + partition_name for partition_name in self.partition_names + if not self.poke_partition(partition_name) + ] + return not self.partition_names http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1e6c0e96/tests/sensors/test_named_hive_partition_sensor.py ---------------------------------------------------------------------- diff --git a/tests/sensors/test_named_hive_partition_sensor.py b/tests/sensors/test_named_hive_partition_sensor.py new file mode 100644 index 0000000..4fef3e0 --- /dev/null +++ b/tests/sensors/test_named_hive_partition_sensor.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import random +import unittest +from datetime import timedelta + +from airflow import configuration, DAG, operators +from airflow.sensors.named_hive_partition_sensor import NamedHivePartitionSensor +from airflow.utils.timezone import datetime +from airflow.hooks.hive_hooks import HiveMetastoreHook + +configuration.load_test_config() + +DEFAULT_DATE = datetime(2015, 1, 1) +DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() +DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] + + +class NamedHivePartitionSensorTests(unittest.TestCase): + def setUp(self): + configuration.load_test_config() + args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} + self.dag = DAG('test_dag_id', default_args=args) + self.next_day = (DEFAULT_DATE + + timedelta(days=1)).isoformat()[:10] + self.database = 'airflow' + self.partition_by = 'ds' + self.table = 'static_babynames_partitioned' + self.hql = """ + CREATE DATABASE IF NOT EXISTS {{ params.database }}; + USE {{ params.database }}; + DROP TABLE IF EXISTS {{ params.table }}; + CREATE TABLE IF NOT EXISTS {{ params.table }} ( + state string, + year string, + name string, + gender string, + num int) + PARTITIONED BY ({{ params.partition_by }} string); + ALTER TABLE {{ params.table }} + ADD PARTITION({{ params.partition_by }}='{{ ds }}'); + """ + self.hook = HiveMetastoreHook() + t = operators.hive_operator.HiveOperator( + task_id='HiveHook_' + str(random.randint(1, 10000)), + params={ + 'database': self.database, + 'table': self.table, + 'partition_by': self.partition_by + }, + hive_cli_conn_id='beeline_default', + hql=self.hql, dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, + ignore_ti_state=True) + + def tearDown(self): + hook = HiveMetastoreHook() + with hook.get_conn() as metastore: + metastore.drop_table(self.database, self.table, deleteData=True) + + def test_parse_partition_name_correct(self): + schema = 'default' + table = 'users' + partition = 'ds=2016-01-01/state=IT' + name = '{schema}.{table}/{partition}'.format(schema=schema, + table=table, + partition=partition) + parsed_schema, parsed_table, parsed_partition = ( + NamedHivePartitionSensor.parse_partition_name(name) + ) + self.assertEqual(schema, parsed_schema) + self.assertEqual(table, parsed_table) + self.assertEqual(partition, parsed_partition) + + def test_parse_partition_name_incorrect(self): + name = 'incorrect.name' + with self.assertRaises(ValueError): + NamedHivePartitionSensor.parse_partition_name(name) + + def test_parse_partition_name_default(self): + table = 'users' + partition = 'ds=2016-01-01/state=IT' + name = '{table}/{partition}'.format(table=table, + partition=partition) + parsed_schema, parsed_table, parsed_partition = ( + NamedHivePartitionSensor.parse_partition_name(name) + ) + self.assertEqual('default', parsed_schema) + self.assertEqual(table, parsed_table) + self.assertEqual(partition, parsed_partition) + + def test_poke_existing(self): + partitions = ["{}.{}/{}={}".format(self.database, + self.table, + self.partition_by, + DEFAULT_DATE_DS)] + sensor = NamedHivePartitionSensor(partition_names=partitions, + task_id='test_poke_existing', + poke_interval=1, + hook=self.hook, + dag=self.dag) + self.assertTrue(sensor.poke(None)) + + def test_poke_non_existing(self): + partitions = ["{}.{}/{}={}".format(self.database, + self.table, + self.partition_by, + self.next_day)] + sensor = NamedHivePartitionSensor(partition_names=partitions, + task_id='test_poke_non_existing', + poke_interval=1, + hook=self.hook, + dag=self.dag) + self.assertFalse(sensor.poke(None))