hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zg...@apache.org
Subject [hbase] 57/133: HBASE-15705 Add on meta cache (Mikhail Antonov)
Date Tue, 12 Mar 2019 12:45:45 GMT
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit bb6dc7330fea1a7410a0d020cbdbbf1221d47052
Author: Enis Soztutar <enis@apache.org>
AuthorDate: Thu Jan 19 15:01:11 2017 -0800

    HBASE-15705 Add on meta cache (Mikhail Antonov)
---
 hbase-native-client/connection/connection-pool.cc |  20 +--
 hbase-native-client/connection/rpc-client.cc      |   5 +-
 hbase-native-client/connection/rpc-client.h       |   6 +-
 hbase-native-client/core/client.cc                |  17 ++-
 hbase-native-client/core/client.h                 |   9 +-
 hbase-native-client/core/location-cache-test.cc   |  82 ++++++++++++-
 hbase-native-client/core/location-cache.cc        | 142 +++++++++++++++++++++-
 hbase-native-client/core/location-cache.h         |  97 ++++++++++++++-
 hbase-native-client/core/region-location.h        |   4 +
 hbase-native-client/core/simple-client.cc         |   2 +-
 10 files changed, 349 insertions(+), 35 deletions(-)

diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc
index 07518c5..15dd64e 100644
--- a/hbase-native-client/connection/connection-pool.cc
+++ b/hbase-native-client/connection/connection-pool.cc
@@ -41,15 +41,7 @@ ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor>
io_
 ConnectionPool::ConnectionPool(std::shared_ptr<ConnectionFactory> cf)
     : cf_(cf), clients_(), connections_(), map_mutex_() {}
 
-ConnectionPool::~ConnectionPool() {
-  SharedMutexWritePriority::WriteHolder holder(map_mutex_);
-  for (auto &item : connections_) {
-    auto &con = item.second;
-    con->Close();
-  }
-  connections_.clear();
-  clients_.clear();
-}
+ConnectionPool::~ConnectionPool() { Close(); }
 
 std::shared_ptr<RpcConnection> ConnectionPool::GetConnection(
     std::shared_ptr<ConnectionId> remote_id) {
@@ -116,4 +108,12 @@ void ConnectionPool::Close(std::shared_ptr<ConnectionId> remote_id)
{
   connections_.erase(found);
 }
 
-void ConnectionPool::Close() {}
+void ConnectionPool::Close() {
+  SharedMutexWritePriority::WriteHolder holder{map_mutex_};
+  for (auto &item : connections_) {
+    auto &con = item.second;
+    con->Close();
+  }
+  connections_.clear();
+  clients_.clear();
+}
diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc
index 9cfefb8..7621193 100644
--- a/hbase-native-client/connection/rpc-client.cc
+++ b/hbase-native-client/connection/rpc-client.cc
@@ -39,9 +39,8 @@ class RpcChannelImplementation : public AbstractRpcChannel {
 };
 }  // namespace hbase
 
-RpcClient::RpcClient() {
-  io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(sysconf(_SC_NPROCESSORS_ONLN));
-
+RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor)
+    : io_executor_(io_executor) {
   cp_ = std::make_shared<ConnectionPool>(io_executor_);
 }
 
diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h
index 407d588..aeb9b56 100644
--- a/hbase-native-client/connection/rpc-client.h
+++ b/hbase-native-client/connection/rpc-client.h
@@ -26,6 +26,8 @@
 
 #include <google/protobuf/service.h>
 
+#include <utility>
+
 using hbase::security::User;
 using hbase::pb::ServerName;
 using hbase::Request;
@@ -49,7 +51,7 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> {
   friend class RpcChannelImplementation;
 
  public:
-  RpcClient();
+  RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor);
 
   virtual ~RpcClient() { Close(); }
 
@@ -77,6 +79,8 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> {
                                                        std::shared_ptr<User> ticket,
                                                        int rpc_timeout);
 
