hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zg...@apache.org
Subject [hbase] 103/133: HBASE-18338 [C++] Implement RpcTestServer (Xiaobing Zhou)
Date Tue, 12 Mar 2019 12:46:31 GMT
This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch HBASE-14850
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit da28b3e214fbb867d7ad3202632d5d3528ca1bed
Author: Enis Soztutar <enis@apache.org>
AuthorDate: Fri Jul 21 16:29:44 2017 -0700

    HBASE-18338 [C++] Implement RpcTestServer (Xiaobing Zhou)
---
 hbase-native-client/connection/BUCK                | 13 ++++
 hbase-native-client/connection/client-handler.cc   | 21 ++++--
 hbase-native-client/connection/client-handler.h    |  7 +-
 hbase-native-client/connection/pipeline.cc         | 12 ++-
 .../connection/rpc-test-server-handler.cc          | 77 +++++++++++++++++++
 .../connection/rpc-test-server-handler.h           | 47 ++++++++++++
 hbase-native-client/connection/rpc-test-server.cc  | 70 ++++++++++++++++++
 hbase-native-client/connection/rpc-test-server.h   | 50 +++++++++++++
 hbase-native-client/connection/rpc-test.cc         | 86 ++++++++++++++++++++++
 hbase-native-client/connection/sasl-handler.h      |  2 +-
 hbase-native-client/if/test.proto                  | 43 +++++++++++
 hbase-native-client/if/test_rpc_service.proto      | 35 +++++++++
 hbase-native-client/serde/BUCK                     |  4 +-
 .../serde/client-deserializer-test.cc              |  3 +-
 .../serde/client-serializer-test.cc                |  2 +-
 hbase-native-client/serde/{rpc.cc => rpc-serde.cc} | 16 +++-
 hbase-native-client/serde/{rpc.h => rpc-serde.h}   | 16 ++++
 17 files changed, 482 insertions(+), 22 deletions(-)

diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK
index c3119eb..aaf8fdb 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -33,6 +33,8 @@ cxx_library(
         "service.h",
         "rpc-client.h",
         "sasl-util.h",
+        "rpc-test-server.h",
+        "rpc-test-server-handler.h",
     ],
     srcs=[
         "client-dispatcher.cc",
@@ -44,6 +46,8 @@ cxx_library(
         "rpc-client.cc",
         "sasl-handler.cc",
         "sasl-util.cc",
+        "rpc-test-server.cc",
+        "rpc-test-server-handler.cc",
     ],
     deps=[
         "//if:if",
@@ -68,3 +72,12 @@ cxx_test(
     deps=[
         ":connection",
     ],)
+cxx_test(
+    name="rpc-test",
+    srcs=[
+        "rpc-test.cc",
+    ],
+    deps=[
+        ":connection",
+    ],
+    run_test_separately=True,)
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 052c171..39227d3 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -35,9 +35,10 @@ using google::protobuf::Message;
 namespace hbase {
 
 ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec,
-                             const std::string &server)
+                             std::shared_ptr<Configuration> conf, const std::string
&server)
     : user_name_(user_name),
       serde_(codec),
+      conf_(conf),
       server_(server),
       once_flag_(std::make_unique<std::once_flag>()),
       resp_msgs_(
@@ -115,13 +116,17 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf>
buf) {
 }
 
 folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request>
r) {
-  // We need to send the header once.
-  // So use call_once to make sure that only one thread wins this.
-  std::call_once((*once_flag_), [ctx, this]() {
-    VLOG(3) << "Writing RPC Header to server: " << server_;
-    auto header = serde_.Header(user_name_);
-    ctx->fireWrite(std::move(header));
-  });
+  /* for RPC test, there's no need to send connection header */
+  if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE,
+                      RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) {
+    // We need to send the header once.
+    // So use call_once to make sure that only one thread wins this.
+    std::call_once((*once_flag_), [ctx, this]() {
+      VLOG(3) << "Writing RPC Header to server: " << server_;
+      auto header = serde_.Header(user_name_);
+      ctx->fireWrite(std::move(header));
+    });
+  }
 
   VLOG(3) << "Writing RPC Request:" << r->DebugString() << ", server:
