hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject [05/25] hbase git commit: HBASE-18725 [C++] Install header files as well as library
Date Fri, 15 Sep 2017 21:20:22 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/scan.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/scan.cc b/hbase-native-client/src/hbase/client/scan.cc
new file mode 100644
index 0000000..0cadcf0
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/scan.cc
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/scan.h"
+
+#include <algorithm>
+#include <iterator>
+#include <limits>
+#include <stdexcept>
+
+namespace hbase {
+
+Scan::Scan() {}
+
+Scan::~Scan() {}
+
+Scan::Scan(const std::string &start_row) : start_row_(start_row) { CheckRow(start_row_); }
+
+Scan::Scan(const std::string &start_row, const std::string &stop_row)
+    : start_row_(start_row), stop_row_(stop_row) {
+  CheckRow(start_row_);
+  CheckRow(stop_row_);
+}
+
+Scan::Scan(const Scan &scan) : Query(scan) {
+  start_row_ = scan.start_row_;
+  stop_row_ = scan.stop_row_;
+  max_versions_ = scan.max_versions_;
+  caching_ = scan.caching_;
+  max_result_size_ = scan.max_result_size_;
+  cache_blocks_ = scan.cache_blocks_;
+  load_column_families_on_demand_ = scan.load_column_families_on_demand_;
+  reversed_ = scan.reversed_;
+  allow_partial_results_ = scan.allow_partial_results_;
+  consistency_ = scan.consistency_;
+  tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp()));
+  family_map_.insert(scan.family_map_.begin(), scan.family_map_.end());
+}
+
+Scan &Scan::operator=(const Scan &scan) {
+  Query::operator=(scan);
+  start_row_ = scan.start_row_;
+  stop_row_ = scan.stop_row_;
+  max_versions_ = scan.max_versions_;
+  caching_ = scan.caching_;
+  max_result_size_ = scan.max_result_size_;
+  cache_blocks_ = scan.cache_blocks_;
+  load_column_families_on_demand_ = scan.load_column_families_on_demand_;
+  reversed_ = scan.reversed_;
+  allow_partial_results_ = scan.allow_partial_results_;
+  consistency_ = scan.consistency_;
+  tr_.reset(new TimeRange(scan.tr_->MinTimeStamp(), scan.tr_->MaxTimeStamp()));
+  family_map_.insert(scan.family_map_.begin(), scan.family_map_.end());
+  return *this;
+}
+
+Scan::Scan(const Get &get) {
+  cache_blocks_ = get.CacheBlocks();
+  max_versions_ = get.MaxVersions();
+  tr_.reset(new TimeRange(get.Timerange().MinTimeStamp(), get.Timerange().MaxTimeStamp()));
+  family_map_.insert(get.FamilyMap().begin(), get.FamilyMap().end());
+}
+
+Scan &Scan::AddFamily(const std::string &family) {
+  const auto &it = family_map_.find(family);
+  /**
+   * Check if any qualifiers are already present or not.
+   * Remove all existing qualifiers if the given family is already present in
+   * the map
+   */
+  if (family_map_.end() != it) {
+    it->second.clear();
+  } else {
+    family_map_[family];
+  }
+  return *this;
+}
+
+Scan &Scan::AddColumn(const std::string &family, const std::string &qualifier) {
+  const auto &it = std::find(family_map_[family].begin(), family_map_[family].end(), qualifier);
+  /**
+   * Check if any qualifiers are already present or not.
+   * Add only if qualifiers for a given family are not present
+   */
+  if (it == family_map_[family].end()) {
+    family_map_[family].push_back(qualifier);
+  }
+  return *this;
+}
+
+void Scan::SetReversed(bool reversed) { reversed_ = reversed; }
+
+bool Scan::IsReversed() const { return reversed_; }
+
+void Scan::SetStartRow(const std::string &start_row) { start_row_ = start_row; }
+
+const std::string &Scan::StartRow() const { return start_row_; }
+
+void Scan::SetStopRow(const std::string &stop_row) { stop_row_ = stop_row; }
+
+const std::string &Scan::StopRow() const { return stop_row_; }
+
+void Scan::SetCaching(int caching) { caching_ = caching; }
+
+int Scan::Caching() const { return caching_; }
+
+Scan &Scan::SetConsistency(const hbase::pb::Consistency consistency) {
+  consistency_ = consistency;
+  return *this;
+}
+
+hbase::pb::Consistency Scan::Consistency() const { return consistency_; }
+
+void Scan::SetCacheBlocks(bool cache_blocks) { cache_blocks_ = cache_blocks; }
+
+bool Scan::CacheBlocks() const { return cache_blocks_; }
+
+void Scan::SetAllowPartialResults(bool allow_partial_results) {
+  allow_partial_results_ = allow_partial_results;
+}
+
+bool Scan::AllowPartialResults() const { return allow_partial_results_; }
+
+void Scan::SetLoadColumnFamiliesOnDemand(bool load_column_families_on_demand) {
+  load_column_families_on_demand_ = load_column_families_on_demand;
+}
+
+bool Scan::LoadColumnFamiliesOnDemand() const { return load_column_families_on_demand_; }
+
+Scan &Scan::SetMaxVersions(uint32_t max_versions) {
+  max_versions_ = max_versions;
+  return *this;
+}
+
+int Scan::MaxVersions() const { return max_versions_; }
+
+void Scan::SetMaxResultSize(int64_t max_result_size) { max_result_size_ = max_result_size; }
+
+int64_t Scan::MaxResultSize() const { return max_result_size_; }
+
+Scan &Scan::SetTimeRange(int64_t min_stamp, int64_t max_stamp) {
+  tr_.reset(new TimeRange(min_stamp, max_stamp));
+  return *this;
+}
+
+Scan &Scan::SetTimeStamp(int64_t timestamp) {
+  tr_.reset(new TimeRange(timestamp, timestamp + 1));
+  return *this;
+}
+
+const TimeRange &Scan::Timerange() const { return *tr_; }
+
+void Scan::CheckRow(const std::string &row) {
+  const int32_t kMaxRowLength = std::numeric_limits<int16_t>::max();
+  int row_length = row.size();
+  if (0 == row_length) {
+    throw std::runtime_error("Row length can't be 0");
+  }
+  if (row_length > kMaxRowLength) {
+    throw std::runtime_error("Length of " + row + " is greater than max row size: " +
+                             std::to_string(kMaxRowLength));
+  }
+}
+
+bool Scan::HasFamilies() const { return !family_map_.empty(); }
+
+const std::map<std::string, std::vector<std::string>> &Scan::FamilyMap() const {
+  return family_map_;
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/scanner-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/scanner-test.cc b/hbase-native-client/src/hbase/client/scanner-test.cc
new file mode 100644
index 0000000..8a58e34
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/scanner-test.cc
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <thread>
+#include <vector>
+
+#include "hbase/client/async-client-scanner.h"
+#include "hbase/client/async-table-result-scanner.h"
+#include "hbase/client/cell.h"
+#include "hbase/client/client.h"
+#include "hbase/client/configuration.h"
+#include "hbase/client/filter.h"
+#include "hbase/client/get.h"
+#include "hbase/client/hbase-configuration-loader.h"
+#include "hbase/client/put.h"
+#include "hbase/client/result.h"
+#include "hbase/client/row.h"
+#include "hbase/client/table.h"
+#include "hbase/if/Comparator.pb.h"
+#include "hbase/if/Filter.pb.h"
+#include "hbase/serde/table-name.h"
+#include "hbase/test-util/test-util.h"
+#include "hbase/utils/time-util.h"
+
+using hbase::Cell;
+using hbase::ComparatorFactory;
+using hbase::Comparator;
+using hbase::Configuration;
+using hbase::Get;
+using hbase::Put;
+using hbase::Result;
+using hbase::Scan;
+using hbase::Table;
+using hbase::TestUtil;
+using hbase::TimeUtil;
+using hbase::AsyncClientScanner;
+using hbase::AsyncTableResultScanner;
+using hbase::FilterFactory;
+using hbase::pb::CompareType;
+
+class ScannerTest : public ::testing::Test {
+ public:
+  static std::unique_ptr<hbase::TestUtil> test_util;
+  static const uint32_t num_rows;
+
+  static void SetUpTestCase() {
+    google::InstallFailureSignalHandler();
+    test_util = std::make_unique<hbase::TestUtil>();
+    test_util->StartMiniCluster(2);
+  }
+};
+std::unique_ptr<hbase::TestUtil> ScannerTest::test_util = nullptr;
+const uint32_t ScannerTest::num_rows = 1000;
+
+std::string Family(uint32_t i) { return "f" + folly::to<std::string>(i); }
+
+std::string Row(uint32_t i, int width) {
+  std::ostringstream s;
+  s.fill('0');
+  s.width(width);
+  s << i;
+  return "row" + s.str();
+}
+
+std::string Row(uint32_t i) { return Row(i, 3); }
+
+std::unique_ptr<Put> MakePut(const std::string &row, uint32_t num_families) {
+  auto put = std::make_unique<Put>(row);
+
+  for (uint32_t i = 0; i < num_families; i++) {
+    put->AddColumn(Family(i), "q1", row);
+    put->AddColumn(Family(i), "q2", row + "-" + row);
+  }
+
+  return std::move(put);
+}
+
+void CheckResult(const Result &r, std::string expected_row, uint32_t num_families) {
+  VLOG(1) << r.DebugString();
+  auto row = r.Row();
+  ASSERT_EQ(row, expected_row);
+  ASSERT_EQ(r.Cells().size(), num_families * 2);
+  for (uint32_t i = 0; i < num_families; i++) {
+    ASSERT_EQ(*r.Value(Family(i), "q1"), row);
+    ASSERT_EQ(*r.Value(Family(i), "q2"), row + "-" + row);
+  }
+}
+
+void CreateTable(std::string table_name, uint32_t num_families, uint32_t num_rows,
+                 int32_t num_regions) {
+  LOG(INFO) << "Creating the table " << table_name
+            << " with num_regions:" << folly::to<std::string>(num_regions);
+  std::vector<std::string> families;
+  for (uint32_t i = 0; i < num_families; i++) {
+    families.push_back(Family(i));
+  }
+  if (num_regions <= 1) {
+    ScannerTest::test_util->CreateTable(table_name, families);
+  } else {
+    std::vector<std::string> keys;
+    for (int32_t i = 0; i < num_regions - 1; i++) {
+      keys.push_back(Row(i * (num_rows / (num_regions - 1))));
+      LOG(INFO) << "Split key:" << keys[keys.size() - 1];
+    }
+    ScannerTest::test_util->CreateTable(table_name, families, keys);
+  }
+}
+
+std::unique_ptr<hbase::Client> CreateTableAndWriteData(std::string table_name,
+                                                       uint32_t num_families, uint32_t num_rows,
+                                                       int32_t num_regions) {
+  CreateTable(table_name, num_families, num_rows, num_regions);
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+  auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf());
+  auto table = client->Table(tn);
+
+  LOG(INFO) << "Writing data to the table, num_rows:" << num_rows;
+  // Perform Puts
+  for (uint32_t i = 0; i < num_rows; i++) {
+    table->Put(*MakePut(Row(i), num_families));
+  }
+  return std::move(client);
+}
+
+void TestScan(const Scan &scan, uint32_t num_families, int32_t start, int32_t num_rows,
+              Table *table) {
+  LOG(INFO) << "Starting scan for the test with start:" << scan.StartRow()
+            << ", stop:" << scan.StopRow() << " expected_num_rows:" << num_rows;
+  auto scanner = table->Scan(scan);
+
+  uint32_t i = start;
+  auto r = scanner->Next();
+  while (r != nullptr) {
+    CheckResult(*r, Row(i++), num_families);
+    r = scanner->Next();
+  }
+  ASSERT_EQ(i - start, num_rows);
+}
+
+void TestScan(const Scan &scan, int32_t start, int32_t num_rows, Table *table) {
+  TestScan(scan, 1, start, num_rows, table);
+}
+
+void TestScan(uint32_t num_families, int32_t start, int32_t stop, int32_t num_rows, Table *table) {
+  Scan scan{};
+  if (start >= 0) {
+    scan.SetStartRow(Row(start));
+  } else {
+    start = 0;  // neded for below logic
+  }
+  if (stop >= 0) {
+    scan.SetStopRow(Row(stop));
+  }
+
+  TestScan(scan, num_families, start, num_rows, table);
+}
+
+void TestScan(int32_t start, int32_t stop, int32_t num_rows, Table *table) {
+  TestScan(1, start, stop, num_rows, table);
+}
+
+void TestScan(uint32_t num_families, std::string start, std::string stop, int32_t num_rows,
+              Table *table) {
+  Scan scan{};
+
+  scan.SetStartRow(start);
+  scan.SetStopRow(stop);
+
+  LOG(INFO) << "Starting scan for the test with start:" << start << ", stop:" << stop
+            << " expected_num_rows:" << num_rows;
+  auto scanner = table->Scan(scan);
+
+  uint32_t i = 0;
+  auto r = scanner->Next();
+  while (r != nullptr) {
+    VLOG(1) << r->DebugString();
+    i++;
+    ASSERT_EQ(r->Map().size(), num_families);
+    r = scanner->Next();
+  }
+  ASSERT_EQ(i, num_rows);
+}
+
+void TestScan(std::string start, std::string stop, int32_t num_rows, Table *table) {
+  TestScan(1, start, stop, num_rows, table);
+}
+
+void TestScanCombinations(Table *table, uint32_t num_families) {
+  // full table
+  TestScan(num_families, -1, -1, 1000, table);
+  TestScan(num_families, -1, 999, 999, table);
+  TestScan(num_families, 0, -1, 1000, table);
+  TestScan(num_families, 0, 999, 999, table);
+  TestScan(num_families, 10, 990, 980, table);
+  TestScan(num_families, 1, 998, 997, table);
+
+  TestScan(num_families, 123, 345, 222, table);
+  TestScan(num_families, 234, 456, 222, table);
+  TestScan(num_families, 345, 567, 222, table);
+  TestScan(num_families, 456, 678, 222, table);
+
+  // single results
+  TestScan(num_families, 111, 111, 1, table);  // split keys are like 111, 222, 333, etc
+  TestScan(num_families, 111, 112, 1, table);
+  TestScan(num_families, 332, 332, 1, table);
+  TestScan(num_families, 332, 333, 1, table);
+  TestScan(num_families, 333, 333, 1, table);
+  TestScan(num_families, 333, 334, 1, table);
+  TestScan(num_families, 42, 42, 1, table);
+  TestScan(num_families, 921, 921, 1, table);
+  TestScan(num_families, 0, 0, 1, table);
+  TestScan(num_families, 0, 1, 1, table);
+  TestScan(num_families, 999, 999, 1, table);
+
+  // few results
+  TestScan(num_families, 0, 0, 1, table);
+  TestScan(num_families, 0, 2, 2, table);
+  TestScan(num_families, 0, 5, 5, table);
+  TestScan(num_families, 10, 15, 5, table);
+  TestScan(num_families, 105, 115, 10, table);
+  TestScan(num_families, 111, 221, 110, table);
+  TestScan(num_families, 111, 222, 111, table);  // crossing region boundary 111-222
+  TestScan(num_families, 111, 223, 112, table);
+  TestScan(num_families, 111, 224, 113, table);
+  TestScan(num_families, 990, 999, 9, table);
+  TestScan(num_families, 900, 998, 98, table);
+
+  // empty results
+  TestScan(num_families, "a", "a", 0, table);
+  TestScan(num_families, "a", "r", 0, table);
+  TestScan(num_families, "", "r", 0, table);
+  TestScan(num_families, "s", "", 0, table);
+  TestScan(num_families, "s", "z", 0, table);
+  TestScan(num_families, Row(110) + "a", Row(111), 0, table);
+  TestScan(num_families, Row(111) + "a", Row(112), 0, table);
+  TestScan(num_families, Row(123) + "a", Row(124), 0, table);
+
+  // custom
+  TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table);
+  TestScan(num_families, Row(0, 3), Row(0, 4), 1, table);
+  TestScan(num_families, Row(999, 3), Row(9999, 4), 1, table);
+  TestScan(num_families, Row(111, 3), Row(1111, 4), 1, table);
+  TestScan(num_families, Row(0, 3), Row(9999, 4), 1000, table);
+  TestScan(num_families, "a", "z", 1000, table);
+}
+
+// some of these tests are from TestAsyncTableScan* and some from TestFromClientSide* and
+// TestScannersFromClientSide*
+
+TEST_F(ScannerTest, SingleRegionScan) {
+  auto client = CreateTableAndWriteData("t_single_region_scan", 1, num_rows, 1);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_single_region_scan"));
+
+  TestScanCombinations(table.get(), 1);
+}
+
+TEST_F(ScannerTest, MultiRegionScan) {
+  auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 10);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan"));
+
+  TestScanCombinations(table.get(), 1);
+}
+
+TEST_F(ScannerTest, ScanWithPauses) {
+  auto max_result_size =
+      ScannerTest::test_util->conf()->GetInt("hbase.client.scanner.max.result.size", 2097152);
+  ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", 100);
+  auto client = CreateTableAndWriteData("t_multi_region_scan", 1, num_rows, 5);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_multi_region_scan"));
+
+  VLOG(1) << "Starting scan for the test";
+  Scan scan{};
+  scan.SetCaching(100);
+  auto scanner = table->Scan(scan);
+
+  uint32_t i = 0;
+  auto r = scanner->Next();
+  while (r != nullptr) {
+    CheckResult(*r, Row(i++), 1);
+    r = scanner->Next();
+    std::this_thread::sleep_for(TimeUtil::MillisToNanos(10));
+  }
+
+  auto s = static_cast<AsyncTableResultScanner *>(scanner.get());
+  ASSERT_GT(s->num_prefetch_stopped(), 0);
+
+  ASSERT_EQ(i, num_rows);
+  ScannerTest::test_util->conf()->SetInt("hbase.client.scanner.max.result.size", max_result_size);
+}
+
+TEST_F(ScannerTest, ScanWithFilters) {
+  auto client = CreateTableAndWriteData("t_scan_with_filters", 1, num_rows, 1);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_with_filters"));
+
+  Scan scan{};
+  scan.SetFilter(FilterFactory::ValueFilter(CompareType::GREATER_OR_EQUAL,
+                                            *ComparatorFactory::BinaryComparator(Row(800))));
+
+  TestScan(scan, 800, 200, table.get());
+}
+
+TEST_F(ScannerTest, ScanMultiFamily) {
+  auto client = CreateTableAndWriteData("t_scan_multi_family", 3, num_rows, 1);
+  auto table = client->Table(folly::to<hbase::pb::TableName>("t_scan_multi_family"));
+
+  TestScanCombinations(table.get(), 3);
+}
+
+TEST_F(ScannerTest, ScanNullQualifier) {
+  std::string table_name{"t_scan_null_qualifier"};
+  std::string row{"row"};
+  CreateTable(table_name, 1, 1, 1);
+
+  auto tn = folly::to<hbase::pb::TableName>(table_name);
+  auto client = std::make_unique<hbase::Client>(*ScannerTest::test_util->conf());
+  auto table = client->Table(tn);
+
+  // Perform Puts
+  Put put{row};
+  put.AddColumn(Family(0), "q1", row);
+  put.AddColumn(Family(0), "", row);
+  table->Put(put);
+
+  Scan scan1{};
+  scan1.AddColumn(Family(0), "");
+  auto scanner1 = table->Scan(scan1);
+  auto r1 = scanner1->Next();
+  ASSERT_EQ(r1->Cells().size(), 1);
+  ASSERT_EQ(scanner1->Next(), nullptr);
+
+  Scan scan2{};
+  scan2.AddFamily(Family(0));
+  auto scanner2 = table->Scan(scan2);
+  auto r2 = scanner2->Next();
+  ASSERT_EQ(r2->Cells().size(), 2);
+  ASSERT_EQ(scanner2->Next(), nullptr);
+}
+
+TEST_F(ScannerTest, ScanNoResults) {
+  std::string table_name{"t_scan_no_results"};
+  auto client = CreateTableAndWriteData(table_name, 1, num_rows, 3);
+  auto table = client->Table(folly::to<hbase::pb::TableName>(table_name));
+
+  Scan scan{};
+  scan.AddColumn(Family(0), "non_existing_qualifier");
+
+  TestScan(scan, 0, 0, table.get());
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/simple-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/simple-client.cc b/hbase-native-client/src/hbase/client/simple-client.cc
new file mode 100644
index 0000000..25f203a
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/simple-client.cc
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Logging.h>
+#include <folly/Random.h>
+#include <gflags/gflags.h>
+
+#include <atomic>
+#include <chrono>
+#include <iostream>
+#include <thread>
+
+#include "hbase/connection/rpc-client.h"
+#include "hbase/client/client.h"
+#include "hbase/client/get.h"
+#include "hbase/client/hbase-configuration-loader.h"
+#include "hbase/client/put.h"
+#include "hbase/client/scan.h"
+#include "hbase/client/table.h"
+#include "hbase/serde/server-name.h"
+#include "hbase/serde/table-name.h"
+#include "hbase/utils/time-util.h"
+
+using hbase::Client;
+using hbase::Configuration;
+using hbase::Get;
+using hbase::HBaseConfigurationLoader;
+using hbase::Scan;
+using hbase::Put;
+using hbase::Result;
+using hbase::Table;
+using hbase::pb::TableName;
+using hbase::pb::ServerName;
+using hbase::TimeUtil;
+
+DEFINE_string(table, "test_table", "What table to do the reads or writes");
+DEFINE_string(row, "row_", "row prefix");
+DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to");
+DEFINE_string(conf, "", "Conf directory to read the config from (optional)");
+DEFINE_uint64(num_rows, 10000, "How many rows to write and read");
+DEFINE_uint64(batch_num_rows, 10000, "How many rows batch for multi-gets and multi-puts");
+DEFINE_uint64(report_num_rows, 10000, "How frequent we should report the progress");
+DEFINE_bool(puts, true, "Whether to perform puts");
+DEFINE_bool(gets, true, "Whether to perform gets");
+DEFINE_bool(multigets, true, "Whether to perform multi-gets");
+DEFINE_bool(scans, true, "Whether to perform scans");
+DEFINE_bool(display_results, false, "Whether to display the Results from Gets");
+DEFINE_int32(threads, 6, "How many cpu threads");
+
+std::unique_ptr<Put> MakePut(const std::string &row) {
+  auto put = std::make_unique<Put>(row);
+  put->AddColumn("f", "q", row);
+  return std::move(put);
+}
+
+std::string Row(const std::string &prefix, uint64_t i) {
+  auto suf = folly::to<std::string>(i);
+  return prefix + suf;
+}
+
+void ValidateResult(const Result &result, const std::string &row) {
+  CHECK(!result.IsEmpty());
+  CHECK_EQ(result.Row(), row);
+  CHECK_EQ(result.Size(), 1);
+  CHECK_EQ(result.Value("f", "q").value(), row);
+}
+
+int main(int argc, char *argv[]) {
+  gflags::SetUsageMessage("Simple client to get a single row from HBase on the comamnd line");
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  google::InitGoogleLogging(argv[0]);
+  google::InstallFailureSignalHandler();
+  FLAGS_logtostderr = 1;
+  FLAGS_stderrthreshold = 1;
+
+  std::shared_ptr<Configuration> conf = nullptr;
+  if (FLAGS_conf == "") {
+    // Configuration
+    conf = std::make_shared<Configuration>();
+    conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper);
+    conf->SetInt("hbase.client.cpu.thread.pool.size", FLAGS_threads);
+  } else {
+    setenv("HBASE_CONF", FLAGS_conf.c_str(), 1);
+    hbase::HBaseConfigurationLoader loader;
+    conf = std::make_shared<Configuration>(loader.LoadDefaultResources().value());
+  }
+
+  auto row = FLAGS_row;
+
+  auto tn = std::make_shared<TableName>(folly::to<TableName>(FLAGS_table));
+  auto num_puts = FLAGS_num_rows;
+
+  auto client = std::make_unique<Client>(*conf);
+  auto table = client->Table(*tn);
+
+  auto start_ns = TimeUtil::GetNowNanos();
+
+  // Do the Put requests
+  if (FLAGS_puts) {
+    LOG(INFO) << "Sending put requests";
+    for (uint64_t i = 0; i < num_puts; i++) {
+      table->Put(*MakePut(Row(FLAGS_row, i)));
+      if (i != 0 && i % FLAGS_report_num_rows == 0) {
+        LOG(INFO) << "Sent  " << i << " Put requests in " << TimeUtil::ElapsedMillis(start_ns)
+                  << " ms.";
+      }
+    }
+
+    LOG(INFO) << "Successfully sent  " << num_puts << " Put requests in "
+              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+  }
+
+  // Do the Get requests
+  if (FLAGS_gets) {
+    LOG(INFO) << "Sending get requests";
+    start_ns = TimeUtil::GetNowNanos();
+    for (uint64_t i = 0; i < num_puts; i++) {
+      auto row = Row(FLAGS_row, i);
+      auto result = table->Get(Get{row});
+      if (FLAGS_display_results) {
+        LOG(INFO) << result->DebugString();
+      } else if (i != 0 && i % FLAGS_report_num_rows == 0) {
+        LOG(INFO) << "Sent  " << i << " Get requests in " << TimeUtil::ElapsedMillis(start_ns)
+                  << " ms.";
+      }
+      ValidateResult(*result, row);
+    }
+
+    LOG(INFO) << "Successfully sent  " << num_puts << " Get requests in "
+              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+  }
+
+  // Do the Multi-Gets
+  if (FLAGS_multigets) {
+    LOG(INFO) << "Sending multi-get requests";
+    start_ns = TimeUtil::GetNowNanos();
+    std::vector<hbase::Get> gets;
+
+    for (uint64_t i = 0; i < num_puts;) {
+      gets.clear();
+      // accumulate batch_num_rows at a time
+      for (uint64_t j = 0; j < FLAGS_batch_num_rows && i < num_puts; ++j) {
+        hbase::Get get(Row(FLAGS_row, i));
+        gets.push_back(get);
+        i++;
+      }
+      auto results = table->Get(gets);
+
+      if (FLAGS_display_results) {
+        for (const auto &result : results) LOG(INFO) << result->DebugString();
+      } else if (i != 0 && i % FLAGS_report_num_rows == 0) {
+        LOG(INFO) << "Sent  " << i << " Multi-Get requests in " << TimeUtil::ElapsedMillis(start_ns)
+                  << " ms.";
+      }
+    }
+
+    LOG(INFO) << "Successfully sent  " << num_puts << " Multi-Get requests in "
+              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+  }
+
+  // Do the Scan
+  if (FLAGS_scans) {
+    LOG(INFO) << "Starting scanner";
+    start_ns = TimeUtil::GetNowNanos();
+    Scan scan{};
+    auto scanner = table->Scan(scan);
+
+    uint64_t i = 0;
+    auto r = scanner->Next();
+    while (r != nullptr) {
+      if (FLAGS_display_results) {
+        LOG(INFO) << r->DebugString();
+      }
+      r = scanner->Next();
+      i++;
+      if (!FLAGS_display_results && i != 0 && i % FLAGS_report_num_rows == 0) {
+        LOG(INFO) << "Scan iterated over " << i << " results " << TimeUtil::ElapsedMillis(start_ns)
+                  << " ms.";
+      }
+    }
+
+    LOG(INFO) << "Successfully iterated over  " << i << " Scan results in "
+              << TimeUtil::ElapsedMillis(start_ns) << " ms.";
+    scanner->Close();
+  }
+
+  table->Close();
+  client->Close();
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/table.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/table.cc b/hbase-native-client/src/hbase/client/table.cc
new file mode 100644
index 0000000..e877ec8
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/table.cc
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/table.h"
+
+#include <chrono>
+#include <limits>
+#include <utility>
+#include <vector>
+
+#include "hbase/client/async-connection.h"
+#include "hbase/client/async-table-result-scanner.h"
+#include "hbase/client/request-converter.h"
+#include "hbase/client/response-converter.h"
+#include "hbase/if/Client.pb.h"
+#include "hbase/security/user.h"
+#include "hbase/serde/server-name.h"
+#include "hbase/utils/time-util.h"
+
+using hbase::pb::TableName;
+using hbase::security::User;
+using std::chrono::milliseconds;
+
+namespace hbase {
+
+Table::Table(const TableName &table_name, std::shared_ptr<AsyncConnection> async_connection)
+    : table_name_(std::make_shared<TableName>(table_name)),
+      async_connection_(async_connection),
+      conf_(async_connection->conf()) {
+  async_table_ = std::make_unique<RawAsyncTable>(table_name_, async_connection);
+}
+
+Table::~Table() {}
+
+std::shared_ptr<hbase::Result> Table::Get(const hbase::Get &get) {
+  auto context = async_table_->Get(get);
+  return context.get(operation_timeout());
+}
+
+std::shared_ptr<ResultScanner> Table::Scan(const hbase::Scan &scan) {
+  auto max_cache_size = ResultSize2CacheSize(
+      scan.MaxResultSize() > 0 ? scan.MaxResultSize()
+                               : async_connection_->connection_conf()->scanner_max_result_size());
+  auto scanner = std::make_shared<AsyncTableResultScanner>(max_cache_size);
+  async_table_->Scan(scan, scanner);
+  return scanner;
+}
+
+int64_t Table::ResultSize2CacheSize(int64_t max_results_size) const {
+  // * 2 if possible
+  return max_results_size > (std::numeric_limits<int64_t>::max() / 2) ? max_results_size
+                                                                      : max_results_size * 2;
+}
+
+void Table::Put(const hbase::Put &put) {
+  auto future = async_table_->Put(put);
+  future.get(operation_timeout());
+}
+
+bool Table::CheckAndPut(const std::string &row, const std::string &family,
+                        const std::string &qualifier, const std::string &value,
+                        const hbase::Put &put, const pb::CompareType &compare_op) {
+  auto context = async_table_->CheckAndPut(row, family, qualifier, value, put, compare_op);
+  return context.get(operation_timeout());
+}
+
+bool Table::CheckAndDelete(const std::string &row, const std::string &family,
+                           const std::string &qualifier, const std::string &value,
+                           const hbase::Delete &del, const pb::CompareType &compare_op) {
+  auto context = async_table_->CheckAndDelete(row, family, qualifier, value, del, compare_op);
+  return context.get(operation_timeout());
+}
+
+void Table::Delete(const hbase::Delete &del) {
+  auto future = async_table_->Delete(del);
+  future.get(operation_timeout());
+}
+
+std::shared_ptr<hbase::Result> Table::Increment(const hbase::Increment &increment) {
+  auto context = async_table_->Increment(increment);
+  return context.get(operation_timeout());
+}
+
+std::shared_ptr<hbase::Result> Table::Append(const hbase::Append &append) {
+  auto context = async_table_->Append(append);
+  return context.get(operation_timeout());
+}
+
+milliseconds Table::operation_timeout() const {
+  return TimeUtil::ToMillis(async_connection_->connection_conf()->operation_timeout());
+}
+
+void Table::Close() { async_table_->Close(); }
+
+std::shared_ptr<RegionLocation> Table::GetRegionLocation(const std::string &row) {
+  return async_connection_->region_locator()->LocateRegion(*table_name_, row).get();
+}
+
+std::vector<std::shared_ptr<hbase::Result>> Table::Get(const std::vector<hbase::Get> &gets) {
+  auto tresults = async_table_->Get(gets).get(operation_timeout());
+  std::vector<std::shared_ptr<hbase::Result>> results{};
+  uint32_t num = 0;
+  for (auto tresult : tresults) {
+    if (tresult.hasValue()) {
+      results.push_back(tresult.value());
+    } else if (tresult.hasException()) {
+      LOG(ERROR) << "Caught exception:- " << tresult.exception().what() << " for "
+                 << gets[num++].row();
+      throw tresult.exception();
+    }
+  }
+  return results;
+}
+
+void Table::Put(const std::vector<hbase::Put> &puts) {
+  auto tresults = async_table_->Put(puts).get(operation_timeout());
+  uint32_t num = 0;
+  for (auto tresult : tresults) {
+    if (tresult.hasException()) {
+      LOG(ERROR) << "Caught exception:- " << tresult.exception().what() << " for "
+                 << puts[num++].row();
+      throw tresult.exception();
+    }
+  }
+  return;
+}
+
+} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/time-range-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/time-range-test.cc b/hbase-native-client/src/hbase/client/time-range-test.cc
new file mode 100644
index 0000000..a3db877
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/time-range-test.cc
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/time-range.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+using namespace hbase;
+
+TEST(TimeRange, DefaultObject) {
+  TimeRange *timerange_def = nullptr;
+  ASSERT_NO_THROW(timerange_def = new TimeRange());
+
+  EXPECT_EQ(0, timerange_def->MinTimeStamp());
+  EXPECT_EQ(std::numeric_limits<int64_t>::max(), timerange_def->MaxTimeStamp());
+  EXPECT_NE(1000, timerange_def->MinTimeStamp());
+  EXPECT_NE(2000, timerange_def->MaxTimeStamp());
+  delete timerange_def;
+  timerange_def = nullptr;
+}
+
+TEST(TimeRange, Exception) {
+  // Negative Min TS
+  ASSERT_THROW(TimeRange(-1000, 2000), std::runtime_error);
+
+  // Negative Max TS
+  ASSERT_THROW(TimeRange(1000, -2000), std::runtime_error);
+
+  // Min TS > Max TS
+  ASSERT_THROW(TimeRange(10000, 2000), std::runtime_error);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/time-range.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/time-range.cc b/hbase-native-client/src/hbase/client/time-range.cc
new file mode 100644
index 0000000..b53e6f9
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/time-range.cc
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/time-range.h"
+#include <limits>
+#include <stdexcept>
+#include <string>
+
+namespace hbase {
+
+TimeRange::TimeRange()
+    : min_timestamp_(0L), max_timestamp_(std::numeric_limits<int64_t>::max()), all_time_(true) {}
+
+TimeRange::TimeRange(const TimeRange &tr) {
+  this->all_time_ = tr.all_time_;
+  this->max_timestamp_ = tr.max_timestamp_;
+  this->min_timestamp_ = tr.min_timestamp_;
+}
+
+TimeRange &TimeRange::operator=(const TimeRange &tr) {
+  this->all_time_ = tr.all_time_;
+  this->max_timestamp_ = tr.max_timestamp_;
+  this->min_timestamp_ = tr.min_timestamp_;
+  return *this;
+}
+
+TimeRange::~TimeRange() {}
+
+TimeRange::TimeRange(int64_t min_timestamp) {
+  this->min_timestamp_ = min_timestamp;
+  this->max_timestamp_ = std::numeric_limits<int64_t>::max();
+  this->all_time_ = false;
+}
+
+TimeRange::TimeRange(int64_t min_timestamp, int64_t max_timestamp) {
+  if (min_timestamp < 0 || max_timestamp < 0) {
+    throw std::runtime_error("Timestamp cannot be negative. min_timestamp: " +
+                             std::to_string(min_timestamp) + ", max_timestamp:" +
+                             std::to_string(max_timestamp));
+  }
+  if (max_timestamp < min_timestamp) {
+    throw std::runtime_error("max_timestamp [" + std::to_string(max_timestamp) +
+                             "] should be greater than min_timestamp [" +
+                             std::to_string(min_timestamp) + "]");
+  }
+
+  this->min_timestamp_ = min_timestamp;
+  this->max_timestamp_ = max_timestamp;
+  this->all_time_ = false;
+}
+
+int64_t TimeRange::MinTimeStamp() const { return this->min_timestamp_; }
+
+int64_t TimeRange::MaxTimeStamp() const { return this->max_timestamp_; }
+
+bool TimeRange::IsAllTime() const { return this->all_time_; }
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/zk-util-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/zk-util-test.cc b/hbase-native-client/src/hbase/client/zk-util-test.cc
new file mode 100644
index 0000000..83d22fb
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/zk-util-test.cc
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <gtest/gtest.h>
+
+#include "hbase/client/zk-util.h"
+
+using hbase::Configuration;
+using hbase::ZKUtil;
+
+TEST(ZKUtilTest, ParseZooKeeperQuorum) {
+  Configuration conf{};
+  conf.Set(ZKUtil::kHBaseZookeeperQuorum_, "s1");
+  conf.SetInt(ZKUtil::kHBaseZookeeperClientPort_, 100);
+
+  ASSERT_EQ("s1:100", ZKUtil::ParseZooKeeperQuorum(conf));
+
+  conf.Set(ZKUtil::kHBaseZookeeperQuorum_, "s1:42");
+
+  ASSERT_EQ("s1:42", ZKUtil::ParseZooKeeperQuorum(conf));
+
+  conf.Set(ZKUtil::kHBaseZookeeperQuorum_, "s1,s2,s3");
+  ASSERT_EQ("s1:100,s2:100,s3:100", ZKUtil::ParseZooKeeperQuorum(conf));
+
+  conf.Set(ZKUtil::kHBaseZookeeperQuorum_, "s1:42,s2:42,s3:42");
+  ASSERT_EQ("s1:42,s2:42,s3:42", ZKUtil::ParseZooKeeperQuorum(conf));
+}
+
+TEST(ZKUtilTest, MetaZNode) {
+  Configuration conf{};
+  ASSERT_EQ("/hbase/meta-region-server", ZKUtil::MetaZNode(conf));
+
+  conf.Set(ZKUtil::kHBaseZnodeParent_, "/hbase-secure");
+  ASSERT_EQ("/hbase-secure/meta-region-server", ZKUtil::MetaZNode(conf));
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/client/zk-util.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/client/zk-util.cc b/hbase-native-client/src/hbase/client/zk-util.cc
new file mode 100644
index 0000000..38a0a2e
--- /dev/null
+++ b/hbase-native-client/src/hbase/client/zk-util.cc
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/client/zk-util.h"
+
+#include <folly/Conv.h>
+#include <boost/algorithm/string.hpp>
+
+#include <vector>
+
+namespace hbase {
+
+/**
+ * Returns a "proper" zookeeper quorum string, from hbase's broken quorum string formats. In
+ * hbase.zookeeper.quorum, the ports are not listed explicitly per server (eg. s1,s2,s3),
+ * however ZooKeeper expects the string of the format s1:2181,s2:2181,s3:2181. This code
+ * appends the "clientPort" to each node in the quorum string if not there.
+ */
+std::string ZKUtil::ParseZooKeeperQuorum(const hbase::Configuration& conf) {
+  auto zk_quorum = conf.Get(kHBaseZookeeperQuorum_, kDefHBaseZookeeperQuorum_);
+  auto zk_port = conf.GetInt(kHBaseZookeeperClientPort_, kDefHBaseZookeeperClientPort_);
+
+  std::vector<std::string> zk_quorum_parts;
+  boost::split(zk_quorum_parts, zk_quorum, boost::is_any_of(","), boost::token_compress_on);
+  std::vector<std::string> servers;
+  for (auto server : zk_quorum_parts) {
+    if (boost::contains(server, ":")) {
+      servers.push_back(server);
+    } else {
+      servers.push_back(server + ":" + folly::to<std::string>(zk_port));
+    }
+  }
+  return boost::join(servers, ",");
+}
+
+std::string ZKUtil::MetaZNode(const hbase::Configuration& conf) {
+  std::string zk_node = conf.Get(kHBaseZnodeParent_, kDefHBaseZnodeParent_) + "/";
+  zk_node += kHBaseMetaRegionServer_;
+  return zk_node;
+}
+
+int32_t ZKUtil::SessionTimeout(const hbase::Configuration& conf) {
+  return conf.GetInt(kHBaseZookeeperSessionTimeout_, kDefHBaseZookeeperSessionTimeout_);
+}
+
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/connection/BUCK b/hbase-native-client/src/hbase/connection/BUCK
new file mode 100644
index 0000000..1a856fb
--- /dev/null
+++ b/hbase-native-client/src/hbase/connection/BUCK
@@ -0,0 +1,68 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This is the library dealing with a single connection
+# to a single server.
+cxx_library(
+    name="connection",
+    srcs=[
+        "client-dispatcher.cc",
+        "client-handler.cc",
+        "connection-factory.cc",
+        "connection-pool.cc",
+        "pipeline.cc",
+        "request.cc",
+        "rpc-client.cc",
+        "sasl-handler.cc",
+        "sasl-util.cc",
+        "rpc-test-server.cc",
+        "rpc-test-server-handler.cc",
+        "rpc-fault-injector.cc",
+    ],
+    deps=[
+        "//include/hbase/connection:connection",
+        "//src/hbase/if:if",
+        "//src/hbase/utils:utils",
+        "//src/hbase/serde:serde",
+        "//src/hbase/security:security",
+        "//third-party:folly",
+        "//third-party:wangle",
+        "//src/hbase/exceptions:exceptions",
+    ],
+    compiler_flags=['-Weffc++'],
+    linker_flags=['-L/usr/local/lib', '-lsasl2', '-lkrb5'],
+    exported_linker_flags=['-L/usr/local/lib', '-lsasl2', '-lkrb5'],
+    visibility=[
+        '//src/hbase/client/...',
+    ],)
+cxx_test(
+    name="connection-pool-test",
+    srcs=[
+        "connection-pool-test.cc",
+    ],
+    deps=[
+        ":connection",
+    ],)
+cxx_test(
+    name="rpc-test",
+    srcs=[
+        "rpc-test.cc",
+    ],
+    deps=[
+        ":connection",
+    ],
+    run_test_separately=True,)

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/client-dispatcher.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/connection/client-dispatcher.cc b/hbase-native-client/src/hbase/connection/client-dispatcher.cc
new file mode 100644
index 0000000..302ab6b
--- /dev/null
+++ b/hbase-native-client/src/hbase/connection/client-dispatcher.cc
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "hbase/connection/client-dispatcher.h"
+
+#include <folly/ExceptionWrapper.h>
+#include <folly/Format.h>
+#include <folly/io/async/AsyncSocketException.h>
+#include <utility>
+
+#include "hbase/connection/rpc-connection.h"
+#include "hbase/exceptions/exception.h"
+
+using std::unique_ptr;
+
+namespace hbase {
+
+ClientDispatcher::ClientDispatcher(const std::string &server)
+    : current_call_id_(9), requests_(5000), server_(server), is_closed_(false) {}
+
+void ClientDispatcher::read(Context *ctx, unique_ptr<Response> in) {
+  VLOG(5) << "ClientDispatcher::read()";
+  auto call_id = in->call_id();
+  auto p = requests_.find_and_erase(call_id);
+
+  VLOG(3) << folly::sformat("Read hbase::Response, call_id: {}, hasException: {}, what: {}",
+                            in->call_id(), bool(in->exception()), in->exception().what());
+
+  if (in->exception()) {
+    p.setException(in->exception());
+  } else {
+    p.setValue(std::move(in));
+  }
+}
+
+void ClientDispatcher::readException(Context *ctx, folly::exception_wrapper e) {
+  VLOG(5) << "ClientDispatcher::readException()";
+  CloseAndCleanUpCalls();
+}
+
+void ClientDispatcher::readEOF(Context *ctx) {
+  VLOG(5) << "ClientDispatcher::readEOF()";
+  CloseAndCleanUpCalls();
+}
+
+folly::Future<unique_ptr<Response>> ClientDispatcher::operator()(unique_ptr<Request> arg) {
+  VLOG(5) << "ClientDispatcher::operator()";
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  if (is_closed_) {
+    throw ConnectionException("Connection closed already");
+  }
+
+  auto call_id = current_call_id_++;
+  arg->set_call_id(call_id);
+
+  // TODO: if the map is full (or we have more than hbase.client.perserver.requests.threshold)
+  // then throw ServerTooBusyException so that upper layers will retry.
+  auto &p = requests_[call_id];
+
+  auto f = p.getFuture();
+  p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) {
+    LOG(ERROR) << "e = " << call_id;
+    this->requests_.erase(call_id);
+    // TODO: call Promise::SetException()?
+  });
+
+  try {
+    this->pipeline_->write(std::move(arg));
+  } catch (const folly::AsyncSocketException &e) {
+    p.setException(folly::exception_wrapper{ConnectionException{folly::exception_wrapper{e}}});
+    /* clear folly::Promise to avoid overflow. */
+    requests_.erase(call_id);
+  }
+
+  return f;
+}
+
+void ClientDispatcher::CloseAndCleanUpCalls() {
+  VLOG(5) << "ClientDispatcher::CloseAndCleanUpCalls()";
+  std::lock_guard<std::recursive_mutex> lock(mutex_);
+  if (is_closed_) {
+    return;
+  }
+  for (auto &pair : requests_) {
+    pair.second.setException(IOException{"Connection closed to server:" + server_});
+  }
+  requests_.clear();
+  is_closed_ = true;
+}
+
+folly::Future<folly::Unit> ClientDispatcher::close() {
+  CloseAndCleanUpCalls();
+  return ClientDispatcherBase::close();
+}
+
+folly::Future<folly::Unit> ClientDispatcher::close(Context *ctx) {
+  CloseAndCleanUpCalls();
+  return ClientDispatcherBase::close(ctx);
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/client-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/connection/client-handler.cc b/hbase-native-client/src/hbase/connection/client-handler.cc
new file mode 100644
index 0000000..c963c20
--- /dev/null
+++ b/hbase-native-client/src/hbase/connection/client-handler.cc
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/connection/client-handler.h"
+
+#include <folly/ExceptionWrapper.h>
+#include <folly/Likely.h>
+#include <folly/io/async/AsyncSocketException.h>
+#include <glog/logging.h>
+#include <string>
+
+#include "hbase/connection/request.h"
+#include "hbase/connection/response.h"
+#include "hbase/if/Client.pb.h"
+#include "hbase/if/RPC.pb.h"
+
+using google::protobuf::Message;
+
+namespace hbase {
+
+ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec,
+                             std::shared_ptr<Configuration> conf, const std::string &server)
+    : user_name_(user_name),
+      serde_(codec),
+      conf_(conf),
+      server_(server),
+      once_flag_(std::make_unique<std::once_flag>()),
+      resp_msgs_(
+          std::make_unique<concurrent_map<uint32_t, std::shared_ptr<google::protobuf::Message>>>(
+              5000)) {}
+
+void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) {
+  if (LIKELY(buf != nullptr)) {
+    buf->coalesce();
+    auto received = std::make_unique<Response>();
+    pb::ResponseHeader header;
+
+    int used_bytes = serde_.ParseDelimited(buf.get(), &header);
+    VLOG(3) << "Read RPC ResponseHeader size=" << used_bytes << " call_id=" << header.call_id()
+            << " has_exception=" << header.has_exception() << ", server: " << server_;
+
+    auto resp_msg = resp_msgs_->find_and_erase(header.call_id());
+
+    // set the call_id.
+    // This will be used to by the dispatcher to match up
+    // the promise with the response.
+    received->set_call_id(header.call_id());
+
+    // If there was an exception then there's no
+    // data left on the wire.
+    if (header.has_exception() == false) {
+      buf->trimStart(used_bytes);
+
+      int cell_block_length = 0;
+      used_bytes = serde_.ParseDelimited(buf.get(), resp_msg.get());
+      if (header.has_cell_block_meta() && header.cell_block_meta().has_length()) {
+        cell_block_length = header.cell_block_meta().length();
+      }
+
+      VLOG(3) << "Read RPCResponse, buf length:" << buf->length()
+              << ", header PB length:" << used_bytes << ", cell_block length:" << cell_block_length
+              << ", server: " << server_;
+
+      // Make sure that bytes were parsed.
+      CHECK((used_bytes + cell_block_length) == buf->length());
+
+      if (cell_block_length > 0) {
+        auto cell_scanner = serde_.CreateCellScanner(std::move(buf), used_bytes, cell_block_length);
+        received->set_cell_scanner(std::shared_ptr<CellScanner>{cell_scanner.release()});
+      }
+
+      received->set_resp_msg(resp_msg);
+    } else {
+      hbase::pb::ExceptionResponse exceptionResponse = header.exception();
+
+      std::string what;
+      std::string exception_class_name = exceptionResponse.has_exception_class_name()
+                                             ? exceptionResponse.exception_class_name()
+                                             : "";
+      std::string stack_trace =
+          exceptionResponse.has_stack_trace() ? exceptionResponse.stack_trace() : "";
+      what.append(stack_trace);
+
+      auto remote_exception = std::make_unique<RemoteException>(what);
+      remote_exception->set_exception_class_name(exception_class_name)
+          ->set_stack_trace(stack_trace)
+          ->set_hostname(exceptionResponse.has_hostname() ? exceptionResponse.hostname() : "")
+          ->set_port(exceptionResponse.has_port() ? exceptionResponse.port() : 0);
+      if (exceptionResponse.has_do_not_retry()) {
+        remote_exception->set_do_not_retry(exceptionResponse.do_not_retry());
+      }
+
+      VLOG(3) << "Exception RPC ResponseHeader, call_id=" << header.call_id()
+              << " exception.what=" << remote_exception->what()
+              << ", do_not_retry=" << remote_exception->do_not_retry() << ", server: " << server_;
+      received->set_exception(folly::exception_wrapper{*remote_exception});
+    }
+    ctx->fireRead(std::move(received));
+  }
+}
+
+folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
+  /* for RPC test, there's no need to send connection header */
+  if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE,
+                      RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) {
+    // We need to send the header once.
+    // So use call_once to make sure that only one thread wins this.
+    std::call_once((*once_flag_), [ctx, this]() {
+      VLOG(3) << "Writing RPC Header to server: " << server_;
+      auto header = serde_.Header(user_name_);
+      ctx->fireWrite(std::move(header));
+    });
+  }
+
+  VLOG(3) << "Writing RPC Request:" << r->DebugString() << ", server: " << server_;
+
+  // Now store the call id to response.
+  resp_msgs_->insert(std::make_pair(r->call_id(), r->resp_msg()));
+
+  try {
+    // Send the data down the pipeline.
+    return ctx->fireWrite(serde_.Request(r->call_id(), r->method(), r->req_msg().get()));
+  } catch (const folly::AsyncSocketException &e) {
+    /* clear protobuf::Message to avoid overflow. */
+    resp_msgs_->erase(r->call_id());
+    throw e;
+  }
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/connection-factory.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/connection/connection-factory.cc b/hbase-native-client/src/hbase/connection/connection-factory.cc
new file mode 100644
index 0000000..14ac22b
--- /dev/null
+++ b/hbase-native-client/src/hbase/connection/connection-factory.cc
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <glog/logging.h>
+#include <wangle/channel/Handler.h>
+
+#include <chrono>
+
+#include <folly/ExceptionWrapper.h>
+#include <folly/SocketAddress.h>
+#include <folly/io/async/AsyncSocketException.h>
+
+#include "hbase/connection/client-dispatcher.h"
+#include "hbase/connection/connection-factory.h"
+#include "hbase/connection/pipeline.h"
+#include "hbase/connection/sasl-handler.h"
+#include "hbase/connection/service.h"
+#include "hbase/exceptions/exception.h"
+
+using std::chrono::milliseconds;
+using std::chrono::nanoseconds;
+
+namespace hbase {
+
+ConnectionFactory::ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+                                     std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
+                                     std::shared_ptr<Codec> codec,
+                                     std::shared_ptr<Configuration> conf,
+                                     nanoseconds connect_timeout)
+    : connect_timeout_(connect_timeout),
+      io_executor_(io_executor),
+      cpu_executor_(cpu_executor),
+      conf_(conf),
+      pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec, conf)) {}
+
+std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> ConnectionFactory::MakeBootstrap() {
+  auto client = std::make_shared<wangle::ClientBootstrap<SerializePipeline>>();
+  client->group(io_executor_);
+  client->pipelineFactory(pipeline_factory_);
+
+  // TODO: Opened https://github.com/facebook/wangle/issues/85 in wangle so that we can set socket
+  //  options like TCP_NODELAY, SO_KEEPALIVE, CONNECT_TIMEOUT_MILLIS, etc.
+
+  return client;
+}
+
+std::shared_ptr<HBaseService> ConnectionFactory::Connect(
+    std::shared_ptr<RpcConnection> rpc_connection,
+    std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client_bootstrap,
+    const std::string &hostname, uint16_t port) {
+  // connection should happen from an IO thread
+  try {
+    auto future = via(io_executor_.get()).then([=]() {
+      VLOG(1) << "Connecting to server: " << hostname << ":" << port;
+      return client_bootstrap->connect(folly::SocketAddress(hostname, port, true),
+                                       std::chrono::duration_cast<milliseconds>(connect_timeout_));
+    });
+
+    // See about using shared promise for this.
+    auto pipeline = future.get();
+
+    VLOG(1) << "Connected to server: " << hostname << ":" << port;
+    auto dispatcher =
+        std::make_shared<ClientDispatcher>(hostname + ":" + folly::to<std::string>(port));
+    dispatcher->setPipeline(pipeline);
+    return dispatcher;
+  } catch (const folly::AsyncSocketException &e) {
+    throw ConnectionException(folly::exception_wrapper{e});
+  }
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/connection-pool-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/connection/connection-pool-test.cc b/hbase-native-client/src/hbase/connection/connection-pool-test.cc
new file mode 100644
index 0000000..5886a42
--- /dev/null
+++ b/hbase-native-client/src/hbase/connection/connection-pool-test.cc
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Logging.h>
+#include <gmock/gmock.h>
+
+#include "hbase/connection/connection-factory.h"
+#include "hbase/connection/connection-id.h"
+#include "hbase/connection/connection-pool.h"
+#include "hbase/if/HBase.pb.h"
+#include "hbase/serde/server-name.h"
+
+using hbase::pb::ServerName;
+using ::testing::Return;
+using ::testing::_;
+using hbase::ConnectionFactory;
+using hbase::ConnectionPool;
+using hbase::ConnectionId;
+using hbase::HBaseService;
+using hbase::Request;
+using hbase::Response;
+using hbase::RpcConnection;
+using hbase::SerializePipeline;
+
+class MockConnectionFactory : public ConnectionFactory {
+ public:
+  MockConnectionFactory() : ConnectionFactory(nullptr, nullptr, nullptr, nullptr) {}
+  MOCK_METHOD0(MakeBootstrap, std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>());
+  MOCK_METHOD4(Connect, std::shared_ptr<HBaseService>(
+                            std::shared_ptr<RpcConnection> rpc_connection,
+                            std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>,
+                            const std::string &hostname, uint16_t port));
+};
+
+class MockBootstrap : public wangle::ClientBootstrap<SerializePipeline> {};
+
+class MockService : public HBaseService {
+ public:
+  folly::Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> req) override {
+    return folly::makeFuture<std::unique_ptr<Response>>(
+        std::make_unique<Response>(do_operation(req.get())));
+  }
+  MOCK_METHOD1(do_operation, Response(Request *));
+};
+
+TEST(TestConnectionPool, TestOnlyCreateOnce) {
+  auto hostname = std::string{"hostname"};
+  auto mock_boot = std::make_shared<MockBootstrap>();
+  auto mock_service = std::make_shared<MockService>();
+  auto mock_cf = std::make_shared<MockConnectionFactory>();
+  uint32_t port{999};
+
+  EXPECT_CALL((*mock_cf), Connect(_, _, _, _)).Times(1).WillRepeatedly(Return(mock_service));
+  EXPECT_CALL((*mock_cf), MakeBootstrap()).Times(1).WillRepeatedly(Return(mock_boot));
+  EXPECT_CALL((*mock_service), do_operation(_)).Times(1).WillRepeatedly(Return(Response{}));
+  ConnectionPool cp{mock_cf};
+
+  auto remote_id = std::make_shared<ConnectionId>(hostname, port);
+  auto result = cp.GetConnection(remote_id);
+  ASSERT_TRUE(result != nullptr);
+  result = cp.GetConnection(remote_id);
+  result->SendRequest(nullptr);
+}
+
+TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) {
+  std::string hostname_one{"hostname"};
+  std::string hostname_two{"hostname_two"};
+  uint32_t port{999};
+
+  auto mock_boot = std::make_shared<MockBootstrap>();
+  auto mock_service = std::make_shared<MockService>();
+  auto mock_cf = std::make_shared<MockConnectionFactory>();
+
+  EXPECT_CALL((*mock_cf), Connect(_, _, _, _)).Times(2).WillRepeatedly(Return(mock_service));
+  EXPECT_CALL((*mock_cf), MakeBootstrap()).Times(2).WillRepeatedly(Return(mock_boot));
+  EXPECT_CALL((*mock_service), do_operation(_)).Times(4).WillRepeatedly(Return(Response{}));
+  ConnectionPool cp{mock_cf};
+
+  {
+    auto remote_id = std::make_shared<ConnectionId>(hostname_one, port);
+    auto result_one = cp.GetConnection(remote_id);
+    result_one->SendRequest(nullptr);
+    auto remote_id2 = std::make_shared<ConnectionId>(hostname_two, port);
+    auto result_two = cp.GetConnection(remote_id2);
+    result_two->SendRequest(nullptr);
+  }
+  auto remote_id = std::make_shared<ConnectionId>(hostname_one, port);
+  auto result_one = cp.GetConnection(remote_id);
+  result_one->SendRequest(nullptr);
+  auto remote_id2 = std::make_shared<ConnectionId>(hostname_two, port);
+  auto result_two = cp.GetConnection(remote_id2);
+  result_two->SendRequest(nullptr);
+}
+
+TEST(TestConnectionPool, TestCreateOneConnectionForOneService) {
+  std::string hostname{"hostname"};
+  uint32_t port{999};
+  std::string service1{"service1"};
+  std::string service2{"service2"};
+
+  auto mock_boot = std::make_shared<MockBootstrap>();
+  auto mock_service = std::make_shared<MockService>();
+  auto mock_cf = std::make_shared<MockConnectionFactory>();
+
+  EXPECT_CALL((*mock_cf), Connect(_, _, _, _)).Times(2).WillRepeatedly(Return(mock_service));
+  EXPECT_CALL((*mock_cf), MakeBootstrap()).Times(2).WillRepeatedly(Return(mock_boot));
+  EXPECT_CALL((*mock_service), do_operation(_)).Times(4).WillRepeatedly(Return(Response{}));
+  ConnectionPool cp{mock_cf};
+
+  {
+    auto remote_id = std::make_shared<ConnectionId>(hostname, port, service1);
+    auto result_one = cp.GetConnection(remote_id);
+    result_one->SendRequest(nullptr);
+    auto remote_id2 = std::make_shared<ConnectionId>(hostname, port, service2);
+    auto result_two = cp.GetConnection(remote_id2);
+    result_two->SendRequest(nullptr);
+  }
+  auto remote_id = std::make_shared<ConnectionId>(hostname, port, service1);
+  auto result_one = cp.GetConnection(remote_id);
+  result_one->SendRequest(nullptr);
+  auto remote_id2 = std::make_shared<ConnectionId>(hostname, port, service2);
+  auto result_two = cp.GetConnection(remote_id2);
+  result_two->SendRequest(nullptr);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/connection-pool.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/connection/connection-pool.cc b/hbase-native-client/src/hbase/connection/connection-pool.cc
new file mode 100644
index 0000000..92e87f8
--- /dev/null
+++ b/hbase-native-client/src/hbase/connection/connection-pool.cc
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/connection/connection-pool.h"
+
+#include <folly/Conv.h>
+#include <folly/Logging.h>
+#include <wangle/service/Service.h>
+
+#include <memory>
+#include <string>
+#include <utility>
+
+using std::chrono::nanoseconds;
+
+namespace hbase {
+
+ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+                               std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
+                               std::shared_ptr<Codec> codec, std::shared_ptr<Configuration> conf,
+                               nanoseconds connect_timeout)
+    : cf_(std::make_shared<ConnectionFactory>(io_executor, cpu_executor, codec, conf,
+                                              connect_timeout)),
+      connections_(),
+      map_mutex_(),
+      conf_(conf) {}
+ConnectionPool::ConnectionPool(std::shared_ptr<ConnectionFactory> cf)
+    : cf_(cf), connections_(), map_mutex_() {}
+
+ConnectionPool::~ConnectionPool() {}
+
+std::shared_ptr<RpcConnection> ConnectionPool::GetConnection(
+    std::shared_ptr<ConnectionId> remote_id) {
+  // Try and get th cached connection.
+  auto found_ptr = GetCachedConnection(remote_id);
+
+  // If there's no connection then create it.
+  if (found_ptr == nullptr) {
+    found_ptr = GetNewConnection(remote_id);
+  }
+  return found_ptr;
+}
+
+std::shared_ptr<RpcConnection> ConnectionPool::GetCachedConnection(
+    std::shared_ptr<ConnectionId> remote_id) {
+  folly::SharedMutexWritePriority::ReadHolder holder(map_mutex_);
+  auto found = connections_.find(remote_id);
+  if (found == connections_.end()) {
+    return nullptr;
+  }
+  return found->second;
+}
+
+std::shared_ptr<RpcConnection> ConnectionPool::GetNewConnection(
+    std::shared_ptr<ConnectionId> remote_id) {
+  // Grab the upgrade lock. While we are double checking other readers can
+  // continue on
+  folly::SharedMutexWritePriority::UpgradeHolder u_holder{map_mutex_};
+
+  // Now check if someone else created the connection before we got the lock
+  // This is safe since we hold the upgrade lock.
+  // upgrade lock is more power than the reader lock.
+  auto found = connections_.find(remote_id);
+  if (found != connections_.end() && found->second != nullptr) {
+    return found->second;
+  } else {
+    // Yeah it looks a lot like there's no connection
+    folly::SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)};
+
+    // Make double sure there are not stale connections hanging around.
+    connections_.erase(remote_id);
+
+    /* create new connection */
+    auto connection = std::make_shared<RpcConnection>(remote_id, cf_);
+
+    connections_.insert(std::make_pair(remote_id, connection));
+
+    return connection;
+  }
+}
+
+void ConnectionPool::Close(std::shared_ptr<ConnectionId> remote_id) {
+  folly::SharedMutexWritePriority::WriteHolder holder{map_mutex_};
+  DLOG(INFO) << "Closing RPC Connection to host:" << remote_id->host()
+             << ", port:" << folly::to<std::string>(remote_id->port());
+
+  auto found = connections_.find(remote_id);
+  if (found == connections_.end() || found->second == nullptr) {
+    return;
+  }
+  found->second->Close();
+  connections_.erase(found);
+}
+
+void ConnectionPool::Close() {
+  folly::SharedMutexWritePriority::WriteHolder holder{map_mutex_};
+  for (auto &item : connections_) {
+    auto &con = item.second;
+    con->Close();
+  }
+  connections_.clear();
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/pipeline.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/connection/pipeline.cc b/hbase-native-client/src/hbase/connection/pipeline.cc
new file mode 100644
index 0000000..45ac0c0
--- /dev/null
+++ b/hbase-native-client/src/hbase/connection/pipeline.cc
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "hbase/connection/pipeline.h"
+
+#include <folly/Logging.h>
+#include <wangle/channel/AsyncSocketHandler.h>
+#include <wangle/channel/EventBaseHandler.h>
+#include <wangle/channel/OutputBufferingHandler.h>
+#include <wangle/codec/LengthFieldBasedFrameDecoder.h>
+
+#include "hbase/connection/client-handler.h"
+#include "hbase/connection/sasl-handler.h"
+
+namespace hbase {
+
+RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr<Codec> codec,
+                                       std::shared_ptr<Configuration> conf)
+    : user_util_(), codec_(codec), conf_(conf) {}
+SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
+    std::shared_ptr<folly::AsyncTransportWrapper> sock) {
+  folly::SocketAddress addr;  // for logging
+  sock->getPeerAddress(&addr);
+
+  auto pipeline = SerializePipeline::create();
+  pipeline->addBack(wangle::AsyncSocketHandler{sock});
+  pipeline->addBack(wangle::EventBaseHandler{});
+  bool secure = false;
+  /* for RPC test, there's no need to setup Sasl */
+  if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE,
+                      RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) {
+    secure = security::User::IsSecurityEnabled(*conf_);
+    pipeline->addBack(SaslHandler{user_util_.user_name(secure), conf_});
+  }
+  pipeline->addBack(wangle::LengthFieldBasedFrameDecoder{});
+  pipeline->addBack(ClientHandler{user_util_.user_name(secure), codec_, conf_, addr.describe()});
+  pipeline->finalize();
+  return pipeline;
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/request.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/connection/request.cc b/hbase-native-client/src/hbase/connection/request.cc
new file mode 100644
index 0000000..91a6119
--- /dev/null
+++ b/hbase-native-client/src/hbase/connection/request.cc
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/connection/request.h"
+
+#include "hbase/if/Client.pb.h"
+
+namespace hbase {
+
+Request::Request(std::shared_ptr<google::protobuf::Message> req,
+                 std::shared_ptr<google::protobuf::Message> resp, std::string method)
+    : req_msg_(req), resp_msg_(resp), method_(method), call_id_(0) {}
+
+std::unique_ptr<Request> Request::get() {
+  return std::make_unique<Request>(std::make_shared<hbase::pb::GetRequest>(),
+                                   std::make_shared<hbase::pb::GetResponse>(), "Get");
+}
+std::unique_ptr<Request> Request::mutate() {
+  return std::make_unique<Request>(std::make_shared<hbase::pb::MutateRequest>(),
+                                   std::make_shared<hbase::pb::MutateResponse>(), "Mutate");
+}
+std::unique_ptr<Request> Request::scan() {
+  return std::make_unique<Request>(std::make_shared<hbase::pb::ScanRequest>(),
+                                   std::make_shared<hbase::pb::ScanResponse>(), "Scan");
+}
+std::unique_ptr<Request> Request::multi() {
+  return std::make_unique<Request>(std::make_shared<hbase::pb::MultiRequest>(),
+                                   std::make_shared<hbase::pb::MultiResponse>(), "Multi");
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/rpc-client.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/connection/rpc-client.cc b/hbase-native-client/src/hbase/connection/rpc-client.cc
new file mode 100644
index 0000000..d73829e
--- /dev/null
+++ b/hbase-native-client/src/hbase/connection/rpc-client.cc
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/connection/rpc-client.h"
+
+#include <folly/Format.h>
+#include <folly/Logging.h>
+#include <folly/futures/Future.h>
+#include <unistd.h>
+#include <wangle/concurrent/IOThreadPoolExecutor.h>
+#include "hbase/exceptions/exception.h"
+
+using hbase::security::User;
+using std::chrono::nanoseconds;
+
+namespace hbase {
+
+RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor,
+                     std::shared_ptr<wangle::CPUThreadPoolExecutor> cpu_executor,
+                     std::shared_ptr<Codec> codec, std::shared_ptr<Configuration> conf,
+                     nanoseconds connect_timeout)
+    : io_executor_(io_executor), conf_(conf) {
+  cp_ = std::make_shared<ConnectionPool>(io_executor_, cpu_executor, codec, conf, connect_timeout);
+}
+
+void RpcClient::Close() { io_executor_->stop(); }
+
+std::unique_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port,
+                                              std::unique_ptr<Request> req,
+                                              std::shared_ptr<User> ticket) {
+  return AsyncCall(host, port, std::move(req), ticket).get();
+}
+
+std::unique_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port,
+                                              std::unique_ptr<Request> req,
+                                              std::shared_ptr<User> ticket,
+                                              const std::string& service_name) {
+  return AsyncCall(host, port, std::move(req), ticket, service_name).get();
+}
+
+folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& host,
+                                                              uint16_t port,
+                                                              std::unique_ptr<Request> req,
+                                                              std::shared_ptr<User> ticket) {
+  auto remote_id = std::make_shared<ConnectionId>(host, port, ticket);
+  return SendRequest(remote_id, std::move(req));
+}
+
+folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& host,
+                                                              uint16_t port,
+                                                              std::unique_ptr<Request> req,
+                                                              std::shared_ptr<User> ticket,
+                                                              const std::string& service_name) {
+  auto remote_id = std::make_shared<ConnectionId>(host, port, ticket, service_name);
+  return SendRequest(remote_id, std::move(req));
+}
+
+/**
+ * There are two cases for ConnectionException:
+ * 1. The first time connection
+ * establishment, i.e. GetConnection(remote_id), AsyncSocketException being a cause.
+ * 2. Writing request down the pipeline, i.e. RpcConnection::SendRequest, AsyncSocketException being
+ * a cause as well.
+ */
+folly::Future<std::unique_ptr<Response>> RpcClient::SendRequest(
+    std::shared_ptr<ConnectionId> remote_id, std::unique_ptr<Request> req) {
+  try {
+    return GetConnection(remote_id)
+        ->SendRequest(std::move(req))
+        .onError([&, this](const folly::exception_wrapper& ew) {
+          VLOG(3) << folly::sformat("RpcClient Exception: {}", ew.what());
+          ew.with_exception([&, this](const hbase::ConnectionException& re) {
+            /* bad connection, remove it from pool. */
+            cp_->Close(remote_id);
+          });
+          return GetFutureWithException(ew);
+        });
+  } catch (const ConnectionException& e) {
+    CHECK(e.cause().get_exception() != nullptr);
+    VLOG(3) << folly::sformat("RpcClient Exception: {}", e.cause().what());
+    /* bad connection, remove it from pool. */
+    cp_->Close(remote_id);
+    return GetFutureWithException(e);
+  }
+}
+
+template <typename EXCEPTION>
+folly::Future<std::unique_ptr<Response>> RpcClient::GetFutureWithException(const EXCEPTION& e) {
+  return GetFutureWithException(folly::exception_wrapper{e});
+}
+
+folly::Future<std::unique_ptr<Response>> RpcClient::GetFutureWithException(
+    const folly::exception_wrapper& ew) {
+  folly::Promise<std::unique_ptr<Response>> promise;
+  auto future = promise.getFuture();
+  promise.setException(ew);
+  return future;
+}
+
+std::shared_ptr<RpcConnection> RpcClient::GetConnection(std::shared_ptr<ConnectionId> remote_id) {
+  return cp_->GetConnection(remote_id);
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/rpc-fault-injector.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/connection/rpc-fault-injector.cc b/hbase-native-client/src/hbase/connection/rpc-fault-injector.cc
new file mode 100644
index 0000000..202b21d
--- /dev/null
+++ b/hbase-native-client/src/hbase/connection/rpc-fault-injector.cc
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "hbase/connection/rpc-fault-injector.h"
+
+namespace hbase {} /* namespace hbase */

http://git-wip-us.apache.org/repos/asf/hbase/blob/128fc306/hbase-native-client/src/hbase/connection/rpc-test-server-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/src/hbase/connection/rpc-test-server-handler.cc b/hbase-native-client/src/hbase/connection/rpc-test-server-handler.cc
new file mode 100644
index 0000000..b371ba9
--- /dev/null
+++ b/hbase-native-client/src/hbase/connection/rpc-test-server-handler.cc
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "hbase/connection/rpc-test-server-handler.h"
+#include "hbase/if/RPC.pb.h"
+#include "hbase/if/test.pb.h"
+
+namespace hbase {
+
+void RpcTestServerSerializeHandler::read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) {
+  buf->coalesce();
+  pb::RequestHeader header;
+
+  int used_bytes = serde_.ParseDelimited(buf.get(), &header);
+  VLOG(3) << "Read RPC RequestHeader size=" << used_bytes << " call_id=" << header.call_id();
+
+  auto received = CreateReceivedRequest(header.method_name());
+
+  buf->trimStart(used_bytes);
+  if (header.has_request_param() && received != nullptr) {
+    used_bytes = serde_.ParseDelimited(buf.get(), received->req_msg().get());
+    VLOG(3) << "Read RPCRequest, buf length:" << buf->length()
+            << ", header PB length:" << used_bytes;
+    received->set_call_id(header.call_id());
+  }
+
+  if (received != nullptr) {
+    ctx->fireRead(std::move(received));
+  }
+}
+
+folly::Future<folly::Unit> RpcTestServerSerializeHandler::write(Context* ctx,
+                                                                std::unique_ptr<Response> resp) {
+  VLOG(3) << "Writing RPC Request";
+  // Send the data down the pipeline.
+  return ctx->fireWrite(
+      serde_.Response(resp->call_id(), resp->resp_msg().get(), resp->exception()));
+}
+
+std::unique_ptr<Request> RpcTestServerSerializeHandler::CreateReceivedRequest(
+    const std::string& method_name) {
+  std::unique_ptr<Request> result = nullptr;
+
+  if (method_name == "ping") {
+    result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+                                       std::make_shared<EmptyResponseProto>(), method_name);
+  } else if (method_name == "echo") {
+    result = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
+                                       std::make_shared<EchoResponseProto>(), method_name);
+  } else if (method_name == "error") {
+    result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+                                       std::make_shared<EmptyResponseProto>(), method_name);
+  } else if (method_name == "pause") {
+    result = std::make_unique<Request>(std::make_shared<PauseRequestProto>(),
+                                       std::make_shared<EmptyResponseProto>(), method_name);
+  } else if (method_name == "addr") {
+    result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+                                       std::make_shared<AddrResponseProto>(), method_name);
+  } else if (method_name == "socketNotOpen") {
+    result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+                                       std::make_shared<EmptyResponseProto>(), method_name);
+  }
+  return result;
+}
+}  // end of namespace hbase


Mime
View raw message