kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [1/2] incubator-kudu git commit: KUDU-1165: Implement partition pruning for C++ client and server
Date Fri, 25 Mar 2016 19:46:19 GMT
Repository: incubator-kudu
Updated Branches:
  refs/heads/master 7f3691a82 -> 1ad79bccd


KUDU-1165: Implement partition pruning for C++ client and server

Change-Id: I9f4fc14b44af63a17b34cd9c2a9134127b1afe4d
Reviewed-on: http://gerrit.cloudera.org:8080/2413
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <adar@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/96b7291f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/96b7291f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/96b7291f

Branch: refs/heads/master
Commit: 96b7291ff2533a01f14f0722cfa196948bfd3006
Parents: 7f3691a
Author: Dan Burkert <dan@cloudera.com>
Authored: Wed Mar 2 13:23:08 2016 -0800
Committer: Dan Burkert <dan@cloudera.com>
Committed: Fri Mar 25 17:40:01 2016 +0000

----------------------------------------------------------------------
 .../scan-optimization-partition-pruning.md      |  69 +-
 src/kudu/client/client.cc                       |  51 +-
 src/kudu/client/scanner-internal.cc             |  42 +-
 src/kudu/client/scanner-internal.h              |  12 +-
 src/kudu/common/CMakeLists.txt                  |   4 +-
 src/kudu/common/partial_row.h                   |   7 +-
 src/kudu/common/partition.cc                    |  14 +-
 src/kudu/common/partition.h                     |   5 +-
 src/kudu/common/partition_pruner-test.cc        | 640 +++++++++++++++++++
 src/kudu/common/partition_pruner.cc             | 428 +++++++++++++
 src/kudu/common/partition_pruner.h              |  75 +++
 src/kudu/util/memory/arena.cc                   |   3 +
 src/kudu/util/memory/arena.h                    |   3 +
 13 files changed, 1229 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/96b7291f/docs/design-docs/scan-optimization-partition-pruning.md
----------------------------------------------------------------------
diff --git a/docs/design-docs/scan-optimization-partition-pruning.md b/docs/design-docs/scan-optimization-partition-pruning.md
index 740dfd0..75f0e3b 100644
--- a/docs/design-docs/scan-optimization-partition-pruning.md
+++ b/docs/design-docs/scan-optimization-partition-pruning.md
@@ -21,17 +21,32 @@ tablets through a combination of hash and range partitioning. The design allows
 operators to have control over data locality in order to optimize for the
 expected workload.
 
+Every table has a partition schema, which is comprised of zero or more hash
+components, and a range component. The hash components have one or more columns.
+Each column must be part of the primary key column set, and any column may not
+be in two hash components. The range component may have zero or more columns,
+all of which must be part of the primary key.
+
+Rows in a Kudu table are mapped to tablets using a partition key. A row's
+partition key is created by encoding the column values of the row according to
+the table's partition schema. For each hash bucket component, the partition key
+contains the hash of the column values in the component. For each column in the
+range component, the key will contain the row's encoded value for that column.
+Every tablet has a start and end partition key which covers the hash bucket and
+range assignment of the tablet. Finding the tablet for a row requires finding
+the tablet with the partition key range which contains the row's partition key.
+
 Currently, Kudu does not take full advantage of partition information when
 executing scans. This results in missed opportunities to 'prune', or skip
 tablets during a scan based on the scan's predicates and the tablet's hash
 bucket and range assignments. This remainder of this design doc will detail the
 specific opportunities we can take advantage of to prune partitions, provide an
-overview of how we will accomplish this is the clients and on the server, and
-provide some alternatives for discussion.
+overview of how we will accomplish this on client and on the server, and provide
+alternatives for discussion.
 
 ### Sample Schemas
 
-The following sections will reference two example table schema:
+The following sections will reference two example table schemas:
 
 ```sql
 CREATE TABLE 'machine_metrics'
@@ -42,7 +57,7 @@ DISTRIBUTE BY
   RANGE (time) SPLIT ROWS [(1451606400000)];
 ```
 
-which the following tablets:
+with the following tablets:
 
 ```
 A: bucket(host, metric) = 0, range(time) = [(min), (1451606400000))
@@ -63,8 +78,8 @@ DISTRIBUTE BY RANGE (user_id, target_id) SPLIT ROWS [(1000, 1000)];
 with the following tablets:
 
 ```
-A: range(user_id) = [(min, min), (1000, 1000))
-B: range(user_id) = [(1000, 1000), (max, max))
+A: range(user_id, target_id) = [(min, min), (1000, 1000))
+B: range(user_id, target_id) = [(1000, 1000), (max, max))
 ```
 
 ## Scan Constraints
@@ -119,12 +134,12 @@ primary_key  < (500, 700)
 ### Range Pruning
 
 If the table is range partitioned with split rows and the scan contains
-predicates over a prefix subset of the range columns, then the scan may be able
-to prune tablets based on those predicates. For example, the query:
+predicates over a prefix of the range columns, then the scan may be able to
+prune tablets based on those predicates. For example, the query:
 
 ```sql
 SELECT * FROM 'machine_metrics'
-WHERE timestamp < 500;
+WHERE time < 500;
 ```
 
 can prune tablets `B` and `D`.
@@ -169,20 +184,16 @@ WHERE host = "host001.example.com"
   AND metric = "load-avg-1min";
 
 -- Does not allow hash bucket pruning
-SELECT * from 't'
+SELECT * from 'machine_metrics'
 WHERE host = "host001.example.com";
 ```
 
 ## Tablet Lookup Optimization
 
-In order for the client to prune tablets, it must look up the tablet partition
-information from the master. In order to limit these lookups for queries which
-only touch a small portion of the table, predicates and primary key bounds are
-pushed into partition key bounds in a manner similar to how predicates are
-pushed into the primary key bounds. The partition key bounds limit the set of
-tablets that are looked up from the master and evaluated for pruning. This can
-limit the high upfront cost of looking up the entire tablet metadata set for a
-new client which is performing a scan that is constrained to a few rows.
+In order for the client to scan a tablet it must retrieve the tablet location
+from the master. When a tablet is going to be pruned from a scan, its tablet
+location is not needed, so the client can speed up metadata operations by not
+looking up metadata for pruned tablets.
 
 ## Implementation
 
@@ -192,12 +203,10 @@ Duplicating the work on the server is not strictly necessary, but it is a
 low-overhead operation in comparison with accessing disk, and it allows for
 client implementations which don't implement the optimizations to benefit.
 
-In the client, the scan's range, hash bucket, and partition key constraints will
-be evaluated once per scan. As the scan progresses through the set of tablets in
-the partition key range, each tablet's partition information will be compared
-against the range bounds and hash buckets to determine whether pruning can
-occur. Pruned tablets never need to be contacted (though their partition
-information needs to be looked up from the master).
+In the client, the scan's constraints will be evaluated once per scan into a set
+of partition key ranges which cover the non-pruned tablets in the scan. Using
+these partition key ranges, only the tablet metadata necessary for the scan can
+be requested from the master.
 
 The server will go one step further by adding the tablet's primary key bounds to
 the scan spec during scan initialization, which may provide additional pruning
@@ -217,15 +226,3 @@ yield better pruning oppurtunities often.  On the server, it is a lighter weight
 operation since the original scan predicate does not need to be copied (because
 it can be mutated in place), and most of the optimization steps are already
 happening anyway.