+  std::shared_ptr<ConnectionPool> connection_pool() const { return cp_; }
+
  private:
   void CallMethod(const MethodDescriptor *method, RpcController *controller, const Message
*req_msg,
                   Message *resp_msg, Closure *done, const std::string &host, uint16_t
port,
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index 6eb3d8f..dd568ce 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -33,15 +33,22 @@ Client::Client() {
                   "hbase-site.xml is absent in the search path or problems in XML parsing";
     throw std::runtime_error("Configuration object not present.");
   }
-  conf_ = std::make_shared<hbase::Configuration>(conf.value());
-  auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_);
-  location_cache_ = std::make_shared<hbase::LocationCache>(zk_quorum, cpu_executor_,
io_executor_);
+  init(conf.value());
 }
 
-Client::Client(const hbase::Configuration &conf) {
+Client::Client(const hbase::Configuration &conf) { init(conf); }
+
+void Client::init(const hbase::Configuration &conf) {
   conf_ = std::make_shared<hbase::Configuration>(conf);
   auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_);
-  location_cache_ = std::make_shared<hbase::LocationCache>(zk_quorum, cpu_executor_,
io_executor_);
+
+  cpu_executor_ =
+      std::make_shared<wangle::CPUThreadPoolExecutor>(4);  // TODO: read num threads
from conf
+  io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(sysconf(_SC_NPROCESSORS_ONLN));
+
+  rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_);
+  location_cache_ = std::make_shared<hbase::LocationCache>(zk_quorum, cpu_executor_,
+                                                           rpc_client_->connection_pool());
 }
 
 // We can't have the threads continue running after everything is done
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index da71624..730981d 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -67,14 +67,13 @@ class Client {
   void Close();
 
  private:
+  void init(const hbase::Configuration &conf);
   const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum";
   const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181";
-  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_ =
-      std::make_shared<wangle::CPUThreadPoolExecutor>(4);
-  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_ =
-      std::make_shared<wangle::IOThreadPoolExecutor>(sysconf(_SC_NPROCESSORS_ONLN));
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
   std::shared_ptr<hbase::LocationCache> location_cache_;
-  std::shared_ptr<hbase::RpcClient> rpc_client_ = std::make_shared<hbase::RpcClient>();
+  std::shared_ptr<hbase::RpcClient> rpc_client_;
   std::shared_ptr<hbase::Configuration> conf_;
   bool is_closed_ = false;
 };
diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc
index 53db6fc..42e7bb3 100644
--- a/hbase-native-client/core/location-cache-test.cc
+++ b/hbase-native-client/core/location-cache-test.cc
@@ -34,19 +34,24 @@ TEST(LocationCacheTest, TestGetMetaNodeContents) {
 
   auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
   auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4);
-  LocationCache cache{"localhost:2181", cpu, io};
+  auto cp = std::make_shared<ConnectionPool>(io);
+  LocationCache cache{"localhost:2181", cpu, cp};
   auto f = cache.LocateMeta();
   auto result = f.get();
   ASSERT_FALSE(f.hasException());
   ASSERT_TRUE(result.has_port());
   ASSERT_TRUE(result.has_host_name());
+  cpu->stop();
+  io->stop();
+  cp->Close();
 }
 
 TEST(LocationCacheTest, TestGetRegionLocation) {
   TestUtil test_util{};
   auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
   auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4);
-  LocationCache cache{"localhost:2181", cpu, io};
+  auto cp = std::make_shared<ConnectionPool>(io);
+  LocationCache cache{"localhost:2181", cpu, cp};
 
   // If there is no table this should throw an exception
   auto tn = folly::to<hbase::pb::TableName>("t");
