airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davy...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-243] Create NamedHivePartitionSensor
Date Wed, 29 Jun 2016 22:43:50 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4a84a578a -> bf28de4e6


[AIRFLOW-243] Create NamedHivePartitionSensor

Closes #1593 from zodiac/create-NamedHivePartitionSensor


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/bf28de4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/bf28de4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/bf28de4e

Branch: refs/heads/master
Commit: bf28de4e601c165020669fd593964187b6246131
Parents: 4a84a57
Author: Li Xuanji <xuanji@gmail.com>
Authored: Wed Jun 29 15:40:50 2016 -0700
Committer: Dan Davydov <dan.davydov@airbnb.com>
Committed: Wed Jun 29 15:40:50 2016 -0700

----------------------------------------------------------------------
 airflow/hooks/hive_hooks.py      | 42 +++++++++++++++++-
 airflow/operators/__init__.py    |  1 +
 airflow/operators/sensors.py     | 81 +++++++++++++++++++++++++++++++++--
 tests/operators/hive_operator.py | 31 ++++++++++++++
 4 files changed, 151 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bf28de4e/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index e24cf86..e8e10b4 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -22,6 +22,7 @@ import logging
 import re
 import subprocess
 from tempfile import NamedTemporaryFile
+import hive_metastore
 
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
@@ -321,7 +322,17 @@ class HiveMetastoreHook(BaseHook):
         return self.metastore
 
     def check_for_partition(self, schema, table, partition):
-        """Checks whether a partition exists
+        """
+        Checks whether a partition exists
+
+        :param schema: Name of hive schema (database) @table belongs to
+        :type schema: string
+        :param table: Name of hive table @partition belongs to
+        :type schema: string
+        :partition: Expression that matches the partitions to check for
+            (eg `a = 'b' AND c = 'd'`)
+        :type schema: string
+        :rtype: boolean
 
         >>> hh = HiveMetastoreHook()
         >>> t = 'static_babynames_partitioned'
@@ -337,6 +348,35 @@ class HiveMetastoreHook(BaseHook):
         else:
             return False
 
+    def check_for_named_partition(self, schema, table, partition_name):
+        """
+        Checks whether a partition with a given name exists
+
+        :param schema: Name of hive schema (database) @table belongs to
+        :type schema: string
+        :param table: Name of hive table @partition belongs to
+        :type schema: string
+        :partition: Name of the partitions to check for (eg `a=b/c=d`)
+        :type schema: string
+        :rtype: boolean
+
+        >>> hh = HiveMetastoreHook()
+        >>> t = 'static_babynames_partitioned'
+        >>> hh.check_for_named_partition('airflow', t, "ds=2015-01-01")
+        True
+        >>> hh.check_for_named_partition('airflow', t, "ds=xxx")
+        False
+        """
+        self.metastore._oprot.trans.open()
+        try:
+            self.metastore.get_partition_by_name(
+                schema, table, partition_name)
+            return True
+        except hive_metastore.ttypes.NoSuchObjectException:
+            return False
+        finally:
+            self.metastore._oprot.trans.close()
+
     def get_table(self, table_name, db='default'):
         """Get a metastore table object
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bf28de4e/airflow/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py
index 5e92b13..e392b15 100644
--- a/airflow/operators/__init__.py
+++ b/airflow/operators/__init__.py
@@ -68,6 +68,7 @@ _operators = {
         'HivePartitionSensor',
         'HttpSensor',
         'MetastorePartitionSensor',
+        'NamedHivePartitionSensor',
         'S3KeySensor',
         'S3PrefixSensor',
         'SqlSensor',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bf28de4e/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 90a4d14..e9b8885 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -222,15 +222,91 @@ class ExternalTaskSensor(BaseSensorOperator):
         return count
 
 
+class NamedHivePartitionSensor(BaseSensorOperator):
+    """
+    Waits for a set of partitions to show up in Hive.
+
+    :param partition_names: List of fully qualified names of the
+        partitions to wait for. A fully qualified name is of the
+        form schema.table/pk1=pv1/pk2=pv2, for example,
+        default.users/ds=2016-01-01. This is passed as is to the metastore
+        Thrift client "get_partitions_by_name" method. Note that
+        you cannot use logical operators as in HivePartitionSensor.
+    :type partition_names: list of strings
+    :param metastore_conn_id: reference to the metastore thrift service
+        connection id
+    :type metastore_conn_id: str
+    """
+
+    template_fields = ('partition_names', )
+
+    @apply_defaults
+    def __init__(
+            self,
+            partition_names,
+            metastore_conn_id='metastore_default',
+            poke_interval=60*3,
+            *args,
+            **kwargs):
+        super(NamedHivePartitionSensor, self).__init__(
+            poke_interval=poke_interval, *args, **kwargs)
+
+        if isinstance(partition_names, basestring):
+            raise TypeError('partition_names must be an array of strings')
+
+        for partition_name in partition_names:
+            self.parse_partition_name(partition_name)
+
+        self.metastore_conn_id = metastore_conn_id
+        self.partition_names = partition_names
+        self.next_poke_idx = 0
+
+    def parse_partition_name(self, partition):
+        try:
+            schema, table_partition = partition.split('.')
+            table, partition = table_partition.split('/', 1)
+            return schema, table, partition
+        except ValueError as e:
+            raise ValueError('Could not parse ' + partition)
+
+    def poke(self, context):
+
+        if not hasattr(self, 'hook'):
+            self.hook = airflow.hooks.hive_hooks.HiveMetastoreHook(
+                metastore_conn_id=self.metastore_conn_id)
+
+        def poke_partition(partition):
+
+            schema, table, partition = self.parse_partition_name(partition)
+
+            logging.info(
+                'Poking for {schema}.{table}/{partition}'.format(**locals())
+            )
+            return self.hook.check_for_named_partition(
+                schema, table, partition)
+
+        while self.next_poke_idx < len(self.partition_names):
+            if poke_partition(self.partition_names[self.next_poke_idx]):
+                self.next_poke_idx += 1
+            else:
+                return False
+
+        return True
+
+
 class HivePartitionSensor(BaseSensorOperator):
     """