" << server_;
 
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index 8de3a8b..b6f19a2 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -26,9 +26,10 @@
 #include <string>
 #include <utility>
 
+#include "core/configuration.h"
 #include "exceptions/exception.h"
 #include "serde/codec.h"
-#include "serde/rpc.h"
+#include "serde/rpc-serde.h"
 #include "utils/concurrent-map.h"
 
 // Forward decs.
@@ -60,7 +61,8 @@ class ClientHandler
    * Create the handler
    * @param user_name the user name of the user running this process.
    */
-  ClientHandler(std::string user_name, std::shared_ptr<Codec> codec, const std::string
&server);
+  ClientHandler(std::string user_name, std::shared_ptr<Codec> codec,
+                std::shared_ptr<Configuration> conf, const std::string &server);
 
   /**
    * Get bytes from the wire.
@@ -79,6 +81,7 @@ class ClientHandler
   std::string user_name_;
   RpcSerde serde_;
   std::string server_;  // for logging
+  std::shared_ptr<Configuration> conf_;
 
   // in flight requests
   std::unique_ptr<concurrent_map<uint32_t, std::shared_ptr<google::protobuf::Message>>>
resp_msgs_;
diff --git a/hbase-native-client/connection/pipeline.cc b/hbase-native-client/connection/pipeline.cc
index 2844752..9c790b6 100644
--- a/hbase-native-client/connection/pipeline.cc
+++ b/hbase-native-client/connection/pipeline.cc
@@ -32,7 +32,6 @@ namespace hbase {
 RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr<Codec> codec,
                                        std::shared_ptr<Configuration> conf)
     : user_util_(), codec_(codec), conf_(conf) {}
-
 SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
     std::shared_ptr<folly::AsyncTransportWrapper> sock) {
   folly::SocketAddress addr;  // for logging
@@ -41,10 +40,15 @@ SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
   auto pipeline = SerializePipeline::create();
   pipeline->addBack(wangle::AsyncSocketHandler{sock});
   pipeline->addBack(wangle::EventBaseHandler{});
-  auto secure = security::User::IsSecurityEnabled(*conf_);
-  pipeline->addBack(SaslHandler{user_util_.user_name(secure), conf_});
+  bool secure = false;
+  /* for RPC test, there's no need to setup Sasl */
+  if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE,
+                      RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) {
+    secure = security::User::IsSecurityEnabled(*conf_);
+    pipeline->addBack(SaslHandler{user_util_.user_name(secure), conf_});
+  }
   pipeline->addBack(wangle::LengthFieldBasedFrameDecoder{});
-  pipeline->addBack(ClientHandler{user_util_.user_name(secure), codec_, addr.describe()});
+  pipeline->addBack(ClientHandler{user_util_.user_name(secure), codec_, conf_, addr.describe()});
   pipeline->finalize();
   return pipeline;
 }
diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc
new file mode 100644
index 0000000..7d2f407
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server-handler.cc
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "connection/rpc-test-server-handler.h"
+#include "if/RPC.pb.h"
+#include "if/test.pb.h"
+
+namespace hbase {
+
+void RpcTestServerSerializeHandler::read(Context* ctx, std::unique_ptr<folly::IOBuf>
buf) {
+  buf->coalesce();
+  pb::RequestHeader header;
+
+  int used_bytes = serde_.ParseDelimited(buf.get(), &header);
+  VLOG(3) << "Read RPC RequestHeader size=" << used_bytes << " call_id="
<< header.call_id();
+
+  auto received = CreateReceivedRequest(header.method_name());
+
+  buf->trimStart(used_bytes);
+  if (header.has_request_param() && received != nullptr) {
+    used_bytes = serde_.ParseDelimited(buf.get(), received->req_msg().get());
+    VLOG(3) << "Read RPCRequest, buf length:" << buf->length()
+            << ", header PB length:" << used_bytes;
+    received->set_call_id(header.call_id());
+  }
+
+  if (received != nullptr) {
+    ctx->fireRead(std::move(received));
+  }
+}
+
+folly::Future<folly::Unit> RpcTestServerSerializeHandler::write(Context* ctx,
+                                                                std::unique_ptr<Response>
r) {
+  VLOG(3) << "Writing RPC Request";
+  // Send the data down the pipeline.
+  return ctx->fireWrite(serde_.Response(r->call_id(), r->resp_msg().get()));
+}
+
+std::unique_ptr<Request> RpcTestServerSerializeHandler::CreateReceivedRequest(
+    const std::string& method_name) {
+  std::unique_ptr<Request> result = nullptr;
+  ;
+  if (method_name == "ping") {
+    result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+                                       std::make_shared<EmptyResponseProto>(), method_name);
+  } else if (method_name == "echo") {
+    result = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
+                                       std::make_shared<EchoResponseProto>(), method_name);
+  } else if (method_name == "error") {
+    result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+                                       std::make_shared<EmptyResponseProto>(), method_name);
+  } else if (method_name == "pause") {
+    result = std::make_unique<Request>(std::make_shared<PauseRequestProto>(),
+                                       std::make_shared<EmptyResponseProto>(), method_name);
+  } else if (method_name == "addr") {
+    result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+                                       std::make_shared<AddrResponseProto>(), method_name);
+  }
+  return result;
+}
+}  // end of namespace hbase
diff --git a/hbase-native-client/connection/rpc-test-server-handler.h b/hbase-native-client/connection/rpc-test-server-handler.h
new file mode 100644
index 0000000..4c84615
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server-handler.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <wangle/channel/Handler.h>
+
+#include "connection/request.h"
+#include "connection/response.h"
+#include "serde/rpc-serde.h"
+
+using namespace hbase;
+
+namespace hbase {
+// A real rpc server would probably use generated client/server stubs
+class RpcTestServerSerializeHandler
+    : public wangle::Handler<std::unique_ptr<folly::IOBuf>, std::unique_ptr<Request>,
+                             std::unique_ptr<Response>, std::unique_ptr<folly::IOBuf>>
{
+ public:
+  RpcTestServerSerializeHandler() : serde_() {}
+
+  void read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) override;
+
+  folly::Future<folly::Unit> write(Context* ctx, std::unique_ptr<Response> r)
override;
+
+ private:
+  std::unique_ptr<Request> CreateReceivedRequest(const std::string& method_name);
+
+ private:
+  hbase::RpcSerde serde_;
+};
+}  // end of namespace hbase
diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc
new file mode 100644
index 0000000..d3a30b1
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server.cc
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <wangle/channel/AsyncSocketHandler.h>
+#include <wangle/channel/EventBaseHandler.h>
+#include <wangle/codec/LengthFieldBasedFrameDecoder.h>
+#include <wangle/codec/LengthFieldPrepender.h>
+#include <wangle/service/ServerDispatcher.h>
+
+#include "connection/rpc-test-server-handler.h"
+#include "connection/rpc-test-server.h"
+#include "if/test.pb.h"
+
+namespace hbase {
+
+RpcTestServerSerializePipeline::Ptr RpcTestServerPipelineFactory::newPipeline(
+    std::shared_ptr<AsyncTransportWrapper> sock) {
+  auto pipeline = RpcTestServerSerializePipeline::create();
+  pipeline->addBack(AsyncSocketHandler(sock));
+  // ensure we can write from any thread
+  pipeline->addBack(EventBaseHandler());
+  pipeline->addBack(LengthFieldBasedFrameDecoder());
+  pipeline->addBack(RpcTestServerSerializeHandler());
+  pipeline->addBack(
+      MultiplexServerDispatcher<std::unique_ptr<Request>, std::unique_ptr<Response>>(&service_));
+  pipeline->finalize();
+
+  return pipeline;
+}
+
+Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Request>
request) {
+  /* build Response */
+  auto response = std::make_unique<Response>();
+  response->set_call_id(request->call_id());
+  std::string method_name = request->method();
+
+  if (method_name == "ping") {
+    auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
+    response->set_resp_msg(pb_resp_msg);
+  } else if (method_name == "echo") {
+    auto pb_resp_msg = std::make_shared<EchoResponseProto>();
+    auto pb_req_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
+    pb_resp_msg->set_message(pb_req_msg->message());
+    response->set_resp_msg(pb_resp_msg);
+  } else if (method_name == "error") {
+    // TODO:
+  } else if (method_name == "pause") {
+    // TODO:
+  } else if (method_name == "addr") {
+    // TODO:
+  }
+
+  return folly::makeFuture<std::unique_ptr<Response>>(std::move(response));
+}
+}  // namespace hbase
diff --git a/hbase-native-client/connection/rpc-test-server.h b/hbase-native-client/connection/rpc-test-server.h
new file mode 100644
index 0000000..c3225ff
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server.h
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+#include <wangle/concurrent/CPUThreadPoolExecutor.h>
+#include <wangle/service/ExecutorFilter.h>
+#include <wangle/service/Service.h>
+
+#include "connection/request.h"
+#include "connection/response.h"
+
+using namespace hbase;
+using namespace folly;
+using namespace wangle;
+
+namespace hbase {
+using RpcTestServerSerializePipeline = wangle::Pipeline<IOBufQueue&, std::unique_ptr<Response>>;
+
+class RpcTestService : public Service<std::unique_ptr<Request>, std::unique_ptr<Response>>
{
+ public:
+  RpcTestService() {}
+  virtual ~RpcTestService() = default;
+  Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request>
request) override;
+};
+
+class RpcTestServerPipelineFactory : public PipelineFactory<RpcTestServerSerializePipeline>
{
+ public:
+  RpcTestServerSerializePipeline::Ptr newPipeline(
+      std::shared_ptr<AsyncTransportWrapper> sock) override;
+
+ private:
+  ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>> service_{
+      std::make_shared<CPUThreadPoolExecutor>(1), std::make_shared<RpcTestService>()};
+};
+}  // end of namespace hbase
diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc
new file mode 100644
index 0000000..d4cd89f
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test.cc
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <wangle/bootstrap/ClientBootstrap.h>
+#include <wangle/channel/Handler.h>
+
+#include <folly/Logging.h>
+#include <folly/SocketAddress.h>
+#include <folly/String.h>
+#include <folly/experimental/TestUtil.h>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <boost/thread.hpp>
+
+#include "connection/rpc-client.h"
+#include "if/test.pb.h"
+#include "rpc-test-server.h"
+#include "security/user.h"
+#include "serde/rpc-serde.h"
+
+using namespace wangle;
+using namespace folly;
+using namespace hbase;
+
+DEFINE_int32(port, 0, "test server port");
+
+TEST(RpcTestServer, echo) {
+  /* create conf */
+  auto conf = std::make_shared<Configuration>();
+  conf->Set(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, "true");
+
+  /* create rpc test server */
+  auto server = std::make_shared<ServerBootstrap<RpcTestServerSerializePipeline>>();
+  server->childPipeline(std::make_shared<RpcTestServerPipelineFactory>());
+  server->bind(FLAGS_port);
+  folly::SocketAddress server_addr;
+  server->getSockets()[0]->getAddress(&server_addr);
+
+  /* create RpcClient */
+  auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+
+  auto rpc_client = std::make_shared<RpcClient>(io_executor, nullptr, conf);
+
+  /**
+   * test echo
+   */
+  try {
+    std::string greetings = "hello, hbase server!";
+    auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
+                                             std::make_shared<EchoResponseProto>(),
"echo");
+    auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
+    pb_msg->set_message(greetings);
+
+    /* sending out request */
+    rpc_client
+        ->AsyncCall(server_addr.getAddressStr(), server_addr.getPort(), std::move(request),
+                    hbase::security::User::defaultUser())
+        .then([=](std::unique_ptr<Response> response) {
+          auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg());
+          VLOG(1) << "message returned: " + pb_resp->message();
+          EXPECT_EQ(greetings, pb_resp->message());
+        });
+  } catch (const std::exception& e) {
+    throw e;
+  }
+
+  server->stop();
+  server->join();
+}
diff --git a/hbase-native-client/connection/sasl-handler.h b/hbase-native-client/connection/sasl-handler.h
index f606a23..81f4e81 100644
--- a/hbase-native-client/connection/sasl-handler.h
+++ b/hbase-native-client/connection/sasl-handler.h
@@ -30,7 +30,7 @@
 #include "connection/sasl-util.h"
 #include "connection/service.h"
 #include "security/user.h"
