kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [1/2] kudu git commit: [python] Implement Scan Token API
Date Tue, 13 Sep 2016 19:13:22 GMT
Repository: kudu
Updated Branches:
  refs/heads/master 1a062253e -> 74065d1ca


[python] Implement Scan Token API

First attempt at implementing the
[Scan Token API](http://gerrit.cloudera.org:8080/#/c/2443/) for the Python
client. This patch should also resolve KUDU-1401.  I included several
unit tests, most of which were based on the current scanner unit tests.

Change-Id: I710c93e51ab5f0f5ed038aaaf1925b58c576b655
Reviewed-on: http://gerrit.cloudera.org:8080/4367
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <dan@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/609546db
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/609546db
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/609546db

Branch: refs/heads/master
Commit: 609546dbc2f0f7ed9daeb0eaff2bbc07042b0ed8
Parents: 1a06225
Author: Jordan Birdsell <jordantbirdsell@gmail.com>
Authored: Sat Sep 10 18:06:08 2016 -0400
Committer: Dan Burkert <dan@cloudera.com>
Committed: Tue Sep 13 16:58:23 2016 +0000

----------------------------------------------------------------------
 python/kudu/__init__.py             |   3 +-
 python/kudu/client.pyx              | 396 ++++++++++++++++++++++++++++++-
 python/kudu/libkudu_client.pxd      |  49 +++-
 python/kudu/tests/test_scantoken.py | 162 +++++++++++++
 python/setup.py                     |   2 +-
 5 files changed, 604 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/609546db/python/kudu/__init__.py
----------------------------------------------------------------------
diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py
index 155f2e5..1a1ff39 100644
--- a/python/kudu/__init__.py
+++ b/python/kudu/__init__.py
@@ -17,7 +17,8 @@
 
 from kudu.client import (Client, Table, Scanner, Session,  # noqa
                          Insert, Update, Delete, Predicate,
-                         TimeDelta, KuduError,
+                         TimeDelta, KuduError, ScanTokenBuilder,
+                         ScanToken,
                          FLUSH_AUTO_BACKGROUND,
                          FLUSH_AUTO_SYNC,
                          FLUSH_MANUAL)

http://git-wip-us.apache.org/repos/asf/kudu/blob/609546db/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index f111da6..5394410 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -300,6 +300,22 @@ cdef class Client:
         check_status(self.cp.TableExists(c_name, &exists))
         return exists
 
+    def deserialize_token_into_scanner(self, serialized_token):
+        """
+        Deserializes a ScanToken using the client and returns a scanner.
+
+        Parameters
+        ----------
+        serialized_token : String
+          Serialized form of a ScanToken.
+
+        Returns
+        -------
+        scanner : Scanner
+        """
+        token = ScanToken()
+        return token.deserialize_into_scanner(self, serialized_token)
+
     def table(self, table_name):
         """
         Construct a kudu.Table and retrieve its schema from the cluster.
@@ -506,9 +522,9 @@ cdef class TabletServer:
     """
 
     cdef:
-        KuduTabletServer* _tserver
+        const KuduTabletServer* _tserver
 
-    cdef _init(self, KuduTabletServer* tserver):
+    cdef _init(self, const KuduTabletServer* tserver):
         self._tserver = tserver
         return self
 
@@ -642,6 +658,27 @@ cdef class Table:
         result.scanner = new KuduScanner(self.ptr())
         return result
 
+    def scan_token_builder(self):
+        """
+        Create a new ScanTokenBuilder for this table to build a series of
+        scan tokens.
+
+        Examples
+        --------
+        builder = table.scan_token_builder()
+        builder.set_fault_tolerant().add_predicate(table['key'] > 10)
+        tokens = builder.build()
+        for token in tokens:
+            scanner = token.into_kudu_scanner()
+            scanner.open()
+            tuples = scanner.read_all_tuples()
+
+        Returns
+        -------
+        builder : ScanTokenBuilder
+        """
+        return ScanTokenBuilder(self)
+
     cdef inline KuduTable* ptr(self):
         return self.table.get()
 
@@ -1074,7 +1111,7 @@ cdef class Scanner:
         KuduScanner* scanner
         bint is_open
 
-    def __cinit__(self, Table table):
+    def __cinit__(self, Table table = None):
         self.table = table
         self.scanner = NULL
         self.is_open = 0
@@ -1278,6 +1315,359 @@ cdef class Scanner:
         check_status(self.scanner.NextBatch(&batch.batch))
         return batch
 
+cdef class ScanToken:
+    """
+    A ScanToken describes a partial scan of a Kudu table limited to a single
+    contiguous physical location. Using the KuduScanTokenBuilder, clients
+    can describe the desired scan, including predicates, bounds, timestamps,
+    and caching, and receive back a collection of scan tokens.
+    """
+    cdef:
+        KuduScanToken* _token
+
+    def __cinit__(self):
+        self._token = NULL
+
+    def __dealloc__(self):
+        if self._token != NULL:
+            del self._token
+
+    cdef _init(self, KuduScanToken* token):
+        self._token = token
+        return self
+
+    def into_kudu_scanner(self):
+        """
+        Returns a scanner under the current client.
+
+        Returns
+        -------
+        scanner : Scanner
+        """
+        cdef:
+            Scanner result = Scanner()
+            KuduScanner* _scanner = NULL
+        check_status(self._token.IntoKuduScanner(&_scanner))
+        result.scanner = _scanner
+        return result
+
+
+    def tablet(self):
+        """
+        Returns the Tablet associated with this ScanToken
+
+        Returns
+        -------
+        tablet : Tablet
+        """
+        tablet = Tablet()
+        return tablet._init(&self._token.tablet())
+
+    def serialize(self):
+        """
+        Serialize token into a string.
+
+        Returns
+        -------
+        serialized_token : string
+        """
+        cdef string buf
+        check_status(self._token.Serialize(&buf))
+        return frombytes(buf)
+
+    def deserialize_into_scanner(self, Client client, serialized_token):
+        """
+        Returns a new scanner from the serialized token created under
+        the provided Client.
+
+        Parameters
+        ----------
+        client : Client
+        serialized_token : string
+
+        Returns
+        -------
+        scanner : Scanner
+        """
+        cdef:
+            Scanner result = Scanner()
+            KuduScanner* _scanner
+        check_status(self._token.DeserializeIntoScanner(client.cp, tobytes(serialized_token),
&_scanner))
+        result.scanner = _scanner
+        return result
+
+
+cdef class ScanTokenBuilder:
+    """
+    This class builds ScanTokens for a Table.
+    """
+    cdef:
+        KuduScanTokenBuilder* _builder
+        Table _table
+
+    def __cinit__(self, Table table):
+        self._table = table
+        self._builder = new KuduScanTokenBuilder(table.ptr())
+
+    def __dealloc__(self):
+        if self._builder != NULL:
+            del self._builder
+
+    def set_projected_column_names(self, names):
+        """
+        Sets the columns to be scanned.
+        Returns a reference to itself to facilitate chaining.
+
+        Parameters
+        ----------
+        names : list of strings
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        cdef vector[string] v_names
+        for name in names:
+            v_names.push_back(tobytes(name))
+        check_status(self._builder.SetProjectedColumnNames(v_names))
+        return self
+
+    def set_projected_column_indexes(self, indexes):
+        """
+        Sets the columns to be scanned.
+        Returns a reference to itself to facilitate chaining.
+
+        Parameters
+        ----------
+        indexes : list of integers representing column indexes
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        cdef vector[int] v_indexes = indexes
+        check_status(self._builder.SetProjectedColumnIndexes(v_indexes))
+        return self
+
+    def set_batch_size_bytes(self, batch_size):
+        """
+        Sets the batch size in bytes.
+        Returns a reference to itself to facilitate chaining.
+
+        Parameters
+        ----------
+        batch_size : Size of batch in bytes
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        check_status(self._builder.SetBatchSizeBytes(batch_size))
+        return self
+
+    def set_timout_millis(self, millis):
+        """
+        Sets the timeout in milliseconds.
+        Returns a reference to itself to facilitate chaining.
+
+        Parameters
+        ----------
+        millis : int64_t
+          timeout in milliseconds
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        check_status(self._builder.SetTimeoutMillis(millis))
+        return self
+
+    def set_fault_tolerant(self):
+        """
+        Makes the underlying KuduScanner fault tolerant.
+        Returns a reference to itself to facilitate chaining.
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        check_status(self._builder.SetFaultTolerant())
+        return self
+
+    def add_predicates(self, preds):
+        """
+        Add a list of scan predicates to the ScanTokenBuilder. Select columns
+        from the parent table and make comparisons to create predicates.
+
+        Examples
+        --------
+        c = table[col_name]
+        preds = [c >= 0, c <= 10]
+        builder.add_predicates(preds)
+
+        Parameters
+        ----------
+        preds : list of Predicate
+        """
+        for pred in preds:
+            self.add_predicate(pred)
+
+    cpdef add_predicate(self, Predicate pred):
+        """
+        Add a scan predicates to the scan token. Select columns from the
+        parent table and make comparisons to create predicates.
+
+        Examples
+        --------
+        pred = table[col_name] <= 10
+        builder.add_predicate(pred)
+
+        Parameters
+        ----------
+        pred : kudu.Predicate
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        cdef KuduPredicate* clone
+
+        # We clone the KuduPredicate so that the Predicate wrapper class can be
+        # reused
+        clone = pred.pred.Clone()
+        check_status(self._builder.AddConjunctPredicate(clone))
+
+    def new_bound(self):
+        """
+        Returns a new instance of a ScanBound (subclass of PartialRow) to be
+        later set with add_lower_bound()/add_upper_bound().
+
+        Returns
+        -------
+        bound : ScanBound
+        """
+        return ScanBound(self._table)
+
+    def add_lower_bound(self, ScanBound bound):
+        """
+        Sets the lower bound of the scan.
+        Returns a reference to itself to facilitate chaining.
+
+        Parameters
+        ----------
+        bound : ScanBound
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        check_status(self._builder.AddLowerBound(deref(bound.row)))
+        return self
+
+    def add_upper_bound(self, ScanBound bound):
+        """
+        Sets the upper bound of the scan.
+        Returns a reference to itself to facilitate chaining.
+
+        Parameters
+        ----------
+        bound : ScanBound
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        check_status(self._builder.AddUpperBound(deref(bound.row)))
+        return self
+
+    def set_cache_blocks(self, cache_blocks):
+        """
+        Sets the block caching policy.
+        Returns a reference to itself to facilitate chaining.
+
+        Parameters
+        ----------
+        cache_blocks : bool
+
+        Returns
+        -------
+        self : ScanTokenBuilder
+        """
+        check_status(self._builder.SetCacheBlocks(cache_blocks))
+        return self
+
+    def build(self):
+        """
+        Build the set of scan tokens. The builder may be reused after
+        this call. Returns a list of ScanTokens to be serialized and
+        executed in parallel with seperate client instances.
+
+        Returns
+        -------
+        tokens : List[ScanToken]
+        """
+
+        cdef:
+            vector[KuduScanToken*] tokens
+            size_t i
+
+        check_status(self._builder.Build(&tokens))
+
+        result = []
+        for i in range(tokens.size()):
+            token = ScanToken()
+            result.append(token._init(tokens[i]))
+        return result
+
+
+cdef class Tablet:
+    """
+    Represents a remote Tablet. Contains the tablet id and Replicas associated
+    with the Kudu Tablet. Retrieved by the ScanToken.tablet() method.
+    """
+    cdef:
+        const KuduTablet* _tablet
+        vector[KuduReplica*] _replicas
+
+    cdef _init(self, const KuduTablet* tablet):
+        self._tablet = tablet
+        return self
+
+    def id(self):
+        return frombytes(self._tablet.id())
+
+    def replicas(self):
+        cdef size_t i
+
+        result = []
+        _replicas = self._tablet.replicas()
+        for i in range(_replicas.size()):
+            replica = Replica()
+            result.append(replica._init(_replicas[i]))
+        return result
+
+cdef class Replica:
+    """
+    Represents a remote Tablet's replica. Retrieve a list of Replicas with the
+    Tablet.replicas() method. Contains the boolean is_leader and its
+    respective TabletServer object.
+    """
+    cdef const KuduReplica* _replica
+
+    cdef _init(self, const KuduReplica* replica):
+        self._replica = replica
+        return self
+
+    def __dealloc__(self):
+        if self._replica != NULL:
+            del self._replica
+
+    def is_leader(self):
+        return self._replica.is_leader()
+
+    def ts(self):
+        ts = TabletServer()
+        return ts._init(&self._replica.ts())
 
 cdef class KuduError:
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/609546db/python/kudu/libkudu_client.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index e6592bb..546b2be 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -446,7 +446,10 @@ cdef extern from "kudu/client/value.h" namespace "kudu::client" nogil:
 
 cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
 
-    # Omitted KuduClient::ReplicaSelection enum
+    enum ReplicaSelection" kudu::client::KuduClient::ReplicaSelection":
+        LEADER_ONLY " kudu::client::KuduClient::LEADER_ONLY"
+        CLOSEST_REPLICA " kudu::client::KuduClient::CLOSEST_REPLICA"
+        FIRST_REPLICA " kudu::client::KuduClient::FIRST_REPLICA"
 
     cdef cppclass KuduClient:
 
@@ -607,8 +610,7 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
         Status NextBatch(KuduScanBatch* batch)
         Status SetBatchSizeBytes(uint32_t batch_size)
 
-        # Pending definition of ReplicaSelection enum
-        # Status SetSelection(ReplicaSelection selection)
+        Status SetSelection(ReplicaSelection selection)
 
         Status SetReadMode(ReadMode read_mode)
         Status SetSnapshot(uint64_t snapshot_timestamp_micros)
@@ -622,6 +624,47 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
         KuduSchema GetProjectionSchema()
         string ToString()
 
+    cdef cppclass KuduScanToken:
+        KuduScanToken()
+
+        const KuduTablet& tablet()
+
+        Status IntoKuduScanner(KuduScanner** scanner)
+        Status Serialize(string* buf)
+        Status DeserializeIntoScanner(KuduClient* client,
+                                      const string& serialized_token,
+                                      KuduScanner** scanner)
+
+    cdef cppclass KuduScanTokenBuilder:
+        KuduScanTokenBuilder(KuduTable* table)
+
+        Status SetProjectedColumnNames(const vector[string]& col_names)
+        Status SetProjectedColumnIndexes(const vector[int]& col_indexes)
+        Status SetBatchSizeBytes(uint32_t batch_size)
+        Status SetReadMode(ReadMode read_mode)
+        Status SetFaultTolerant()
+        Status SetSnapshotMicros(uint64_t snapshot_timestamp_micros)
+        Status SetSnapshotRaw(uint64_t snapshot_timestamp)
+        Status SetSelection(ReplicaSelection selection)
+        Status SetTimeoutMillis(int millis)
+        Status AddConjunctPredicate(KuduPredicate* pred)
+        Status AddLowerBound(const KuduPartialRow& key)
+        Status AddUpperBound(const KuduPartialRow& key)
+        Status SetCacheBlocks(c_bool cache_blocks)
+        Status Build(vector[KuduScanToken*]* tokens)
+
+    cdef cppclass KuduTablet:
+        KuduTablet()
+
+        const string& id()
+        const vector[const KuduReplica*]& replicas()
+
+    cdef cppclass KuduReplica:
+        KuduReplica()
+
+        c_bool is_leader()
+        const KuduTabletServer& ts()
+
     cdef cppclass C_KuduError " kudu::client::KuduError":
 
         Status& status()

http://git-wip-us.apache.org/repos/asf/kudu/blob/609546db/python/kudu/tests/test_scantoken.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py
new file mode 100644
index 0000000..d855569
--- /dev/null
+++ b/python/kudu/tests/test_scantoken.py
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kudu.compat import unittest
+from kudu.tests.common import KuduTestBase
+import kudu
+from multiprocessing import Pool
+
+def _get_scan_token_results(input):
+    client = kudu.Client("{0}:{1}".format(input[1], input[2]))
+    scanner = client.deserialize_token_into_scanner(input[0])
+    scanner.open()
+    return scanner.read_all_tuples()
+
+class TestScanToken(KuduTestBase, unittest.TestCase):
+
+    @classmethod
+    def setUpClass(self):
+        """
+        Stolen from the the test scanner given the similarity in
+        functionality.
+        """
+        super(TestScanToken, self).setUpClass()
+
+        self.nrows = 100
+        table = self.client.table(self.ex_table)
+        session = self.client.new_session()
+
+        tuples = []
+        for i in range(self.nrows):
+            op = table.new_insert()
+            tup = i, i * 2, 'hello_%d' % i if i % 2 == 0 else None
+            op['key'] = tup[0]
+            op['int_val'] = tup[1]
+            if i % 2 == 0:
+                op['string_val'] = tup[2]
+            elif i % 3 == 0:
+                op['string_val'] = None
+            session.apply(op)
+            tuples.append(tup)
+        session.flush()
+
+        self.table = table
+        self.tuples = tuples
+
+    def setUp(self):
+        pass
+
+    def _subtest_serialize_thread_and_verify(self, tokens, expected_tuples):
+        """
+        Given the input serialized tokens, spawn new threads,
+        execute them and validate the results
+        """
+        input =  [(token.serialize(), self.master_host, self.master_port)
+                for token in tokens]
+
+        # Begin process pool
+        pool = Pool(len(input))
+        results = pool.map(_get_scan_token_results, input)
+
+        #Validate results
+        actual_tuples = []
+        for result in results:
+            actual_tuples += result
+
+        self.assertEqual(sorted(expected_tuples), sorted(actual_tuples))
+
+    def test_scan_token_serde_threaded_with_named_projection(self):
+        """
+        Creates scan tokens, serializes them, delivers them to new
+        threads then executes them in parallel with seperate clients.
+        """
+        builder = self.table.scan_token_builder()
+        builder.set_projected_column_names(['key', 'string_val']).set_fault_tolerant()
+
+        # Serialize execute and verify
+        self._subtest_serialize_thread_and_verify(builder.build(),
+                                                  [(x[0], x[2]) for x in self.tuples])
+
+    def test_scan_token_serde_threaded_simple_predicate_and_index_projection(self):
+        """
+        Creates scan tokens with predicates and an index projection,
+        serializes them, delivers them to new threads then executes
+        them in parallel with seperate clients.
+        """
+        key = self.table['key']
+        preds = [key >= 20, key <= 49]
+
+        builder = self.table.scan_token_builder()
+        builder.set_projected_column_indexes([0, 1])\
+            .set_fault_tolerant()\
+            .add_predicates(preds)
+
+        # Serialize execute and verify
+        self._subtest_serialize_thread_and_verify(builder.build(),
+                                                  [x[0:2] for x in self.tuples[20:50]])
+
+    def test_scan_token_serde_threaded_with_bounds(self):
+        """
+        Creates scan tokens with bounds, serializes them,
+        delivers them to new threads then executes them
+        in parallel with seperate clients.
+        """
+        builder = self.table.scan_token_builder()
+        lower_bound = builder.new_bound()
+        lower_bound['key'] = 50
+        upper_bound = builder.new_bound()
+        upper_bound['key'] = 55
+        builder.set_fault_tolerant()\
+            .add_lower_bound(lower_bound)\
+            .add_upper_bound(upper_bound)
+
+        # Serialize execute and verify
+        self._subtest_serialize_thread_and_verify(builder.build(),
+                                                  self.tuples[50:55])
+
+    def test_scan_token_invalid_predicates(self):
+        builder = self.table.scan_token_builder()
+        sv = self.table['string_val']
+
+        with self.assertRaises(TypeError):
+            builder.add_predicates([sv >= None])
+
+        with self.assertRaises(kudu.KuduInvalidArgument):
+            builder.add_predicates([sv >= 1])
+
+    def test_scan_token_batch_by_batch_with_local_scanner(self):
+        builder = self.table.scan_token_builder()
+        lower_bound = builder.new_bound()
+        lower_bound['key'] = 10
+        upper_bound = builder.new_bound()
+        upper_bound['key'] = 90
+        builder.set_fault_tolerant() \
+            .add_lower_bound(lower_bound) \
+            .add_upper_bound(upper_bound)
+        tokens = builder.build()
+
+        tuples = []
+        for token in tokens:
+            scanner = token.into_kudu_scanner()
+            scanner.open()
+
+            while scanner.has_more_rows():
+                batch = scanner.next_batch()
+                tuples.extend(batch.as_tuples())
+
+        self.assertEqual(sorted(tuples), self.tuples[10:90])

http://git-wip-us.apache.org/repos/asf/kudu/blob/609546db/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 82f0383..830a103 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -154,7 +154,7 @@ setup(
         'build_ext': build_ext
     },
     setup_requires=['pytest-runner'],
-    tests_require=['pytest'],
+    tests_require=['pytest', 'multiprocessing'],
     install_requires=['cython >= 0.21'],
     description=DESCRIPTION,
     long_description=LONG_DESCRIPTION,


Mime
View raw message