airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davy...@apache.org
Subject incubator-airflow git commit: [Airflow-2202] Add filter support in HiveMetastoreHook().max_partition()
Date Sat, 17 Mar 2018 01:02:47 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 133e249e0 -> 9a315efc7


[Airflow-2202] Add filter support in HiveMetastoreHook().max_partition()

Adding back support for filter in max_partition()
which could be used by some valid use cases. It
will work for tables with multiple partitions,
which is the behavior before (tho the doc stated
it only works for single partitioned table). This
change also kept the behavior when trying to get
max partition on a sub-partitioned table without
supplying filter--it will return the max partition
value of the partition key even it is not unique.

Some extra checks are added to provide more
meaningful exception messages.

Closes #3117 from yrqls21/kevin_yang_add_filter


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9a315efc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9a315efc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9a315efc

Branch: refs/heads/master
Commit: 9a315efc797da3936ef0ee9065307b1e02cab43a
Parents: 133e249
Author: Kevin Yang <kevin.yang@airbnb.com>
Authored: Fri Mar 16 18:01:48 2018 -0700
Committer: Dan Davydov <dan.davydov@airbnb.com>
Committed: Fri Mar 16 18:01:54 2018 -0700

----------------------------------------------------------------------
 airflow/hooks/hive_hooks.py   | 109 ++++++++++++++++++++++++-------------
 airflow/macros/hive.py        |  21 ++++---
 tests/hooks/test_hive_hook.py |  57 +++++++++++++++----
 3 files changed, 129 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9a315efc/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 128be41..7b5ff10 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -614,62 +614,95 @@ class HiveMetastoreHook(BaseHook):
             return [dict(zip(pnames, p.values)) for p in parts]
 
     @staticmethod
-    def _get_max_partition_from_part_names(part_names, key_name):
-        """
-        Helper method to get max partition from part names. Works only
-        when partition format follows '{key}={value}' and key_name is name of
-        the only partition key.
-        :param part_names: list of partition names
-        :type part_names: list
-        :param key_name: partition key name
-        :type key_name: str
-        :return: Max partition or None if part_names is empty.
-        """
-        if not part_names:
+    def _get_max_partition_from_part_specs(part_specs, partition_key, filter_map):
+        """
+        Helper method to get max partition of partitions with partition_key
+        from part specs. key:value pair in filter_map will be used to
+        filter out partitions.
+
+        :param part_specs: list of partition specs.
+        :type part_specs: list
+        :param partition_key: partition key name.
+        :type partition_key: string
+        :param filter_map: partition_key:partition_value map used for partition filtering,
+                           e.g. {'key1': 'value1', 'key2': 'value2'}.
+                           Only partitions matching all partition_key:partition_value
+                           pairs will be considered as candidates of max partition.
+        :type filter_map: map
+        :return: Max partition or None if part_specs is empty.
+        """
+        if not part_specs:
             return None
 
-        prefix = key_name + '='
-        prefix_len = len(key_name) + 1
-        max_val = None
-        for part_name in part_names:
-            if part_name.startswith(prefix):
-                if max_val is None:
-                    max_val = part_name[prefix_len:]
-                else:
-                    max_val = max(max_val, part_name[prefix_len:])
-            else:
-                raise AirflowException(
-                    "Partition name mal-formatted: {}".format(part_name))
-        return max_val
+        # Assuming all specs have the same keys.
+        if partition_key not in part_specs[0].keys():
+            raise AirflowException("Provided partition_key {} "
+                                   "is not in part_specs.".format(partition_key))
 
