hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject hbase git commit: HBASE-18178 [C++] Retrying meta location lookup and zookeeper connection
Date Fri, 16 Jun 2017 19:03:25 GMT
Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 f27075a6f -> 73b65b475


HBASE-18178 [C++] Retrying meta location lookup and zookeeper connection


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/73b65b47
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/73b65b47
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/73b65b47

Branch: refs/heads/HBASE-14850
Commit: 73b65b475c2277115cff4ded67e44987a7b2b157
Parents: f27075a
Author: Enis Soztutar <enis@apache.org>
Authored: Fri Jun 16 11:55:51 2017 -0700
Committer: Enis Soztutar <enis@apache.org>
Committed: Fri Jun 16 11:55:51 2017 -0700

----------------------------------------------------------------------
 .../connection/client-handler.cc                |   9 +-
 hbase-native-client/core/BUCK                   |  12 ++
 .../core/async-rpc-retrying-caller.cc           |  17 +--
 .../core/async-rpc-retrying-test.cc             |   3 +-
 .../core/connection-configuration.h             |   2 +-
 .../core/location-cache-retry-test.cc           | 112 +++++++++++++++++++
 hbase-native-client/core/location-cache.cc      | 100 ++++++++++++-----
 hbase-native-client/core/location-cache.h       |   8 +-
 hbase-native-client/core/meta-utils.cc          |  42 +++++--
 hbase-native-client/core/meta-utils.h           |  24 +++-
 hbase-native-client/core/region-location.h      |  19 +---
 hbase-native-client/core/response-converter.cc  |   2 +-
 hbase-native-client/core/simple-client.cc       |  18 ++-
 hbase-native-client/core/zk-util.cc             |   4 +
 hbase-native-client/core/zk-util.h              |   5 +
 hbase-native-client/serde/region-info.h         |   4 +-
 hbase-native-client/serde/table-name.h          |   5 +-
 hbase-native-client/test-util/mini-cluster.cc   |   4 +
 hbase-native-client/test-util/test-util.cc      |   4 +
 hbase-native-client/test-util/test-util.h       |   1 +
 hbase-native-client/utils/bytes-util-test.cc    |   3 +-
 hbase-native-client/utils/bytes-util.cc         |   4 +-
 22 files changed, 315 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/connection/client-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 894ecb3..775df68 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -51,7 +51,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf>
buf) {
 
     int used_bytes = serde_.ParseDelimited(buf.get(), &header);
     VLOG(3) << "Read RPC ResponseHeader size=" << used_bytes << " call_id="
<< header.call_id()
-            << " has_exception=" << header.has_exception();
+            << " has_exception=" << header.has_exception() << ", server:
" << server_;
 
     // Get the response protobuf from the map
     auto search = resp_msgs_->find(header.call_id());
@@ -80,7 +80,8 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf>
buf) {
       }
 
       VLOG(3) << "Read RPCResponse, buf length:" << buf->length()
-              << ", header PB length:" << used_bytes << ", cell_block length:"
<< cell_block_length;
+              << ", header PB length:" << used_bytes << ", cell_block length:"
<< cell_block_length
+              << ", server: " << server_;
 
       // Make sure that bytes were parsed.
       CHECK((used_bytes + cell_block_length) == buf->length());
@@ -113,7 +114,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf>
buf) {
 
       VLOG(3) << "Exception RPC ResponseHeader, call_id=" << header.call_id()
               << " exception.what=" << remote_exception->what()
-              << ", do_not_retry=" << remote_exception->do_not_retry();
+              << ", do_not_retry=" << remote_exception->do_not_retry() <<
", server: " << server_;
       received->set_exception(folly::exception_wrapper{*remote_exception});
     }
     ctx->fireRead(std::move(received));
@@ -129,7 +130,7 @@ folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Re
     ctx->fireWrite(std::move(header));
   });
 