@@ -57,4 +62,77 @@ TEST(LocationCacheTest, TestGetRegionLocation) {
   ASSERT_TRUE(loc != nullptr);
   cpu->stop();
   io->stop();
+  cp->Close();
+}
+
+TEST(LocationCacheTest, TestCaching) {
+  TestUtil test_util{};
+  auto cpu = std::make_shared<wangle::CPUThreadPoolExecutor>(4);
+  auto io = std::make_shared<wangle::IOThreadPoolExecutor>(4);
+  auto cp = std::make_shared<ConnectionPool>(io);
+  LocationCache cache{"localhost:2181", cpu, cp};
+
+  auto tn_1 = folly::to<hbase::pb::TableName>("t1");
+  auto tn_2 = folly::to<hbase::pb::TableName>("t2");
+  auto tn_3 = folly::to<hbase::pb::TableName>("t3");
+  auto row_a = "a";
+
+  // test location pulled from meta gets cached
+  ASSERT_ANY_THROW(cache.LocateRegion(tn_1, row_a).get(milliseconds(1000)));
+  ASSERT_ANY_THROW(cache.LocateFromMeta(tn_1, row_a).get(milliseconds(1000)));
+
+  test_util.RunShellCmd("create 't1', 'd'");
+
+  ASSERT_FALSE(cache.IsLocationCached(tn_1, row_a));
+  auto loc = cache.LocateRegion(tn_1, row_a).get(milliseconds(1000));
+  ASSERT_TRUE(cache.IsLocationCached(tn_1, row_a));
+  ASSERT_EQ(loc, cache.GetCachedLocation(tn_1, row_a));
+
+  // test with two regions
+  test_util.RunShellCmd("create 't2', 'd', SPLITS => ['b']");
+
+  ASSERT_FALSE(cache.IsLocationCached(tn_2, "a"));
+  loc = cache.LocateRegion(tn_2, "a").get(milliseconds(1000));
+  ASSERT_TRUE(cache.IsLocationCached(tn_2, "a"));
+  ASSERT_EQ(loc, cache.GetCachedLocation(tn_2, "a"));
+
+  ASSERT_FALSE(cache.IsLocationCached(tn_2, "b"));
+  loc = cache.LocateRegion(tn_2, "b").get(milliseconds(1000));
+  ASSERT_TRUE(cache.IsLocationCached(tn_2, "b"));
+  ASSERT_EQ(loc, cache.GetCachedLocation(tn_2, "b"));
+  ASSERT_TRUE(cache.IsLocationCached(tn_2, "ba"));
+  ASSERT_EQ(loc, cache.GetCachedLocation(tn_2, "ba"));
+
+  // test with three regions
+  test_util.RunShellCmd("create 't3', 'd', SPLITS => ['b', 'c']");
+
+  ASSERT_FALSE(cache.IsLocationCached(tn_3, "c"));
+  ASSERT_FALSE(cache.IsLocationCached(tn_3, "ca"));
+  loc = cache.LocateRegion(tn_3, "ca").get(milliseconds(1000));
+  ASSERT_TRUE(cache.IsLocationCached(tn_3, "c"));
+  ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "c"));
+  ASSERT_TRUE(cache.IsLocationCached(tn_3, "ca"));
+  ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "ca"));
+
+  ASSERT_FALSE(cache.IsLocationCached(tn_3, "b"));
+  loc = cache.LocateRegion(tn_3, "b").get(milliseconds(1000));
+  ASSERT_TRUE(cache.IsLocationCached(tn_3, "b"));
+  ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "b"));
+  ASSERT_TRUE(cache.IsLocationCached(tn_3, "ba"));
+  ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "ba"));
+
+  // clear second region
+  cache.ClearCachedLocation(tn_3, "b");
+  ASSERT_FALSE(cache.IsLocationCached(tn_3, "b"));
+
+  ASSERT_FALSE(cache.IsLocationCached(tn_3, "a"));
+  loc = cache.LocateRegion(tn_3, "a").get(milliseconds(1000));
+  ASSERT_TRUE(cache.IsLocationCached(tn_3, "a"));
+  ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "a"));
+  ASSERT_TRUE(cache.IsLocationCached(tn_3, "abc"));
+  ASSERT_EQ(loc, cache.GetCachedLocation(tn_3, "abc"));
+
+  cpu->stop();
+  io->stop();
+  cp->Close();
 }
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index 6c2a790..66f3eb7 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -55,14 +55,16 @@ static const char META_ZNODE_NAME[] = "/hbase/meta-region-server";
 
 LocationCache::LocationCache(std::string quorum_spec,
                              std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
-                             std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor)
+                             std::shared_ptr<ConnectionPool> cp)
     : quorum_spec_(quorum_spec),
       cpu_executor_(cpu_executor),
       meta_promise_(nullptr),
       meta_lock_(),
-      cp_(io_executor),
+      cp_(cp),
       meta_util_(),
