kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [3/8] kudu git commit: KUDU-1612 - [python] Enable setting of read mode for scanning
Date Thu, 06 Oct 2016 00:05:59 GMT
KUDU-1612 - [python] Enable setting of read mode for scanning

Currently the python client is unable to set the read mode for
scanning, so all scans are done as READ_LATEST.  This patch enables
the ability to set the read mode so that the python client can read
at snapshots. This patch includes multiple tests.

Change-Id: I2c61ef09f6e15bad2c44d9caf85b2cc2582b8a49
Reviewed-on: http://gerrit.cloudera.org:8080/4520
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dralves@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4db8851c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4db8851c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4db8851c

Branch: refs/heads/master
Commit: 4db8851cf31f68f2db8dafc41a4609017371501b
Parents: bda9b94
Author: Jordan Birdsell <jordantbirdsell@gmail.com>
Authored: Thu Sep 22 19:11:15 2016 -0500
Committer: David Ribeiro Alves <dralves@apache.org>
Committed: Wed Oct 5 21:53:07 2016 +0000

----------------------------------------------------------------------
 python/kudu/__init__.py             |   4 +-
 python/kudu/client.pyx              | 168 ++++++++++++++++++++++++++++++-
 python/kudu/libkudu_client.pxd      |  11 +-
 python/kudu/tests/test_scanner.py   |  32 ++++++
 python/kudu/tests/test_scantoken.py |  33 ++++++
 python/kudu/tests/util.py           |  33 ++++++
 python/kudu/util.py                 |  15 +++
 7 files changed, 289 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/__init__.py
