hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zg...@apache.org
Subject [hbase] 94/133: HBASE-18174 Implement Table#checkAndPut()
Date Tue, 12 Mar 2019 12:46:22 GMT
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 56be04bb058efda4b65e806fc4d59fad42cb9dac
Author: tedyu <yuzhihong@gmail.com>
AuthorDate: Fri Jun 9 17:19:57 2017 -0700

    HBASE-18174 Implement Table#checkAndPut()
---
 .../core/async-rpc-retrying-caller.cc              |  1 +
 hbase-native-client/core/client-test.cc            | 29 ++++++++++++++++++++++
 hbase-native-client/core/raw-async-table.cc        | 26 +++++++++++++++++++
 hbase-native-client/core/raw-async-table.h         |  5 ++++
 hbase-native-client/core/request-converter.cc      | 21 ++++++++++++++++
 hbase-native-client/core/request-converter.h       |  5 ++++
 hbase-native-client/core/response-converter.cc     |  5 ++++
 hbase-native-client/core/response-converter.h      |  2 ++
 hbase-native-client/core/table.cc                  |  7 ++++++
 hbase-native-client/core/table.h                   | 17 +++++++++++++
 10 files changed, 118 insertions(+)

diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc
index b9d01bb..0302ad3 100644
--- a/hbase-native-client/core/async-rpc-retrying-caller.cc
+++ b/hbase-native-client/core/async-rpc-retrying-caller.cc
@@ -216,4 +216,5 @@ class OpenScannerResponse;
 template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<hbase::Result>>;
 template class AsyncSingleRequestRpcRetryingCaller<folly::Unit>;
 template class AsyncSingleRequestRpcRetryingCaller<std::shared_ptr<OpenScannerResponse>>;
+template class AsyncSingleRequestRpcRetryingCaller<bool>;
 } /* namespace hbase */
diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc
index 4972e05..ed413e8 100644
--- a/hbase-native-client/core/client-test.cc
+++ b/hbase-native-client/core/client-test.cc
@@ -247,6 +247,35 @@ TEST_F(ClientTest, Increment) {
   EXPECT_EQ(incr1 + incr2, hbase::BytesUtil::ToInt64(*(result->Value("d", "1"))));
 }
 
