hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e...@apache.org
Subject hbase git commit: HBASE-18338 [C++] Implement RpcTestServer (Xiaobing Zhou)
Date Fri, 21 Jul 2017 23:33:00 GMT
Repository: hbase
Updated Branches:
  refs/heads/HBASE-14850 a93c6a998 -> 1193812d7


HBASE-18338 [C++] Implement RpcTestServer (Xiaobing Zhou)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1193812d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1193812d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1193812d

Branch: refs/heads/HBASE-14850
Commit: 1193812d784f407ab8596380e003b65de27a117a
Parents: a93c6a9
Author: Enis Soztutar <enis@apache.org>
Authored: Fri Jul 21 16:29:44 2017 -0700
Committer: Enis Soztutar <enis@apache.org>
Committed: Fri Jul 21 16:29:44 2017 -0700

----------------------------------------------------------------------
 hbase-native-client/connection/BUCK             |  13 ++
 .../connection/client-handler.cc                |  21 +-
 hbase-native-client/connection/client-handler.h |   7 +-
 hbase-native-client/connection/pipeline.cc      |  12 +-
 .../connection/rpc-test-server-handler.cc       |  77 ++++++
 .../connection/rpc-test-server-handler.h        |  47 ++++
 .../connection/rpc-test-server.cc               |  70 ++++++
 .../connection/rpc-test-server.h                |  50 ++++
 hbase-native-client/connection/rpc-test.cc      |  86 +++++++
 hbase-native-client/connection/sasl-handler.h   |   2 +-
 hbase-native-client/if/test.proto               |  43 ++++
 hbase-native-client/if/test_rpc_service.proto   |  35 +++
 hbase-native-client/serde/BUCK                  |   4 +-
 .../serde/client-deserializer-test.cc           |   3 +-
 .../serde/client-serializer-test.cc             |   2 +-
 hbase-native-client/serde/rpc-serde.cc          | 234 +++++++++++++++++++
 hbase-native-client/serde/rpc-serde.h           | 141 +++++++++++
 hbase-native-client/serde/rpc.cc                | 222 ------------------
 hbase-native-client/serde/rpc.h                 | 125 ----------
 19 files changed, 827 insertions(+), 367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK
index c3119eb..aaf8fdb 100644
--- a/hbase-native-client/connection/BUCK
+++ b/hbase-native-client/connection/BUCK
@@ -33,6 +33,8 @@ cxx_library(
         "service.h",
         "rpc-client.h",
         "sasl-util.h",
+        "rpc-test-server.h",
+        "rpc-test-server-handler.h",
     ],
     srcs=[
         "client-dispatcher.cc",
@@ -44,6 +46,8 @@ cxx_library(
         "rpc-client.cc",
         "sasl-handler.cc",
         "sasl-util.cc",
+        "rpc-test-server.cc",
+        "rpc-test-server-handler.cc",
     ],
     deps=[
         "//if:if",
@@ -68,3 +72,12 @@ cxx_test(
     deps=[
         ":connection",
     ],)
+cxx_test(
+    name="rpc-test",
+    srcs=[
+        "rpc-test.cc",
+    ],
+    deps=[
+        ":connection",
+    ],
+    run_test_separately=True,)

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/client-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc
index 052c171..39227d3 100644
--- a/hbase-native-client/connection/client-handler.cc
+++ b/hbase-native-client/connection/client-handler.cc
@@ -35,9 +35,10 @@ using google::protobuf::Message;
 namespace hbase {
 
 ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec,
-                             const std::string &server)
+                             std::shared_ptr<Configuration> conf, const std::string &server)
     : user_name_(user_name),
       serde_(codec),
+      conf_(conf),
       server_(server),
       once_flag_(std::make_unique<std::once_flag>()),
       resp_msgs_(
@@ -115,13 +116,17 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) {
 }
 
 folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) {
-  // We need to send the header once.
-  // So use call_once to make sure that only one thread wins this.
-  std::call_once((*once_flag_), [ctx, this]() {
-    VLOG(3) << "Writing RPC Header to server: " << server_;
-    auto header = serde_.Header(user_name_);
-    ctx->fireWrite(std::move(header));
-  });
+  /* for RPC test, there's no need to send connection header */
+  if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE,
+                      RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) {
+    // We need to send the header once.
+    // So use call_once to make sure that only one thread wins this.
+    std::call_once((*once_flag_), [ctx, this]() {
+      VLOG(3) << "Writing RPC Header to server: " << server_;
+      auto header = serde_.Header(user_name_);
+      ctx->fireWrite(std::move(header));
+    });
+  }
 
   VLOG(3) << "Writing RPC Request:" << r->DebugString() << ", server: " << server_;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/client-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h
index 8de3a8b..b6f19a2 100644
--- a/hbase-native-client/connection/client-handler.h
+++ b/hbase-native-client/connection/client-handler.h
@@ -26,9 +26,10 @@
 #include <string>
 #include <utility>
 
+#include "core/configuration.h"
 #include "exceptions/exception.h"
 #include "serde/codec.h"
-#include "serde/rpc.h"
+#include "serde/rpc-serde.h"
 #include "utils/concurrent-map.h"
 
 // Forward decs.
@@ -60,7 +61,8 @@ class ClientHandler
    * Create the handler
    * @param user_name the user name of the user running this process.
    */
