kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [1/4] kudu git commit: KUDU-1671 - [python] Enable predicate pushdown for additional types
Date Thu, 06 Oct 2016 02:57:16 GMT
Repository: kudu
Updated Branches:
  refs/heads/master bce1dd777 -> cae2f9899


KUDU-1671 - [python] Enable predicate pushdown for additional types

Currently, the python client does not support predicate pushdown for
boolean and unixtime_micros values. Additionally, as pointed out in
KUDU-1672, float predicates have a bug. This patch addresses both
of these issues.  Test cases have been added to validate this
functionality.  Two minor namespace issues were addressed as well
for float and boolean types.

Change-Id: If5766d24055dfba5fa371fc61c6dfd66adc54273
Reviewed-on: http://gerrit.cloudera.org:8080/4589
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dralves@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4d1b1e97
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4d1b1e97
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4d1b1e97

Branch: refs/heads/master
Commit: 4d1b1e973655b783f2e7b338b6574b2e2166e08f
Parents: bce1dd7
Author: Jordan Birdsell <jordantbirdsell@gmail.com>
Authored: Sun Oct 2 16:58:17 2016 -0400
Committer: David Ribeiro Alves <dralves@apache.org>
Committed: Thu Oct 6 00:55:43 2016 +0000

----------------------------------------------------------------------
 python/kudu/__init__.py             |  4 +-
 python/kudu/client.pyx              | 40 +++++++--------
 python/kudu/tests/test_scanner.py   | 37 +++++++++++++-
 python/kudu/tests/test_scantoken.py | 40 +++++++++++++--
 python/kudu/tests/util.py           | 84 +++++++++++++++++++++++++++++++-
 python/kudu/util.py                 |  5 ++
 6 files changed, 182 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4d1b1e97/python/kudu/__init__.py