-
-### Prune Tablets by Partition Key Interval Set
-
-The method of pruning tablets described above is 'lazy' in that it retrieves
-partition metadata for each tablet in the partition key range, and then compares
-the partition against the precomputed hash bucket and range constraints to
-determine if the tablet should be pruned.
-
-As an alternative, the client could create a set of partition key ranges from
-the hash bucket and range constraints. Taking the intersection of this interval
-set with the interval set of tablet partition key ranges would yield exactly the
-set of tablets necessary to satisfy the scan.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/96b7291f/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 74c4a4d..862e459 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -1049,8 +1049,11 @@ Status KuduScanner::Open() {
   CHECK(data_->projection_ != nullptr) << "No projection provided";
 
   data_->spec_.OptimizeScan(*data_->table_->schema().schema_, &data_->arena_, &data_->pool_, false);
+  data_->partition_pruner_.Init(*data_->table_->schema().schema_,
+                                data_->table_->partition_schema(),
+                                data_->spec_);
 
-  if (data_->spec_.CanShortCircuit()) {
+  if (data_->spec_.CanShortCircuit() || !data_->partition_pruner_.HasMorePartitionKeyRanges()) {
     VLOG(1) << "Short circuiting scan " << ToString();
     data_->open_ = true;
     data_->short_circuit_ = true;
@@ -1063,43 +1066,7 @@ Status KuduScanner::Open() {
   deadline.AddDelta(data_->timeout_);
   set<string> blacklist;
 
-  bool is_simple_range_partitioned =
-    data_->table_->partition_schema().IsSimplePKRangePartitioning(*data_->table_->schema().schema_);
-
-  if (!is_simple_range_partitioned &&
-      (data_->spec_.lower_bound_key() != nullptr ||
-       data_->spec_.exclusive_upper_bound_key() != nullptr ||
-       !data_->spec_.predicates().empty())) {
-    KLOG_FIRST_N(WARNING, 1) << "Starting full table scan. In the future this scan may be "
-                                "automatically optimized with partition pruning.";
-  }
-
-  if (is_simple_range_partitioned) {
-    // If the table is simple range partitioned, then the partition key space is
-    // isomorphic to the primary key space. We can potentially reduce the scan
-    // length by only scanning the intersection of the primary key range and the
-    // partition key range. This is a stop-gap until real partition pruning is
-    // in place that will work across any partition type.
-    Slice start_primary_key = data_->spec_.lower_bound_key() == nullptr ? Slice()
-                            : data_->spec_.lower_bound_key()->encoded_key();
-    Slice end_primary_key = data_->spec_.exclusive_upper_bound_key() == nullptr ? Slice()
-                          : data_->spec_.exclusive_upper_bound_key()->encoded_key();
-    Slice start_partition_key = data_->spec_.lower_bound_partition_key();
-    Slice end_partition_key = data_->spec_.exclusive_upper_bound_partition_key();
-
-    if ((!end_partition_key.empty() && start_primary_key.compare(end_partition_key) >= 0) ||
-        (!end_primary_key.empty() && start_partition_key.compare(end_primary_key) >= 0)) {
-      // The primary key range and the partition key range do not intersect;
-      // the scan will be empty. Keep the existing partition key range.
-    } else {
-      // Assign the scan's partition key range to the intersection of the
-      // primary key and partition key ranges.
-      data_->spec_.SetLowerBoundPartitionKey(start_primary_key);
-      data_->spec_.SetExclusiveUpperBoundPartitionKey(end_primary_key);
-    }
-  }
-
-  RETURN_NOT_OK(data_->OpenTablet(data_->spec_.lower_bound_partition_key(), deadline, &blacklist));
+  RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist));
 
   data_->open_ = true;
   return Status::OK();
@@ -1227,9 +1194,7 @@ Status KuduScanner::NextBatch(KuduScanBatch* result) {
                                       batch_deadline, candidates, &blacklist));
 
     LOG(WARNING) << "Attempting to retry scan of tablet " << ToString() << " elsewhere.";
-    // Use the start partition key of the current tablet as the start partition key.
-    const string& partition_key_start = data_->remote_->partition().partition_key_start();
-    return data_->OpenTablet(partition_key_start, batch_deadline, &blacklist);
+    return data_->ReopenCurrentTablet(batch_deadline, &blacklist);
   } else if (data_->MoreTablets()) {
     // More data may be available in other tablets.
     // No need to close the current tablet; we scanned all the data so the
@@ -1239,8 +1204,8 @@ Status KuduScanner::NextBatch(KuduScanBatch* result) {
     MonoTime deadline = MonoTime::Now(MonoTime::FINE);
     deadline.AddDelta(data_->timeout_);
     set<string> blacklist;
-    RETURN_NOT_OK(data_->OpenTablet(data_->remote_->partition().partition_key_end(),
-                                    deadline, &blacklist));
+
+    RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist));
     // No rows written, the next invocation will pick them up.
     return Status::OK();
   } else {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/96b7291f/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index b9931e9..568f381 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -251,6 +251,20 @@ Status KuduScanner::Data::CanBeRetried(const bool isNewScan,
   return Status::OK();
 }
 
+Status KuduScanner::Data::OpenNextTablet(const MonoTime& deadline,
+                                         std::set<std::string>* blacklist) {
+  return OpenTablet(partition_pruner_.NextPartitionKey(),
+                    deadline,
+                    blacklist);
+}
+
+Status KuduScanner::Data::ReopenCurrentTablet(const MonoTime& deadline,
+                                              std::set<std::string>* blacklist) {
+  return OpenTablet(remote_->partition().partition_key_start(),
+                    deadline,
+                    blacklist);
+}
+
 Status KuduScanner::Data::OpenTablet(const string& partition_key,
                                      const MonoTime& deadline,
                                      set<string>* blacklist) {
@@ -387,6 +401,8 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
                                candidates, blacklist));
   }
 