-  ClientHandler(std::string user_name, std::shared_ptr<Codec> codec, const std::string &server);
+  ClientHandler(std::string user_name, std::shared_ptr<Codec> codec,
+                std::shared_ptr<Configuration> conf, const std::string &server);
 
   /**
    * Get bytes from the wire.
@@ -79,6 +81,7 @@ class ClientHandler
   std::string user_name_;
   RpcSerde serde_;
   std::string server_;  // for logging
+  std::shared_ptr<Configuration> conf_;
 
   // in flight requests
   std::unique_ptr<concurrent_map<uint32_t, std::shared_ptr<google::protobuf::Message>>> resp_msgs_;

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/pipeline.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/pipeline.cc b/hbase-native-client/connection/pipeline.cc
index 2844752..9c790b6 100644
--- a/hbase-native-client/connection/pipeline.cc
+++ b/hbase-native-client/connection/pipeline.cc
@@ -32,7 +32,6 @@ namespace hbase {
 RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr<Codec> codec,
                                        std::shared_ptr<Configuration> conf)
     : user_util_(), codec_(codec), conf_(conf) {}
-
 SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
     std::shared_ptr<folly::AsyncTransportWrapper> sock) {
   folly::SocketAddress addr;  // for logging
@@ -41,10 +40,15 @@ SerializePipeline::Ptr RpcPipelineFactory::newPipeline(
   auto pipeline = SerializePipeline::create();
   pipeline->addBack(wangle::AsyncSocketHandler{sock});
   pipeline->addBack(wangle::EventBaseHandler{});
-  auto secure = security::User::IsSecurityEnabled(*conf_);
-  pipeline->addBack(SaslHandler{user_util_.user_name(secure), conf_});
+  bool secure = false;
+  /* for RPC test, there's no need to setup Sasl */
+  if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE,
+                      RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) {
+    secure = security::User::IsSecurityEnabled(*conf_);
+    pipeline->addBack(SaslHandler{user_util_.user_name(secure), conf_});
+  }
   pipeline->addBack(wangle::LengthFieldBasedFrameDecoder{});
-  pipeline->addBack(ClientHandler{user_util_.user_name(secure), codec_, addr.describe()});
+  pipeline->addBack(ClientHandler{user_util_.user_name(secure), codec_, conf_, addr.describe()});
   pipeline->finalize();
   return pipeline;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test-server-handler.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc
new file mode 100644
index 0000000..7d2f407
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server-handler.cc
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "connection/rpc-test-server-handler.h"
+#include "if/RPC.pb.h"
+#include "if/test.pb.h"
+
+namespace hbase {
+
+void RpcTestServerSerializeHandler::read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) {
+  buf->coalesce();
+  pb::RequestHeader header;
+
+  int used_bytes = serde_.ParseDelimited(buf.get(), &header);
+  VLOG(3) << "Read RPC RequestHeader size=" << used_bytes << " call_id=" << header.call_id();
+
+  auto received = CreateReceivedRequest(header.method_name());
+
+  buf->trimStart(used_bytes);
+  if (header.has_request_param() && received != nullptr) {
+    used_bytes = serde_.ParseDelimited(buf.get(), received->req_msg().get());
+    VLOG(3) << "Read RPCRequest, buf length:" << buf->length()
+            << ", header PB length:" << used_bytes;
+    received->set_call_id(header.call_id());
+  }
+
+  if (received != nullptr) {
+    ctx->fireRead(std::move(received));
+  }
+}
+
+folly::Future<folly::Unit> RpcTestServerSerializeHandler::write(Context* ctx,
+                                                                std::unique_ptr<Response> r) {
+  VLOG(3) << "Writing RPC Request";
+  // Send the data down the pipeline.
+  return ctx->fireWrite(serde_.Response(r->call_id(), r->resp_msg().get()));
+}
+
+std::unique_ptr<Request> RpcTestServerSerializeHandler::CreateReceivedRequest(
+    const std::string& method_name) {
+  std::unique_ptr<Request> result = nullptr;
+  ;
+  if (method_name == "ping") {
+    result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+                                       std::make_shared<EmptyResponseProto>(), method_name);
+  } else if (method_name == "echo") {
+    result = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
+                                       std::make_shared<EchoResponseProto>(), method_name);
+  } else if (method_name == "error") {
+    result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+                                       std::make_shared<EmptyResponseProto>(), method_name);
+  } else if (method_name == "pause") {
+    result = std::make_unique<Request>(std::make_shared<PauseRequestProto>(),
+                                       std::make_shared<EmptyResponseProto>(), method_name);
+  } else if (method_name == "addr") {
+    result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(),
+                                       std::make_shared<AddrResponseProto>(), method_name);
+  }
+  return result;
+}
+}  // end of namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test-server-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server-handler.h b/hbase-native-client/connection/rpc-test-server-handler.h
new file mode 100644
index 0000000..4c84615
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server-handler.h
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <wangle/channel/Handler.h>
+
+#include "connection/request.h"
+#include "connection/response.h"
+#include "serde/rpc-serde.h"
+
+using namespace hbase;
+
+namespace hbase {
+// A real rpc server would probably use generated client/server stubs
+class RpcTestServerSerializeHandler
+    : public wangle::Handler<std::unique_ptr<folly::IOBuf>, std::unique_ptr<Request>,
+                             std::unique_ptr<Response>, std::unique_ptr<folly::IOBuf>> {
+ public:
+  RpcTestServerSerializeHandler() : serde_() {}
+
+  void read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) override;
+
+  folly::Future<folly::Unit> write(Context* ctx, std::unique_ptr<Response> r) override;
+
+ private:
+  std::unique_ptr<Request> CreateReceivedRequest(const std::string& method_name);
+
+ private:
+  hbase::RpcSerde serde_;
+};
+}  // end of namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test-server.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc
new file mode 100644
index 0000000..d3a30b1
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server.cc
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <wangle/channel/AsyncSocketHandler.h>
+#include <wangle/channel/EventBaseHandler.h>
+#include <wangle/codec/LengthFieldBasedFrameDecoder.h>
+#include <wangle/codec/LengthFieldPrepender.h>
+#include <wangle/service/ServerDispatcher.h>
+
+#include "connection/rpc-test-server-handler.h"
+#include "connection/rpc-test-server.h"
+#include "if/test.pb.h"
+
+namespace hbase {
+
+RpcTestServerSerializePipeline::Ptr RpcTestServerPipelineFactory::newPipeline(
+    std::shared_ptr<AsyncTransportWrapper> sock) {
+  auto pipeline = RpcTestServerSerializePipeline::create();
+  pipeline->addBack(AsyncSocketHandler(sock));
+  // ensure we can write from any thread
+  pipeline->addBack(EventBaseHandler());
+  pipeline->addBack(LengthFieldBasedFrameDecoder());
+  pipeline->addBack(RpcTestServerSerializeHandler());
+  pipeline->addBack(
+      MultiplexServerDispatcher<std::unique_ptr<Request>, std::unique_ptr<Response>>(&service_));
+  pipeline->finalize();
+
+  return pipeline;
+}
+
+Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Request> request) {
+  /* build Response */
+  auto response = std::make_unique<Response>();
+  response->set_call_id(request->call_id());
+  std::string method_name = request->method();
+
+  if (method_name == "ping") {
+    auto pb_resp_msg = std::make_shared<EmptyResponseProto>();
+    response->set_resp_msg(pb_resp_msg);
+  } else if (method_name == "echo") {
+    auto pb_resp_msg = std::make_shared<EchoResponseProto>();
+    auto pb_req_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
+    pb_resp_msg->set_message(pb_req_msg->message());
+    response->set_resp_msg(pb_resp_msg);
+  } else if (method_name == "error") {
+    // TODO:
+  } else if (method_name == "pause") {
+    // TODO:
+  } else if (method_name == "addr") {
+    // TODO:
+  }
+
+  return folly::makeFuture<std::unique_ptr<Response>>(std::move(response));
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test-server.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test-server.h b/hbase-native-client/connection/rpc-test-server.h
new file mode 100644
index 0000000..c3225ff
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test-server.h
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+#include <wangle/concurrent/CPUThreadPoolExecutor.h>
+#include <wangle/service/ExecutorFilter.h>
+#include <wangle/service/Service.h>
+
+#include "connection/request.h"
+#include "connection/response.h"
+
+using namespace hbase;
+using namespace folly;
+using namespace wangle;
+
+namespace hbase {
+using RpcTestServerSerializePipeline = wangle::Pipeline<IOBufQueue&, std::unique_ptr<Response>>;
+
+class RpcTestService : public Service<std::unique_ptr<Request>, std::unique_ptr<Response>> {
+ public:
+  RpcTestService() {}
+  virtual ~RpcTestService() = default;
+  Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> request) override;
+};
+
+class RpcTestServerPipelineFactory : public PipelineFactory<RpcTestServerSerializePipeline> {
+ public:
+  RpcTestServerSerializePipeline::Ptr newPipeline(
+      std::shared_ptr<AsyncTransportWrapper> sock) override;
+
+ private:
+  ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>> service_{
+      std::make_shared<CPUThreadPoolExecutor>(1), std::make_shared<RpcTestService>()};
+};
+}  // end of namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc
new file mode 100644
index 0000000..d4cd89f
--- /dev/null
+++ b/hbase-native-client/connection/rpc-test.cc
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <wangle/bootstrap/ClientBootstrap.h>
+#include <wangle/channel/Handler.h>
+
+#include <folly/Logging.h>
+#include <folly/SocketAddress.h>
+#include <folly/String.h>
+#include <folly/experimental/TestUtil.h>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <boost/thread.hpp>
+
+#include "connection/rpc-client.h"
+#include "if/test.pb.h"
+#include "rpc-test-server.h"
+#include "security/user.h"
+#include "serde/rpc-serde.h"
+
+using namespace wangle;
+using namespace folly;
+using namespace hbase;
+
+DEFINE_int32(port, 0, "test server port");
+
+TEST(RpcTestServer, echo) {
+  /* create conf */
+  auto conf = std::make_shared<Configuration>();
+  conf->Set(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, "true");
+
+  /* create rpc test server */
+  auto server = std::make_shared<ServerBootstrap<RpcTestServerSerializePipeline>>();
+  server->childPipeline(std::make_shared<RpcTestServerPipelineFactory>());
+  server->bind(FLAGS_port);
+  folly::SocketAddress server_addr;
+  server->getSockets()[0]->getAddress(&server_addr);
+
+  /* create RpcClient */
+  auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1);
+
+  auto rpc_client = std::make_shared<RpcClient>(io_executor, nullptr, conf);
+
+  /**
+   * test echo
+   */
+  try {
+    std::string greetings = "hello, hbase server!";
+    auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(),
+                                             std::make_shared<EchoResponseProto>(), "echo");
+    auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg());
+    pb_msg->set_message(greetings);
+
+    /* sending out request */
+    rpc_client
+        ->AsyncCall(server_addr.getAddressStr(), server_addr.getPort(), std::move(request),
+                    hbase::security::User::defaultUser())
+        .then([=](std::unique_ptr<Response> response) {
+          auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg());
+          VLOG(1) << "message returned: " + pb_resp->message();
+          EXPECT_EQ(greetings, pb_resp->message());
+        });
+  } catch (const std::exception& e) {
+    throw e;
+  }
+
+  server->stop();
+  server->join();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/sasl-handler.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/connection/sasl-handler.h b/hbase-native-client/connection/sasl-handler.h
index f606a23..81f4e81 100644
--- a/hbase-native-client/connection/sasl-handler.h
+++ b/hbase-native-client/connection/sasl-handler.h
@@ -30,7 +30,7 @@
 #include "connection/sasl-util.h"
 #include "connection/service.h"
 #include "security/user.h"