-      zk_(nullptr) {
+      zk_(nullptr),
+      cached_locations_(),
+      locations_lock_() {
   zk_ = zookeeper_init(quorum_spec.c_str(), nullptr, 1000, 0, 0, 0);
 }
 
@@ -121,7 +123,7 @@ Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(const
Tabl
       .via(cpu_executor_.get())
       .then([this](ServerName sn) {
         auto remote_id = std::make_shared<ConnectionId>(sn.host_name(), sn.port());
-        return this->cp_.GetConnection(remote_id);
+        return this->cp_->GetConnection(remote_id);
       })
       .then([tn, row, this](std::shared_ptr<RpcConnection> rpc_connection) {
         return (*rpc_connection->get_service())(std::move(meta_util_.MetaRequest(tn, row)));
@@ -143,11 +145,27 @@ Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(const
Tabl
         auto remote_id =
             std::make_shared<ConnectionId>(rl->server_name().host_name(), rl->server_name().port());
         // Now fill out the connection.
-        rl->set_service(cp_.GetConnection(remote_id)->get_service());
+        // rl->set_service(cp_->GetConnection(remote_id)->get_service()); TODO:
this causes wangle
+        // assertion errors
+        return rl;
+      })
+      .then([tn, this](shared_ptr<RegionLocation> rl) {
+        // now add fetched location to the cache.
+        this->CacheLocation(tn, rl);
         return rl;
       });
 }
 
+Future<shared_ptr<RegionLocation>> LocationCache::LocateRegion(const hbase::pb::TableName
&tn,
+                                                               const std::string &row)
{
+  auto cached_loc = this->GetCachedLocation(tn, row);
+  if (cached_loc != nullptr) {
+    return cached_loc;
+  } else {
+    return this->LocateFromMeta(tn, row);
+  }
+}
+
 std::shared_ptr<RegionLocation> LocationCache::CreateLocation(const Response &resp)
{
   auto resp_msg = static_pointer_cast<ScanResponse>(resp.resp_msg());
   auto &results = resp_msg->results().Get(0);
@@ -163,3 +181,117 @@ std::shared_ptr<RegionLocation> LocationCache::CreateLocation(const
Response &re
   auto server_name = folly::to<ServerName>(cell_one);
   return std::make_shared<RegionLocation>(row, std::move(region_info), server_name,
nullptr);
 }
+
+// must hold shared lock on locations_lock_
+shared_ptr<RegionLocation> LocationCache::GetCachedLocation(const hbase::pb::TableName
&tn,
+                                                            const std::string &row) {
+  auto t_locs = this->GetTableLocations(tn);
+  std::shared_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
+
+  if (VLOG_IS_ON(2)) {
+    for (const auto &p : *t_locs) {
+      VLOG(2) << "t_locs[" << p.first << "] = " << p.second->DebugString();
+    }
+  }
+
+  // looking for the "floor" key as a start key
+  auto possible_region = t_locs->upper_bound(row);
+
+  if (t_locs->empty()) {
+    VLOG(2) << "Could not find region in cache, table map is empty";
+    return nullptr;
+  }
+
+  if (possible_region == t_locs->begin()) {
+    VLOG(2) << "Could not find region in cache, all keys are greater, row:" <<
row
+            << " ,possible_region:" << possible_region->second->DebugString();
+    return nullptr;
+  }
+  --possible_region;
+
+  VLOG(2) << "Found possible region in cache for row:" << row
+          << " ,possible_region:" << possible_region->second->DebugString();
+
+  // found possible start key, now need to check end key
+  if (possible_region->second->region_info().end_key() == "" ||
+      possible_region->second->region_info().end_key() > row) {
+    VLOG(1) << "Found region in cache for row:" << row
+            << " ,region:" << possible_region->second->DebugString();
+    return possible_region->second;
+  } else {
+    return nullptr;
+  }
+}
+
+// must hold unique lock on locations_lock_
+void LocationCache::CacheLocation(const hbase::pb::TableName &tn,
+                                  const shared_ptr<RegionLocation> loc) {
+  auto t_locs = this->GetTableLocations(tn);
+  std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
+
+  (*t_locs)[loc->region_info().start_key()] = loc;
+  VLOG(1) << "Cached location for region:" << loc->DebugString();
+}
+
+// must hold shared lock on locations_lock_
+bool LocationCache::IsLocationCached(const hbase::pb::TableName &tn, const std::string
&row) {
+  return (this->GetCachedLocation(tn, row) != nullptr);
+}
+
+// shared lock needed for cases when this table has been requested before;
+// in the rare case it hasn't, unique lock will be grabbed to add it to cache
+shared_ptr<hbase::PerTableLocationMap> LocationCache::GetTableLocations(
+    const hbase::pb::TableName &tn) {
+  auto found_locs = this->GetCachedTableLocations(tn);
+  if (found_locs == nullptr) {
+    found_locs = this->GetNewTableLocations(tn);
+  }
+  return found_locs;
+}
+
+shared_ptr<hbase::PerTableLocationMap> LocationCache::GetCachedTableLocations(
+    const hbase::pb::TableName &tn) {
+  SharedMutexWritePriority::ReadHolder r_holder{locations_lock_};
+
+  auto table_locs = cached_locations_.find(tn);
+  if (table_locs != cached_locations_.end()) {
+    return table_locs->second;
+  } else {
+    return nullptr;
+  }
+}
+
+shared_ptr<hbase::PerTableLocationMap> LocationCache::GetNewTableLocations(
+    const hbase::pb::TableName &tn) {
+  // double-check locking under upgradable lock
+  SharedMutexWritePriority::UpgradeHolder u_holder{locations_lock_};
+
+  auto table_locs = cached_locations_.find(tn);
+  if (table_locs != cached_locations_.end()) {
+    return table_locs->second;
+  }
+  SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)};
+
+  auto t_locs_p = make_shared<map<std::string, shared_ptr<RegionLocation>>>();
+  cached_locations_.insert(std::make_pair(tn, t_locs_p));
+  return t_locs_p;
+}
+
+// must hold unique lock on locations_lock_
+void LocationCache::ClearCache() {
+  unique_lock<SharedMutexWritePriority> lock(locations_lock_);
+  cached_locations_.clear();
+}
+
+// must hold unique lock on locations_lock_
+void LocationCache::ClearCachedLocations(const hbase::pb::TableName &tn) {
+  unique_lock<SharedMutexWritePriority> lock(locations_lock_);
+  cached_locations_.erase(tn);
+}
+
+// must hold unique lock on locations_lock_
+void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const std::string
&row) {
+  auto table_locs = this->GetTableLocations(tn);
+  unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
+  table_locs->erase(row);
+}
diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h
index b290a1f..22a8ad5 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -19,6 +19,7 @@
 #pragma once
 
 #include <folly/Executor.h>
