Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9C517200BA3 for ; Thu, 6 Oct 2016 02:05:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9AF1E160ADE; Thu, 6 Oct 2016 00:05:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3F12A160AEA for ; Thu, 6 Oct 2016 02:05:58 +0200 (CEST) Received: (qmail 64999 invoked by uid 500); 6 Oct 2016 00:05:57 -0000 Mailing-List: contact commits-help@kudu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kudu.apache.org Delivered-To: mailing list commits@kudu.apache.org Received: (qmail 64945 invoked by uid 99); 6 Oct 2016 00:05:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Oct 2016 00:05:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1ED7EE08B3; Thu, 6 Oct 2016 00:05:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: todd@apache.org To: commits@kudu.apache.org Date: Thu, 06 Oct 2016 00:05:59 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/8] kudu git commit: KUDU-1612 - [python] Enable setting of read mode for scanning archived-at: Thu, 06 Oct 2016 00:05:59 -0000 KUDU-1612 - [python] Enable setting of read mode for scanning Currently the python client is unable to set the read mode for scanning, so all scans are done as READ_LATEST. This patch enables the ability to set the read mode so that the python client can read at snapshots. This patch includes multiple tests. Change-Id: I2c61ef09f6e15bad2c44d9caf85b2cc2582b8a49 Reviewed-on: http://gerrit.cloudera.org:8080/4520 Tested-by: Kudu Jenkins Reviewed-by: David Ribeiro Alves Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4db8851c Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4db8851c Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4db8851c Branch: refs/heads/master Commit: 4db8851cf31f68f2db8dafc41a4609017371501b Parents: bda9b94 Author: Jordan Birdsell Authored: Thu Sep 22 19:11:15 2016 -0500 Committer: David Ribeiro Alves Committed: Wed Oct 5 21:53:07 2016 +0000 ---------------------------------------------------------------------- python/kudu/__init__.py | 4 +- python/kudu/client.pyx | 168 ++++++++++++++++++++++++++++++- python/kudu/libkudu_client.pxd | 11 +- python/kudu/tests/test_scanner.py | 32 ++++++ python/kudu/tests/test_scantoken.py | 33 ++++++ python/kudu/tests/util.py | 33 ++++++ python/kudu/util.py | 15 +++ 7 files changed, 289 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/__init__.py ---------------------------------------------------------------------- diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py index 1a1ff39..99ca0d9 100644 --- a/python/kudu/__init__.py +++ b/python/kudu/__init__.py @@ -21,7 +21,9 @@ from kudu.client import (Client, Table, Scanner, Session, # noqa ScanToken, FLUSH_AUTO_BACKGROUND, FLUSH_AUTO_SYNC, - FLUSH_MANUAL) + FLUSH_MANUAL, + READ_LATEST, + READ_AT_SNAPSHOT) from kudu.errors import (KuduException, KuduBadStatus, KuduNotFound, # noqa KuduNotSupported, http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/client.pyx ---------------------------------------------------------------------- diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx index 29004bb..150997d 100644 --- a/python/kudu/client.pyx +++ b/python/kudu/client.pyx @@ -28,12 +28,21 @@ from libkudu_client cimport * from kudu.compat import tobytes, frombytes from kudu.schema cimport Schema, ColumnSchema from kudu.errors cimport check_status -from kudu.util import to_unixtime_micros, from_unixtime_micros +from kudu.util import to_unixtime_micros, from_unixtime_micros, from_hybridtime from errors import KuduException import six +# Read mode enums +READ_LATEST = ReadMode_Latest +READ_AT_SNAPSHOT = ReadMode_Snapshot + +cdef dict _read_modes = { + 'latest': ReadMode_Latest, + 'snapshot': ReadMode_Snapshot +} + cdef dict _type_names = { KUDU_INT8 : "KUDU_INT8", KUDU_INT16 : "KUDU_INT16", @@ -240,6 +249,25 @@ cdef class Client: # Nothing yet to clean up here pass + def latest_observed_timestamp(self): + """ + Get the highest timestamp observed by the client in UTC. This + is intended to gain external consistency across clients. + + Note: The latest observed timestamp can also be used to start a + snapshot scan on a table which is guaranteed to contain all data + written or previously read by this client. This should be treated + as experimental as it this method will change or disappear in a + future release. Additionally, note that 1 must be added to the + value to be used in snapshot reads (this is taken care of in the + from_hybridtime method). + + Returns + ------- + latest : datetime.datetime + """ + return from_hybridtime(self.cp.GetLatestObservedTimestamp()) + def create_table(self, table_name, Schema schema, partitioning, n_replicas=None): """ Creates a new Kudu table from the passed Schema and options. @@ -1286,6 +1314,75 @@ cdef class Scanner: check_status(self.scanner.SetProjectedColumnIndexes(v_indexes)) return self + def set_read_mode(self, read_mode): + """ + Set the read mode for scanning. + + Parameters + ---------- + read_mode : {'latest', 'snapshot'} + You can also use the constants READ_LATEST, READ_AT_SNAPSHOT + + Returns + ------- + self : Scanner + """ + cdef ReadMode rmode + + def invalid_selection_policy(): + raise ValueError('Invalid read mode: {0}' + .format(read_mode)) + + if isinstance(read_mode, int): + if 0 <= read_mode < len(_read_modes): + check_status(self.scanner.SetReadMode( + read_mode)) + else: + invalid_selection_policy() + else: + try: + check_status(self.scanner.SetReadMode( + _read_modes[read_mode.lower()])) + except KeyError: + invalid_selection_policy() + + return self + + def set_snapshot(self, timestamp, format=None): + """ + Set the snapshot timestamp for this scanner. + + Parameters + --------- + timestamp : datetime.datetime or string + If a string is provided, a format must be provided as well. + NOTE: This should be in UTC. If a timezone aware datetime + object is provided, it will be converted to UTC, otherwise, + all other input is assumed to be UTC. + format : Required if a string timestamp is provided + Uses the C strftime() function, see strftime(3) documentation. + + Returns + ------- + self : Scanner + """ + # Confirm that a format is provided if timestamp is a string + if isinstance(timestamp, six.string_types) and not format: + raise ValueError( + "To use a string timestamp you must provide a format. " + + "See the strftime(3) documentation.") + + snapshot_micros = to_unixtime_micros(timestamp, format) + + if snapshot_micros >= 0: + check_status(self.scanner.SetSnapshotMicros( + snapshot_micros)) + else: + raise ValueError( + "Snapshot Timestamps be greater than the unix epoch.") + + return self + def set_fault_tolerant(self): """ Makes the underlying KuduScanner fault tolerant. @@ -1553,6 +1650,75 @@ cdef class ScanTokenBuilder: check_status(self._builder.SetBatchSizeBytes(batch_size)) return self + def set_read_mode(self, read_mode): + """ + Set the read mode for scanning. + + Parameters + ---------- + read_mode : {'latest', 'snapshot'} + You can also use the constants READ_LATEST, READ_AT_SNAPSHOT + + Returns + ------- + self : ScanTokenBuilder + """ + cdef ReadMode rmode + + def invalid_selection_policy(): + raise ValueError('Invalid read mode: {0}' + .format(read_mode)) + + if isinstance(read_mode, int): + if 0 <= read_mode < len(_read_modes): + check_status(self._builder.SetReadMode( + read_mode)) + else: + invalid_selection_policy() + else: + try: + check_status(self._builder.SetReadMode( + _read_modes[read_mode.lower()])) + except KeyError: + invalid_selection_policy() + + return self + + def set_snapshot(self, timestamp, format=None): + """ + Set the snapshot timestamp for this ScanTokenBuilder. + + Parameters + --------- + timestamp : datetime.datetime or string + If a string is provided, a format must be provided as well. + NOTE: This should be in UTC. If a timezone aware datetime + object is provided, it will be converted to UTC, otherwise, + all other input is assumed to be UTC. + format : Required if a string timestamp is provided + Uses the C strftime() function, see strftime(3) documentation. + + Returns + ------- + self : ScanTokenBuilder + """ + # Confirm that a format is provided if timestamp is a string + if isinstance(timestamp, six.string_types) and not format: + raise ValueError( + "To use a string timestamp you must provide a format. " + + "See the strftime(3) documentation.") + + snapshot_micros = to_unixtime_micros(timestamp, format) + + if snapshot_micros >= 0: + check_status(self._builder.SetSnapshotMicros( + snapshot_micros)) + else: + raise ValueError( + "Snapshot Timestamps be greater than the unix epoch.") + + return self + def set_timout_millis(self, millis): """ Sets the timeout in milliseconds. http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/libkudu_client.pxd ---------------------------------------------------------------------- diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd index b9022e0..9c9899f 100644 --- a/python/kudu/libkudu_client.pxd +++ b/python/kudu/libkudu_client.pxd @@ -456,6 +456,10 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: CLOSEST_REPLICA " kudu::client::KuduClient::CLOSEST_REPLICA" FIRST_REPLICA " kudu::client::KuduClient::FIRST_REPLICA" + enum ReadMode" kudu::client::KuduScanner::ReadMode": + ReadMode_Latest " kudu::client::KuduScanner::READ_LATEST" + ReadMode_Snapshot " kudu::client::KuduScanner::READ_AT_SNAPSHOT" + cdef cppclass KuduClient: Status DeleteTable(const string& table_name) @@ -479,6 +483,7 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: KuduTableAlterer* NewTableAlterer() Status IsAlterTableInProgress(const string& table_name, c_bool* alter_in_progress) + uint64_t GetLatestObservedTimestamp() shared_ptr[KuduSession] NewSession() @@ -601,9 +606,6 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: KuduClient* client() - enum ReadMode" kudu::client::KuduScanner::ReadMode": - READ_LATEST " kudu::client::KuduScanner::READ_LATEST" - READ_AT_SNAPSHOT " kudu::client::KuduScanner::READ_AT_SNAPSHOT" cdef cppclass KuduScanner: KuduScanner(KuduTable* table) @@ -620,7 +622,7 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: Status SetSelection(ReplicaSelection selection) Status SetReadMode(ReadMode read_mode) - Status SetSnapshot(uint64_t snapshot_timestamp_micros) + Status SetSnapshotMicros(uint64_t snapshot_timestamp_micros) Status SetTimeoutMillis(int millis) Status SetProjectedColumnNames(const vector[string]& col_names) Status SetProjectedColumnIndexes(const vector[int]& col_indexes) @@ -651,7 +653,6 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil: Status SetReadMode(ReadMode read_mode) Status SetFaultTolerant() Status SetSnapshotMicros(uint64_t snapshot_timestamp_micros) - Status SetSnapshotRaw(uint64_t snapshot_timestamp) Status SetSelection(ReplicaSelection selection) Status SetTimeoutMillis(int millis) Status AddConjunctPredicate(KuduPredicate* pred) http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/tests/test_scanner.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_scanner.py b/python/kudu/tests/test_scanner.py index e0fcd37..950f0dd 100644 --- a/python/kudu/tests/test_scanner.py +++ b/python/kudu/tests/test_scanner.py @@ -23,6 +23,7 @@ from kudu.tests.util import TestScanBase from kudu.tests.common import KuduTestBase import kudu import datetime +import time class TestScanner(TestScanBase): @@ -182,3 +183,34 @@ class TestScanner(TestScanBase): scanner = self.table.scanner() scanner.set_fault_tolerant().open() self.assertEqual(sorted(self.tuples), scanner.read_all_tuples()) + + def test_read_mode(self): + """ + Test setting the read mode and scanning against a + snapshot and latest + """ + # Delete row + self.delete_insert_row_for_read_test() + + # Check scanner results prior to delete + scanner = self.table.scanner() + scanner.set_read_mode('snapshot')\ + .set_snapshot(self.snapshot_timestamp)\ + .open() + + self.assertEqual(sorted(self.tuples[1:]), sorted(scanner.read_all_tuples())) + + #Check scanner results after delete + timeout = time.time() + 10 + check_tuples = [] + while check_tuples != sorted(self.tuples): + if time.time() > timeout: + raise TimeoutError("Could not validate results in allocated" + + "time.") + + scanner = self.table.scanner() + scanner.set_read_mode(kudu.READ_LATEST)\ + .open() + check_tuples = sorted(scanner.read_all_tuples()) + # Avoid tight looping + time.sleep(0.05) http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/tests/test_scantoken.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py index a5ae256..5a37486 100644 --- a/python/kudu/tests/test_scantoken.py +++ b/python/kudu/tests/test_scantoken.py @@ -176,3 +176,36 @@ class TestScanToken(TestScanBase): # Serialize execute and verify self._subtest_serialize_thread_and_verify(builder.build(), [self.tuples[98]]) + + def test_read_mode(self): + """ + Test setting the read mode and scanning against a + snapshot and latest + """ + # Delete row + self.delete_insert_row_for_read_test() + + # Check scanner results prior to delete + builder = self.table.scan_token_builder() + tokens = builder.set_read_mode('snapshot') \ + .set_snapshot(self.snapshot_timestamp) \ + .build() + + tuples = [] + for token in tokens: + scanner = token.into_kudu_scanner().open() + tuples.extend(scanner.read_all_tuples()) + + self.assertEqual(sorted(self.tuples[1:]), sorted(tuples)) + + #Check scanner results after insterts + builder = self.table.scan_token_builder() + tokens = builder.set_read_mode(kudu.READ_LATEST) \ + .build() + + tuples = [] + for token in tokens: + scanner = token.into_kudu_scanner().open() + tuples.extend(scanner.read_all_tuples()) + + self.assertEqual(sorted(self.tuples), sorted(tuples)) http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/tests/util.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/util.py b/python/kudu/tests/util.py index 39520e4..6a72a63 100644 --- a/python/kudu/tests/util.py +++ b/python/kudu/tests/util.py @@ -62,6 +62,12 @@ class TestScanBase(KuduTestBase, unittest.TestCase): pass def insert_new_unixtime_micros_rows(self): + # Get current UTC datetime to be used for read at snapshot test + # Not using the easter time value below as that may not be the + # actual local timezone of the host executing this test and as + # such does would not accurately be offset to UTC + self.snapshot_timestamp = datetime.datetime.utcnow() + # Insert new rows # Also test a timezone other than UTC to confirm that # conversion to UTC is properly applied @@ -94,4 +100,31 @@ class TestScanBase(KuduTestBase, unittest.TestCase): # Apply timezone list[3] = list[3].replace(tzinfo=pytz.utc) self.tuples.append(tuple(list)) + session.flush() + + def delete_insert_row_for_read_test(self): + + # Retrive row to delete so it can be reinserted into the table so + # that other tests do not fail + row = self.table.scanner()\ + .set_fault_tolerant()\ + .open()\ + .read_all_tuples()[0] + + # Delete row from table + session = self.client.new_session() + op = self.table.new_delete() + op['key'] = row[0] + session.apply(op) + session.flush() + + # Get latest observed timestamp for snapshot + self.snapshot_timestamp = self.client.latest_observed_timestamp() + + # Insert row back into table so that other tests don't fail. + session = self.client.new_session() + op = self.table.new_insert() + for idx, val in enumerate(row): + op[idx] = val + session.apply(op) session.flush() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kudu/blob/4db8851c/python/kudu/util.py ---------------------------------------------------------------------- diff --git a/python/kudu/util.py b/python/kudu/util.py index 603e0e0..8533b04 100644 --- a/python/kudu/util.py +++ b/python/kudu/util.py @@ -92,3 +92,18 @@ def from_unixtime_micros(unixtime_micros): else: raise ValueError("Invalid unixtime_micros value." + "You must provide an integer value.") + +def from_hybridtime(hybridtime): + """ + Convert a raw HybridTime value to a datetime in UTC. + + Parameters + ---------- + hybridtime : long + + Returns + ------- + timestamp : datetime.datetime in UTC + """ + # Add 1 so the value is usable for snapshot scans + return from_unixtime_micros(int(hybridtime >> 12) + 1)