-#include "serde/rpc.h"
+#include "serde/rpc-serde.h"
 
 namespace hbase {
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/if/test.proto
----------------------------------------------------------------------
diff --git a/hbase-native-client/if/test.proto b/hbase-native-client/if/test.proto
new file mode 100644
index 0000000..72b68e9
--- /dev/null
+++ b/hbase-native-client/if/test.proto
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated";
+option java_outer_classname = "TestProtos";
+option java_generate_equals_and_hash = true;
+
+message EmptyRequestProto {
+}
+
+message EmptyResponseProto {
+}
+
+message EchoRequestProto {
+  required string message = 1;
+}
+
+message EchoResponseProto {
+  required string message = 1;
+}
+
+message PauseRequestProto {
+  required uint32 ms = 1;
+}
+
+message AddrResponseProto {
+  required string addr = 1;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/if/test_rpc_service.proto
----------------------------------------------------------------------
diff --git a/hbase-native-client/if/test_rpc_service.proto b/hbase-native-client/if/test_rpc_service.proto
new file mode 100644
index 0000000..5f91dc4
--- /dev/null
+++ b/hbase-native-client/if/test_rpc_service.proto
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated";
+option java_outer_classname = "TestRpcServiceProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "test.proto";
+
+
+/**
+ * A protobuf service for use in tests
+ */
+service TestProtobufRpcProto {
+  rpc ping(EmptyRequestProto) returns (EmptyResponseProto);
+  rpc echo(EchoRequestProto) returns (EchoResponseProto);
+  rpc error(EmptyRequestProto) returns (EmptyResponseProto);
+  rpc pause(PauseRequestProto) returns (EmptyResponseProto);
+  rpc addr(EmptyRequestProto) returns (AddrResponseProto);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK
index 18e949c..a765884 100644
--- a/hbase-native-client/serde/BUCK
+++ b/hbase-native-client/serde/BUCK
@@ -22,13 +22,13 @@ cxx_library(
         "cell-outputstream.h",
         "codec.h",
         "region-info.h",
-        "rpc.h",
+        "rpc-serde.h",
         "server-name.h",
         "table-name.h",
         "zk.h",
     ],
     srcs=[
-        "rpc.cc",
+        "rpc-serde.cc",
         "zk.cc",
     ],
     deps=[

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/client-deserializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc
index 054684d..1856047 100644
--- a/hbase-native-client/serde/client-deserializer-test.cc
+++ b/hbase-native-client/serde/client-deserializer-test.cc
@@ -16,12 +16,11 @@
  * limitations under the License.
  *
  */
-#include "serde/rpc.h"
-
 #include <folly/io/IOBuf.h>
 #include <gtest/gtest.h>
 
 #include "if/Client.pb.h"
+#include "rpc-serde.h"
 
 using namespace hbase;
 using folly::IOBuf;

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/client-serializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc
index 7d8b29c..306f2c2 100644
--- a/hbase-native-client/serde/client-serializer-test.cc
+++ b/hbase-native-client/serde/client-serializer-test.cc
@@ -24,7 +24,7 @@
 
 #include "if/HBase.pb.h"
 #include "if/RPC.pb.h"
-#include "serde/rpc.h"
+#include "rpc-serde.h"
 
 using namespace hbase;
 using namespace hbase::pb;

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/rpc-serde.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc-serde.cc b/hbase-native-client/serde/rpc-serde.cc
new file mode 100644
index 0000000..9e1f79a
--- /dev/null
+++ b/hbase-native-client/serde/rpc-serde.cc
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <folly/Conv.h>
+#include <folly/Logging.h>
+#include <folly/io/Cursor.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <google/protobuf/message.h>
+#include <boost/algorithm/string.hpp>
+
+#include <utility>
+
+#include "if/RPC.pb.h"
+#include "rpc-serde.h"
+#include "utils/version.h"
+
+using folly::IOBuf;
+using folly::io::RWPrivateCursor;
+using google::protobuf::Message;
+using google::protobuf::io::ArrayInputStream;
+using google::protobuf::io::ArrayOutputStream;
+using google::protobuf::io::CodedInputStream;
+using google::protobuf::io::CodedOutputStream;
+using google::protobuf::io::ZeroCopyOutputStream;
+
+namespace hbase {
+
+static const std::string PREAMBLE = "HBas";
+static const std::string INTERFACE = "ClientService";
+static const uint8_t RPC_VERSION = 0;
+static const uint8_t DEFAULT_AUTH_TYPE = 80;
+static const uint8_t KERBEROS_AUTH_TYPE = 81;
+
+int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) {
+  if (buf == nullptr || msg == nullptr) {
+    return -2;
+  }
+
+  DCHECK(!buf->isChained());
+
+  ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())};
+  CodedInputStream coded_stream{&ais};
+
+  uint32_t msg_size;
+
+  // Try and read the varint.
+  if (coded_stream.ReadVarint32(&msg_size) == false) {
+    FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t";
+    return -3;
+  }
+
+  coded_stream.PushLimit(msg_size);
+  // Parse the message.
+  if (msg->MergeFromCodedStream(&coded_stream) == false) {
+    FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a protobuf message from data.";
+    return -4;
+  }
+
+  // Make sure all the data was consumed.
+  if (coded_stream.ConsumedEntireMessage() == false) {
+    FB_LOG_EVERY_MS(ERROR, 1000) << "Orphaned data left after reading protobuf message";
+    return -5;
+  }
+
+  return coded_stream.CurrentPosition();
+}
+
+RpcSerde::RpcSerde() {}
+
+RpcSerde::RpcSerde(std::shared_ptr<Codec> codec) : codec_(codec) {}
+
+std::unique_ptr<IOBuf> RpcSerde::Preamble(bool secure) {
+  auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2);
+  magic->append(2);
+  RWPrivateCursor c(magic.get());
+  c.skip(4);
+  // Version
+  c.write(RPC_VERSION);
+  if (secure) {
+    // for now support only KERBEROS (DIGEST is not supported)
+    c.write(KERBEROS_AUTH_TYPE);
+  } else {
+    c.write(DEFAULT_AUTH_TYPE);
+  }
+  return magic;
+}
+
+std::unique_ptr<IOBuf> RpcSerde::Header(const std::string &user) {
+  pb::ConnectionHeader h;
+
+  // TODO(eclark): Make this not a total lie.
+  h.mutable_user_info()->set_effective_user(user);
+  // The service name that we want to talk to.
+  //
+  // Right now we're completely ignoring the service interface.
+  // That may or may not be the correct thing to do.
+  // It worked for a while with the java client; until it
+  // didn't.
+  // TODO: send the service name and user from the RpcClient
+  h.set_service_name(INTERFACE);
+
+  std::unique_ptr<pb::VersionInfo> version_info = CreateVersionInfo();
+
+  h.set_allocated_version_info(version_info.release());
+
+  if (codec_ != nullptr) {
+    h.set_cell_block_codec_class(codec_->java_class_name());
+  }
+  return PrependLength(SerializeMessage(h));
+}
+
+std::unique_ptr<pb::VersionInfo> RpcSerde::CreateVersionInfo() {
+  std::unique_ptr<pb::VersionInfo> version_info = std::make_unique<pb::VersionInfo>();
+  version_info->set_user(Version::user);
+  version_info->set_revision(Version::revision);
+  version_info->set_url(Version::url);
+  version_info->set_date(Version::date);
+  version_info->set_src_checksum(Version::src_checksum);
+  version_info->set_version(Version::version);
+
+  std::string version{Version::version};
+  std::vector<std::string> version_parts;
+  boost::split(version_parts, version, boost::is_any_of("."), boost::token_compress_on);
+  uint32_t major_version = 0, minor_version = 0;
+  if (version_parts.size() >= 2) {
+    version_info->set_version_major(folly::to<uint32_t>(version_parts[0]));
+    version_info->set_version_minor(folly::to<uint32_t>(version_parts[1]));
+  }
+
+  VLOG(1) << "Client VersionInfo:" << version_info->ShortDebugString();
+  return version_info;
+}
+
+std::unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id, const std::string &method,
+                                         const Message *msg) {
+  pb::RequestHeader rq;
+  rq.set_method_name(method);
+  rq.set_call_id(call_id);
+  rq.set_request_param(msg != nullptr);
+  auto ser_header = SerializeDelimited(rq);
+  if (msg != nullptr) {
+    auto ser_req = SerializeDelimited(*msg);
+    ser_header->appendChain(std::move(ser_req));
+  }
+
+  return PrependLength(std::move(ser_header));
+}
+
+std::unique_ptr<folly::IOBuf> RpcSerde::Response(const uint32_t call_id,
+                                                 const google::protobuf::Message *msg) {
+  pb::ResponseHeader rh;
+  rh.set_call_id(call_id);
+  auto ser_header = SerializeDelimited(rh);
+  auto ser_resp = SerializeDelimited(*msg);
+  ser_header->appendChain(std::move(ser_resp));
+
+  return PrependLength(std::move(ser_header));
+}
+
+std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf> buf,
+                                                         uint32_t offset, uint32_t length) {
+  if (codec_ == nullptr) {
+    return nullptr;
+  }
+  return codec_->CreateDecoder(std::move(buf), offset, length);
+}
+
+std::unique_ptr<IOBuf> RpcSerde::PrependLength(std::unique_ptr<IOBuf> msg) {
+  // Java ints are 4 long. So create a buffer that large
+  auto len_buf = IOBuf::create(4);
+  // Then make those bytes visible.
+  len_buf->append(4);
+
+  RWPrivateCursor c(len_buf.get());
+  // Get the size of the data to be pushed out the network.
+  auto size = msg->computeChainDataLength();
+
+  // Write the length to this IOBuf.
+  c.writeBE(static_cast<uint32_t>(size));
+
+  // Then attach the origional to the back of len_buf
+  len_buf->appendChain(std::move(msg));
+  return len_buf;
+}
+
+std::unique_ptr<IOBuf> RpcSerde::SerializeDelimited(const Message &msg) {
+  // Get the buffer size needed for just the message.
+  int msg_size = msg.ByteSize();
+  int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size;
+
+  // Create a buffer big enough to hold the varint and the object.
+  auto buf = IOBuf::create(buf_size);
+  buf->append(buf_size);
+
+  // Create the array output stream.
+  ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())};
+  // Wrap the ArrayOuputStream in the coded output stream to allow writing
+  // Varint32
+  CodedOutputStream cos{&aos};
+
+  // Write out the size.
+  cos.WriteVarint32(msg_size);
+
+  // Now write the rest out.
+  // We're using the protobuf output streams here to keep track
+  // of where in the output array we are rather than IOBuf.
+  msg.SerializeWithCachedSizesToArray(cos.GetDirectBufferForNBytesAndAdvance(msg_size));
+
+  // Return the buffer.
+  return buf;
+}
+// TODO(eclark): Make this 1 copy.
+std::unique_ptr<IOBuf> RpcSerde::SerializeMessage(const Message &msg) {
+  auto buf = IOBuf::copyBuffer(msg.SerializeAsString());
+  return buf;
+}
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/rpc-serde.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc-serde.h b/hbase-native-client/serde/rpc-serde.h
new file mode 100644
index 0000000..0e1d44e
--- /dev/null
+++ b/hbase-native-client/serde/rpc-serde.h
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "if/HBase.pb.h"
+#include "serde/cell-scanner.h"
+#include "serde/codec.h"
+
+// Forward
+namespace folly {
+class IOBuf;
+}
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
+
+namespace hbase {
+
+/**
+ * @brief Class for serializing a deserializing rpc formatted data.
+ *
+ * RpcSerde is the one stop shop for reading/writing data to HBase daemons.
+ * It should throw exceptions if anything goes wrong.
+ */
+class RpcSerde {
+ public:
+  RpcSerde();
+  /**
+   * Constructor assumes the default auth type.
+   */
+  RpcSerde(std::shared_ptr<Codec> codec);
+
+  /**
+   * Destructor. This is provided just for testing purposes.
+   */
+  virtual ~RpcSerde() = default;
+
+  /**
+   * Pase a message in the delimited format.
+   *
+   * A message in delimited format consists of the following:
+   *
+   * - a protobuf var int32.
+   * - A protobuf object serialized.
+   */
+  int ParseDelimited(const folly::IOBuf *buf, google::protobuf::Message *msg);
+
+  /**
+   * Create a new connection preamble in a new IOBuf.
+   */
+  static std::unique_ptr<folly::IOBuf> Preamble(bool secure);
+
+  /**
+   * Create the header protobuf object and serialize it to a new IOBuf.
+   * Header is in the following format:
+   *
+   * - Big endian length
+   * - ConnectionHeader object serialized out.
+   */
+  std::unique_ptr<folly::IOBuf> Header(const std::string &user);
+
+  /**
+   * Take ownership of the passed buffer, and create a CellScanner using the
+   * Codec class to parse Cells out of the wire.
+   */
+  std::unique_ptr<CellScanner> CreateCellScanner(std::unique_ptr<folly::IOBuf> buf, uint32_t offset,
+                                                 uint32_t length);
+
+  /**
+   * Serialize a request message into a protobuf.
+   * Request consists of:
+   *
+   * - Big endian length
+   * - RequestHeader object
+   * - The passed in Message object
+   */
+  std::unique_ptr<folly::IOBuf> Request(const uint32_t call_id, const std::string &method,
+                                        const google::protobuf::Message *msg);
+
+  /**
+     * Serialize a response message into a protobuf.
+     * Request consists of:
+     *
+     * - Big endian length
+     * - ResponseHeader object
+     * - The passed in Message object
+     */
+  std::unique_ptr<folly::IOBuf> Response(const uint32_t call_id,
+                                         const google::protobuf::Message *msg);
+
+  /**
+   * Serialize a message in the delimited format.
+   * Delimited format consists of the following:
+   *
+   * - A protobuf var int32
+   * - The message object seriailized after that.
+   */
+  std::unique_ptr<folly::IOBuf> SerializeDelimited(const google::protobuf::Message &msg);
+
+  /**
+   * Serilalize a message. This does not add any length prepend.
+   */
+  std::unique_ptr<folly::IOBuf> SerializeMessage(const google::protobuf::Message &msg);
+
+  /**
+   * Prepend a length IOBuf to the given IOBuf chain.
+   * This involves no copies or moves of the passed in data.
+   */
+  std::unique_ptr<folly::IOBuf> PrependLength(std::unique_ptr<folly::IOBuf> msg);
+
+ public:
+  static constexpr const char *HBASE_CLIENT_RPC_TEST_MODE = "hbase.client.rpc.test.mode";
+  static constexpr const bool DEFAULT_HBASE_CLIENT_RPC_TEST_MODE = false;
+
+ private:
+  /* data */
+  std::shared_ptr<Codec> codec_;
+  std::unique_ptr<pb::VersionInfo> CreateVersionInfo();
+};
+}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/rpc.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc.cc
deleted file mode 100644
index 957a317..0000000
--- a/hbase-native-client/serde/rpc.cc
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "serde/rpc.h"
-
-#include <folly/Conv.h>
-#include <folly/Logging.h>
-#include <folly/io/Cursor.h>
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
-#include <google/protobuf/message.h>
-#include <boost/algorithm/string.hpp>
-
-#include <utility>
-
-#include "if/RPC.pb.h"
-#include "utils/version.h"
-
-using folly::IOBuf;
-using folly::io::RWPrivateCursor;
-using google::protobuf::Message;
-using google::protobuf::io::ArrayInputStream;
-using google::protobuf::io::ArrayOutputStream;
-using google::protobuf::io::CodedInputStream;
-using google::protobuf::io::CodedOutputStream;
-using google::protobuf::io::ZeroCopyOutputStream;
-
-namespace hbase {
-
-static const std::string PREAMBLE = "HBas";
-static const std::string INTERFACE = "ClientService";
-static const uint8_t RPC_VERSION = 0;
-static const uint8_t DEFAULT_AUTH_TYPE = 80;
-static const uint8_t KERBEROS_AUTH_TYPE = 81;
-
-int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) {
-  if (buf == nullptr || msg == nullptr) {
-    return -2;
-  }
-
-  DCHECK(!buf->isChained());
-
-  ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())};
-  CodedInputStream coded_stream{&ais};
-
-  uint32_t msg_size;
-
-  // Try and read the varint.
-  if (coded_stream.ReadVarint32(&msg_size) == false) {
-    FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t";
-    return -3;
-  }
-
-  coded_stream.PushLimit(msg_size);
-  // Parse the message.
-  if (msg->MergeFromCodedStream(&coded_stream) == false) {
-    FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a protobuf message from data.";
-    return -4;
-  }
-
-  // Make sure all the data was consumed.
-  if (coded_stream.ConsumedEntireMessage() == false) {
-    FB_LOG_EVERY_MS(ERROR, 1000) << "Orphaned data left after reading protobuf message";
-    return -5;
-  }
-
-  return coded_stream.CurrentPosition();
-}
-
-RpcSerde::RpcSerde(std::shared_ptr<Codec> codec) : codec_(codec) {}
-
-std::unique_ptr<IOBuf> RpcSerde::Preamble(bool secure) {
-  auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2);
-  magic->append(2);
-  RWPrivateCursor c(magic.get());
-  c.skip(4);
-  // Version
-  c.write(RPC_VERSION);
-  if (secure) {
-    // for now support only KERBEROS (DIGEST is not supported)
-    c.write(KERBEROS_AUTH_TYPE);
-  } else {
-    c.write(DEFAULT_AUTH_TYPE);
-  }
-  return magic;
-}
-
-std::unique_ptr<IOBuf> RpcSerde::Header(const std::string &user) {
-  pb::ConnectionHeader h;
-
-  // TODO(eclark): Make this not a total lie.
-  h.mutable_user_info()->set_effective_user(user);
-  // The service name that we want to talk to.
-  //
-  // Right now we're completely ignoring the service interface.
-  // That may or may not be the correct thing to do.
-  // It worked for a while with the java client; until it
-  // didn't.
-  // TODO: send the service name and user from the RpcClient
-  h.set_service_name(INTERFACE);
-
-  std::unique_ptr<pb::VersionInfo> version_info = CreateVersionInfo();
-
-  h.set_allocated_version_info(version_info.release());
-
-  if (codec_ != nullptr) {
-    h.set_cell_block_codec_class(codec_->java_class_name());
-  }
-  return PrependLength(SerializeMessage(h));
-}
-
-std::unique_ptr<pb::VersionInfo> RpcSerde::CreateVersionInfo() {
-  std::unique_ptr<pb::VersionInfo> version_info = std::make_unique<pb::VersionInfo>();
-  version_info->set_user(Version::user);
-  version_info->set_revision(Version::revision);
-  version_info->set_url(Version::url);
-  version_info->set_date(Version::date);
-  version_info->set_src_checksum(Version::src_checksum);
-  version_info->set_version(Version::version);
-
-  std::string version{Version::version};
-  std::vector<std::string> version_parts;
-  boost::split(version_parts, version, boost::is_any_of("."), boost::token_compress_on);
-  uint32_t major_version = 0, minor_version = 0;
-  if (version_parts.size() >= 2) {
-    version_info->set_version_major(folly::to<uint32_t>(version_parts[0]));
-    version_info->set_version_minor(folly::to<uint32_t>(version_parts[1]));
-  }
-
-  VLOG(1) << "Client VersionInfo:" << version_info->ShortDebugString();
-  return version_info;
-}
-
-std::unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id, const std::string &method,
-                                         const Message *msg) {
-  pb::RequestHeader rq;
-  rq.set_method_name(method);
-  rq.set_call_id(call_id);
-  rq.set_request_param(msg != nullptr);
-  auto ser_header = SerializeDelimited(rq);
-  if (msg != nullptr) {
-    auto ser_req = SerializeDelimited(*msg);
-    ser_header->appendChain(std::move(ser_req));
-  }
-
-  return PrependLength(std::move(ser_header));
-}
-
-std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf> buf,
-                                                         uint32_t offset, uint32_t length) {
-  if (codec_ == nullptr) {
-    return nullptr;
-  }
-  return codec_->CreateDecoder(std::move(buf), offset, length);
-}
-
-std::unique_ptr<IOBuf> RpcSerde::PrependLength(std::unique_ptr<IOBuf> msg) {
-  // Java ints are 4 long. So create a buffer that large
-  auto len_buf = IOBuf::create(4);
-  // Then make those bytes visible.
-  len_buf->append(4);
-
-  RWPrivateCursor c(len_buf.get());
-  // Get the size of the data to be pushed out the network.
-  auto size = msg->computeChainDataLength();
-
-  // Write the length to this IOBuf.
-  c.writeBE(static_cast<uint32_t>(size));
-
-  // Then attach the origional to the back of len_buf
-  len_buf->appendChain(std::move(msg));
-  return len_buf;
-}
-
-std::unique_ptr<IOBuf> RpcSerde::SerializeDelimited(const Message &msg) {
-  // Get the buffer size needed for just the message.
-  int msg_size = msg.ByteSize();
-  int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size;
-
-  // Create a buffer big enough to hold the varint and the object.
-  auto buf = IOBuf::create(buf_size);
-  buf->append(buf_size);
-
-  // Create the array output stream.
-  ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())};
-  // Wrap the ArrayOuputStream in the coded output stream to allow writing
-  // Varint32
-  CodedOutputStream cos{&aos};
-
-  // Write out the size.
-  cos.WriteVarint32(msg_size);
-
-  // Now write the rest out.
-  // We're using the protobuf output streams here to keep track
-  // of where in the output array we are rather than IOBuf.
-  msg.SerializeWithCachedSizesToArray(cos.GetDirectBufferForNBytesAndAdvance(msg_size));
-
-  // Return the buffer.
-  return buf;
-}
-// TODO(eclark): Make this 1 copy.
-std::unique_ptr<IOBuf> RpcSerde::SerializeMessage(const Message &msg) {
-  auto buf = IOBuf::copyBuffer(msg.SerializeAsString());
-  return buf;
-}
-}  // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/rpc.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/rpc.h b/hbase-native-client/serde/rpc.h
deleted file mode 100644
index 15aa1ee..0000000
--- a/hbase-native-client/serde/rpc.h
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#pragma once
-
-#include <memory>
-#include <string>
-
-#include "if/HBase.pb.h"
-#include "serde/cell-scanner.h"
-#include "serde/codec.h"
-
-// Forward
-namespace folly {
-class IOBuf;
-}
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
-
-namespace hbase {
-
-/**
- * @brief Class for serializing a deserializing rpc formatted data.
- *
- * RpcSerde is the one stop shop for reading/writing data to HBase daemons.
- * It should throw exceptions if anything goes wrong.
- */
-class RpcSerde {
- public:
-  /**
-   * Constructor assumes the default auth type.
-   */
-  RpcSerde(std::shared_ptr<Codec> codec);
-
-  /**
-   * Destructor. This is provided just for testing purposes.
-   */
-  virtual ~RpcSerde() = default;
-
-  /**
-   * Pase a message in the delimited format.
-   *
-   * A message in delimited format consists of the following:
-   *
-   * - a protobuf var int32.
-   * - A protobuf object serialized.
-   */
-  int ParseDelimited(const folly::IOBuf *buf, google::protobuf::Message *msg);
-
-  /**
-   * Create a new connection preamble in a new IOBuf.
-   */
-  static std::unique_ptr<folly::IOBuf> Preamble(bool secure);
-
-  /**
-   * Create the header protobuf object and serialize it to a new IOBuf.
-   * Header is in the following format:
-   *
-   * - Big endian length
-   * - ConnectionHeader object serialized out.
-   */
-  std::unique_ptr<folly::IOBuf> Header(const std::string &user);
-
-  /**
-   * Take ownership of the passed buffer, and create a CellScanner using the
-   * Codec class to parse Cells out of the wire.
-   */
-  std::unique_ptr<CellScanner> CreateCellScanner(std::unique_ptr<folly::IOBuf> buf, uint32_t offset,
-                                                 uint32_t length);
-
-  /**
-   * Serialize a request message into a protobuf.
-   * Request consists of:
-   *
-   * - Big endian length
-   * - RequestHeader object
-   * - The passed in Message object
-   */
-  std::unique_ptr<folly::IOBuf> Request(const uint32_t call_id, const std::string &method,
-                                        const google::protobuf::Message *msg);
-
-  /**
-   * Serialize a message in the delimited format.
-   * Delimited format consists of the following:
-   *
-   * - A protobuf var int32
-   * - The message object seriailized after that.
-   */
-  std::unique_ptr<folly::IOBuf> SerializeDelimited(const google::protobuf::Message &msg);
-
-  /**
-   * Serilalize a message. This does not add any length prepend.
-   */
-  std::unique_ptr<folly::IOBuf> SerializeMessage(const google::protobuf::Message &msg);
-
-  /**
-   * Prepend a length IOBuf to the given IOBuf chain.
-   * This involves no copies or moves of the passed in data.
-   */
-  std::unique_ptr<folly::IOBuf> PrependLength(std::unique_ptr<folly::IOBuf> msg);
-
- private:
-  /* data */
-  std::shared_ptr<Codec> codec_;
-  std::unique_ptr<pb::VersionInfo> CreateVersionInfo();
-};
-}  // namespace hbase


Mime
View raw message