+  partition_pruner_.RemovePartitionKeyRange(remote_->partition().partition_key_end());
+
   next_req_.clear_new_scan_request();
   data_in_open_ = last_response_.has_data();
   if (last_response_.has_more_results()) {
@@ -440,31 +456,7 @@ Status KuduScanner::Data::KeepAlive() {
 bool KuduScanner::Data::MoreTablets() const {
   CHECK(open_);
   // TODO(KUDU-565): add a test which has a scan end on a tablet boundary
-
-  if (remote_->partition().partition_key_end().empty()) {
-    // Last tablet -- nothing more to scan.
-    return false;
-  }
-
-  if (!spec_.exclusive_upper_bound_partition_key().empty() &&
-      spec_.exclusive_upper_bound_partition_key() <= remote_->partition().partition_key_end()) {
-    // We are not past the scan's upper bound partition key.
-    return false;
-  }
-
-  if (!table_->partition_schema().IsSimplePKRangePartitioning(*table_->schema().schema_)) {
-    // We can't do culling yet if the partitioning isn't simple.
-    return true;
-  }
-
-  if (spec_.exclusive_upper_bound_key() == nullptr) {
-    // No upper bound - keep going!
-    return true;
-  }
-
-  // Otherwise, we have to compare the upper bound.
-  return spec_.exclusive_upper_bound_key()->encoded_key()
-          .compare(remote_->partition().partition_key_end()) > 0;
+  return partition_pruner_.HasMorePartitionKeyRanges();
 }
 
 void KuduScanner::Data::PrepareRequest(RequestType state) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/96b7291f/src/kudu/client/scanner-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 8e17c59..e2a3f63 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -23,6 +23,7 @@
 
 #include "kudu/client/client.h"
 #include "kudu/client/row_result.h"
+#include "kudu/common/partition_pruner.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/tserver/tserver_service.proxy.h"
@@ -58,10 +59,17 @@ class KuduScanner::Data {
                       const std::vector<internal::RemoteTabletServer*>& candidates,
                       std::set<std::string>* blacklist);
 
-  // Open a tablet.
+  // Open the next tablet in the scan.
   // The deadline is the time budget for this operation.
   // The blacklist is used to temporarily filter out nodes that are experiencing transient errors.
   // This blacklist may be modified by the callee.
+  Status OpenNextTablet(const MonoTime& deadline, std::set<std::string>* blacklist);
+
+  // Open the current tablet in the scan again.
+  // See OpenNextTablet for options.
+  Status ReopenCurrentTablet(const MonoTime& deadline, std::set<std::string>* blacklist);
+
+  // Open the tablet to scan.
   Status OpenTablet(const std::string& partition_key,
                     const MonoTime& deadline,
                     std::set<std::string>* blacklist);
@@ -147,6 +155,8 @@ class KuduScanner::Data {
   // encoded keys.
   ScanSpec spec_;
 
+  PartitionPruner partition_pruner_;
+
   // The tablet we're scanning.
   scoped_refptr<internal::RemoteTablet> remote_;
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/96b7291f/src/kudu/common/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/common/CMakeLists.txt b/src/kudu/common/CMakeLists.txt
index 04afe3e..28415b5 100644
--- a/src/kudu/common/CMakeLists.txt
+++ b/src/kudu/common/CMakeLists.txt
@@ -49,6 +49,7 @@ set(COMMON_SRCS
   key_util.cc
   partial_row.cc
   partition.cc
+  partition_pruner.cc
   rowblock.cc
   row_changelist.cc
   row_operations.cc
@@ -81,11 +82,12 @@ ADD_KUDU_TEST(column_predicate-test)
 ADD_KUDU_TEST(encoded_key-test)
 ADD_KUDU_TEST(generic_iterators-test)
 ADD_KUDU_TEST(id_mapping-test)
+ADD_KUDU_TEST(key_util-test)
 ADD_KUDU_TEST(partial_row-test)
 ADD_KUDU_TEST(partition-test)
+ADD_KUDU_TEST(partition_pruner-test)
 ADD_KUDU_TEST(predicate-test)
 ADD_KUDU_TEST(row_changelist-test)
-ADD_KUDU_TEST(key_util-test)
 ADD_KUDU_TEST(row_operations-test)
 ADD_KUDU_TEST(scan_spec-test)
 ADD_KUDU_TEST(schema-test)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/96b7291f/src/kudu/common/partial_row.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h
index eac4fab..66b430f 100644
--- a/src/kudu/common/partial_row.h
+++ b/src/kudu/common/partial_row.h
@@ -24,6 +24,7 @@
 #ifdef KUDU_HEADERS_NO_STUBS
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
+#include <gtest/gtest_prod.h>
 #else
 // This is a poor module interdependency, but the stubs are header-only and
 // it's only for exported header builds, so we'll make an exception.
@@ -189,14 +190,16 @@ class KUDU_EXPORT KuduPartialRow {
   const Schema* schema() const { return schema_; }
 
  private:
+  friend class client::KuduWriteOperation;   // for row_data_.
   friend class KeyUtilTest;
+  friend class PartitionSchema;
   friend class RowOperationsPBDecoder;
   friend class RowOperationsPBEncoder;
-  friend class client::KuduWriteOperation;   // for row_data_.
-  friend class PartitionSchema;
   friend class TestScanSpec;
   template<typename KeyTypeWrapper> friend struct client::SliceKeysTestSetup;
   template<typename KeyTypeWrapper> friend struct client::IntKeysTestSetup;
+  FRIEND_TEST(TestPartitionPruner, TestPrimaryKeyRangePruning);
+  FRIEND_TEST(TestPartitionPruner, TestPartialPrimaryKeyRangePruning);
 
   template<typename T>
   Status Set(const Slice& col_name, const typename T::cpp_type& val,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/96b7291f/src/kudu/common/partition.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition.cc b/src/kudu/common/partition.cc
index d8910ce..86b63a4 100644
--- a/src/kudu/common/partition.cc
+++ b/src/kudu/common/partition.cc
@@ -247,8 +247,8 @@ Status PartitionSchema::CreatePartitions(const vector<KuduPartialRow>& split_row
 
     // Check for an empty split row.
     if (column_count == 0) {
-    return Status::InvalidArgument("Split rows must contain a value for at "
-                                   "least one range partition column");
+      return Status::InvalidArgument("Split rows must contain a value for at "
+                                     "least one range partition column");
     }
 
     start_key.clear();
@@ -655,16 +655,6 @@ bool PartitionSchema::Equals(const PartitionSchema& other) const {
   return true;
 }
 
-bool PartitionSchema::IsSimplePKRangePartitioning(const Schema& schema) const {
-  if (!hash_bucket_schemas_.empty()) return false;
-  if (range_schema_.column_ids.size() != schema.num_key_columns()) return false;
-
-  for (int i = 0; i < schema.num_key_columns(); i++) {
-    if (range_schema_.column_ids[i] != schema.column_id(i)) return false;
-  }
-  return true;
-}
-
 // Encodes the specified primary key columns of the supplied row into the buffer.
 Status PartitionSchema::EncodeColumns(const ConstContiguousRow& row,
                                       const vector<ColumnId>& column_ids,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/96b7291f/src/kudu/common/partition.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition.h b/src/kudu/common/partition.h
index b8988c4..06527dc 100644
--- a/src/kudu/common/partition.h
+++ b/src/kudu/common/partition.h
@@ -179,11 +179,8 @@ class PartitionSchema {
   // Returns true if the other partition schema is equivalent to this one.
   bool Equals(const PartitionSchema& other) const;
 
-  // Return true if the partitioning scheme simply range-partitions on the full primary key,
-  // with no bucketing components, etc.
-  bool IsSimplePKRangePartitioning(const Schema& schema) const;
-
  private:
+  friend class PartitionPruner;
 
   struct RangeSchema {
     std::vector<ColumnId> column_ids;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/96b7291f/src/kudu/common/partition_pruner-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner-test.cc b/src/kudu/common/partition_pruner-test.cc
new file mode 100644
index 0000000..f819db8
--- /dev/null
+++ b/src/kudu/common/partition_pruner-test.cc
@@ -0,0 +1,640 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/common/partition_pruner.h"
+
+#include <algorithm>
+#include <boost/optional.hpp>
+#include <memory>
+#include <string>
+#include <tuple>
+
+#include "kudu/common/common.pb.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/partition.h"
+#include "kudu/common/row.h"
+#include "kudu/common/scan_predicate.h"
+#include "kudu/common/scan_spec.h"
+#include "kudu/common/schema.h"
+#include "kudu/util/test_util.h"
+
+using boost::optional;
+using std::count_if;
+using std::get;
+using std::make_tuple;
+using std::move;
+using std::string;
+using std::tuple;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+
+void CheckPrunedPartitions(const Schema& schema,
+                            const PartitionSchema& partition_schema,
+                            const vector<Partition> partitions,
+                            const ScanSpec& spec,
+                            size_t remaining_tablets) {
+
+  PartitionPruner pruner;
+  pruner.Init(schema, partition_schema, spec);
+
+  SCOPED_TRACE(strings::Substitute("schema: $0", schema.ToString()));
+  SCOPED_TRACE(strings::Substitute("partition schema: $0", partition_schema.DebugString(schema)));
+  SCOPED_TRACE(strings::Substitute("partition pruner: $0",
+                                    pruner.ToString(schema, partition_schema)));
+  SCOPED_TRACE(strings::Substitute("scan spec: $0", spec.ToString(schema)));
+
+  int pruned_partitions = count_if(partitions.begin(), partitions.end(),
+                                    [&] (const Partition& partition) {
+                                      return pruner.ShouldPruneForTests(partition);
+                                    });
+  ASSERT_EQ(remaining_tablets, partitions.size() - pruned_partitions);
+}
+
+TEST(TestPartitionPruner, TestPrimaryKeyRangePruning) {
+  // CREATE TABLE t
+  // (a INT8, b INT8, c INT8)
+  // PRIMARY KEY (a, b, c)) SPLIT ROWS [(0, 0, 0), (10, 10, 10)]
+  // DISTRIBUTE BY RANGE(a, b, c);
+  Schema schema({ ColumnSchema("a", INT8),
+                  ColumnSchema("b", INT8),
+                  ColumnSchema("c", INT8) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) },
+                3);
+
+  PartitionSchema partition_schema;
+  ASSERT_OK(PartitionSchema::FromPB(PartitionSchemaPB(), schema, &partition_schema));
+
+  KuduPartialRow split1(&schema);
+  ASSERT_OK(split1.SetInt8("a", 0));
+  ASSERT_OK(split1.SetInt8("b", 0));
+  ASSERT_OK(split1.SetInt8("c", 0));
+
+  KuduPartialRow split2(&schema);
+  ASSERT_OK(split2.SetInt8("a", 10));
+  ASSERT_OK(split2.SetInt8("b", 10));
+  ASSERT_OK(split2.SetInt8("c", 10));
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, schema, &partitions));
+
+  // Creates a scan with optional lower and upper bounds, and checks that the
+  // expected number of tablets are pruned.
+  auto Check = [&] (optional<tuple<int8_t, int8_t, int8_t>> lower,
+                    optional<tuple<int8_t, int8_t, int8_t>> upper,
+                    size_t remaining_tablets) {
+    ScanSpec spec;
+    KuduPartialRow lower_bound(&schema);
+    KuduPartialRow upper_bound(&schema);
+    gscoped_ptr<EncodedKey> enc_lower_bound;
+    gscoped_ptr<EncodedKey> enc_upper_bound;
+
+    if (lower) {
+      CHECK_OK(lower_bound.SetInt8("a", get<0>(*lower)));
+      CHECK_OK(lower_bound.SetInt8("b", get<1>(*lower)));
+      CHECK_OK(lower_bound.SetInt8("c", get<2>(*lower)));
+      ConstContiguousRow row(lower_bound.schema(), lower_bound.row_data_);
+      enc_lower_bound = EncodedKey::FromContiguousRow(row);
+      spec.SetLowerBoundKey(enc_lower_bound.get());
+    }
+    if (upper) {
+      CHECK_OK(upper_bound.SetInt8("a", get<0>(*upper)));
+      CHECK_OK(upper_bound.SetInt8("b", get<1>(*upper)));
+      CHECK_OK(upper_bound.SetInt8("c", get<2>(*upper)));
+      ConstContiguousRow row(upper_bound.schema(), upper_bound.row_data_);
+      enc_upper_bound = EncodedKey::FromContiguousRow(row);
+      spec.SetExclusiveUpperBoundKey(enc_upper_bound.get());
+    }
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec, remaining_tablets);
+  };
+
+  // No bounds
+  Check(boost::none, boost::none, 3);
+
+  // PK < (-1, min, min)
+  Check(boost::none,
+        make_tuple<int8_t, int8_t, int8_t>(-1, INT8_MIN, INT8_MIN),
+        1);
+
+  // PK < (10, 10, 10)
+  Check(boost::none,
+        make_tuple<int8_t, int8_t, int8_t>(10, 10, 10),
+        2);
+
+  // PK < (100, min, min)
+  Check(boost::none,
+        make_tuple<int8_t, int8_t, int8_t>(100, INT8_MIN, INT8_MIN),
+        3);
+
+  // PK >= (-10, -10, -10)
+  Check(make_tuple<int8_t, int8_t, int8_t>(-10, -10, -10),
+        boost::none,
+        3);
+
+  // PK >= (0, 0, 0)
+  Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
+        boost::none,
+        2);
+
+  // PK >= (100, 0, 0)
+  Check(make_tuple<int8_t, int8_t, int8_t>(100, 0, 0),
+        boost::none,
+        1);
+
+  // PK >= (-10, 0, 0)
+  // PK  < (100, 0, 0)
+  Check(make_tuple<int8_t, int8_t, int8_t>(-10, 0, 0),
+        make_tuple<int8_t, int8_t, int8_t>(100, 0, 0),
+        3);
+
+  // PK >= (0, 0, 0)
+  // PK  < (10, 10, 10)
+  Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
+        make_tuple<int8_t, int8_t, int8_t>(10, 10, 10),
+        1);
+
+  // PK >= (0, 0, 0)
+  // PK  < (10, 10, 11)
+  Check(make_tuple<int8_t, int8_t, int8_t>(0, 0, 0),
+        make_tuple<int8_t, int8_t, int8_t>(10, 10, 11),
+        2);
+}
+
+TEST(TestPartitionPruner, TestPartialPrimaryKeyRangePruning) {
+  // CREATE TABLE t
+  // (a INT8, b STRING, c STRING, PRIMARY KEY (a, b, c))
+  // DISTRIBUTE BY RANGE(a, b)
+  // SPLIT ROWS [(0, "m"), (10, "r"];
+
+  // Setup the Schema
+  Schema schema({ ColumnSchema("a", INT8),
+      ColumnSchema("b", STRING),
+      ColumnSchema("c", STRING) },
+      { ColumnId(0), ColumnId(1), ColumnId(2) },
+      3);
+
+  PartitionSchema partition_schema;
+  auto pb = PartitionSchemaPB();
+  auto range_schema = pb.mutable_range_schema();
+  range_schema->add_columns()->set_name("a");
+  range_schema->add_columns()->set_name("b");
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+  KuduPartialRow split1(&schema);
+  ASSERT_OK(split1.SetInt8("a", 0));
+  ASSERT_OK(split1.SetStringCopy("b", "m"));
+
+  KuduPartialRow split2(&schema);
+  ASSERT_OK(split2.SetInt8("a", 10));
+  ASSERT_OK(split2.SetStringCopy("b", "r"));
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, schema, &partitions));
+
+  // Applies the specified lower and upper bound primary keys against the
+  // schema, and checks that the expected number of partitions are pruned.
+  auto Check = [&] (optional<tuple<int8_t, string>> lower,
+                    optional<tuple<int8_t, string>> upper,
+                    size_t remaining_tablets ) {
+    ScanSpec spec;
+    KuduPartialRow lower_bound(&schema);
+    KuduPartialRow upper_bound(&schema);
+    gscoped_ptr<EncodedKey> enc_lower_bound;
+    gscoped_ptr<EncodedKey> enc_upper_bound;
+
+    if (lower) {
+      CHECK_OK(lower_bound.SetInt8("a", get<0>(*lower)));
+      CHECK_OK(lower_bound.SetStringCopy("b", get<1>(*lower)));
+      CHECK_OK(lower_bound.SetStringCopy("c", "fuzz"));
+      ConstContiguousRow row(lower_bound.schema(), lower_bound.row_data_);
+      enc_lower_bound = EncodedKey::FromContiguousRow(row);
+      spec.SetLowerBoundKey(enc_lower_bound.get());
+    }
+    if (upper) {
+      CHECK_OK(upper_bound.SetInt8("a", get<0>(*upper)));
+      CHECK_OK(upper_bound.SetStringCopy("b", get<1>(*upper)));
+      CHECK_OK(upper_bound.SetStringCopy("c", "fuzzy"));
+      ConstContiguousRow row(upper_bound.schema(), upper_bound.row_data_);
+      enc_upper_bound = EncodedKey::FromContiguousRow(row);
+      spec.SetExclusiveUpperBoundKey(enc_upper_bound.get());
+    }
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec, remaining_tablets);
+  };
+
+  // No bounds
+  Check(boost::none, boost::none, 3);
+
+  // PK < (-1, min, _)
+  Check(boost::none, make_tuple<int8_t, string>(-1, ""), 1);
+
+  // PK < (10, "r", _)
+  Check(boost::none, make_tuple<int8_t, string>(10, "r"), 2);
+
+  // PK < (100, min)
+  Check(boost::none, make_tuple<int8_t, string>(100, ""), 3);
+
+  // PK >= (-10, "m")
+  Check(make_tuple<int8_t, string>(-10, "m"), boost::none, 3);
+
+  // PK >= (0, "")
+  Check(make_tuple<int8_t, string>(0, ""), boost::none, 3);
+
+  // PK >= (0, "m")
+  Check(make_tuple<int8_t, string>(0, "m"), boost::none, 2);
+
+  // PK >= (100, "")
+  Check(make_tuple<int8_t, string>(100, ""), boost::none, 1);
+
+  // PK >= (-10, 0)
+  // PK  < (100, 0)
+  Check(make_tuple<int8_t, string>(-10, ""),
+        make_tuple<int8_t, string>(100, ""), 3);
+
+  // PK >= (0, "m")
+  // PK  < (10, "r")
+  Check(make_tuple<int8_t, string>(0, "m"),
+        make_tuple<int8_t, string>(10, "r"), 1);
+
+  // PK >= (0, "")
+  // PK  < (10, "m")
+  Check(make_tuple<int8_t, string>(0, ""),
+        make_tuple<int8_t, string>(10, "m"), 2);
+
+  // PK >= (10, "m")
+  // PK  < (10, "m")
+  Check(make_tuple<int8_t, string>(10, "m"),
+        make_tuple<int8_t, string>(10, "m"), 1);
+}
+
+TEST(TestPartitionPruner, TestRangePruning) {
+  // CREATE TABLE t
+  // (a INT8, b STRING, c INT8)
+  // PRIMARY KEY (a, b, c))
+  // DISTRIBUTE BY RANGE(c, b);
+  // SPLIT ROWS [(0, "m"), (10, "r")];
+  Schema schema({ ColumnSchema("a", INT8),
+                  ColumnSchema("b", STRING),
+                  ColumnSchema("c", INT8) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) },
+                3);
+
+  PartitionSchema partition_schema;
+  auto pb = PartitionSchemaPB();
+  auto range_schema = pb.mutable_range_schema();
+  range_schema->add_columns()->set_name("c");
+  range_schema->add_columns()->set_name("b");
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+  KuduPartialRow split1(&schema);
+  ASSERT_OK(split1.SetInt8("c", 0));
+  ASSERT_OK(split1.SetStringCopy("b", "m"));
+
+  KuduPartialRow split2(&schema);
+  ASSERT_OK(split2.SetInt8("c", 10));
+  ASSERT_OK(split2.SetStringCopy("b", "r"));
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions({ split1, split2 }, schema, &partitions));
+
+  // Applies the specified predicates to a scan and checks that the expected
+  // number of partitions are pruned.
+  auto Check = [&] (const vector<ColumnPredicate>& predicates, size_t remaining_tablets) {
+    ScanSpec spec;
+
+    for (const auto& pred : predicates) {
+      spec.AddPredicate(pred);
+    }
+
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec, remaining_tablets);
+  };
+
+  int8_t neg_ten = -10;
+  int8_t zero = 0;
+  int8_t five = 5;
+  int8_t ten = 10;
+  int8_t hundred = 100;
+
+  Slice empty = "";
+  Slice a = "a";
+  Slice m = "m";
+  Slice m0 = Slice("m\0", 2);
+  Slice r = "r";
+  Slice z = "z";
+
+  // No Bounds
+  Check({}, 3);
+
+  // c < -10
+  Check({ ColumnPredicate::Range(schema.column(2), nullptr, &neg_ten) }, 1);
+
+  // c = -10
+  Check({ ColumnPredicate::Equality(schema.column(2), &neg_ten) }, 1);
+
+  // c < 10
+  Check({ ColumnPredicate::Range(schema.column(2), nullptr, &ten) }, 2);
+
+  // c < 100
+  Check({ ColumnPredicate::Range(schema.column(2), nullptr, &hundred) }, 3);
+
+  // c >= -10
+  Check({ ColumnPredicate::Range(schema.column(0), &neg_ten, nullptr) }, 3);
+
+  // c >= 0
+  Check({ ColumnPredicate::Range(schema.column(2), &zero, nullptr) }, 3);
+
+  // c >= 5
+  Check({ ColumnPredicate::Range(schema.column(2), &five, nullptr) }, 2);
+
+  // c >= 10
+  Check({ ColumnPredicate::Range(schema.column(2), &ten, nullptr) }, 2);
+
+  // c >= 100
+  Check({ ColumnPredicate::Range(schema.column(2), &hundred, nullptr) }, 1);
+
+  // c >= -10
+  // c < 0
+  Check({ ColumnPredicate::Range(schema.column(2), &neg_ten, &zero) }, 1);
+
+  // c >= -10
+  // c < 0
+  Check({ ColumnPredicate::Range(schema.column(2), &neg_ten, &zero) }, 1);
+
+  // c >= 5
+  // c < 100
+  Check({ ColumnPredicate::Range(schema.column(2), &five, &hundred) }, 2);
+
+  // b = ""
+  Check({ ColumnPredicate::Equality(schema.column(1), &empty) }, 3);
+
+  // b >= "z"
+  Check({ ColumnPredicate::Range(schema.column(1), &z, nullptr) }, 3);
+
+  // b < "a"
+  Check({ ColumnPredicate::Range(schema.column(1), nullptr, &a) }, 3);
+
+  // b >= "m"
+  // b < "z"
+  Check({ ColumnPredicate::Range(schema.column(1), &m, &z) }, 3);
+
+  // c >= 10
+  // b >= "r"
+  Check({ ColumnPredicate::Range(schema.column(2), &ten, nullptr),
+          ColumnPredicate::Range(schema.column(1), &r, nullptr) },
+        1);
+
+  // c >= 10
+  // b < "r"
+  Check({ ColumnPredicate::Range(schema.column(2), &ten, nullptr),
+          ColumnPredicate::Range(schema.column(1), nullptr, &r) },
+        2);
+
+  // c = 10
+  // b < "r"
+  Check({ ColumnPredicate::Equality(schema.column(2), &ten),
+          ColumnPredicate::Range(schema.column(1), nullptr, &r) },
+        1);
+
+  // c < 0
+  // b < "m"
+  Check({ ColumnPredicate::Range(schema.column(2), nullptr, &zero),
+          ColumnPredicate::Range(schema.column(1), nullptr, &m) },
+        1);
+
+  // c < 0
+  // b < "z"
+  Check({ ColumnPredicate::Range(schema.column(2), nullptr, &zero),
+          ColumnPredicate::Range(schema.column(1), nullptr, &z) },
+        1);
+
+  // c = 0
+  // b = "m\0"
+  Check({ ColumnPredicate::Equality(schema.column(2), &zero),
+          ColumnPredicate::Equality(schema.column(1), &m0) },
+        1);
+
+  // c = 0
+  // b < "m"
+  Check({ ColumnPredicate::Equality(schema.column(2), &zero),
+          ColumnPredicate::Range(schema.column(1), nullptr, &m) },
+        1);
+
+  // c = 0
+  // b < "m\0"
+  Check({ ColumnPredicate::Equality(schema.column(2), &zero),
+          ColumnPredicate::Range(schema.column(1), nullptr, &m0) },
+        2);
+}
+
+TEST(TestPartitionPruner, TestHashPruning) {
+  // CREATE TABLE t
+  // (a INT8, b STRING, c INT8)
+  // PRIMARY KEY (a, b, c)
+  // DISTRIBUTE BY HASH(a) INTO 2 BUCKETS,
+  //               HASH(b, c) INTO 2 BUCKETS;
+  Schema schema({ ColumnSchema("a", INT8),
+                  ColumnSchema("b", INT8),
+                  ColumnSchema("c", INT8) },
+                { ColumnId(0), ColumnId(1), ColumnId(2) },
+                3);
+
+    PartitionSchema partition_schema;
+    auto pb = PartitionSchemaPB();
+    pb.mutable_range_schema()->Clear();
+    auto hash_component_1 = pb.add_hash_bucket_schemas();
+    hash_component_1->add_columns()->set_name("a");
+    hash_component_1->set_num_buckets(2);
+    auto hash_component_2 = pb.add_hash_bucket_schemas();
+    hash_component_2->add_columns()->set_name("b");
+    hash_component_2->add_columns()->set_name("c");
+    hash_component_2->set_num_buckets(2);
+
+    ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+    vector<Partition> partitions;
+    ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>(), schema, &partitions));
+
+
+  // Applies the specified predicates to a scan and checks that the expected
+  // number of partitions are pruned.
+  auto Check = [&] (const vector<ColumnPredicate>& predicates, size_t remaining_tablets) {
+    ScanSpec spec;
+
+    for (const auto& pred : predicates) {
+      spec.AddPredicate(pred);
+    }
+
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec, remaining_tablets);
+  };
+
+  int8_t zero = 0;
+  int8_t one = 1;
+  int8_t two = 2;
+
+  // No Bounds
+  Check({}, 4);
+
+  // a = 0;
+  Check({ ColumnPredicate::Equality(schema.column(0), &zero) }, 2);
+
+  // a >= 0;
+  Check({ ColumnPredicate::Range(schema.column(0), &zero, nullptr) }, 4);
+
+  // a >= 0;
+  // a < 1;
+  Check({ ColumnPredicate::Range(schema.column(0), &zero, &one) }, 2);
+
+  // a >= 0;
+  // a < 2;
+  Check({ ColumnPredicate::Range(schema.column(0), &zero, &two) }, 4);
+
+  // b = 1;
+  Check({ ColumnPredicate::Equality(schema.column(1), &one) }, 4);
+
+  // b = 1;
+  // c = 2;
+  Check({ ColumnPredicate::Equality(schema.column(1), &one),
+          ColumnPredicate::Equality(schema.column(2), &two) },
+        2);
+
+  // a = 0;
+  // b = 1;
+  // c = 2;
+  Check({ ColumnPredicate::Equality(schema.column(0), &zero),
+          ColumnPredicate::Equality(schema.column(1), &one),
+          ColumnPredicate::Equality(schema.column(2), &two) },
+        1);
+}
+
+TEST(TestPartitionPruner, TestPruning) {
+  // CREATE TABLE timeseries
+  // (host STRING, metric STRING, time TIMESTAMP, value DOUBLE)
+  // PRIMARY KEY (host, metric, time)
+  // DISTRIBUTE BY RANGE(time) SPLIT ROWS [(5), (10)],
+  //               HASH(host, metric) INTO 2 BUCKETS;
+  Schema schema({ ColumnSchema("host", STRING),
+                  ColumnSchema("metric", STRING),
+                  ColumnSchema("time", TIMESTAMP),
+                  ColumnSchema("value", DOUBLE) },
+                { ColumnId(0), ColumnId(1), ColumnId(2), ColumnId(3) },
+                3);
+
+  PartitionSchema partition_schema;
+  auto pb = PartitionSchemaPB();
+  pb.mutable_range_schema()->add_columns()->set_name("time");
+
+  auto hash = pb.add_hash_bucket_schemas();
+  hash->add_columns()->set_name("host");
+  hash->add_columns()->set_name("metric");
+  hash->set_num_buckets(2);
+
+  ASSERT_OK(PartitionSchema::FromPB(pb, schema, &partition_schema));
+
+  KuduPartialRow split(&schema);
+  ASSERT_OK(split.SetTimestamp("time", 10));
+
+  vector<Partition> partitions;
+  ASSERT_OK(partition_schema.CreatePartitions(vector<KuduPartialRow>{ move(split) },
+                                              schema, &partitions));
+  ASSERT_EQ(4, partitions.size());
+
+  // Applies the specified predicates to a scan and checks that the expected
+  // number of partitions are pruned.
+  auto Check = [&] (const vector<ColumnPredicate>& predicates,
+                    string lower_bound_partition_key,
+                    string upper_bound_partition_key,
+                    size_t remaining_tablets) {
+    ScanSpec spec;
+
+    spec.SetLowerBoundPartitionKey(lower_bound_partition_key);
+    spec.SetExclusiveUpperBoundPartitionKey(upper_bound_partition_key);
+    for (const auto& pred : predicates) {
+      spec.AddPredicate(pred);
+    }
+
+    CheckPrunedPartitions(schema, partition_schema, partitions, spec, remaining_tablets);
+  };
+
+  Slice a = "a";
+
+  int64_t nine = 9;
+  int64_t ten = 10;
+  int64_t twenty = 20;
+
+  // host = "a"
+  // metric = "a"
+  // timestamp >= 9;
+  Check({ ColumnPredicate::Equality(schema.column(0), &a),
+          ColumnPredicate::Equality(schema.column(1), &a),
+          ColumnPredicate::Range(schema.column(2), &nine, nullptr) },
+        "", "",
+        2);
+
+  // host = "a"
+  // metric = "a"
+  // timestamp >= 10;
+  // timestamp < 20;
+  Check({ ColumnPredicate::Equality(schema.column(0), &a),
+          ColumnPredicate::Equality(schema.column(1), &a),
+          ColumnPredicate::Range(schema.column(2), &ten, &twenty) },
+        "", "",
+        1);
+
+  // host = "a"
+  // metric = "a"
+  // timestamp < 10;
+  Check({ ColumnPredicate::Equality(schema.column(0), &a),
+          ColumnPredicate::Equality(schema.column(1), &a),
+          ColumnPredicate::Range(schema.column(2), nullptr, &ten) },
+        "", "",
+        1);
+
+  // host = "a"
+  // metric = "a"
+  // timestamp = 10;
+  Check({ ColumnPredicate::Equality(schema.column(0), &a),
+          ColumnPredicate::Equality(schema.column(1), &a),
+          ColumnPredicate::Equality(schema.column(2), &ten) },
+        "", "",
+        1);
+
+  // host = "a"
+  // metric = "a"
+  // timestamp = 10;
+  Check({ ColumnPredicate::Equality(schema.column(0), &a),
+          ColumnPredicate::Equality(schema.column(1), &a),
+          ColumnPredicate::Equality(schema.column(2), &ten) },
+        "", "",
+        1);
+
+  // partition key < (hash=1)
+  Check({}, "", string("\0\0\0\1", 4), 2);
+
+  // partition key >= (hash=1)
+  Check({}, string("\0\0\0\1", 4), "", 2);
+
+  // a = 10
+  // partition key < (hash=1)
+  Check({ ColumnPredicate::Equality(schema.column(2), &ten) },
+        "", string("\0\0\0\1", 4), 1);
+
+  // a = 10
+  // partition key >= (hash=1)
+  Check({ ColumnPredicate::Equality(schema.column(2), &ten) },
+        string("\0\0\0\1", 4), "", 1);
+}
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/96b7291f/src/kudu/common/partition_pruner.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner.cc b/src/kudu/common/partition_pruner.cc
new file mode 100644
index 0000000..88278e2
--- /dev/null
+++ b/src/kudu/common/partition_pruner.cc
@@ -0,0 +1,428 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/common/partition_pruner.h"
+
+#include <algorithm>
+#include <boost/optional.hpp>
+#include <cstring>
+#include <memory>
+#include <numeric>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "kudu/common/key_encoder.h"
+#include "kudu/common/key_util.h"
+#include "kudu/common/partition.h"
+#include "kudu/common/scan_spec.h"
+#include "kudu/common/schema.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+
+using boost::optional;
+using std::distance;
+using std::find;
+using std::get;
+using std::iota;
+using std::lower_bound;
+using std::make_tuple;
+using std::memcpy;
+using std::min;
+using std::move;
+using std::string;
+using std::tuple;
+using std::unique_ptr;
+using std::unordered_map;
+using std::vector;
+
+namespace kudu {
+namespace {
+
+// Returns true if the partition schema's range columns are a prefix of the
+// primary key columns.
+bool AreRangeColumnsPrefixOfPrimaryKey(const Schema& schema,
+                                       const vector<ColumnId>& range_columns) {
+  CHECK(range_columns.size() <= schema.num_key_columns());
+  for (int32_t col_idx = 0; col_idx < range_columns.size(); col_idx++) {
+    if (schema.column_id(col_idx) != range_columns[col_idx]) {
+      return false;
+    }
+  }
+  return true;
+}
+
+// Translates the scan primary key bounds into range keys. This should only be
+// used when the range columns are a prefix of the primary key columns.
+void EncodeRangeKeysFromPrimaryKeyBounds(const Schema& schema,
+                                         const ScanSpec& scan_spec,
+                                         size_t num_range_columns,
+                                         string* range_key_start,
+                                         string* range_key_end) {
+  if (scan_spec.lower_bound_key() == nullptr && scan_spec.exclusive_upper_bound_key() == nullptr) {
+    // Don't bother if there are no lower and upper PK bounds
+    return;
+  }
+
+  if (num_range_columns == schema.num_key_columns()) {
+    // The range columns are the primary key columns, so the range key is the
+    // primary key.
+    if (scan_spec.lower_bound_key() != nullptr) {
+      *range_key_start = scan_spec.lower_bound_key()->encoded_key().ToString();
+    }
+    if (scan_spec.exclusive_upper_bound_key() != nullptr) {
+      *range_key_end = scan_spec.exclusive_upper_bound_key()->encoded_key().ToString();
+    }
+  } else {
+    // The range columns are a prefix of the primary key columns. Copy
+    // the column values over to a row, and then encode the row as a range key.
+
+    vector<int32_t> col_idxs(num_range_columns);
+    iota(col_idxs.begin(), col_idxs.end(), 0);
+
+    unique_ptr<uint8_t[]> buf(new uint8_t[schema.key_byte_size()]);
+    ContiguousRow row(&schema, buf.get());
+
+    if (scan_spec.lower_bound_key() != nullptr) {
+      for (int32_t idx : col_idxs) {
+        memcpy(row.mutable_cell_ptr(idx),
+               scan_spec.lower_bound_key()->raw_keys()[idx],
+               schema.column(idx).type_info()->size());
+      }
+      key_util::EncodeKey(col_idxs, row, range_key_start);
+    }
+
+    if (scan_spec.exclusive_upper_bound_key() != nullptr) {
+      for (int32_t idx : col_idxs) {
+        memcpy(row.mutable_cell_ptr(idx),
+               scan_spec.exclusive_upper_bound_key()->raw_keys()[idx],
+               schema.column(idx).type_info()->size());
+      }
+      key_util::EncodeKey(col_idxs, row, range_key_end);
+    }
+  }
+}
+
+// Push the scan predicates into the range keys.
+void EncodeRangeKeysFromPredicates(const Schema& schema,
+                                   const unordered_map<string, ColumnPredicate>& predicates,
+                                   const vector<ColumnId>& range_columns,
+                                   string* range_key_start,
+                                   string* range_key_end) {
+  // Find the column indexes of the range columns.
+  vector<int32_t> col_idxs;
+  col_idxs.reserve(range_columns.size());
+  for (ColumnId column : range_columns) {
+    int32_t col_idx = schema.find_column_by_id(column);
+    CHECK(col_idx != Schema::kColumnNotFound);
+    CHECK(col_idx < schema.num_key_columns());
+    col_idxs.push_back(col_idx);
+  }
+
+  // Arenas must be at least the minimum chunk size, and we require at least
+  // enough space for the range key columns.
+  Arena arena(max<size_t>(Arena::kMinimumChunkSize, schema.key_byte_size()), 4096);
+  uint8_t* buf = static_cast<uint8_t*>(CHECK_NOTNULL(arena.AllocateBytes(schema.key_byte_size())));
+  ContiguousRow row(&schema, buf);
+
+  if (key_util::PushLowerBoundKeyPredicates(col_idxs, predicates, &row, &arena) > 0) {
+    key_util::EncodeKey(col_idxs, row, range_key_start);
+  }
+
+  if (key_util::PushUpperBoundKeyPredicates(col_idxs, predicates, &row, &arena) > 0) {
+    key_util::EncodeKey(col_idxs, row, range_key_end);
+  }
+}
+} // anonymous namespace
+
+void PartitionPruner::Init(const Schema& schema,
+                           const PartitionSchema& partition_schema,
+                           const ScanSpec& scan_spec) {
+  // If we can already short circuit the scan we don't need to bother with
+  // partition pruning. This also allows us to assume some invariants of the
+  // scan spec, such as no None predicates and that the lower bound PK < upper
+  // bound PK.
+  if (scan_spec.CanShortCircuit()) { return; }
+
+  // Build a set of partition key ranges which cover the tablets necessary for
+  // the scan.
+  //
+  // Example predicate sets and resulting partition key ranges, based on the
+  // following tablet schema:
+  //
+  // CREATE TABLE t (a INT32, b INT32, c INT32) PRIMARY KEY (a, b, c)
+  // DISTRIBUTE BY RANGE (c)
+  //               HASH (a) INTO 2 BUCKETS
+  //               HASH (b) INTO 3 BUCKETS;
+  //
+  // Assume that hash(0) = 0 and hash(2) = 2.
+  //
+  // | Predicates | Partition Key Ranges                                   |
+  // +------------+--------------------------------------------------------+
+  // | a = 0      | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
+  // | b = 2      |                                                        |
+  // | c = 0      |                                                        |
+  // +------------+--------------------------------------------------------+
+  // | a = 0      | [(bucket=0, bucket=2), (bucket=0, bucket=3))           |
+  // | b = 2      |                                                        |
+  // +------------+--------------------------------------------------------+
+  // | a = 0      | [(bucket=0, bucket=0, c=0), (bucket=0, bucket=0, c=1)) |
+  // | c = 0      | [(bucket=0, bucket=1, c=0), (bucket=0, bucket=1, c=1)) |
+  // |            | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
+  // +------------+--------------------------------------------------------+
+  // | b = 2      | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
+  // | c = 0      | [(bucket=1, bucket=2, c=0), (bucket=1, bucket=2, c=1)) |
+  // +------------+--------------------------------------------------------+
+  // | a = 0      | [(bucket=0), (bucket=1))                               |
+  // +------------+--------------------------------------------------------+
+  // | b = 2      | [(bucket=0, bucket=2), (bucket=0, bucket=3))           |
+  // |            | [(bucket=1, bucket=2), (bucket=1, bucket=3))           |
+  // +------------+--------------------------------------------------------+
+  // | c = 0      | [(bucket=0, bucket=0, c=0), (bucket=0, bucket=0, c=1)) |
+  // |            | [(bucket=0, bucket=1, c=0), (bucket=0, bucket=1, c=1)) |
+  // |            | [(bucket=0, bucket=2, c=0), (bucket=0, bucket=2, c=1)) |
+  // |            | [(bucket=1, bucket=0, c=0), (bucket=1, bucket=0, c=1)) |
+  // |            | [(bucket=1, bucket=1, c=0), (bucket=1, bucket=1, c=1)) |
+  // |            | [(bucket=1, bucket=2, c=0), (bucket=1, bucket=2, c=1)) |
+  // +------------+--------------------------------------------------------+
+  // | None       | [(), ())                                               |
+  //
+  // If the partition key is considered as a sequence of the hash bucket
+  // components and a range component, then a few patterns emerge from the
+  // examples above:
+  //
+  // 1) The partition keys are truncated after the final constrained component
+  //    (hash bucket components are constrained when the scan is limited to a
+  //    single bucket via equality predicates on that component, while range
+  //    components are constrained if they have an upper or lower bound via
+  //    range or equality predicates on that component).
+  //
+  // 2) If the final constrained component is a hash bucket, then the
+  //    corresponding bucket in the upper bound is incremented in order to make
+  //    it an exclusive key.
+  //
+  // 3) The number of partition key ranges in the result is equal to the product
+  //    of the number of buckets of each unconstrained hash component which come
+  //    before a final constrained component. If there are no unconstrained hash
+  //    components, then the number of partition key ranges is one.
+
+  // Step 1: Build the range portion of the partition key.
+  string range_lower_bound;
+  string range_upper_bound;
+  const vector<ColumnId>& range_columns = partition_schema.range_schema_.column_ids;
+  if (!range_columns.empty()) {
+    if (AreRangeColumnsPrefixOfPrimaryKey(schema, range_columns)) {
+      EncodeRangeKeysFromPrimaryKeyBounds(schema,
+                                          scan_spec,
+                                          range_columns.size(),
+                                          &range_lower_bound,
+                                          &range_upper_bound);
+    } else {
+      EncodeRangeKeysFromPredicates(schema,
+                                    scan_spec.predicates(),
+                                    range_columns,
+                                    &range_lower_bound,
+                                    &range_upper_bound);
+    }
+  }
+
+  // Step 2: Create the hash bucket portion of the partition key.
+
+  // The list of hash buckets per hash component, or none if the component is
+  // not constrained.
+  vector<optional<uint32_t>> hash_buckets;
+  hash_buckets.reserve(partition_schema.hash_bucket_schemas_.size());
+  for (int hash_idx = 0; hash_idx < partition_schema.hash_bucket_schemas_.size(); hash_idx++) {
+    const auto& hash_bucket_schema = partition_schema.hash_bucket_schemas_[hash_idx];
+    string encoded_columns;
+    bool can_prune = true;
+    for (int col_offset = 0; col_offset < hash_bucket_schema.column_ids.size(); col_offset++) {
+      const ColumnSchema& column = schema.column_by_id(hash_bucket_schema.column_ids[col_offset]);
+      const ColumnPredicate* predicate = FindOrNull(scan_spec.predicates(), column.name());
+      if (predicate == nullptr || predicate->predicate_type() != PredicateType::Equality) {
+        can_prune = false;
+        break;
+      }
+
+      const KeyEncoder<string>& encoder = GetKeyEncoder<string>(column.type_info());
+      encoder.Encode(predicate->raw_lower(),
+                     col_offset + 1 == hash_bucket_schema.column_ids.size(),
+                     &encoded_columns);
+    }
+    if (can_prune) {
+      hash_buckets.push_back(partition_schema.BucketForEncodedColumns(encoded_columns,
+                                                                      hash_bucket_schema));
+    } else {
+      hash_buckets.push_back(boost::none);
+    }
+  }
+
+  // The index of the final constrained component in the partition key.
+  int constrained_index;
+  if (!range_lower_bound.empty() || !range_upper_bound.empty()) {
+    // The range component is constrained.
+    constrained_index = partition_schema.hash_bucket_schemas_.size();
+  } else {
+    // Search the hash bucket constraints from right to left, looking for the
+    // first constrained component.
+    constrained_index = partition_schema.hash_bucket_schemas_.size() -
+                        distance(hash_buckets.rbegin(),
+                                 find_if(hash_buckets.rbegin(),
+                                         hash_buckets.rend(),
+                                         [] (const optional<uint32_t>& x) { return x; }));
+  }
+
+  // Build up a set of partition key ranges out of the hash components.
+  //
+  // Each constrained hash component simply appends its bucket number to the
+  // partition key ranges (possibly incrementing the upper bound by one bucket
+  // number if this is the final constraint, see note 2 in the example above).
+  //
+  // Each unconstrained hash component results in creating a new partition key
+  // range for each bucket of the hash component.
+  vector<tuple<string, string>> partition_key_ranges(1);
+  const KeyEncoder<string>& hash_encoder = GetKeyEncoder<string>(GetTypeInfo(UINT32));
+  for (int hash_idx = 0; hash_idx < constrained_index; hash_idx++) {
+    // This is the final partition key component if this is the final constrained
+    // bucket, and the range upper bound is empty. In this case we need to
+    // increment the bucket on the upper bound to convert from inclusive to
+    // exclusive.
+    bool is_last = hash_idx + 1 == constrained_index && range_upper_bound.empty();
+
+    if (hash_buckets[hash_idx]) {
+      // This hash component is constrained by equality predicates to a single
+      // hash bucket.
+      uint32_t bucket = *hash_buckets[hash_idx];
+      uint32_t bucket_upper = is_last ? bucket + 1 : bucket;
+      for (auto& partition_key_range : partition_key_ranges) {
+        hash_encoder.Encode(&bucket, &get<0>(partition_key_range));
+        hash_encoder.Encode(&bucket_upper, &get<1>(partition_key_range));
+      }
+    } else {
+      const auto& hash_bucket_schema = partition_schema.hash_bucket_schemas_[hash_idx];
+      // Add a partition key range for each possible hash bucket.
+      vector<tuple<string, string>> new_partition_key_ranges;
+      for (const auto& partition_key_range : partition_key_ranges) {
+        for (uint32_t bucket = 0; bucket < hash_bucket_schema.num_buckets; bucket++) {
+          uint32_t bucket_upper = is_last ? bucket + 1 : bucket;
+          string lower = get<0>(partition_key_range);
+          string upper = get<1>(partition_key_range);
+          hash_encoder.Encode(&bucket, &lower);
+          hash_encoder.Encode(&bucket_upper, &upper);
+          new_partition_key_ranges.push_back(make_tuple(move(lower), move(upper)));
+        }
+      }
+      partition_key_ranges.swap(new_partition_key_ranges);
+    }
+  }
+
+  // Step 3: append the (possibly empty) range bounds to the partition key ranges.
+  for (auto& range : partition_key_ranges) {
+    get<0>(range).append(range_lower_bound);
+    get<1>(range).append(range_upper_bound);
+  }
+
+  // Step 4: remove all partition key ranges past the scan spec's upper bound partition key.
+  if (!scan_spec.exclusive_upper_bound_partition_key().empty()) {
+    for (auto range = partition_key_ranges.rbegin();
+         range != partition_key_ranges.rend();
+         range++) {
+      if (!get<1>(*range).empty() &&
+          scan_spec.exclusive_upper_bound_partition_key() >= get<1>(*range)) {
+        break;
+      }
+      if (scan_spec.exclusive_upper_bound_partition_key() <= get<0>(*range)) {
+        partition_key_ranges.pop_back();
+      } else {
+        get<1>(*range) = scan_spec.exclusive_upper_bound_partition_key();
+      }
+    }
+  }
+
+  // Step 5: Reverse the order of the partition key ranges, so that it is
+  // efficient to remove the partition key ranges from the vector in ascending order.
+  partition_key_ranges_.resize(partition_key_ranges.size());
+  move(partition_key_ranges.rbegin(), partition_key_ranges.rend(), partition_key_ranges_.begin());
+
+  // Step 6: Remove all partition key ranges before the scan spec's lower bound partition key.
+  if (!scan_spec.lower_bound_partition_key().empty()) {
+    RemovePartitionKeyRange(scan_spec.lower_bound_partition_key());
+  }
+}
+
+bool PartitionPruner::HasMorePartitionKeyRanges() const {
+  return !partition_key_ranges_.empty();
+}
+
+const string& PartitionPruner::NextPartitionKey() const {
+  CHECK(HasMorePartitionKeyRanges());
+  return get<0>(partition_key_ranges_.back());
+}
+
+void PartitionPruner::RemovePartitionKeyRange(const string& upper_bound) {
+  if (upper_bound.empty()) {
+    partition_key_ranges_.clear();
+    return;
+  }
+
+  for (auto range = partition_key_ranges_.rbegin();
+       range != partition_key_ranges_.rend();
+       range++) {
+    if (upper_bound <= get<0>(*range)) { break; }
+    if (get<1>(*range).empty() || upper_bound < get<1>(*range)) {
+      get<0>(*range) = upper_bound;
+    } else {
+      partition_key_ranges_.pop_back();
+    }
+  }
+}
+
+bool PartitionPruner::ShouldPruneForTests(const Partition& partition) const {
+  // range is an iterator that points to the first partition key range which
+  // overlaps or is greater than the partition.
+  auto range = lower_bound(partition_key_ranges_.rbegin(), partition_key_ranges_.rend(), partition,
+    [] (const tuple<string, string>& scan_range, const Partition& partition) {
+      // return true if scan_range < partition
+      const string& scan_upper = get<1>(scan_range);
+      return !scan_upper.empty() && scan_upper <= partition.partition_key_start();
+    });
+
+  return range == partition_key_ranges_.rend() ||
+         (!partition.partition_key_end().empty() &&
+          partition.partition_key_end() <= get<0>(*range));
+}
+
+string PartitionPruner::ToString(const Schema& schema,
+                                 const PartitionSchema& partition_schema) const {
+  vector<string> strings;
+  for (auto range = partition_key_ranges_.rbegin();
+       range != partition_key_ranges_.rend();
+       range++) {
+    strings.push_back(strings::Substitute(
+          "[($0), ($1))",
+          get<0>(*range).empty() ? "<start>" :
+              partition_schema.PartitionKeyDebugString(get<0>(*range), schema),
+          get<1>(*range).empty() ? "<end>" :
+              partition_schema.PartitionKeyDebugString(get<1>(*range), schema)));
+  }
+
+  return JoinStrings(strings, ", ");
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/96b7291f/src/kudu/common/partition_pruner.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner.h b/src/kudu/common/partition_pruner.h
new file mode 100644
index 0000000..dc57098
--- /dev/null
+++ b/src/kudu/common/partition_pruner.h
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+#include <tuple>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+
+namespace kudu {
+
+class Partition;
+class PartitionSchema;
+class ScanSpec;
+class Schema;
+
+// Provides partition key ranges to a scanner in order to prune tablets which
+// are not necessary for the scan. The scanner retrieves the partition key of
+// the next tablet to scan through the NextPartitionKey method, and notifies the
+// partition pruner that a tablet has been scanned by calling
+// RemovePartitionKeyRange with the tablet's upper bound partition key.
+//
+// Partition keys are in the same encoded format as used by the Partition class.
+class PartitionPruner {
+ public:
+
+  PartitionPruner() = default;
+
+  // Initializes the partition pruner for a new scan. The scan spec should
+  // already be optimized by the ScanSpec::Optimize method.
+  void Init(const Schema& schema,
+            const PartitionSchema& partition_schema,
+            const ScanSpec& scan_spec);
+
+  // Returns whether there are more partition key ranges to scan.
+  bool HasMorePartitionKeyRanges() const;
+
+  // Returns the inclusive lower bound partition key of the next tablet to scan.
+  const std::string& NextPartitionKey() const;
+
+  // Removes all partition key ranges through the provided exclusive upper bound.
+  void RemovePartitionKeyRange(const std::string& upper_bound);
+
+  // Returns true if the provided partition should be pruned.
+  //
+  // Used for testing.
+  bool ShouldPruneForTests(const Partition& partition) const;
+
+  std::string ToString(const Schema& schema, const PartitionSchema& partition_schema) const;
+
+ private:
+  // The reverse sorted set of partition key ranges. Each range has an inclusive
+  // lower and exclusive upper bound.
+  std::vector<std::tuple<std::string, std::string>> partition_key_ranges_;
+
+  DISALLOW_COPY_AND_ASSIGN(PartitionPruner);
+};
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/96b7291f/src/kudu/util/memory/arena.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/memory/arena.cc b/src/kudu/util/memory/arena.cc
index 8922f44..5f935df 100644
--- a/src/kudu/util/memory/arena.cc
+++ b/src/kudu/util/memory/arena.cc
@@ -40,6 +40,9 @@ TAG_FLAG(arena_warn_threshold_bytes, hidden);
 namespace kudu {
 
 template <bool THREADSAFE>
+const size_t ArenaBase<THREADSAFE>::kMinimumChunkSize = 16;
+
+template <bool THREADSAFE>
 ArenaBase<THREADSAFE>::ArenaBase(
   BufferAllocator* const buffer_allocator,
   size_t initial_buffer_size,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/96b7291f/src/kudu/util/memory/arena.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/memory/arena.h b/src/kudu/util/memory/arena.h
index 21af74d..2362f21 100644
--- a/src/kudu/util/memory/arena.h
+++ b/src/kudu/util/memory/arena.h
@@ -70,6 +70,9 @@ template <> struct ArenaTraits<false> {
 template <bool THREADSAFE>
 class ArenaBase {
  public:
+  // Arenas are required to have a minimum size of at least this amount.
+  static const size_t kMinimumChunkSize;
+
   // Creates a new arena, with a single buffer of size up-to
   // initial_buffer_size, upper size limit for later-allocated buffers capped
   // at max_buffer_size, and maximum capacity (i.e. total sizes of all buffers)


Mime
View raw message