+#include <folly/SharedMutex.h>
 #include <folly/futures/Future.h>
 #include <folly/futures/SharedPromise.h>
 #include <wangle/concurrent/CPUThreadPoolExecutor.h>
@@ -27,6 +28,7 @@
 
 #include <memory>
 #include <mutex>
+#include <shared_mutex>
 #include <string>
 
 #include "connection/connection-pool.h"
@@ -40,8 +42,34 @@ class Request;
 class Response;
 namespace pb {
 class ServerName;
+class TableName;
 }
 
+/** Equals function for TableName (uses namespace and table name) */
+struct TableNameEquals {
+  /** equals */
+  bool operator()(const hbase::pb::TableName &lht, const hbase::pb::TableName &rht)
const {
+    return lht.namespace_() == rht.namespace_() && lht.qualifier() == rht.qualifier();
+  }
+};
+
+/** Hash for TableName. */
+struct TableNameHash {
+  /** hash */
+  std::size_t operator()(hbase::pb::TableName const &t) const {
+    std::size_t h = 0;
+    boost::hash_combine(h, t.namespace_());
+    boost::hash_combine(h, t.qualifier());
+    return h;
+  }
+};
+
+// typedefs for location cache
+typedef std::map<std::string, std::shared_ptr<RegionLocation>> PerTableLocationMap;
+typedef std::unordered_map<hbase::pb::TableName, std::shared_ptr<PerTableLocationMap>,
+                           TableNameHash, TableNameEquals>
+    RegionLocationMap;
+
 /**
  * Class that can look up and cache locations.
  */