-    Waits for a partition to show up in Hive
+    Waits for a partition to show up in Hive.
+
+    Note: Because @partition supports general logical operators, it
+    can be inefficient. Consider using NamedHivePartitionSensor instead if
+    you don't need the full flexibility of HivePartitionSensor.
 
     :param table: The name of the table to wait for, supports the dot
         notation (my_database.my_table)
     :type table: string
     :param partition: The partition clause to wait for. This is passed as
-        is to the Metastore Thrift client "get_partitions_by_filter" method,
+        is to the metastore Thrift client "get_partitions_by_filter" method,
         and apparently supports SQL like notation as in `ds='2015-01-01'
         AND type='value'` and > < sings as in "ds>=2015-01-01"
     :type partition: string
@@ -264,7 +340,6 @@ class HivePartitionSensor(BaseSensorOperator):
             'Poking for table {self.schema}.{self.table}, '
             'partition {self.partition}'.format(**locals()))
         if not hasattr(self, 'hook'):
-            import airflow.hooks.hive_hooks
             self.hook = airflow.hooks.hive_hooks.HiveMetastoreHook(
                 metastore_conn_id=self.metastore_conn_id)
         return self.hook.check_for_partition(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bf28de4e/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index 202adcf..f59bbf1 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -24,6 +24,7 @@ configuration.test_mode()
 
 import os
 import unittest
+import nose
 
 
 DEFAULT_DATE = datetime.datetime(2015, 1, 1)
@@ -163,6 +164,36 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                 dag=self.dag)
             t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
+        def test_named_hive_partition_sensor(self):
+            t = operators.sensors.NamedHivePartitionSensor(
+                task_id='hive_partition_check',
+                partition_names=["airflow.static_babynames_partitioned/ds={{ds}}"],
+                dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        def test_named_hive_partition_sensor_succeeds_on_multiple_partitions(self):
+            t = operators.sensors.NamedHivePartitionSensor(
+                task_id='hive_partition_check',
+                partition_names=[
+                    "airflow.static_babynames_partitioned/ds={{ds}}",
+                    "airflow.static_babynames_partitioned/ds={{ds}}"
+                ],
+                dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        @nose.tools.raises(airflow.exceptions.AirflowSensorTimeout)
+        def test_named_hive_partition_sensor_times_out_on_nonexistent_partition(self):
+            t = operators.sensors.NamedHivePartitionSensor(
+                task_id='hive_partition_check',
+                partition_names=[
+                    "airflow.static_babynames_partitioned/ds={{ds}}",
+                    "airflow.static_babynames_partitioned/ds=nonexistent"
+                ],
+                poke_interval=0.1,
+                timeout=1,
+                dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
         def test_hive_partition_sensor(self):
             t = operators.sensors.HivePartitionSensor(
                 task_id='hive_partition_check',


Mime
View raw message