-    def max_partition(self, schema, table_name, field=None):
+        if filter_map and not set(filter_map.keys()) < set(part_specs[0].keys()):
+            raise AirflowException("Keys in provided filter_map {} "
+                                   "are not subset of part_spec keys: {}"
+                                   .format(', '.join(filter_map.keys()),
+                                           ', '.join(part_specs[0].keys())))
+
+        candidates = [p_dict[partition_key] for p_dict in part_specs
+                      if filter_map is None or
+                      all(item in p_dict.items() for item in filter_map.items())]
+
+        if not candidates:
+            return None
+        else:
+            return max(candidates).encode('utf-8')
+
+    def max_partition(self, schema, table_name, field=None, filter_map=None):
         """
-        Returns the maximum value for all partitions in a table. Works only
-        for tables that have a single partition key. For subpartitioned
-        table, we recommend using signal tables.
+        Returns the maximum value for all partitions with given field in a table.
+        If only one partition key exist in the table, the key will be used as field.
+        filter_map should be a partition_key:partition_value map and will be used to
+        filter out partitions.
+
+        :param schema: schema name.
+        :type schema: string
+        :param table_name: table name.
+        :type table_name: string
+        :param field: partition key to get max partition from.
+        :type field: string
+        :param filter_map: partition_key:partition_value map used for partition filtering.
+        :type filter_map: map
 
         >>> hh = HiveMetastoreHook()
+        >>  filter_map = {'p_key': 'p_val'}
         >>> t = 'static_babynames_partitioned'
-        >>> hh.max_partition(schema='airflow', table_name=t)
+        >>> hh.max_partition(schema='airflow',\
+        ... table_name=t, field='ds', filter_map=filter_map)
         '2015-01-01'
         """
         self.metastore._oprot.trans.open()
         table = self.metastore.get_table(dbname=schema, tbl_name=table_name)
-        if len(table.partitionKeys) != 1:
-            raise AirflowException(
-                "The table isn't partitioned by a single partition key")
-
-        key_name = table.partitionKeys[0].name
-        if field is not None and key_name != field:
-            raise AirflowException("Provided field is not the partition key")
+        key_name_set = set(key.name for key in table.partitionKeys)
+        if len(table.partitionKeys) == 1:
+            field = table.partitionKeys[0].name
+        elif not field:
+            raise AirflowException("Please specify the field you want the max "
+                                   "value for.")
+        elif field not in key_name_set:
+            raise AirflowException("Provided field is not a partition key.")
+
+        if filter_map and not set(filter_map.keys()).issubset(key_name_set):
+            raise AirflowException("Provided filter_map contains keys "
+                                   "that are not partition key.")
 
         part_names = \
             self.metastore.get_partition_names(schema,
                                                table_name,
                                                max_parts=HiveMetastoreHook.MAX_PART_COUNT)
+        part_specs = [self.metastore.partition_name_to_spec(part_name)
+                      for part_name in part_names]
         self.metastore._oprot.trans.close()
 
-        return HiveMetastoreHook._get_max_partition_from_part_names(part_names, key_name)
+        return HiveMetastoreHook._get_max_partition_from_part_specs(part_specs,
+                                                                    field,
+                                                                    filter_map)
 
     def table_exists(self, table_name, db='default'):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9a315efc/airflow/macros/hive.py
----------------------------------------------------------------------
diff --git a/airflow/macros/hive.py b/airflow/macros/hive.py
index ef80fc6..86605a4 100644
--- a/airflow/macros/hive.py
+++ b/airflow/macros/hive.py
@@ -16,9 +16,9 @@ import datetime
 
 
 def max_partition(
-        table, schema="default", field=None, filter=None,
+        table, schema="default", field=None, filter_map=None,
         metastore_conn_id='metastore_default'):