@@ -56,7 +84,7 @@ class LocationCache {
    */
   LocationCache(std::string quorum_spec,
                 std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
-                std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor);
+                std::shared_ptr<ConnectionPool> cp);
   /**
    * Destructor.
    * This will clean up the zookeeper connections.
@@ -71,7 +99,8 @@ class LocationCache {
   folly::Future<hbase::pb::ServerName> LocateMeta();
 
   /**
-   * Go read meta and find out where a region is located.
+   * Go read meta and find out where a region is located. Most users should
+   * never call this method directly and should use LocateRegion() instead.
    *
    * @param tn Table name of the table to look up. This object must live until
    * after the future is returned
@@ -83,14 +112,72 @@ class LocationCache {
                                                                 const std::string &row);
 
   /**
+   * The only method clients should use for meta lookups. If corresponding
+   * location is cached, it's returned from the cache, otherwise lookup
+   * in meta table is done, location is cached and then returned.
+   * It's expected that tiny fraction of invocations incurs meta scan.
+   * This method is to look up non-meta regions; use LocateMeta() to get the
+   * location of hbase:meta region.
+   *
+   * @param tn Table name of the table to look up. This object must live until
+   * after the future is returned
+   *
+   * @param row of the table to look up. This object must live until after the
+   * future is returned
+   */
+  folly::Future<std::shared_ptr<RegionLocation>> LocateRegion(const hbase::pb::TableName
&tn,
+                                                              const std::string &row);
+
+  /**
    * Remove the cached location of meta.
    */
   void InvalidateMeta();
 
+  /**
+   * Return cached region location corresponding to this row,
+   * nullptr if this location isn't cached.
+   */
+  std::shared_ptr<RegionLocation> GetCachedLocation(const hbase::pb::TableName &tn,
+                                                    const std::string &row);
+
+  /**
+   * Add non-meta region location in the cache (location of meta itself
+   * is cached separately).
+   */
+  void CacheLocation(const hbase::pb::TableName &tn, const std::shared_ptr<RegionLocation>
loc);
+
+  /**
+   * Check if location corresponding to this row key is cached.
+   */
+  bool IsLocationCached(const hbase::pb::TableName &tn, const std::string &row);
+
+  /**
+   * Return cached location for all region of this table.
+   */
+  std::shared_ptr<PerTableLocationMap> GetTableLocations(const hbase::pb::TableName
&tn);
+
+  /**
+   * Completely clear location cache.
+   */
+  void ClearCache();
+
+  /**
+   * Clear all cached locations for one table.
+   */
+  void ClearCachedLocations(const hbase::pb::TableName &tn);
+
+  /**
+   * Clear cached region location.
+   */
+  void ClearCachedLocation(const hbase::pb::TableName &tn, const std::string &row);
+
  private:
   void RefreshMetaLocation();
   hbase::pb::ServerName ReadMetaLocation();
   std::shared_ptr<RegionLocation> CreateLocation(const Response &resp);
+  std::shared_ptr<hbase::PerTableLocationMap> GetCachedTableLocations(
+      const hbase::pb::TableName &tn);
+  std::shared_ptr<hbase::PerTableLocationMap> GetNewTableLocations(const hbase::pb::TableName
&tn);
 
   /* data */
   std::string quorum_spec_;
@@ -98,7 +185,11 @@ class LocationCache {
   std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> meta_promise_;
   std::mutex meta_lock_;
   MetaUtil meta_util_;
-  ConnectionPool cp_;
+  std::shared_ptr<ConnectionPool> cp_;
+
+  // cached region locations
+  RegionLocationMap cached_locations_;
+  folly::SharedMutexWritePriority locations_lock_;
 
   // TODO: migrate this to a smart pointer with a deleter.
   zhandle_t *zk_;
diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h
index e7b76d3..b0411cb 100644
--- a/hbase-native-client/core/region-location.h
+++ b/hbase-native-client/core/region-location.h
@@ -79,6 +79,10 @@ class RegionLocation {
    */
   void set_server_name(hbase::pb::ServerName sn) { sn_ = sn; }
 
+  const std::string DebugString() {
+    return "region_info:" + ri_.ShortDebugString() + ", server_name:" + sn_.ShortDebugString();
+  }
+
  private:
   std::string region_name_;
   hbase::pb::RegionInfo ri_;
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index 90e7cd4..dac0d10 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -94,7 +94,7 @@ int main(int argc, char *argv[]) {
   auto row = FLAGS_row;
   auto tn = folly::to<TableName>(FLAGS_table);
 
-  auto loc = cache.LocateFromMeta(tn, row).get(milliseconds(5000));
+  auto loc = cache.LocateRegion(tn, row).get(milliseconds(5000));
   auto connection = loc->service();
 
   auto num_puts = FLAGS_columns;


Mime
View raw message