-  VLOG(3) << "Writing RPC Request:" << r->DebugString();
+  VLOG(3) << "Writing RPC Request:" << r->DebugString() << ", server:
" << server_;
 
   // Now store the call id to response.
   resp_msgs_->insert(r->call_id(), r->resp_msg());

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index 47e97f5..464c010 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -137,6 +137,18 @@ cxx_test(
     ],
     run_test_separately=True,)
 cxx_test(
+    name="location-cache-retry-test",
+    srcs=[
+        "location-cache-retry-test.cc",
+    ],
+    deps=[
+        ":core",
+        "//if:if",
+        "//serde:serde",
+        "//test-util:test-util",
+    ],
+    run_test_separately=True,)
+cxx_test(
     name="cell-test",
     srcs=[
         "cell-test.cc",

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/async-rpc-retrying-caller.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
index 0302ad3..aee7d0b 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-rpc-retrying-caller.cc
@@ -168,14 +168,17 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation&
loc)
 
   ResetController(controller_, call_timeout_ns);
 
-  callable_(controller_, std::make_shared<RegionLocation>(loc), rpc_client)
-      .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp));
})
-      .onError([&, this](const exception_wrapper& e) {
+  // TODO: RegionLocation should propagate through these method chains as a shared_ptr.
+  // Otherwise, it may get deleted underneat us. We are just copying for now.
+  auto loc_ptr = std::make_shared<RegionLocation>(loc);
+  callable_(controller_, loc_ptr, rpc_client)
+      .then([loc_ptr, this](const RESP& resp) { this->promise_->setValue(std::move(resp));
})
+      .onError([&, loc_ptr, this](const exception_wrapper& e) {
         OnError(e,
                 [&, this]() -> std::string {
-                  return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(),
-                                                     loc.server_name().port()) +
-                         " for '" + row_ + "' in " + loc.DebugString() + " of " +
+                  return "Call to " + folly::sformat("{0}:{1}", loc_ptr->server_name().host_name(),
+                                                     loc_ptr->server_name().port()) +
+                         " for '" + row_ + "' in " + loc_ptr->DebugString() + " of " +
                          table_name_->namespace_() + "::" + table_name_->qualifier()
+
                          " failed, tries = " + std::to_string(tries_) + ", maxAttempts =
" +
                          std::to_string(max_attempts_) + ", timeout = " +
@@ -184,7 +187,7 @@ void AsyncSingleRequestRpcRetryingCaller<RESP>::Call(const RegionLocation&
loc)
                          " ms";
                 },
                 [&, this](const exception_wrapper& error) {
-                  conn_->region_locator()->UpdateCachedLocation(loc, error);
+                  conn_->region_locator()->UpdateCachedLocation(*loc_ptr, error);
                 });
       });
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/async-rpc-retrying-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc
index 0f83914..f887815 100644
--- a/hbase-native-client/core/async-rpc-retrying-test.cc
+++ b/hbase-native-client/core/async-rpc-retrying-test.cc
@@ -147,8 +147,7 @@ class MockWrongRegionAsyncRegionLocator : public AsyncRegionLocatorBase
{
     folly::Promise<std::shared_ptr<RegionLocation>> promise;
     /* set random region name, simulating invalid region */
     auto result = std::make_shared<RegionLocation>(
-        "whatever-region-name", region_location_->region_info(), region_location_->server_name(),
-        region_location_->service());
+        "whatever-region-name", region_location_->region_info(), region_location_->server_name());
     promise.setValue(result);
     return promise.getFuture();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/connection-configuration.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/connection-configuration.h b/hbase-native-client/core/connection-configuration.h
index adc8d5b..995798e 100644
--- a/hbase-native-client/core/connection-configuration.h
+++ b/hbase-native-client/core/connection-configuration.h
@@ -143,7 +143,7 @@ class ConnectionConfiguration {
    */
   static constexpr const char* kClientRetriesNumber = "hbase.client.retries.number";
 
-  static constexpr const uint32_t kDefaultClientRetriesNumber = 31;
+  static constexpr const uint32_t kDefaultClientRetriesNumber = 35;
 
   /**
     * Configure the number of failures after which the client will start logging. A few failures

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/location-cache-retry-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache-retry-test.cc b/hbase-native-client/core/location-cache-retry-test.cc
new file mode 100644
index 0000000..988f994
--- /dev/null
+++ b/hbase-native-client/core/location-cache-retry-test.cc
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <gtest/gtest.h>
+
+#include "core/append.h"
+#include "core/cell.h"
+#include "core/client.h"
+#include "core/configuration.h"
+#include "core/delete.h"
+#include "core/get.h"
+#include "core/hbase-configuration-loader.h"
+#include "core/increment.h"
+#include "core/meta-utils.h"
+#include "core/put.h"
+#include "core/result.h"
+#include "core/table.h"
+#include "exceptions/exception.h"
+#include "serde/table-name.h"
+#include "test-util/test-util.h"
+#include "utils/bytes-util.h"
+
+using hbase::Cell;
+using hbase::Configuration;
+using hbase::Get;
+using hbase::MetaUtil;
+using hbase::RetriesExhaustedException;
+using hbase::Put;
+using hbase::Table;
+using hbase::TestUtil;
+
+using std::chrono_literals::operator""s;
+
+class LocationCacheRetryTest : public ::testing::Test {
+ public:
+  static std::unique_ptr<hbase::TestUtil> test_util;
+  static void SetUpTestCase() {
+    google::InstallFailureSignalHandler();
+    test_util = std::make_unique<hbase::TestUtil>();
+    test_util->StartMiniCluster(2);
+    test_util->conf()->SetInt("hbase.client.retries.number", 5);
+  }
+};
+
+std::unique_ptr<hbase::TestUtil> LocationCacheRetryTest::test_util = nullptr;
+
+TEST_F(LocationCacheRetryTest, GetFromMetaTable) {
+  auto tn = folly::to<hbase::pb::TableName>("hbase:meta");
+  auto row = "test1";
+
+  hbase::Client client(*LocationCacheRetryTest::test_util->conf());
+
+  // do a get against the other table, but not the actual table "t".
+  auto table = client.Table(tn);
+  hbase::Get get(row);
+  auto result = table->Get(get);
+
+  LocationCacheRetryTest::test_util->MoveRegion(MetaUtil::kMetaRegion, "");
+
+  std::this_thread::sleep_for(3s);  // sleep 3 sec
+
+  result = table->Get(get);
+}
+
+TEST_F(LocationCacheRetryTest, PutGet) {
+  LocationCacheRetryTest::test_util->CreateTable("t", "d");
+  LocationCacheRetryTest::test_util->CreateTable("t2", "d");
+
+  auto tn = folly::to<hbase::pb::TableName>("t");
+  auto tn2 = folly::to<hbase::pb::TableName>("t2");
+  auto row = "test1";
+
+  hbase::Client client(*LocationCacheRetryTest::test_util->conf());
+
+  // do a get against the other table, but not the actual table "t".
+  auto table = client.Table(tn);
+  auto table2 = client.Table(tn2);
+  hbase::Get get(row);
+  auto result = table2->Get(get);
+
+  // we should have already cached the location of meta right now. Now
+  // move the meta region to the other server so that we will get a NotServingRegionException
+  // when we do the actual location lookup request. If there is no invalidation
+  // of the meta's own location, then following put/get will result in retries exhausted.
+  LocationCacheRetryTest::test_util->MoveRegion(MetaUtil::kMetaRegion, "");
+
+  std::this_thread::sleep_for(3s);  // sleep 3 sec
+
+  table->Put(Put{row}.AddColumn("d", "1", "value1"));
+
+  result = table->Get(get);
+
+  ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
+  EXPECT_EQ("test1", result->Row());
+  EXPECT_EQ("value1", *(result->Value("d", "1")));
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/location-cache.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc
index dfe3e9f..5f68420 100644
--- a/hbase-native-client/core/location-cache.cc
+++ b/hbase-native-client/core/location-cache.cc
@@ -24,8 +24,12 @@
 #include <wangle/concurrent/CPUThreadPoolExecutor.h>
 #include <wangle/concurrent/IOThreadPoolExecutor.h>
 
+#include <map>
+#include <utility>
+
 #include "connection/response.h"
 #include "connection/rpc-connection.h"
+#include "core/meta-utils.h"
 #include "exceptions/exception.h"
 #include "if/Client.pb.h"
 #include "if/ZooKeeper.pb.h"
@@ -33,8 +37,6 @@
 #include "serde/server-name.h"
 #include "serde/zk.h"
 
-#include <utility>
-
 using hbase::pb::MetaRegionServer;
 using hbase::pb::ServerName;
 using hbase::pb::TableName;
@@ -54,27 +56,49 @@ LocationCache::LocationCache(std::shared_ptr<hbase::Configuration>
conf,
       cached_locations_(),
       locations_lock_() {
   zk_quorum_ = ZKUtil::ParseZooKeeperQuorum(*conf_);
-  zk_ = zookeeper_init(zk_quorum_.c_str(), nullptr, 1000, 0, 0, 0);
+  EnsureZooKeeperConnection();
+}
+
+LocationCache::~LocationCache() { CloseZooKeeperConnection(); }
+
+void LocationCache::CloseZooKeeperConnection() {
+  if (zk_ != nullptr) {
+    zookeeper_close(zk_);
+    zk_ = nullptr;
+    LOG(INFO) << "Closed connection to ZooKeeper.";
+  }
 }
 
-LocationCache::~LocationCache() {
-  zookeeper_close(zk_);
-  zk_ = nullptr;
-  LOG(INFO) << "Closed connection to ZooKeeper.";
+void LocationCache::EnsureZooKeeperConnection() {
+  if (zk_ == nullptr) {
+    LOG(INFO) << "Connecting to ZooKeeper. Quorum:" + zk_quorum_;
+    auto session_timeout = ZKUtil::SessionTimeout(*conf_);
+    zk_ = zookeeper_init(zk_quorum_.c_str(), nullptr, session_timeout, nullptr, nullptr,
0);
+  }
 }
 
 folly::Future<ServerName> LocationCache::LocateMeta() {
-  std::lock_guard<std::mutex> g(meta_lock_);
+  std::lock_guard<std::recursive_mutex> g(meta_lock_);
   if (meta_promise_ == nullptr) {
     this->RefreshMetaLocation();
   }
-  return meta_promise_->getFuture();
+  return meta_promise_->getFuture().onError([&](const folly::exception_wrapper &ew)
{
+    auto promise = InvalidateMeta();
+    promise->setException(ew);
+    return ServerName{};
+  });
 }
 
-void LocationCache::InvalidateMeta() {
+std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> LocationCache::InvalidateMeta()
{
+  VLOG(2) << "Invalidating meta location";
+  std::lock_guard<std::recursive_mutex> g(meta_lock_);
   if (meta_promise_ != nullptr) {
-    std::lock_guard<std::mutex> g(meta_lock_);
-    meta_promise_ = nullptr;
+    // return the unique_ptr back to the caller.
+    std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> ret = nullptr;
+    std::swap(ret, meta_promise_);
+    return ret;
+  } else {
+    return nullptr;
   }
 }
 
@@ -84,18 +108,21 @@ void LocationCache::RefreshMetaLocation() {
   cpu_executor_->add([&] { meta_promise_->setWith([&] { return this->ReadMetaLocation();
}); });
 }
 
+// Note: this is a blocking call to zookeeper
 ServerName LocationCache::ReadMetaLocation() {
   auto buf = folly::IOBuf::create(4096);
   ZkDeserializer derser;
+  EnsureZooKeeperConnection();
 
   // This needs to be int rather than size_t as that's what ZK expects.
   int len = buf->capacity();
   std::string zk_node = ZKUtil::MetaZNode(*conf_);
-  // TODO(elliott): handle disconnects/reconntion as needed.
   int zk_result = zoo_get(this->zk_, zk_node.c_str(), 0,
                           reinterpret_cast<char *>(buf->writableData()), &len,
nullptr);
   if (zk_result != ZOK || len < 9) {
     LOG(ERROR) << "Error getting meta location.";
+    // We just close the zk connection, and let the upper levels retry.
+    CloseZooKeeperConnection();
     throw std::runtime_error("Error getting meta location. Quorum: " + zk_quorum_);
   }
   buf->append(len);
@@ -103,6 +130,8 @@ ServerName LocationCache::ReadMetaLocation() {
   MetaRegionServer mrs;
   if (derser.Parse(buf.get(), &mrs) == false) {
     LOG(ERROR) << "Unable to decode";
+    throw std::runtime_error("Error getting meta location (Unable to decode). Quorum: " +
+                             zk_quorum_);
   }
   return mrs.server();
 }
@@ -118,10 +147,15 @@ folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(
       .then([tn, row, this](std::shared_ptr<RpcConnection> rpc_connection) {
         return (*rpc_connection->get_service())(std::move(meta_util_.MetaRequest(tn, row)));
       })
-      .then([this](std::unique_ptr<Response> resp) {
+      .onError([&](const folly::exception_wrapper &ew) {
+        auto promise = InvalidateMeta();
+        throw ew;
+        return static_cast<std::unique_ptr<Response>>(nullptr);
+      })
+      .then([tn, this](std::unique_ptr<Response> resp) {
         // take the protobuf response and make it into
         // a region location.
-        return meta_util_.CreateLocation(std::move(*resp));
+        return meta_util_.CreateLocation(std::move(*resp), tn);
       })
       .then([tn, this](std::shared_ptr<RegionLocation> rl) {
         // Make sure that the correct location was found.
@@ -134,9 +168,6 @@ folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(
       .then([this](std::shared_ptr<RegionLocation> rl) {
         auto remote_id =
             std::make_shared<ConnectionId>(rl->server_name().host_name(), rl->server_name().port());
-        // Now fill out the connection.
-        // rl->set_service(cp_->GetConnection(remote_id)->get_service()); TODO:
this causes wangle
-        // assertion errors
         return rl;
       })
       .then([tn, this](std::shared_ptr<RegionLocation> rl) {
@@ -146,9 +177,20 @@ folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(
       });
 }
 
+constexpr const char *MetaUtil::kMetaRegionName;
+
 folly::Future<std::shared_ptr<RegionLocation>> LocationCache::LocateRegion(
     const TableName &tn, const std::string &row, const RegionLocateType locate_type,
     const int64_t locate_ns) {
+  // We maybe asked to locate meta itself
+  if (MetaUtil::IsMeta(tn)) {
+    return LocateMeta().then([this](const ServerName &server_name) {
+      auto rl = std::make_shared<RegionLocation>(MetaUtil::kMetaRegionName,
+                                                 meta_util_.meta_region_info(), server_name);
+      return rl;
+    });
+  }
+
   // TODO: implement region locate type and timeout
   auto cached_loc = this->GetCachedLocation(tn, row);
   if (cached_loc != nullptr) {
@@ -164,34 +206,28 @@ std::shared_ptr<RegionLocation> LocationCache::GetCachedLocation(const
hbase::pb
   auto t_locs = this->GetTableLocations(tn);
   std::shared_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
 
-  if (VLOG_IS_ON(2)) {
-    for (const auto &p : *t_locs) {
-      VLOG(2) << "t_locs[" << p.first << "] = " << p.second->DebugString();
-    }
-  }
-
   // looking for the "floor" key as a start key
   auto possible_region = t_locs->upper_bound(row);
 
   if (t_locs->empty()) {
-    VLOG(2) << "Could not find region in cache, table map is empty";
+    VLOG(5) << "Could not find region in cache, table map is empty";
     return nullptr;
   }
 
   if (possible_region == t_locs->begin()) {
-    VLOG(2) << "Could not find region in cache, all keys are greater, row:" <<
row
+    VLOG(5) << "Could not find region in cache, all keys are greater, row:" <<
row
             << " ,possible_region:" << possible_region->second->DebugString();
     return nullptr;
   }
   --possible_region;
 
-  VLOG(2) << "Found possible region in cache for row:" << row
+  VLOG(5) << "Found possible region in cache for row:" << row
           << " ,possible_region:" << possible_region->second->DebugString();
 
   // found possible start key, now need to check end key
   if (possible_region->second->region_info().end_key() == "" ||
       possible_region->second->region_info().end_key() > row) {
-    VLOG(1) << "Found region in cache for row:" << row
+    VLOG(2) << "Found region in cache for row:" << row
             << " ,region:" << possible_region->second->DebugString();
     return possible_region->second;
   } else {
@@ -261,15 +297,23 @@ void LocationCache::ClearCache() {
 
 // must hold unique lock on locations_lock_
 void LocationCache::ClearCachedLocations(const hbase::pb::TableName &tn) {
+  VLOG(1) << "ClearCachedLocations, table:" << folly::to<std::string>(tn);
   std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
   cached_locations_.erase(tn);
+  if (MetaUtil::IsMeta(tn)) {
+    InvalidateMeta();
+  }
 }
 
 // must hold unique lock on locations_lock_
 void LocationCache::ClearCachedLocation(const hbase::pb::TableName &tn, const std::string
&row) {
+  VLOG(1) << "ClearCachedLocation, table:" << folly::to<std::string>(tn)
<< ", row:" << row;
   auto table_locs = this->GetTableLocations(tn);
   std::unique_lock<folly::SharedMutexWritePriority> lock(locations_lock_);
   table_locs->erase(row);
+  if (MetaUtil::IsMeta(tn)) {
+    InvalidateMeta();
+  }
 }
 
 void LocationCache::UpdateCachedLocation(const RegionLocation &loc,

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/location-cache.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h
index a3c15cb..a374fb6 100644
--- a/hbase-native-client/core/location-cache.h
+++ b/hbase-native-client/core/location-cache.h
@@ -137,7 +137,7 @@ class LocationCache : public AsyncRegionLocator {
   /**
    * Remove the cached location of meta.
    */
-  void InvalidateMeta();
+  std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> InvalidateMeta();
 
   /**
    * Return cached region location corresponding to this row,
@@ -186,6 +186,10 @@ class LocationCache : public AsyncRegionLocator {
   const std::string &zk_quorum() { return zk_quorum_; }
 
  private:
+  void CloseZooKeeperConnection();
+  void EnsureZooKeeperConnection();
+
+ private:
   void RefreshMetaLocation();
   hbase::pb::ServerName ReadMetaLocation();
   std::shared_ptr<RegionLocation> CreateLocation(const Response &resp);
@@ -198,7 +202,7 @@ class LocationCache : public AsyncRegionLocator {
   std::string zk_quorum_;
   std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
   std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> meta_promise_;
-  std::mutex meta_lock_;
+  std::recursive_mutex meta_lock_;
   MetaUtil meta_util_;
   std::shared_ptr<ConnectionPool> cp_;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/meta-utils.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc
index 8efecc8..31349a5 100644
--- a/hbase-native-client/core/meta-utils.cc
+++ b/hbase-native-client/core/meta-utils.cc
@@ -21,10 +21,13 @@
 
 #include <folly/Conv.h>
 #include <memory>
+#include <utility>
+#include <vector>
 
 #include "connection/request.h"
 #include "connection/response.h"
 #include "core/response-converter.h"
+#include "exceptions/exception.h"
 #include "if/Client.pb.h"
 #include "serde/region-info.h"
 #include "serde/server-name.h"
@@ -38,10 +41,17 @@ using hbase::pb::ServerName;
 
 namespace hbase {
 
-static const std::string META_REGION = "1588230740";
-static const std::string CATALOG_FAMILY = "info";
-static const std::string REGION_INFO_COLUMN = "regioninfo";
-static const std::string SERVER_COLUMN = "server";
+MetaUtil::MetaUtil() {
+  meta_region_info_.set_start_key("");
+  meta_region_info_.set_end_key("");
+  meta_region_info_.set_offline(false);
+  meta_region_info_.set_split(false);
+  meta_region_info_.set_replica_id(0);
+  meta_region_info_.set_split(false);
+  meta_region_info_.set_region_id(1);
+  meta_region_info_.mutable_table_name()->set_namespace_(MetaUtil::kSystemNamespace);
+  meta_region_info_.mutable_table_name()->set_qualifier(MetaUtil::kMetaTableQualifier);
+}
 
 std::string MetaUtil::RegionLookupRowkey(const TableName &tn, const std::string &row)
const {
   return folly::to<std::string>(tn, ",", row, ",", "999999999999999999");
@@ -56,7 +66,7 @@ std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn,
const std::st
 
   // Set the region this scan goes to
   auto region = msg->mutable_region();
-  region->set_value(META_REGION);
+  region->set_value(MetaUtil::kMetaRegion);
   region->set_type(
       RegionSpecifier_RegionSpecifierType::RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME);
 
@@ -78,30 +88,38 @@ std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn,
const std::st
 
   // Set the columns that we need
   auto info_col = scan->add_column();
-  info_col->set_family("info");
-  info_col->add_qualifier("server");
-  info_col->add_qualifier("regioninfo");
+  info_col->set_family(MetaUtil::kCatalogFamily);
+  info_col->add_qualifier(MetaUtil::kServerColumn);
+  info_col->add_qualifier(MetaUtil::kRegionInfoColumn);
 
   scan->set_start_row(RegionLookupRowkey(tn, row));
   return request;
 }
 
-std::shared_ptr<RegionLocation> MetaUtil::CreateLocation(const Response &resp)
{
+std::shared_ptr<RegionLocation> MetaUtil::CreateLocation(const Response &resp,
+                                                         const TableName &tn) {
   std::vector<std::shared_ptr<Result>> results = ResponseConverter::FromScanResponse(resp);
+  if (results.size() == 0) {
+    throw TableNotFoundException(folly::to<std::string>(tn));
+  }
   if (results.size() != 1) {
     throw std::runtime_error("Was expecting exactly 1 result in meta scan response, got:"
+
                              std::to_string(results.size()));
   }
   auto result = *results[0];
 
-  auto region_info_str = result.Value(CATALOG_FAMILY, REGION_INFO_COLUMN);
-  auto server_str = result.Value(CATALOG_FAMILY, SERVER_COLUMN);
+  auto region_info_str = result.Value(MetaUtil::kCatalogFamily, MetaUtil::kRegionInfoColumn);
+  auto server_str = result.Value(MetaUtil::kCatalogFamily, MetaUtil::kServerColumn);
   CHECK(region_info_str);
   CHECK(server_str);
 
   auto row = result.Row();
   auto region_info = folly::to<RegionInfo>(*region_info_str);
   auto server_name = folly::to<ServerName>(*server_str);
-  return std::make_shared<RegionLocation>(row, std::move(region_info), server_name,
nullptr);
+  return std::make_shared<RegionLocation>(row, std::move(region_info), server_name);
+}
+
+bool MetaUtil::IsMeta(const hbase::pb::TableName &tn) {
+  return folly::to<std::string>(tn) == MetaUtil::kMetaTableName;
 }
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/meta-utils.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/meta-utils.h b/hbase-native-client/core/meta-utils.h
index d67f32d..d178179 100644
--- a/hbase-native-client/core/meta-utils.h
+++ b/hbase-native-client/core/meta-utils.h
@@ -34,6 +34,17 @@ namespace hbase {
  */
 class MetaUtil {
  public:
+  static constexpr const char *kSystemNamespace = "hbase";
+  static constexpr const char *kMetaTableQualifier = "meta";
+  static constexpr const char *kMetaTableName = "hbase:meta";
+  static constexpr const char *kMetaRegion = "1588230740";
+  static constexpr const char *kMetaRegionName = "hbase:meta,,1";
+  static constexpr const char *kCatalogFamily = "info";
+  static constexpr const char *kRegionInfoColumn = "regioninfo";
+  static constexpr const char *kServerColumn = "server";
+
+  MetaUtil();
+
   /**
    * Given a table and a row give the row key from which to start a scan to find
    * region locations.
@@ -49,6 +60,17 @@ class MetaUtil {
   /**
    * Return a RegionLocation from the parsed Response
    */
-  std::shared_ptr<RegionLocation> CreateLocation(const Response &resp);
+  std::shared_ptr<RegionLocation> CreateLocation(const Response &resp,
+                                                 const hbase::pb::TableName &tn);
+
+  /**
+   * Return whether the table is the meta table.
+   */
+  static bool IsMeta(const hbase::pb::TableName &tn);
+
+  const pb::RegionInfo &meta_region_info() const { return meta_region_info_; }
+
+ private:
+  pb::RegionInfo meta_region_info_;
 };
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/region-location.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h
index d5d9d67..822180b 100644
--- a/hbase-native-client/core/region-location.h
+++ b/hbase-native-client/core/region-location.h
@@ -44,9 +44,8 @@ class RegionLocation {
    * this region.
    * @param service the connected service to the regionserver.
    */
-  RegionLocation(std::string region_name, hbase::pb::RegionInfo ri, hbase::pb::ServerName
sn,
-                 std::shared_ptr<HBaseService> service)
-      : region_name_(region_name), ri_(ri), sn_(sn), service_(service) {}
+  RegionLocation(std::string region_name, hbase::pb::RegionInfo ri, hbase::pb::ServerName
sn)
+      : region_name_(region_name), ri_(ri), sn_(sn) {}
 
   /**
    * Get a reference to the regio info
@@ -64,19 +63,6 @@ class RegionLocation {
   const std::string &region_name() const { return region_name_; }
 
   /**
-   * Get a service. This could be closed or null. It's the caller's
-   * responsibility to check.
-   */
-  std::shared_ptr<HBaseService> service() { return service_; }
-
-  /**
-   * Set the service.
-   * This should be used if the region moved or if the connection is thought to
-   * be bad and a new tcp connection needs to be made.
-   */
-  void set_service(std::shared_ptr<HBaseService> s) { service_ = s; }
-
-  /**
    * Set the servername if the region has moved.
    */
   void set_server_name(hbase::pb::ServerName sn) { sn_ = sn; }
@@ -89,7 +75,6 @@ class RegionLocation {
   std::string region_name_;
   hbase::pb::RegionInfo ri_;
   hbase::pb::ServerName sn_;
-  std::shared_ptr<HBaseService> service_;
 };
 
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/response-converter.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc
index 9bc4892..4f9bfb1 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -93,7 +93,7 @@ std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const
R
 std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(
     const std::shared_ptr<ScanResponse> scan_resp, std::shared_ptr<CellScanner>
cell_scanner) {
   VLOG(3) << "FromScanResponse:" << scan_resp->ShortDebugString()
-          << " cell_scanner:" << (cell_scanner == nullptr);
+          << " cell_scanner:" << (cell_scanner != nullptr);
   int num_results =
       cell_scanner != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc
index f79d848..2fd7108 100644
--- a/hbase-native-client/core/simple-client.cc
+++ b/hbase-native-client/core/simple-client.cc
@@ -29,6 +29,7 @@
 #include "connection/rpc-client.h"
 #include "core/client.h"
 #include "core/get.h"
+#include "core/hbase-configuration-loader.h"
 #include "core/put.h"
 #include "core/scan.h"
 #include "core/table.h"
@@ -39,6 +40,7 @@
 using hbase::Client;
 using hbase::Configuration;
 using hbase::Get;
+using hbase::HBaseConfigurationLoader;
 using hbase::Scan;
 using hbase::Put;
 using hbase::Table;
@@ -49,6 +51,7 @@ using hbase::TimeUtil;
 DEFINE_string(table, "test_table", "What table to do the reads or writes");
 DEFINE_string(row, "row_", "row prefix");
 DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to");
+DEFINE_string(conf, "", "Conf directory to read the config from (optional)");
 DEFINE_uint64(num_rows, 10000, "How many rows to write and read");
 DEFINE_bool(puts, true, "Whether to perform puts");
 DEFINE_bool(gets, true, "Whether to perform gets");
@@ -76,10 +79,17 @@ int main(int argc, char *argv[]) {
   FLAGS_logtostderr = 1;
   FLAGS_stderrthreshold = 1;
 
-  // Configuration
-  auto conf = std::make_shared<Configuration>();
-  conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper);
-  conf->SetInt("hbase.client.cpu.thread.pool.size", FLAGS_threads);
+  std::shared_ptr<Configuration> conf = nullptr;
+  if (FLAGS_conf == "") {
+    // Configuration
+    conf = std::make_shared<Configuration>();
+    conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper);
+    conf->SetInt("hbase.client.cpu.thread.pool.size", FLAGS_threads);
+  } else {
+    setenv("HBASE_CONF", FLAGS_conf.c_str(), 1);
+    hbase::HBaseConfigurationLoader loader;
+    conf = std::make_shared<Configuration>(loader.LoadDefaultResources().value());
+  }
 
   auto row = FLAGS_row;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/zk-util.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/zk-util.cc b/hbase-native-client/core/zk-util.cc
index 50ea92a..d29c8c3 100644
--- a/hbase-native-client/core/zk-util.cc
+++ b/hbase-native-client/core/zk-util.cc
@@ -55,4 +55,8 @@ std::string ZKUtil::MetaZNode(const hbase::Configuration& conf) {
   return zk_node;
 }
 
+int32_t ZKUtil::SessionTimeout(const hbase::Configuration& conf) {
+  return conf.GetInt(kHBaseZookeeperSessionTimeout_, kDefHBaseZookeeperSessionTimeout_);
+}
+
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/core/zk-util.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/zk-util.h b/hbase-native-client/core/zk-util.h
index 8f2d627..403fbe4 100644
--- a/hbase-native-client/core/zk-util.h
+++ b/hbase-native-client/core/zk-util.h
@@ -34,8 +34,13 @@ class ZKUtil {
   static constexpr const char* kDefHBaseZnodeParent_ = "/hbase";
   static constexpr const char* kHBaseMetaRegionServer_ = "meta-region-server";
 
+  static constexpr const char* kHBaseZookeeperSessionTimeout_ = "zookeeper.session.timeout";
+  static constexpr const int32_t kDefHBaseZookeeperSessionTimeout_ = 90000;
+
   static std::string ParseZooKeeperQuorum(const hbase::Configuration& conf);
 
   static std::string MetaZNode(const hbase::Configuration& conf);
+
+  static int32_t SessionTimeout(const hbase::Configuration& conf);
 };
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/serde/region-info.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/region-info.h b/hbase-native-client/serde/region-info.h
index 1f08298..8010042 100644
--- a/hbase-native-client/serde/region-info.h
+++ b/hbase-native-client/serde/region-info.h
@@ -19,13 +19,13 @@
 
 #pragma once
 
-#include "if/HBase.pb.h"
-
 #include <folly/Conv.h>
 #include <boost/algorithm/string/predicate.hpp>
 
 #include <string>
 
+#include "if/HBase.pb.h"
+
 namespace hbase {
 namespace pb {
 template <class String>

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/serde/table-name.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/table-name.h b/hbase-native-client/serde/table-name.h
index b8b7776..3594802 100644
--- a/hbase-native-client/serde/table-name.h
+++ b/hbase-native-client/serde/table-name.h
@@ -18,12 +18,13 @@
  */
 #pragma once
 
+#include <folly/Conv.h>
+#include <folly/String.h>
+
 #include <memory>
 #include <string>
 #include <vector>
 
-#include <folly/Conv.h>
-#include <folly/String.h>
 #include "if/HBase.pb.h"
 
 namespace hbase {

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/test-util/mini-cluster.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/test-util/mini-cluster.cc b/hbase-native-client/test-util/mini-cluster.cc
index 688ea8e..56461e1 100644
--- a/hbase-native-client/test-util/mini-cluster.cc
+++ b/hbase-native-client/test-util/mini-cluster.cc
@@ -59,6 +59,7 @@ JNIEnv *MiniCluster::CreateVM(JavaVM **jvm) {
     }
     fd.close();
   }
+
   auto options = std::string{"-Djava.class.path="} + clspath;
   jvm_options.optionString = const_cast<char *>(options.c_str());
   args.options = &jvm_options;
@@ -185,6 +186,9 @@ JNIEnv *MiniCluster::env() {
 }
 // converts C char* to Java byte[]
 jbyteArray MiniCluster::StrToByteChar(const std::string &str) {
+  if (str.size() == 0) {
+    return nullptr;
+  }
   char *p = const_cast<char *>(str.c_str());
   int n = str.length();
   jbyteArray arr = env_->NewByteArray(n);

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/test-util/test-util.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/test-util/test-util.cc b/hbase-native-client/test-util/test-util.cc
index 26862d8..b32c635 100644
--- a/hbase-native-client/test-util/test-util.cc
+++ b/hbase-native-client/test-util/test-util.cc
@@ -79,6 +79,10 @@ void TestUtil::CreateTable(const std::string &table, const std::vector<std::stri
   mini_->CreateTable(table, families, keys);
 }
 
+void TestUtil::MoveRegion(const std::string &region, const std::string &server) {
+  mini_->MoveRegion(region, server);
+}
+
 void TestUtil::StartStandAloneInstance() {
   auto p = temp_dir_.path().string();
   auto cmd = std::string{"bin/start-local-hbase.sh " + p};

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/test-util/test-util.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/test-util/test-util.h b/hbase-native-client/test-util/test-util.h
index e26558b..40e99d1 100644
--- a/hbase-native-client/test-util/test-util.h
+++ b/hbase-native-client/test-util/test-util.h
@@ -68,6 +68,7 @@ class TestUtil {
   void StartStandAloneInstance();
   void StopStandAloneInstance();
   void RunShellCmd(const std::string &);
+  void MoveRegion(const std::string &region, const std::string &server);
 
  private:
   std::unique_ptr<MiniCluster> mini_;

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/utils/bytes-util-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/utils/bytes-util-test.cc b/hbase-native-client/utils/bytes-util-test.cc
index 16af021..4a49593 100644
--- a/hbase-native-client/utils/bytes-util-test.cc
+++ b/hbase-native-client/utils/bytes-util-test.cc
@@ -23,8 +23,7 @@
 
 #include "utils/bytes-util.h"
 
-using namespace std;
-using namespace hbase;
+using hbase::BytesUtil;
 
 TEST(TestBytesUtil, TestToStringBinary) {
   std::string empty{""};

http://git-wip-us.apache.org/repos/asf/hbase/blob/73b65b47/hbase-native-client/utils/bytes-util.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/utils/bytes-util.cc b/hbase-native-client/utils/bytes-util.cc
index a937782..12037c3 100644
--- a/hbase-native-client/utils/bytes-util.cc
+++ b/hbase-native-client/utils/bytes-util.cc
@@ -21,11 +21,11 @@
 
 #include <bits/stdc++.h>
 #include <boost/predef.h>
+#include <glog/logging.h>
+
 #include <memory>
 #include <string>
 
-#include <glog/logging.h>
-
 namespace hbase {
 
 constexpr char BytesUtil::kHexChars[];


Mime
View raw message