----------------------------------------------------------------------
diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py
index 99ca0d9..771f99a 100644
--- a/python/kudu/__init__.py
+++ b/python/kudu/__init__.py
@@ -30,8 +30,8 @@ from kudu.errors import (KuduException, KuduBadStatus, KuduNotFound,  #
noqa
                          KuduInvalidArgument)
 
 from kudu.schema import (int8, int16, int32, int64, string_ as string,  # noqa
-                         double_ as double, float_, binary,
-                         unixtime_micros,
+                         double_ as double, float_, float_ as float, binary,
+                         unixtime_micros, bool_ as bool,
                          KuduType,
                          SchemaBuilder, ColumnSpec, Schema, ColumnSchema,
                          COMPRESSION_DEFAULT,

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d1b1e97/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 150997d..261fdbf 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -766,21 +766,28 @@ cdef class Column:
     cdef KuduValue* box_value(self, object obj) except NULL:
         cdef:
             KuduValue* val
-            Slice* slc
+            Slice slc
 
-        if isinstance(obj, unicode):
-            obj = obj.encode('utf8')
-
-        if isinstance(obj, bytes):
-            slc = new Slice(<char*> obj, len(obj))
-            val = KuduValue.CopyString(deref(slc))
-            del slc
-        elif isinstance(obj, int):
+        if (self.spec.type.name[:3] == 'int'):
             val = KuduValue.FromInt(obj)
-        elif isinstance(obj, float):
+        elif (self.spec.type.name == 'string'):
+            if isinstance(obj, unicode):
+                obj = obj.encode('utf8')
+
+            slc = Slice(<char*> obj, len(obj))
+            val = KuduValue.CopyString(slc)
+        elif (self.spec.type.name == 'bool'):
+            val = KuduValue.FromBool(obj)
+        elif (self.spec.type.name == 'float'):
+            val = KuduValue.FromFloat(obj)
+        elif (self.spec.type.name == 'double'):
             val = KuduValue.FromDouble(obj)
+        elif (self.spec.type.name == 'unixtime_micros'):
+            obj = to_unixtime_micros(obj)
+            val = KuduValue.FromInt(obj)
         else:
-            raise TypeError(obj)
+            raise TypeError("Cannot add predicate for kudu type <{0}>"
+                            .format(self.spec.type.name))
 
         return val
 
@@ -2006,15 +2013,8 @@ cdef class PartialRow:
             self.row.SetStringCopy(i, deref(slc))
             del slc
         elif t == KUDU_UNIXTIME_MICROS:
-            # String with custom format
-            #  eg: ("2016-01-01", "%Y-%m-%d")
-            if type(value) is tuple:
-                self.row.SetUnixTimeMicros(i, <int64_t>
-                    to_unixtime_micros(value[0], value[1]))
-                # datetime.datetime input or string with default format
-            else:
-                self.row.SetUnixTimeMicros(i, <int64_t>
-                    to_unixtime_micros(value))
+            self.row.SetUnixTimeMicros(i, <int64_t>
+                to_unixtime_micros(value))
         else:
             raise TypeError("Cannot set kudu type <{0}>.".format(_type_names[t]))
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d1b1e97/python/kudu/tests/test_scanner.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scanner.py b/python/kudu/tests/test_scanner.py
index 950f0dd..72a22a4 100644
--- a/python/kudu/tests/test_scanner.py
+++ b/python/kudu/tests/test_scanner.py
@@ -141,14 +141,14 @@ class TestScanner(TestScanBase):
         with self.assertRaises(TypeError):
             scanner.add_predicates([sv >= None])
 
-        with self.assertRaises(kudu.KuduInvalidArgument):
+        with self.assertRaises(TypeError):
             scanner.add_predicates([sv >= 1])
 
         with self.assertRaises(TypeError):
             scanner.add_predicates([sv.in_list(['testing',
                                                 datetime.datetime.utcnow()])])
 
-        with self.assertRaises(kudu.KuduInvalidArgument):
+        with self.assertRaises(TypeError):
             scanner.add_predicates([sv.in_list([
                 'hello_20',
                 120
@@ -214,3 +214,36 @@ class TestScanner(TestScanBase):
             check_tuples = sorted(scanner.read_all_tuples())
             # Avoid tight looping
             time.sleep(0.05)
+
+    def verify_pred_type_scans(self, preds, row_indexes, count_only=False):
+        # Using the incoming list of predicates, verify that the row returned
+        # matches the inserted tuple at the row indexes specified in a
+        # slice object
+        scanner = self.type_table.scanner()
+        scanner.set_fault_tolerant()
+        scanner.add_predicates(preds)
+        scanner.set_projected_column_names(self.projected_names_w_o_float)
+        tuples = scanner.open().read_all_tuples()
+
+        # verify rows
+        if count_only:
+            self.assertEqual(len(self.type_test_rows[row_indexes]), len(tuples))
+        else:
+            self.assertEqual(sorted(self.type_test_rows[row_indexes]), tuples)
+
+    def test_unixtime_micros_pred(self):
+        # Test unixtime_micros value predicate
+        self._test_unixtime_micros_pred()
+
+    def test_bool_pred(self):
+        # Test a boolean value predicate
+        self._test_bool_pred()
+
+    def test_double_pred(self):
+        # Test a double precision float predicate
+        self._test_double_pred()
+
+    def test_float_pred(self):
+        # Test a single precision float predicate
+        # Does a row check count only
+        self._test_float_pred()

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d1b1e97/python/kudu/tests/test_scantoken.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py
index 5a37486..da879d5 100644
--- a/python/kudu/tests/test_scantoken.py
+++ b/python/kudu/tests/test_scantoken.py
@@ -38,7 +38,7 @@ class TestScanToken(TestScanBase):
     def setUp(self):
         pass
 
-    def _subtest_serialize_thread_and_verify(self, tokens, expected_tuples):
+    def _subtest_serialize_thread_and_verify(self, tokens, expected_tuples, count_only=False):
         """
         Given the input serialized tokens, spawn new threads,
         execute them and validate the results
@@ -55,7 +55,10 @@ class TestScanToken(TestScanBase):
         for result in results:
             actual_tuples += result
 
-        self.assertEqual(sorted(expected_tuples), sorted(actual_tuples))
+        if count_only:
+            self.assertEqual(expected_tuples, actual_tuples)
+        else:
+            self.assertEqual(sorted(expected_tuples), sorted(actual_tuples))
 
     def test_scan_token_serde_threaded_with_named_projection(self):
         """
@@ -113,7 +116,7 @@ class TestScanToken(TestScanBase):
         with self.assertRaises(TypeError):
             builder.add_predicates([sv >= None])
 
-        with self.assertRaises(kudu.KuduInvalidArgument):
+        with self.assertRaises(TypeError):
             builder.add_predicates([sv >= 1])
 
     def test_scan_token_batch_by_batch_with_local_scanner(self):
@@ -209,3 +212,34 @@ class TestScanToken(TestScanBase):
             tuples.extend(scanner.read_all_tuples())
 
         self.assertEqual(sorted(self.tuples), sorted(tuples))
+
+    def verify_pred_type_scans(self, preds, row_indexes, count_only=False):
+        # Using the incoming list of predicates, verify that the row returned
+        # matches the inserted tuple at the row indexes specified in a
+        # slice object
+        builder = self.type_table.scan_token_builder()
+        builder.set_fault_tolerant()
+        builder.set_projected_column_names(self.projected_names_w_o_float)
+        builder.add_predicates(preds)
+
+        # Verify rows
+        self._subtest_serialize_thread_and_verify(builder.build(),
+                                                  self.type_test_rows[row_indexes],
+                                                  count_only)
+
+    def test_unixtime_micros_pred(self):
+        # Test unixtime_micros value predicate
+        self._test_unixtime_micros_pred()
+
+    def test_bool_pred(self):
+        # Test a boolean value predicate
+        self._test_bool_pred()
+
+    def test_double_pred(self):
+        # Test a double precision float predicate
+        self._test_double_pred()
+
+    def test_float_pred(self):
+        # Test a single precision float predicate
+        # Does a row check count only
+        self._test_float_pred()

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d1b1e97/python/kudu/tests/util.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/util.py b/python/kudu/tests/util.py
index 6a72a63..9f64e39 100644
--- a/python/kudu/tests/util.py
+++ b/python/kudu/tests/util.py
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -17,6 +18,7 @@
 # under the License.
 
 from kudu.compat import unittest
+from kudu.client import Partitioning
 from kudu.tests.common import KuduTestBase
 import kudu
 import datetime
@@ -58,6 +60,50 @@ class TestScanBase(KuduTestBase, unittest.TestCase):
         self.table = table
         self.tuples = tuples
 
+        # Create table to test all types
+        # for various predicate tests
+        table_name = 'type-test'
+        # Create schema, partitioning and then table
+        builder = kudu.schema_builder()
+        builder.add_column('key').type(kudu.int64).nullable(False).primary_key()
+        builder.add_column('unixtime_micros_val', type_=kudu.unixtime_micros, nullable=False)
+        builder.add_column('string_val', type_=kudu.string, compression=kudu.COMPRESSION_LZ4,
encoding='prefix')
+        builder.add_column('bool_val', type_=kudu.bool)
+        builder.add_column('double_val', type_=kudu.double)
+        builder.add_column('int8_val', type_=kudu.int8)
+        builder.add_column('float_val', type_=kudu.float)
+        schema = builder.build()
+
+        self.projected_names_w_o_float = [
+            col for col in schema.names if col != 'float_val'
+        ]
+
+        partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3)
+
+        self.client.create_table(table_name, schema, partitioning)
+        self.type_table = self.client.table(table_name)
+
+        # Insert new rows
+        self.type_test_rows = [
+            (1, datetime.datetime(2016, 1, 1).replace(tzinfo=pytz.utc),
+             "Test One", True, 1.7976931348623157 * (10^308), 127, 3.402823 * (10^38)),
+            (2, datetime.datetime.utcnow().replace(tzinfo=pytz.utc),
+             "测试二", False, 200.1, -1, -150.2)
+        ]
+        session = self.client.new_session()
+        for row in self.type_test_rows:
+            op = self.type_table.new_insert()
+            for idx, val in enumerate(row):
+                op[idx] = val
+            session.apply(op)
+        session.flush()
+
+        # Remove the float values from the type_test_rows tuples so we can
+        # compare the other vals
+        self.type_test_rows = [
+            tuple[:-1] for tuple in self.type_test_rows
+        ]
+
     def setUp(self):
         pass
 
@@ -127,4 +173,40 @@ class TestScanBase(KuduTestBase, unittest.TestCase):
         for idx, val in enumerate(row):
             op[idx] = val
         session.apply(op)
-        session.flush()
\ No newline at end of file
+        session.flush()
+
+    def _test_unixtime_micros_pred(self):
+        # Test unixtime_micros value predicate
+        self.verify_pred_type_scans(
+            preds=[
+                self.type_table['unixtime_micros_val'] == ("2016-01-01", "%Y-%m-%d")
+            ],
+            row_indexes=slice(0,1)
+        )
+
+    def _test_bool_pred(self):
+        # Test a boolean value predicate
+        self.verify_pred_type_scans(
+            preds=[
+                self.type_table['bool_val'] == False
+            ],
+            row_indexes=slice(1,2)
+        )
+
+    def _test_double_pred(self):
+        # Test a double precision float predicate
+        self.verify_pred_type_scans(
+            preds=[
+                self.type_table['double_val'] < 200.11
+            ],
+            row_indexes=slice(1,2)
+        )
+
+    def _test_float_pred(self):
+        self.verify_pred_type_scans(
+            preds=[
+                self.type_table['float_val'] == 3.402823 * (10^38)
+            ],
+            row_indexes=slice(0, 1),
+            count_only=True
+        )

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d1b1e97/python/kudu/util.py
----------------------------------------------------------------------
diff --git a/python/kudu/util.py b/python/kudu/util.py
index 8533b04..f290425 100644
--- a/python/kudu/util.py
+++ b/python/kudu/util.py
@@ -46,6 +46,9 @@ def to_unixtime_micros(timestamp, format = "%Y-%m-%dT%H:%M:%S.%f"):
     ---------
     timestamp : datetime.datetime or string
       If a string is provided, a format must be provided as well.
+      A tuple may be provided in place of the timestamp with a
+      string value and a format. This is useful for predicates
+      and setting values where this method is indirectly called.
       Timezones provided in the string are not supported at this
       time. UTC unless provided in a datetime object.
     format : Required if a string timestamp is provided
@@ -60,6 +63,8 @@ def to_unixtime_micros(timestamp, format = "%Y-%m-%dT%H:%M:%S.%f"):
         pass
     elif isinstance(timestamp, six.string_types):
         timestamp = datetime.datetime.strptime(timestamp, format)
+    elif isinstance(timestamp, tuple):
+        timestamp = datetime.datetime.strptime(timestamp[0], timestamp[1])
     else:
         raise ValueError("Invalid timestamp type. " +
                          "You must provide a datetime.datetime or a string.")


Mime
View raw message