-#include "serde/rpc.h"
+#include "serde/rpc-serde.h"
 
 namespace hbase {
 
diff --git a/hbase-native-client/if/test.proto b/hbase-native-client/if/test.proto
new file mode 100644
index 0000000..72b68e9
--- /dev/null
+++ b/hbase-native-client/if/test.proto
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated";
+option java_outer_classname = "TestProtos";
+option java_generate_equals_and_hash = true;
+
+message EmptyRequestProto {
+}
+
+message EmptyResponseProto {
+}
+
+message EchoRequestProto {
+  required string message = 1;
+}
+
+message EchoResponseProto {
+  required string message = 1;
+}
+
+message PauseRequestProto {
+  required uint32 ms = 1;
+}
+
+message AddrResponseProto {
+  required string addr = 1;
+}
diff --git a/hbase-native-client/if/test_rpc_service.proto b/hbase-native-client/if/test_rpc_service.proto
new file mode 100644
index 0000000..5f91dc4
--- /dev/null
+++ b/hbase-native-client/if/test_rpc_service.proto
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated";
+option java_outer_classname = "TestRpcServiceProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "test.proto";
+
+
+/**
+ * A protobuf service for use in tests
+ */
+service TestProtobufRpcProto {
+  rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
+  rpc echo(EchoRequestProto) returns (EchoResponseProto);
+  rpc error(EmptyRequestProto) returns (EmptyResponseProto);
+  rpc pause(PauseRequestProto) returns (EmptyResponseProto);
+  rpc addr(EmptyRequestProto) returns (AddrResponseProto);
+}
diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK
index 18e949c..a765884 100644
--- a/hbase-native-client/serde/BUCK
+++ b/hbase-native-client/serde/BUCK
@@ -22,13 +22,13 @@ cxx_library(
         "cell-outputstream.h",
         "codec.h",
         "region-info.h",
-        "rpc.h",
+        "rpc-serde.h",
         "server-name.h",
         "table-name.h",
         "zk.h",
     ],
     srcs=[
-        "rpc.cc",
+        "rpc-serde.cc",
         "zk.cc",
     ],
     deps=[
diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc
index 054684d..1856047 100644
--- a/hbase-native-client/serde/client-deserializer-test.cc
+++ b/hbase-native-client/serde/client-deserializer-test.cc
@@ -16,12 +16,11 @@
  * limitations under the License.
  *
  */
-#include "serde/rpc.h"
-
 #include <folly/io/IOBuf.h>
 #include <gtest/gtest.h>
 
 #include "if/Client.pb.h"
+#include "rpc-serde.h"
 
 using namespace hbase;
 using folly::IOBuf;
diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc
index 7d8b29c..306f2c2 100644
--- a/hbase-native-client/serde/client-serializer-test.cc
+++ b/hbase-native-client/serde/client-serializer-test.cc
@@ -24,7 +24,7 @@
 
 #include "if/HBase.pb.h"
 #include "if/RPC.pb.h"
-#include "serde/rpc.h"
+#include "rpc-serde.h"
 
 using namespace hbase;
 using namespace hbase::pb;
diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc-serde.cc
similarity index 94%
rename from hbase-native-client/serde/rpc.cc
rename to hbase-native-client/serde/rpc-serde.cc
index 957a317..9e1f79a 100644
--- a/hbase-native-client/serde/rpc.cc
+++ b/hbase-native-client/serde/rpc-serde.cc
@@ -17,8 +17,6 @@
  *
  */
 
-#include "serde/rpc.h"
-
 #include <folly/Conv.h>
 #include <folly/Logging.h>
 #include <folly/io/Cursor.h>
@@ -30,6 +28,7 @@
 #include <utility>
 
 #include "if/RPC.pb.h"
+#include "rpc-serde.h"
 #include "utils/version.h"
 
 using folly::IOBuf;
@@ -83,6 +82,8 @@ int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) {
   return coded_stream.CurrentPosition();
 }
 
+RpcSerde::RpcSerde() {}
+
 RpcSerde::RpcSerde(std::shared_ptr<Codec> codec) : codec_(codec) {}
 
 std::unique_ptr<IOBuf> RpcSerde::Preamble(bool secure) {
@@ -162,6 +163,17 @@ std::unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id,
const std::stri
   return PrependLength(std::move(ser_header));
 }
 
+std::unique_ptr<folly::IOBuf> RpcSerde::Response(const uint32_t call_id,
+                                                 const google::protobuf::Message *msg) {
+  pb::ResponseHeader rh;
+  rh.set_call_id(call_id);
+  auto ser_header = SerializeDelimited(rh);
+  auto ser_resp = SerializeDelimited(*msg);
+  ser_header->appendChain(std::move(ser_resp));
+
+  return PrependLength(std::move(ser_header));
+}
+
 std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf>
buf,
                                                          uint32_t offset, uint32_t length)
{
   if (codec_ == nullptr) {
diff --git a/hbase-native-client/serde/rpc.h b/hbase-native-client/serde/rpc-serde.h
similarity index 87%
rename from hbase-native-client/serde/rpc.h
rename to hbase-native-client/serde/rpc-serde.h
index 15aa1ee..0e1d44e 100644
--- a/hbase-native-client/serde/rpc.h
+++ b/hbase-native-client/serde/rpc-serde.h
@@ -45,6 +45,7 @@ namespace hbase {
  */
 class RpcSerde {
  public:
+  RpcSerde();
   /**
    * Constructor assumes the default auth type.
    */
@@ -98,6 +99,17 @@ class RpcSerde {
                                         const google::protobuf::Message *msg);
 
   /**
+     * Serialize a response message into a protobuf.
+     * Request consists of:
+     *
+     * - Big endian length
+     * - ResponseHeader object
+     * - The passed in Message object
+     */
+  std::unique_ptr<folly::IOBuf> Response(const uint32_t call_id,
+                                         const google::protobuf::Message *msg);
+
+  /**
    * Serialize a message in the delimited format.
    * Delimited format consists of the following:
    *
@@ -117,6 +129,10 @@ class RpcSerde {
    */
   std::unique_ptr<folly::IOBuf> PrependLength(std::unique_ptr<folly::IOBuf> msg);
 
+ public:
+  static constexpr const char *HBASE_CLIENT_RPC_TEST_MODE = "hbase.client.rpc.test.mode";
+  static constexpr const bool DEFAULT_HBASE_CLIENT_RPC_TEST_MODE = false;
+
  private:
   /* data */
   std::shared_ptr<Codec> codec_;


Mime
View raw message