-    '''
+    """
     Gets the max partition for a table.
 
     :param schema: The hive schema the table lives in
@@ -27,24 +27,27 @@ def max_partition(
         notation as in "my_database.my_table", if a dot is found,
         the schema param is disregarded
     :type table: string
-    :param hive_conn_id: The hive connection you are interested in.
+    :param metastore_conn_id: The hive connection you are interested in.
         If your default is set you don't need to use this parameter.
-    :type hive_conn_id: string
-    :param filter: filter on a subset of partition as in
-        `sub_part='specific_value'`
-    :type filter: string
+    :type metastore_conn_id: string
+    :param filter_map: partition_key:partition_value map used for partition filtering,
+                       e.g. {'key1': 'value1', 'key2': 'value2'}.
+                       Only partitions matching all partition_key:partition_value
+                       pairs will be considered as candidates of max partition.
+    :type filter_map: map
     :param field: the field to get the max value from. If there's only
         one partition field, this will be inferred
+    :type field: str
 
     >>> max_partition('airflow.static_babynames_partitioned')
     '2015-01-01'
-    '''
+    """
     from airflow.hooks.hive_hooks import HiveMetastoreHook
     if '.' in table:
         schema, table = table.split('.')
     hh = HiveMetastoreHook(metastore_conn_id=metastore_conn_id)
     return hh.max_partition(
-        schema=schema, table_name=table, field=field)
+        schema=schema, table_name=table, field=field, filter_map=filter_map)
 
 
 def _closest_date(target_dt, date_list, before_target=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9a315efc/tests/hooks/test_hive_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_hive_hook.py b/tests/hooks/test_hive_hook.py
index c7da8e5..30dd46d 100644
--- a/tests/hooks/test_hive_hook.py
+++ b/tests/hooks/test_hive_hook.py
@@ -20,20 +20,55 @@ from airflow.hooks.hive_hooks import HiveMetastoreHook
 
 
 class TestHiveMetastoreHook(unittest.TestCase):
-    def test_get_max_partition_from_empty_part_names(self):
+    VALID_FILTER_MAP = {'key2': 'value2'}
+
+    def test_get_max_partition_from_empty_part_specs(self):
         max_partition = \
-            HiveMetastoreHook._get_max_partition_from_part_names([], 'some_key')
+            HiveMetastoreHook._get_max_partition_from_part_specs([],
+                                                                 'key1',
+                                                                 self.VALID_FILTER_MAP)
         self.assertIsNone(max_partition)
 
-    def test_get_max_partition_from_mal_formatted_part_names(self):
+    def test_get_max_partition_from_valid_part_specs_and_invalid_filter_map(self):
+        with self.assertRaises(AirflowException):
+            HiveMetastoreHook._get_max_partition_from_part_specs(
+                [{'key1': 'value1', 'key2': 'value2'},
+                 {'key1': 'value3', 'key2': 'value4'}],
+                'key1',
+                {'key3': 'value5'})
+
+    def test_get_max_partition_from_valid_part_specs_and_invalid_partition_key(self):
+        with self.assertRaises(AirflowException):
+            HiveMetastoreHook._get_max_partition_from_part_specs(
+                [{'key1': 'value1', 'key2': 'value2'},
+                 {'key1': 'value3', 'key2': 'value4'}],
+                'key3',
+                self.VALID_FILTER_MAP)
+
+    def test_get_max_partition_from_valid_part_specs_and_none_partition_key(self):
         with self.assertRaises(AirflowException):
-            HiveMetastoreHook._get_max_partition_from_part_names(
-                ['bad_partition_name'], 'some_key')
+            HiveMetastoreHook._get_max_partition_from_part_specs(
+                [{'key1': 'value1', 'key2': 'value2'},
+                 {'key1': 'value3', 'key2': 'value4'}],
+                None,
+                self.VALID_FILTER_MAP)
+
+    def test_get_max_partition_from_valid_part_specs_and_none_filter_map(self):
+        max_partition = \
+            HiveMetastoreHook._get_max_partition_from_part_specs(
+                [{'key1': 'value1', 'key2': 'value2'},
+                 {'key1': 'value3', 'key2': 'value4'}],
+                'key1',
+                None)
+
+        # No partition will be filtered out.
+        self.assertEqual(max_partition, b'value3')
 
-    def test_get_max_partition_from_mal_valid_part_names(self):
+    def test_get_max_partition_from_valid_part_specs(self):
         max_partition = \
-            HiveMetastoreHook._get_max_partition_from_part_names(['some_key=value1',
-                                                                  'some_key=value2',
-                                                                  'some_key=value3'],
-                                                                 'some_key')
-        self.assertEqual(max_partition, 'value3')
+            HiveMetastoreHook._get_max_partition_from_part_specs(
+                [{'key1': 'value1', 'key2': 'value2'},
+                 {'key1': 'value3', 'key2': 'value4'}],
+                'key1',
+                self.VALID_FILTER_MAP)
+        self.assertEqual(max_partition, b'value1')


Mime
View raw message