Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5FBD7200B44 for ; Thu, 30 Jun 2016 00:43:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5E480160A6E; Wed, 29 Jun 2016 22:43:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 57D86160A57 for ; Thu, 30 Jun 2016 00:43:55 +0200 (CEST) Received: (qmail 24269 invoked by uid 500); 29 Jun 2016 22:43:54 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 24260 invoked by uid 99); 29 Jun 2016 22:43:54 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Jun 2016 22:43:54 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 1700B187CFC for ; Wed, 29 Jun 2016 22:43:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id nhOcZZr2EUq1 for ; Wed, 29 Jun 2016 22:43:52 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id A34DD5FB0A for ; Wed, 29 Jun 2016 22:43:51 +0000 (UTC) Received: (qmail 24247 invoked by uid 99); 29 Jun 2016 22:43:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Jun 2016 22:43:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CC731E5CE1; Wed, 29 Jun 2016 22:43:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davydov@apache.org To: commits@airflow.incubator.apache.org Message-Id: <0938b4b5ac2a413f87c9cf6641ea7169@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-airflow git commit: [AIRFLOW-243] Create NamedHivePartitionSensor Date: Wed, 29 Jun 2016 22:43:50 +0000 (UTC) archived-at: Wed, 29 Jun 2016 22:43:56 -0000 Repository: incubator-airflow Updated Branches: refs/heads/master 4a84a578a -> bf28de4e6 [AIRFLOW-243] Create NamedHivePartitionSensor Closes #1593 from zodiac/create-NamedHivePartitionSensor Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/bf28de4e Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/bf28de4e Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/bf28de4e Branch: refs/heads/master Commit: bf28de4e601c165020669fd593964187b6246131 Parents: 4a84a57 Author: Li Xuanji Authored: Wed Jun 29 15:40:50 2016 -0700 Committer: Dan Davydov Committed: Wed Jun 29 15:40:50 2016 -0700 ---------------------------------------------------------------------- airflow/hooks/hive_hooks.py | 42 +++++++++++++++++- airflow/operators/__init__.py | 1 + airflow/operators/sensors.py | 81 +++++++++++++++++++++++++++++++++-- tests/operators/hive_operator.py | 31 ++++++++++++++ 4 files changed, 151 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bf28de4e/airflow/hooks/hive_hooks.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py index e24cf86..e8e10b4 100644 --- a/airflow/hooks/hive_hooks.py +++ b/airflow/hooks/hive_hooks.py @@ -22,6 +22,7 @@ import logging import re import subprocess from tempfile import NamedTemporaryFile +import hive_metastore from airflow.exceptions import AirflowException from airflow.hooks.base_hook import BaseHook @@ -321,7 +322,17 @@ class HiveMetastoreHook(BaseHook): return self.metastore def check_for_partition(self, schema, table, partition): - """Checks whether a partition exists + """ + Checks whether a partition exists + + :param schema: Name of hive schema (database) @table belongs to + :type schema: string + :param table: Name of hive table @partition belongs to + :type schema: string + :partition: Expression that matches the partitions to check for + (eg `a = 'b' AND c = 'd'`) + :type schema: string + :rtype: boolean >>> hh = HiveMetastoreHook() >>> t = 'static_babynames_partitioned' @@ -337,6 +348,35 @@ class HiveMetastoreHook(BaseHook): else: return False + def check_for_named_partition(self, schema, table, partition_name): + """ + Checks whether a partition with a given name exists + + :param schema: Name of hive schema (database) @table belongs to + :type schema: string + :param table: Name of hive table @partition belongs to + :type schema: string + :partition: Name of the partitions to check for (eg `a=b/c=d`) + :type schema: string + :rtype: boolean + + >>> hh = HiveMetastoreHook() + >>> t = 'static_babynames_partitioned' + >>> hh.check_for_named_partition('airflow', t, "ds=2015-01-01") + True + >>> hh.check_for_named_partition('airflow', t, "ds=xxx") + False + """ + self.metastore._oprot.trans.open() + try: + self.metastore.get_partition_by_name( + schema, table, partition_name) + return True + except hive_metastore.ttypes.NoSuchObjectException: + return False + finally: + self.metastore._oprot.trans.close() + def get_table(self, table_name, db='default'): """Get a metastore table object http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bf28de4e/airflow/operators/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py index 5e92b13..e392b15 100644 --- a/airflow/operators/__init__.py +++ b/airflow/operators/__init__.py @@ -68,6 +68,7 @@ _operators = { 'HivePartitionSensor', 'HttpSensor', 'MetastorePartitionSensor', + 'NamedHivePartitionSensor', 'S3KeySensor', 'S3PrefixSensor', 'SqlSensor', http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bf28de4e/airflow/operators/sensors.py ---------------------------------------------------------------------- diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index 90a4d14..e9b8885 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -222,15 +222,91 @@ class ExternalTaskSensor(BaseSensorOperator): return count +class NamedHivePartitionSensor(BaseSensorOperator): + """ + Waits for a set of partitions to show up in Hive. + + :param partition_names: List of fully qualified names of the + partitions to wait for. A fully qualified name is of the + form schema.table/pk1=pv1/pk2=pv2, for example, + default.users/ds=2016-01-01. This is passed as is to the metastore + Thrift client "get_partitions_by_name" method. Note that + you cannot use logical operators as in HivePartitionSensor. + :type partition_names: list of strings + :param metastore_conn_id: reference to the metastore thrift service + connection id + :type metastore_conn_id: str + """ + + template_fields = ('partition_names', ) + + @apply_defaults + def __init__( + self, + partition_names, + metastore_conn_id='metastore_default', + poke_interval=60*3, + *args, + **kwargs): + super(NamedHivePartitionSensor, self).__init__( + poke_interval=poke_interval, *args, **kwargs) + + if isinstance(partition_names, basestring): + raise TypeError('partition_names must be an array of strings') + + for partition_name in partition_names: + self.parse_partition_name(partition_name) + + self.metastore_conn_id = metastore_conn_id + self.partition_names = partition_names + self.next_poke_idx = 0 + + def parse_partition_name(self, partition): + try: + schema, table_partition = partition.split('.') + table, partition = table_partition.split('/', 1) + return schema, table, partition + except ValueError as e: + raise ValueError('Could not parse ' + partition) + + def poke(self, context): + + if not hasattr(self, 'hook'): + self.hook = airflow.hooks.hive_hooks.HiveMetastoreHook( + metastore_conn_id=self.metastore_conn_id) + + def poke_partition(partition): + + schema, table, partition = self.parse_partition_name(partition) + + logging.info( + 'Poking for {schema}.{table}/{partition}'.format(**locals()) + ) + return self.hook.check_for_named_partition( + schema, table, partition) + + while self.next_poke_idx < len(self.partition_names): + if poke_partition(self.partition_names[self.next_poke_idx]): + self.next_poke_idx += 1 + else: + return False + + return True + + class HivePartitionSensor(BaseSensorOperator): """ - Waits for a partition to show up in Hive + Waits for a partition to show up in Hive. + + Note: Because @partition supports general logical operators, it + can be inefficient. Consider using NamedHivePartitionSensor instead if + you don't need the full flexibility of HivePartitionSensor. :param table: The name of the table to wait for, supports the dot notation (my_database.my_table) :type table: string :param partition: The partition clause to wait for. This is passed as - is to the Metastore Thrift client "get_partitions_by_filter" method, + is to the metastore Thrift client "get_partitions_by_filter" method, and apparently supports SQL like notation as in `ds='2015-01-01' AND type='value'` and > < sings as in "ds>=2015-01-01" :type partition: string @@ -264,7 +340,6 @@ class HivePartitionSensor(BaseSensorOperator): 'Poking for table {self.schema}.{self.table}, ' 'partition {self.partition}'.format(**locals())) if not hasattr(self, 'hook'): - import airflow.hooks.hive_hooks self.hook = airflow.hooks.hive_hooks.HiveMetastoreHook( metastore_conn_id=self.metastore_conn_id) return self.hook.check_for_partition( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bf28de4e/tests/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py index 202adcf..f59bbf1 100644 --- a/tests/operators/hive_operator.py +++ b/tests/operators/hive_operator.py @@ -24,6 +24,7 @@ configuration.test_mode() import os import unittest +import nose DEFAULT_DATE = datetime.datetime(2015, 1, 1) @@ -163,6 +164,36 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ: dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + def test_named_hive_partition_sensor(self): + t = operators.sensors.NamedHivePartitionSensor( + task_id='hive_partition_check', + partition_names=["airflow.static_babynames_partitioned/ds={{ds}}"], + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_named_hive_partition_sensor_succeeds_on_multiple_partitions(self): + t = operators.sensors.NamedHivePartitionSensor( + task_id='hive_partition_check', + partition_names=[ + "airflow.static_babynames_partitioned/ds={{ds}}", + "airflow.static_babynames_partitioned/ds={{ds}}" + ], + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + @nose.tools.raises(airflow.exceptions.AirflowSensorTimeout) + def test_named_hive_partition_sensor_times_out_on_nonexistent_partition(self): + t = operators.sensors.NamedHivePartitionSensor( + task_id='hive_partition_check', + partition_names=[ + "airflow.static_babynames_partitioned/ds={{ds}}", + "airflow.static_babynames_partitioned/ds=nonexistent" + ], + poke_interval=0.1, + timeout=1, + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + def test_hive_partition_sensor(self): t = operators.sensors.HivePartitionSensor( task_id='hive_partition_check',