+TEST_F(ClientTest, CheckAndPut) {
+  // Using TestUtil to populate test data
+  ClientTest::test_util->CreateTable("check", "d");
+
+  // Create TableName and Row to be fetched from HBase
+  auto tn = folly::to<hbase::pb::TableName>("check");
+  auto row = "test1";
+
+  // Create a client
+  hbase::Client client(*ClientTest::test_util->conf());
+
+  // Get connection to HBase Table
+  auto table = client.Table(tn);
+  ASSERT_TRUE(table) << "Unable to get connection to Table.";
+
+  // Perform Puts
+  table->Put(Put{row}.AddColumn("d", "1", "value1"));
+  auto result = table->CheckAndPut(row, "d", "1", "value1", Put{row}.AddColumn("d", "1",
"value2"));
+  ASSERT_TRUE(result) << "CheckAndPut didn't replace value";
+
+  result = table->CheckAndPut(row, "d", "1", "value1", Put{row}.AddColumn("d", "1", "value3"));
+
+  // Perform the Get
+  hbase::Get get(row);
+  auto result1 = table->Get(get);
+  EXPECT_EQ("value2", *(result1->Value("d", "1")));
+  ASSERT_FALSE(result) << "CheckAndPut shouldn't replace value";
+}
+
 TEST_F(ClientTest, PutGet) {
   // Using TestUtil to populate test data
   ClientTest::test_util->CreateTable("t", "d");
diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc
index 413dc6c..46d9dfd 100644
--- a/hbase-native-client/core/raw-async-table.cc
+++ b/hbase-native-client/core/raw-async-table.cc
@@ -111,6 +111,32 @@ folly::Future<folly::Unit> RawAsyncTable::Put(const hbase::Put&
put) {
   return caller->Call().then([caller](const auto r) { return r; });
 }
 
+folly::Future<bool> RawAsyncTable::CheckAndPut(const std::string& row, const std::string&
family,
+                                               const std::string& qualifier,
+                                               const std::string& value, const hbase::Put&
put,
+                                               const pb::CompareType& compare_op) {
+  auto caller =
+      CreateCallerBuilder<bool>(row, connection_conf_->write_rpc_timeout())
+          ->action([=, &put](std::shared_ptr<hbase::HBaseRpcController> controller,
+                             std::shared_ptr<hbase::RegionLocation> loc,
+                             std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<bool>
{
+            return Call<hbase::Put, hbase::Request, hbase::Response, bool>(
+                rpc_client, controller, loc, put,
+                // request conversion
+                [=, &put](const hbase::Put& put,
+                          const std::string& region_name) -> std::unique_ptr<Request>
{
+                  auto checkReq = RequestConverter::CheckAndPutToMutateRequest(
+                      row, family, qualifier, value, compare_op, put, region_name);
+                  return checkReq;
+                },
+                // response conversion
+                &ResponseConverter::BoolFromMutateResponse);
+          })
+          ->Build();
+
+  return caller->Call().then([caller](const auto r) { return r; });
+}
+
 folly::Future<folly::Unit> RawAsyncTable::Delete(const hbase::Delete& del) {
   auto caller =
       CreateCallerBuilder<folly::Unit>(del.row(), connection_conf_->write_rpc_timeout())
diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h
index 6088d1b..068f230 100644
--- a/hbase-native-client/core/raw-async-table.h
+++ b/hbase-native-client/core/raw-async-table.h
@@ -67,6 +67,11 @@ class RawAsyncTable {
 
   folly::Future<folly::Unit> Put(const hbase::Put& put);
 
+  folly::Future<bool> CheckAndPut(const std::string& row, const std::string&
family,
+                                  const std::string& qualifier, const std::string&
value,
+                                  const hbase::Put& put,
+                                  const pb::CompareType& compare_op = pb::CompareType::EQUAL);
+
   void Scan(const hbase::Scan& scan, std::shared_ptr<RawScanResultConsumer> consumer);
 
   void Close() {}
diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc
index 042d5b1..0cd9c7c 100644
--- a/hbase-native-client/core/request-converter.cc
+++ b/hbase-native-client/core/request-converter.cc
@@ -276,6 +276,27 @@ std::unique_ptr<Request> RequestConverter::ToMutateRequest(const
Put &put,
   return pb_req;
 }
 
+std::unique_ptr<Request> RequestConverter::CheckAndPutToMutateRequest(
+    const std::string &row, const std::string &family, const std::string &qualifier,
+    const std::string &value, const pb::CompareType compare_op, const hbase::Put &put,
+    const std::string &region_name) {
+  auto pb_req = Request::mutate();
+  auto pb_msg = std::static_pointer_cast<hbase::pb::MutateRequest>(pb_req->req_msg());
+
+  pb_msg->set_allocated_mutation(
+      ToMutation(MutationType::MutationProto_MutationType_PUT, put, -1).release());
+  ::hbase::pb::Condition *cond = pb_msg->mutable_condition();
+  cond->set_row(row);
+  cond->set_family(family);
+  cond->set_qualifier(qualifier);
+  cond->set_allocated_comparator(
+      Comparator::ToProto(*(ComparatorFactory::BinaryComparator(value).get())).release());
+  cond->set_compare_type(compare_op);
+
+  RequestConverter::SetRegion(region_name, pb_msg->mutable_region());
+  return pb_req;
+}
+
 std::unique_ptr<Request> RequestConverter::DeleteToMutateRequest(const Delete &del,
                                                                  const std::string &region_name)
{
   auto pb_req = Request::mutate();
diff --git a/hbase-native-client/core/request-converter.h b/hbase-native-client/core/request-converter.h
index 51440d8..0755f42 100644
--- a/hbase-native-client/core/request-converter.h
+++ b/hbase-native-client/core/request-converter.h
@@ -84,6 +84,11 @@ class RequestConverter {
 
   static std::unique_ptr<Request> ToMutateRequest(const Put &put, const std::string
&region_name);
 
+  static std::unique_ptr<Request> CheckAndPutToMutateRequest(
+      const std::string &row, const std::string &family, const std::string &qualifier,
+      const std::string &value, const pb::CompareType compare_op, const hbase::Put &put,
+      const std::string &region_name);
+
   static std::unique_ptr<Request> IncrementToMutateRequest(const Increment &incr,
                                                            const std::string &region_name);
 
diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc
index d2d719b..9bc4892 100644
--- a/hbase-native-client/core/response-converter.cc
+++ b/hbase-native-client/core/response-converter.cc
@@ -53,6 +53,11 @@ std::shared_ptr<Result> ResponseConverter::FromMutateResponse(const
Response& re
   return ToResult(mutate_resp->result(), resp.cell_scanner());
 }
 
+bool ResponseConverter::BoolFromMutateResponse(const Response& resp) {
+  auto mutate_resp = std::static_pointer_cast<MutateResponse>(resp.resp_msg());
+  return mutate_resp->processed();
+}
+
 std::shared_ptr<Result> ResponseConverter::ToResult(
     const hbase::pb::Result& result, const std::shared_ptr<CellScanner> cell_scanner)
{
   std::vector<std::shared_ptr<Cell>> vcells;
diff --git a/hbase-native-client/core/response-converter.h b/hbase-native-client/core/response-converter.h
index b518d1c..2f8f279 100644
--- a/hbase-native-client/core/response-converter.h
+++ b/hbase-native-client/core/response-converter.h
@@ -49,6 +49,8 @@ class ResponseConverter {
 
   static std::shared_ptr<hbase::Result> FromMutateResponse(const Response& resp);
 
+  static bool BoolFromMutateResponse(const Response& resp);
+
   static std::vector<std::shared_ptr<Result>> FromScanResponse(const Response&
resp);
 
   static std::vector<std::shared_ptr<Result>> FromScanResponse(
diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc
index bf22169..f20078d 100644
--- a/hbase-native-client/core/table.cc
+++ b/hbase-native-client/core/table.cc
@@ -73,6 +73,13 @@ void Table::Put(const hbase::Put &put) {
   future.get(operation_timeout());
 }
 
+bool Table::CheckAndPut(const std::string &row, const std::string &family,
+                        const std::string &qualifier, const std::string &value,
+                        const hbase::Put &put, const pb::CompareType &compare_op)
{
+  auto context = async_table_->CheckAndPut(row, family, qualifier, value, put, compare_op);
+  return context.get(operation_timeout());
+}
+
 void Table::Delete(const hbase::Delete &del) {
   auto future = async_table_->Delete(del);
   future.get(operation_timeout());
diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h
index 781c6f1..3c486af 100644
--- a/hbase-native-client/core/table.h
+++ b/hbase-native-client/core/table.h
@@ -63,6 +63,23 @@ class Table {
   void Put(const hbase::Put &put);
 
   /**
+   * Atomically checks if a row/family/qualifier value matches the expected
+   * value. If it does, it adds the put.  If the passed value is null, the check
+   * is for the lack of column (ie: non-existance)
+   *
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param value the expected value
+   * @param put data to put if check succeeds
+   * @param compare_op comparison operator to use
+   * @throws IOException e
+   * @return true if the new put was executed, false otherwise
+   */
+  bool CheckAndPut(const std::string &row, const std::string &family, const std::string
&qualifier,
+                   const std::string &value, const hbase::Put &put,
+                   const pb::CompareType &compare_op = pb::CompareType::EQUAL);
+  /**
    * @brief - Deletes some data in the table.
    * @param - del Delete object to perform HBase Delete operation.
    */


Mime
View raw message