hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject hbase git commit: HBASE-17315 [C++] HBase Client and Table Implementation (Sudeep Sunthankar)
Date Sat, 14 Jan 2017 00:03:42 GMT
Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 db30da651 -> 15a51158f


HBASE-17315 [C++] HBase Client and Table Implementation (Sudeep Sunthankar)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/15a51158
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/15a51158
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/15a51158

Branch: refs/heads/HBASE-14850
Commit: 15a51158f9406d778a3168d8c009acdbfbf6a1b0
Parents: db30da6
Author: Enis Soztutar <enis@apache.org>
Authored: Fri Jan 13 16:03:30 2017 -0800
Committer: Enis Soztutar <enis@apache.org>
Committed: Fri Jan 13 16:03:30 2017 -0800

----------------------------------------------------------------------
 hbase-native-client/connection/BUCK     |   2 +
 hbase-native-client/core/BUCK           |  12 ++
 hbase-native-client/core/client-test.cc | 240 +++++++++++++++++++++++++++
 hbase-native-client/core/client.cc      |  46 +++--
 hbase-native-client/core/client.h       |  37 ++++-
 hbase-native-client/core/table.cc       |  74 +++++++++
 hbase-native-client/core/table.h        |  69 ++++++++
 7 files changed, 462 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/15a51158/hbase-native-client/connection/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK
index c22cc89..19536d5 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -30,6 +30,7 @@ cxx_library(
         "rpc-connection.h",
         "response.h",
         "service.h",
+        "rpc-client.h",
     ],
     srcs=[
         "client-dispatcher.cc",
@@ -38,6 +39,7 @@ cxx_library(
         "connection-pool.cc",
         "pipeline.cc",
         "request.cc",
+        "rpc-client.cc",
     ],
     deps=[
         "//if:if",

http://git-wip-us.apache.org/repos/asf/hbase/blob/15a51158/hbase-native-client/core/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK
index b7db41a..0d1bc93 100644
--- a/hbase-native-client/core/BUCK
+++ b/hbase-native-client/core/BUCK
@@ -35,6 +35,7 @@ cxx_library(
         "result.h",
         "request_converter.h",
         "response_converter.h",
+        "table.h",
     ],
     srcs=[
         "cell.cc",
@@ -49,6 +50,7 @@ cxx_library(
         "result.cc",
         "request_converter.cc",
         "response_converter.cc",
+        "table.cc",
     ],
     deps=[
         "//connection:connection",
@@ -107,6 +109,16 @@ cxx_test(
         "//if:if",
     ],
     run_test_separately=True,)
+cxx_test(
+    name="client-test",
+    srcs=["client-test.cc",],
+    deps=[
+        ":core",
+        "//if:if",
+        "//serde:serde",
+        "//test-util:test-util",
+    ],
+    run_test_separately=True,)
 cxx_binary(
     name="simple-client",
     srcs=["simple-client.cc",],

http://git-wip-us.apache.org/repos/asf/hbase/blob/15a51158/hbase-native-client/core/client-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
new file mode 100644
index 0000000..0fe0225
--- /dev/null
+++ b/hbase-native-client/core/client-test.cc
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <gtest/gtest.h>
+#include "core/client.h"
+#include "core/configuration.h"
+#include "core/get.h"
+#include "core/hbase_configuration_loader.h"
+#include "core/result.h"
+#include "core/table.h"
+#include "serde/table-name.h"
+#include "test-util/test-util.h"
+
+class ClientTest {
+ public:
+  const static std::string kDefHBaseConfPath;
+
+  const static std::string kHBaseDefaultXml;
+  const static std::string kHBaseSiteXml;
+
+  const static std::string kHBaseXmlData;
+
+  static void WriteDataToFile(const std::string &file, const std::string &xml_data)
{
+    std::ofstream hbase_conf;
+    hbase_conf.open(file.c_str());
+    hbase_conf << xml_data;
+    hbase_conf.close();
+  }
+
+  static void CreateHBaseConf(const std::string &dir, const std::string &file,
+                              const std::string xml_data) {
+    // Directory will be created if not present
+    if (!boost::filesystem::exists(dir)) {
+      boost::filesystem::create_directories(dir);
+    }
+    // Remove temp file always
+    boost::filesystem::remove((dir + file).c_str());
+    WriteDataToFile((dir + file), xml_data);
+  }
+
+  static void CreateHBaseConfWithEnv() {
+    // Creating Empty Config Files so that we dont get a Configuration exception @Client
+    CreateHBaseConf(kDefHBaseConfPath, kHBaseDefaultXml, kHBaseXmlData);
+    CreateHBaseConf(kDefHBaseConfPath, kHBaseSiteXml, kHBaseXmlData);
+    setenv("HBASE_CONF", kDefHBaseConfPath.c_str(), 1);
+  }
+};
+
+const std::string ClientTest::kDefHBaseConfPath("./build/test-data/client-test/conf/");
+
+const std::string ClientTest::kHBaseDefaultXml("hbase-default.xml");
+const std::string ClientTest::kHBaseSiteXml("hbase-site.xml");
+
+const std::string ClientTest::kHBaseXmlData(
+    "<?xml version=\"1.0\"?>\n<?xml-stylesheet type=\"text/xsl\" "
+    "href=\"configuration.xsl\"?>\n<!--\n/**\n *\n * Licensed to the Apache "
+    "Software Foundation (ASF) under one\n * or more contributor license "
+    "agreements.  See the NOTICE file\n * distributed with this work for "
+    "additional information\n * regarding copyright ownership.  The ASF "
+    "licenses this file\n * to you under the Apache License, Version 2.0 "
+    "(the\n * \"License\"); you may not use this file except in compliance\n * "
+    "with the License.  You may obtain a copy of the License at\n *\n *     "
+    "http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by "
+    "applicable law or agreed to in writing, software\n * distributed under "
+    "the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES "
+    "OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License "
+    "for the specific language governing permissions and\n * limitations under "
+    "the License.\n "
+    "*/\n-->\n<configuration>\n\n</configuration>");
+
+TEST(Client, EmptyConfigurationPassedToClient) {
+  ASSERT_ANY_THROW(hbase::Client client);
+}
+
+TEST(Client, ConfigurationPassedToClient) {
+  // Remove already configured env if present.
+  unsetenv("HBASE_CONF");
+  ClientTest::CreateHBaseConfWithEnv();
+
+  // Create Configuration
+  hbase::HBaseConfigurationLoader loader;
+  auto conf = loader.LoadDefaultResources();
+  // Create a client
+  hbase::Client client(conf.value());
+  client.Close();
+}
+
+TEST(Client, DefaultConfiguration) {
+  // Remove already configured env if present.
+  unsetenv("HBASE_CONF");
+  ClientTest::CreateHBaseConfWithEnv();
+
+  // Create Configuration
+  hbase::Client client;
+  client.Close();
+}
+
+TEST(Client, Get) {
+  // Remove already configured env if present.
+  unsetenv("HBASE_CONF");
+  ClientTest::CreateHBaseConfWithEnv();
+
+  // Using TestUtil to populate test data
+  hbase::TestUtil *test_util = new hbase::TestUtil();
+  test_util->RunShellCmd("create 't', 'd'");
+  test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'");
+  test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'");
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>("t");
+  auto row = "test2";
+
+  // Get to be performed on above HBase Table
+  hbase::Get get(row);
+
+  // Create Configuration
+  hbase::HBaseConfigurationLoader loader;
+  auto conf = loader.LoadDefaultResources();
+
+  // Create a client
+  hbase::Client client(conf.value());
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  // Perform the Get
+  auto result = table->Get(get);
+
+  // Stopping the connection as we are getting segfault due to some folly issue
+  // The connection stays open and we don't want that.
+  // So we are stopping the connection.
+  // We can remove this once we have fixed the folly part
+  delete test_util;
+
+  // Test the values, should be same as in put executed on hbase shell
+  ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty.";
+  EXPECT_EQ("test2", result->Row());
+  EXPECT_EQ("value2", *(result->Value("d", "2")));
+  EXPECT_EQ("value for extra", *(result->Value("d", "extra")));
+
+  table->Close();
+  client.Close();
+}
+
+TEST(Client, GetForNonExistentTable) {
+  // Remove already configured env if present.
+  unsetenv("HBASE_CONF");
+  ClientTest::CreateHBaseConfWithEnv();
+
+  // Using TestUtil to populate test data
+  hbase::TestUtil *test_util = new hbase::TestUtil();
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>("t_not_exists");
+  auto row = "test2";
+
+  // Get to be performed on above HBase Table
+  hbase::Get get(row);
+
+  // Create Configuration
+  hbase::HBaseConfigurationLoader loader;
+  auto conf = loader.LoadDefaultResources();
+
+  // Create a client
+  hbase::Client client(conf.value());
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  // Perform the Get
+  ASSERT_ANY_THROW(table->Get(get)) << "Table does not exist. We should get an exception";
+
+  // Stopping the connection as we are getting segfault due to some folly issue
+  // The connection stays open and we don't want that.
+  // So we are stopping the connection.
+  // We can remove this once we have fixed the folly part
+  delete test_util;
+
+  table->Close();
+  client.Close();
+}
+
+TEST(Client, GetForNonExistentRow) {
+  // Remove already configured env if present.
+  unsetenv("HBASE_CONF");
+  ClientTest::CreateHBaseConfWithEnv();
+
+  // Using TestUtil to populate test data
+  hbase::TestUtil *test_util = new hbase::TestUtil();
+  test_util->RunShellCmd("create 't_exists', 'd'");
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>("t_exists");
+  auto row = "row_not_exists";
+
+  // Get to be performed on above HBase Table
+  hbase::Get get(row);
+
+  // Create Configuration
+  hbase::HBaseConfigurationLoader loader;
+  auto conf = loader.LoadDefaultResources();
+
+  // Create a client
+  hbase::Client client(conf.value());
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  // Perform the Get
+  auto result = table->Get(get);
+  ASSERT_TRUE(result->IsEmpty()) << "Result should  be empty.";
+
+  // Stopping the connection as we are getting segfault due to some folly issue
+  // The connection stays open and we don't want that.
+  // So we are stopping the connection.
+  // We can remove this once we have fixed the folly part
+  delete test_util;
+
+  table->Close();
+  client.Close();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/15a51158/hbase-native-client/core/client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc
index 0389b24..6eb3d8f 100644
--- a/hbase-native-client/core/client.cc
+++ b/hbase-native-client/core/client.cc
@@ -20,27 +20,49 @@
 #include "core/client.h"
 
 #include <glog/logging.h>
-
-#include <unistd.h>
-#include <string>
-
-using namespace folly;
-using namespace std;
-using namespace hbase::pb;
+#include <exception>
+#include <utility>
 
 namespace hbase {
 
-Client::Client(std::string zk_quorum)
-    : cpu_executor_(std::make_shared<wangle::CPUThreadPoolExecutor>(4)),
-      io_executor_(std::make_shared<wangle::IOThreadPoolExecutor>(
-          sysconf(_SC_NPROCESSORS_ONLN))),
-      location_cache_(zk_quorum, cpu_executor_, io_executor_) {}
+Client::Client() {
+  HBaseConfigurationLoader loader;
+  auto conf = loader.LoadDefaultResources();
+  if (!conf) {
+    LOG(ERROR) << "Unable to create default Configuration object. Either hbase-default.xml
or "
+                  "hbase-site.xml is absent in the search path or problems in XML parsing";
+    throw std::runtime_error("Configuration object not present.");
+  }
+  conf_ = std::make_shared<hbase::Configuration>(conf.value());
+  auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_);
+  location_cache_ = std::make_shared<hbase::LocationCache>(zk_quorum, cpu_executor_,
io_executor_);
+}
+
+Client::Client(const hbase::Configuration &conf) {
+  conf_ = std::make_shared<hbase::Configuration>(conf);
+  auto zk_quorum = conf_->Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_);
+  location_cache_ = std::make_shared<hbase::LocationCache>(zk_quorum, cpu_executor_,
io_executor_);
+}
 
 // We can't have the threads continue running after everything is done
 // that leads to an error.
 Client::~Client() {
   cpu_executor_->stop();
   io_executor_->stop();
+  if (rpc_client_.get()) rpc_client_->Close();
+}
+
+std::unique_ptr<hbase::Table> Client::Table(const TableName &table_name) {
+  return std::make_unique<hbase::Table>(table_name, location_cache_, rpc_client_, conf_);
+}
+
+void Client::Close() {
+  if (is_closed_) return;
+
+  cpu_executor_->stop();
+  io_executor_->stop();
+  if (rpc_client_.get()) rpc_client_->Close();
+  is_closed_ = true;
 }
 
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/15a51158/hbase-native-client/core/client.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h
index 0ba1276..2bb506b 100644
--- a/hbase-native-client/core/client.h
+++ b/hbase-native-client/core/client.h
@@ -27,11 +27,18 @@
 #include <memory>
 #include <string>
 
+#include "core/configuration.h"
+#include "core/hbase_configuration_loader.h"
 #include "core/location-cache.h"
+#include "connection/rpc-client.h"
+#include "core/table.h"
+#include "serde/table-name.h"
 #include "if/Cell.pb.h"
 
-namespace hbase {
+using hbase::pb::TableName;
 
+namespace hbase {
+class Table;
 /**
  * Client.
  *
@@ -42,16 +49,34 @@ namespace hbase {
 class Client {
  public:
   /**
-   * Create a new client.
+   * @brief Create a new client.
    * @param quorum_spec Where to connect to get Zookeeper bootstrap information.
    */
-  explicit Client(std::string quorum_spec);
+  Client();
+  explicit Client(const hbase::Configuration &conf);
   ~Client();
+  /**
+   * @brief Retrieve a Table implementation for accessing a table.
+   * @param - table_name
+   */
+  std::unique_ptr<hbase::Table> Table(const TableName &table_name);
+
+  /**
+   * @brief Close the Client connection.
+   */
+  void Close();
 
  private:
-  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_;
-  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_;
-  LocationCache location_cache_;
+  const std::string kHBaseZookeeperQuorum_ = "hbase.zookeeper.quorum";
+  const std::string kDefHBaseZookeeperQuorum_ = "localhost:2181";
+  std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor_ =
+      std::make_shared<wangle::CPUThreadPoolExecutor>(4);
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_ =
+      std::make_shared<wangle::IOThreadPoolExecutor>(sysconf(_SC_NPROCESSORS_ONLN));
+  std::shared_ptr<hbase::LocationCache> location_cache_;
+  std::shared_ptr<hbase::RpcClient> rpc_client_ = std::make_shared<hbase::RpcClient>();
+  std::shared_ptr<hbase::Configuration> conf_;
+  bool is_closed_ = false;
 };
 
 }  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/15a51158/hbase-native-client/core/table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
new file mode 100644
index 0000000..58125f9
--- /dev/null
+++ b/hbase-native-client/core/table.cc
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "core/table.h"
+
+#include <folly/futures/Future.h>
+#include <chrono>
+#include <limits>
+#include <utility>
+#include <vector>
+
+#include "core/request_converter.h"
+#include "core/response_converter.h"
+#include "if/Client.pb.h"
+#include "security/user.h"
+#include "serde/server-name.h"
+
+using folly::Future;
+using hbase::pb::TableName;
+using hbase::security::User;
+using std::chrono::milliseconds;
+
+namespace hbase {
+
+Table::Table(const TableName &table_name,
+             const std::shared_ptr<hbase::LocationCache> &location_cache,
+             const std::shared_ptr<hbase::RpcClient> &rpc_client,
+             const std::shared_ptr<hbase::Configuration> &conf)
+    : table_name_(std::make_shared<TableName>(table_name)),
+      location_cache_(location_cache),
+      rpc_client_(rpc_client),
+      conf_(conf) {
+  client_retries_ = (conf_) ? conf_->GetInt("hbase.client.retries", client_retries_) :
5;
+}
+
+Table::~Table() {}
+
+std::unique_ptr<hbase::Result> Table::Get(const hbase::Get &get) {
+  auto loc = location_cache_->LocateFromMeta(*table_name_, get.Row()).get(milliseconds(1000));
+  auto req = hbase::RequestConverter::ToGetRequest(get, loc->region_name());
+  auto user = User::defaultUser();  // TODO: make User::current() similar to UserUtil
+
+  Future<Response> f =
+      rpc_client_->AsyncCall(loc->server_name().host_name(), loc->server_name().port(),
+                             std::move(req), user, "ClientService");
+  auto resp = f.get();
+
+  return hbase::ResponseConverter::FromGetResponse(resp);
+}
+
+void Table::Close() {
+  if (is_closed_) return;
+
+  if (rpc_client_.get()) rpc_client_->Close();
+  is_closed_ = true;
+}
+
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/15a51158/hbase-native-client/core/table.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
new file mode 100644
index 0000000..0e98cd2
--- /dev/null
+++ b/hbase-native-client/core/table.h
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "connection/rpc-client.h"
+#include "core/client.h"
+#include "core/configuration.h"
+#include "core/get.h"
+#include "core/location-cache.h"
+#include "core/result.h"
+#include "serde/table-name.h"
+
+using hbase::pb::TableName;
+
+namespace hbase {
+class Client;
+
+class Table {
+ public:
+  /**
+   * Constructors
+   */
+  Table(const TableName &table_name, const std::shared_ptr<hbase::LocationCache>
&location_cache,
+        const std::shared_ptr<hbase::RpcClient> &rpc_client,
+        const std::shared_ptr<hbase::Configuration> &conf);
+  ~Table();
+
+  /**
+   * @brief - Returns a Result object for the constructed Get.
+   * @param - get Get object to perform HBase Get operation.
+   */
+  std::unique_ptr<hbase::Result> Get(const hbase::Get &get);
+
+  /**
+   * @brief - Close the client connection.
+   */
+  void Close();
+
+ private:
+  std::shared_ptr<TableName> table_name_;
+  std::shared_ptr<hbase::LocationCache> location_cache_;
+  std::shared_ptr<hbase::RpcClient> rpc_client_;
+  std::shared_ptr<hbase::Configuration> conf_;
+  bool is_closed_ = false;
+  // default 5 retries. over-ridden in constructor.
+  int client_retries_ = 5;
+};
+} /* namespace hbase */


Mime
View raw message