----------------------------------------------------------------------
diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py
index 1a1ff39..99ca0d9 100644
--- a/python/kudu/__init__.py
+++ b/python/kudu/__init__.py
@@ -21,7 +21,9 @@ from kudu.client import (Client, Table, Scanner, Session,  # noqa
                          ScanToken,
                          FLUSH_AUTO_BACKGROUND,
                          FLUSH_AUTO_SYNC,
-                         FLUSH_MANUAL)
+                         FLUSH_MANUAL,
+                         READ_LATEST,
+                         READ_AT_SNAPSHOT)
 
 from kudu.errors import (KuduException, KuduBadStatus, KuduNotFound,  # noqa
                          KuduNotSupported,

http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 29004bb..150997d 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -28,12 +28,21 @@ from libkudu_client cimport *
 from kudu.compat import tobytes, frombytes
 from kudu.schema cimport Schema, ColumnSchema
 from kudu.errors cimport check_status
-from kudu.util import to_unixtime_micros, from_unixtime_micros
+from kudu.util import to_unixtime_micros, from_unixtime_micros, from_hybridtime
 from errors import KuduException
 
 import six
 
 
+# Read mode enums
+READ_LATEST = ReadMode_Latest
+READ_AT_SNAPSHOT = ReadMode_Snapshot
+
+cdef dict _read_modes = {
+    'latest': ReadMode_Latest,
+    'snapshot': ReadMode_Snapshot
+}
+
 cdef dict _type_names = {
     KUDU_INT8 : "KUDU_INT8",
     KUDU_INT16 : "KUDU_INT16",
@@ -240,6 +249,25 @@ cdef class Client:
         # Nothing yet to clean up here
         pass
 
+    def latest_observed_timestamp(self):
+        """
+        Get the highest timestamp observed by the client in UTC. This
+        is intended to gain external consistency across clients.
+
+        Note: The latest observed timestamp can also be used to start a
+        snapshot scan on a table which is guaranteed to contain all data
+        written or previously read by this client. This should be treated
+        as experimental as it this method will change or disappear in a
+        future release. Additionally, note that 1 must be added to the
+        value to be used in snapshot reads (this is taken care of in the
+        from_hybridtime method).
+
+        Returns
+        -------
+        latest : datetime.datetime
+        """
+        return from_hybridtime(self.cp.GetLatestObservedTimestamp())
+
     def create_table(self, table_name, Schema schema, partitioning, n_replicas=None):
         """
         Creates a new Kudu table from the passed Schema and options.
@@ -1286,6 +1314,75 @@ cdef class Scanner:
         check_status(self.scanner.SetProjectedColumnIndexes(v_indexes))
         return self
 
+    def set_read_mode(self, read_mode):
+        """
+        Set the read mode for scanning.
+
+        Parameters
+        ----------
+        read_mode : {'latest', 'snapshot'}
+          You can also use the constants READ_LATEST, READ_AT_SNAPSHOT
+
+        Returns
+        -------
+        self : Scanner
+        """
+        cdef ReadMode rmode
+
+        def invalid_selection_policy():
+            raise ValueError('Invalid read mode: {0}'
+                             .format(read_mode))
+
+        if isinstance(read_mode, int):
+            if 0 <= read_mode < len(_read_modes):
+                check_status(self.scanner.SetReadMode(
+                             <ReadMode> read_mode))
+            else:
+                invalid_selection_policy()
+        else:
+            try:
+                check_status(self.scanner.SetReadMode(
+                    _read_modes[read_mode.lower()]))
+            except KeyError:
+                invalid_selection_policy()
+
+        return self
+
+    def set_snapshot(self, timestamp, format=None):
+        """
+        Set the snapshot timestamp for this scanner.
+
+        Parameters
+        ---------
+        timestamp : datetime.datetime or string
+          If a string is provided, a format must be provided as well.
+          NOTE: This should be in UTC. If a timezone aware datetime
+          object is provided, it will be converted to UTC, otherwise,
+          all other input is assumed to be UTC.
+        format : Required if a string timestamp is provided
+          Uses the C strftime() function, see strftime(3) documentation.
+
+        Returns
+        -------
+        self : Scanner
+        """
+        # Confirm that a format is provided if timestamp is a string
+        if isinstance(timestamp, six.string_types) and not format:
+            raise ValueError(
+                "To use a string timestamp you must provide a format. " +
+                "See the strftime(3) documentation.")
+
+        snapshot_micros = to_unixtime_micros(timestamp, format)
+
+        if snapshot_micros >= 0:
+            check_status(self.scanner.SetSnapshotMicros(
+                         <uint64_t> snapshot_micros))
+        else:
+            raise ValueError(
+                "Snapshot Timestamps be greater than the unix epoch.")
+
+        return self
+
     def set_fault_tolerant(self):
         """
         Makes the underlying KuduScanner fault tolerant.
@@ -1553,6 +1650,75 @@ cdef class ScanTokenBuilder:
         check_status(self._builder.SetBatchSizeBytes(batch_size))
         return self
 
+    def set_read_mode(self, read_mode):
+        """
+        Set the read mode for scanning.
+
+        Parameters
+        ----------
+        read_mode : {'latest', 'snapshot'}
+          You can also use the constants READ_LATEST, READ_AT_SNAPSHOT
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        cdef ReadMode rmode
+
+        def invalid_selection_policy():
+            raise ValueError('Invalid read mode: {0}'
+                             .format(read_mode))
+
+        if isinstance(read_mode, int):
+            if 0 <= read_mode < len(_read_modes):
+                check_status(self._builder.SetReadMode(
+                             <ReadMode> read_mode))
+            else:
+                invalid_selection_policy()
+        else:
+            try:
+                check_status(self._builder.SetReadMode(
+                    _read_modes[read_mode.lower()]))
+            except KeyError:
+                invalid_selection_policy()
+
+        return self
+
+    def set_snapshot(self, timestamp, format=None):
+        """
+        Set the snapshot timestamp for this ScanTokenBuilder.
+
+        Parameters
+        ---------
+        timestamp : datetime.datetime or string
+          If a string is provided, a format must be provided as well.
+          NOTE: This should be in UTC. If a timezone aware datetime
+          object is provided, it will be converted to UTC, otherwise,
+          all other input is assumed to be UTC.
+        format : Required if a string timestamp is provided
+          Uses the C strftime() function, see strftime(3) documentation.
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        # Confirm that a format is provided if timestamp is a string
+        if isinstance(timestamp, six.string_types) and not format:
+            raise ValueError(
+                "To use a string timestamp you must provide a format. " +
+                "See the strftime(3) documentation.")
+
+        snapshot_micros = to_unixtime_micros(timestamp, format)
+
+        if snapshot_micros >= 0:
+            check_status(self._builder.SetSnapshotMicros(
+                         <uint64_t> snapshot_micros))
+        else:
+            raise ValueError(
+                "Snapshot Timestamps be greater than the unix epoch.")
+
+        return self
+
     def set_timout_millis(self, millis):
         """
         Sets the timeout in milliseconds.

http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/libkudu_client.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index b9022e0..9c9899f 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -456,6 +456,10 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
         CLOSEST_REPLICA " kudu::client::KuduClient::CLOSEST_REPLICA"
         FIRST_REPLICA " kudu::client::KuduClient::FIRST_REPLICA"
 
+    enum ReadMode" kudu::client::KuduScanner::ReadMode":
+        ReadMode_Latest " kudu::client::KuduScanner::READ_LATEST"
+        ReadMode_Snapshot " kudu::client::KuduScanner::READ_AT_SNAPSHOT"
+
     cdef cppclass KuduClient:
 
         Status DeleteTable(const string& table_name)
@@ -479,6 +483,7 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
         KuduTableAlterer* NewTableAlterer()
         Status IsAlterTableInProgress(const string& table_name,
                                       c_bool* alter_in_progress)
+        uint64_t GetLatestObservedTimestamp()
 
         shared_ptr[KuduSession] NewSession()
 
@@ -601,9 +606,6 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
 
         KuduClient* client()
 
-    enum ReadMode" kudu::client::KuduScanner::ReadMode":
-        READ_LATEST " kudu::client::KuduScanner::READ_LATEST"
-        READ_AT_SNAPSHOT " kudu::client::KuduScanner::READ_AT_SNAPSHOT"
 
     cdef cppclass KuduScanner:
         KuduScanner(KuduTable* table)
@@ -620,7 +622,7 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
         Status SetSelection(ReplicaSelection selection)
 
         Status SetReadMode(ReadMode read_mode)
-        Status SetSnapshot(uint64_t snapshot_timestamp_micros)
+        Status SetSnapshotMicros(uint64_t snapshot_timestamp_micros)
         Status SetTimeoutMillis(int millis)
         Status SetProjectedColumnNames(const vector[string]& col_names)
         Status SetProjectedColumnIndexes(const vector[int]& col_indexes)
@@ -651,7 +653,6 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
         Status SetReadMode(ReadMode read_mode)
         Status SetFaultTolerant()
         Status SetSnapshotMicros(uint64_t snapshot_timestamp_micros)
-        Status SetSnapshotRaw(uint64_t snapshot_timestamp)
         Status SetSelection(ReplicaSelection selection)
         Status SetTimeoutMillis(int millis)
         Status AddConjunctPredicate(KuduPredicate* pred)

http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/tests/test_scanner.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scanner.py b/python/kudu/tests/test_scanner.py
index e0fcd37..950f0dd 100644
--- a/python/kudu/tests/test_scanner.py
+++ b/python/kudu/tests/test_scanner.py
@@ -23,6 +23,7 @@ from kudu.tests.util import TestScanBase
 from kudu.tests.common import KuduTestBase
 import kudu
 import datetime
+import time
 
 
 class TestScanner(TestScanBase):
@@ -182,3 +183,34 @@ class TestScanner(TestScanBase):
         scanner = self.table.scanner()
         scanner.set_fault_tolerant().open()
         self.assertEqual(sorted(self.tuples), scanner.read_all_tuples())
+
+    def test_read_mode(self):
+        """
+        Test setting the read mode and scanning against a
+        snapshot and latest
+        """
+        # Delete row
+        self.delete_insert_row_for_read_test()
+
+        # Check scanner results prior to delete
+        scanner = self.table.scanner()
+        scanner.set_read_mode('snapshot')\
+            .set_snapshot(self.snapshot_timestamp)\
+            .open()
+
+        self.assertEqual(sorted(self.tuples[1:]), sorted(scanner.read_all_tuples()))
+
+        #Check scanner results after delete
+        timeout = time.time() + 10
+        check_tuples = []
+        while check_tuples != sorted(self.tuples):
+            if time.time() > timeout:
+                raise TimeoutError("Could not validate results in allocated" +
+                                   "time.")
+
+            scanner = self.table.scanner()
+            scanner.set_read_mode(kudu.READ_LATEST)\
+                .open()
+            check_tuples = sorted(scanner.read_all_tuples())
+            # Avoid tight looping
+            time.sleep(0.05)

http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/tests/test_scantoken.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py
index a5ae256..5a37486 100644
--- a/python/kudu/tests/test_scantoken.py
+++ b/python/kudu/tests/test_scantoken.py
@@ -176,3 +176,36 @@ class TestScanToken(TestScanBase):
         # Serialize execute and verify
         self._subtest_serialize_thread_and_verify(builder.build(),
                                                   [self.tuples[98]])
+
+    def test_read_mode(self):
+        """
+        Test setting the read mode and scanning against a
+        snapshot and latest
+        """
+        # Delete row
+        self.delete_insert_row_for_read_test()
+
+        # Check scanner results prior to delete
+        builder = self.table.scan_token_builder()
+        tokens = builder.set_read_mode('snapshot') \
+            .set_snapshot(self.snapshot_timestamp) \
+            .build()
+
+        tuples = []
+        for token in tokens:
+            scanner = token.into_kudu_scanner().open()
+            tuples.extend(scanner.read_all_tuples())
+
+        self.assertEqual(sorted(self.tuples[1:]), sorted(tuples))
+
+        #Check scanner results after insterts
+        builder = self.table.scan_token_builder()
+        tokens = builder.set_read_mode(kudu.READ_LATEST) \
+            .build()
+
+        tuples = []
+        for token in tokens:
+            scanner = token.into_kudu_scanner().open()
+            tuples.extend(scanner.read_all_tuples())
+
+        self.assertEqual(sorted(self.tuples), sorted(tuples))

http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/tests/util.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/util.py b/python/kudu/tests/util.py
index 39520e4..6a72a63 100644
--- a/python/kudu/tests/util.py
+++ b/python/kudu/tests/util.py
@@ -62,6 +62,12 @@ class TestScanBase(KuduTestBase, unittest.TestCase):
         pass
 
     def insert_new_unixtime_micros_rows(self):
+        # Get current UTC datetime to be used for read at snapshot test
+        # Not using the easter time value below as that may not be the
+        # actual local timezone of the host executing this test and as
+        # such does would not accurately be offset to UTC
+        self.snapshot_timestamp = datetime.datetime.utcnow()
+
         # Insert new rows
         # Also test a timezone other than UTC to confirm that
         # conversion to UTC is properly applied
@@ -94,4 +100,31 @@ class TestScanBase(KuduTestBase, unittest.TestCase):
             # Apply timezone
             list[3] = list[3].replace(tzinfo=pytz.utc)
             self.tuples.append(tuple(list))
+        session.flush()
+
+    def delete_insert_row_for_read_test(self):
+
+        # Retrive row to delete so it can be reinserted into the table so
+        # that other tests do not fail
+        row = self.table.scanner()\
+                    .set_fault_tolerant()\
+                    .open()\
+                    .read_all_tuples()[0]
+
+        # Delete row from table
+        session = self.client.new_session()
+        op = self.table.new_delete()
+        op['key'] = row[0]
+        session.apply(op)
+        session.flush()
+
+        # Get latest observed timestamp for snapshot
+        self.snapshot_timestamp = self.client.latest_observed_timestamp()
+
+        # Insert row back into table so that other tests don't fail.
+        session = self.client.new_session()
+        op = self.table.new_insert()
+        for idx, val in enumerate(row):
+            op[idx] = val
+        session.apply(op)
         session.flush()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/util.py
----------------------------------------------------------------------
diff --git a/python/kudu/util.py b/python/kudu/util.py
index 603e0e0..8533b04 100644
--- a/python/kudu/util.py
+++ b/python/kudu/util.py
@@ -92,3 +92,18 @@ def from_unixtime_micros(unixtime_micros):
     else:
         raise ValueError("Invalid unixtime_micros value." +
                          "You must provide an integer value.")
+
+def from_hybridtime(hybridtime):
+    """
+    Convert a raw HybridTime value to a datetime in UTC.
+
+    Parameters
+    ----------
+    hybridtime : long
+
+    Returns
+    -------
+    timestamp : datetime.datetime in UTC
+    """
+    # Add 1 so the value is usable for snapshot scans
+    return from_unixtime_micros(int(hybridtime >> 12) + 1)


Mime
View raw message