hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject [42/50] [abbrv] hbase git commit: HBASE-15620 Add on Call serialization
Date Mon, 25 Apr 2016 21:13:26 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/43df1506/hbase-native-client/serde/BUCK
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK
new file mode 100644
index 0000000..207607f
--- /dev/null
+++ b/hbase-native-client/serde/BUCK
@@ -0,0 +1,54 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cxx_library(name="serde",
+            exported_headers=[
+                "client-serializer.h",
+                "client-deserializer.h",
+            ],
+            srcs=[
+                "client-serializer.cc",
+                "client-deserializer.cc",
+            ],
+            deps=[
+                "//if:if",
+                "//third-party:folly",
+            ],
+            tests=[
+                ":client-serializer-test",
+                ":client-deserializer-test",
+            ],
+            visibility=[
+                'PUBLIC',
+            ], )
+
+cxx_test(name="client-serializer-test",
+         srcs=[
+             "client-serializer-test.cc",
+         ],
+         deps=[
+             ":serde",
+             "//if:if",
+         ], )
+cxx_test(name="client-deserializer-test",
+         srcs=[
+             "client-deserializer-test.cc",
+         ],
+         deps=[
+             ":serde",
+             "//if:if",
+         ], )

http://git-wip-us.apache.org/repos/asf/hbase/blob/43df1506/hbase-native-client/serde/client-deserializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc
new file mode 100644
index 0000000..bb57e50
--- /dev/null
+++ b/hbase-native-client/serde/client-deserializer-test.cc
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <gtest/gtest.h>
+#include <folly/io/IOBuf.h>
+
+#include "serde/client-deserializer.h"
+#include "serde/client-serializer.h"
+#include "if/Client.pb.h"
+
+using namespace hbase;
+using folly::IOBuf;
+using hbase::pb::GetRequest;
+using hbase::pb::RegionSpecifier;
+using hbase::pb::RegionSpecifier_RegionSpecifierType;
+
+TEST(TestClientDeserializer, TestReturnFalseOnNullPtr) {
+  ClientDeserializer deser;
+  ASSERT_LT(deser.parse_delimited(nullptr, nullptr), 0);
+}
+
+TEST(TestClientDeserializer, TestReturnFalseOnBadInput) {
+  ClientDeserializer deser;
+  auto buf = IOBuf::copyBuffer("test");
+  GetRequest gr;
+
+  ASSERT_LT(deser.parse_delimited(buf.get(), &gr), 0);
+}
+
+TEST(TestClientDeserializer, TestGoodGetRequestFullRoundTrip) {
+  GetRequest in;
+  ClientSerializer ser;
+  ClientDeserializer deser;
+
+  // fill up the GetRequest.
+  in.mutable_region()->set_value("test_region_id");
+  in.mutable_region()->set_type(
+      RegionSpecifier_RegionSpecifierType::
+          RegionSpecifier_RegionSpecifierType_ENCODED_REGION_NAME);
+  in.mutable_get()->set_row("test_row");
+
+  // Create the buffer
+  auto buf = ser.serialize_delimited(in);
+
+  GetRequest out;
+
+  int used_bytes = deser.parse_delimited(buf.get(), &out);
+
+  ASSERT_GT(used_bytes, 0);
+  ASSERT_EQ(used_bytes, buf->length());
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/43df1506/hbase-native-client/serde/client-deserializer.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-deserializer.cc b/hbase-native-client/serde/client-deserializer.cc
new file mode 100644
index 0000000..118b0d1
--- /dev/null
+++ b/hbase-native-client/serde/client-deserializer.cc
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "serde/client-deserializer.h"
+
+#include <google/protobuf/message.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <folly/Logging.h>
+
+using namespace hbase;
+
+using folly::IOBuf;
+using google::protobuf::Message;
+using google::protobuf::io::ArrayInputStream;
+using google::protobuf::io::CodedInputStream;
+
+int ClientDeserializer::parse_delimited(const IOBuf *buf, Message *msg) {
+  if (buf == nullptr || msg == nullptr) {
+    return -2;
+  }
+
+  DCHECK(!buf->isChained());
+
+  ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())};
+  CodedInputStream coded_stream{&ais};
+
+  uint32_t msg_size;
+
+  // Try and read the varint.
+  if (coded_stream.ReadVarint32(&msg_size) == false) {
+    FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t";
+    return -3;
+  }
+
+  coded_stream.PushLimit(msg_size);
+  // Parse the message.
+  if (msg->MergeFromCodedStream(&coded_stream) == false) {
+    FB_LOG_EVERY_MS(ERROR, 1000)
+        << "Unable to read a protobuf message from data.";
+    return -4;
+  }
+
+  // Make sure all the data was consumed.
+  if (coded_stream.ConsumedEntireMessage() == false) {
+    FB_LOG_EVERY_MS(ERROR, 1000)
+        << "Orphaned data left after reading protobuf message";
+    return -5;
+  }
+
+  return coded_stream.CurrentPosition();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/43df1506/hbase-native-client/serde/client-deserializer.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-deserializer.h b/hbase-native-client/serde/client-deserializer.h
new file mode 100644
index 0000000..b9664b0
--- /dev/null
+++ b/hbase-native-client/serde/client-deserializer.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/io/IOBuf.h>
+
+// Forward
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
+
+namespace hbase {
+class ClientDeserializer {
+public:
+  int parse_delimited(const folly::IOBuf *buf, google::protobuf::Message *msg);
+};
+
+} // namespace hbase

http://git-wip-us.apache.org/repos/asf/hbase/blob/43df1506/hbase-native-client/serde/client-serializer-test.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc
new file mode 100644
index 0000000..b32b55d
--- /dev/null
+++ b/hbase-native-client/serde/client-serializer-test.cc
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <gtest/gtest.h>
+
+#include <folly/io/Cursor.h>
+
+#include <string>
+
+#include "serde/client-serializer.h"
+#include "if/HBase.pb.h"
+#include "if/RPC.pb.h"
+
+using namespace hbase;
+using namespace hbase::pb;
+using namespace folly;
+using namespace folly::io;
+
+TEST(ClientSerializerTest, PreambleIncludesHBas) {
+  ClientSerializer ser;
+  auto buf = ser.preamble();
+  const char *p = reinterpret_cast<const char *>(buf->data());
+  // Take the first for chars and make sure they are the
+  // magic string
+  EXPECT_EQ("HBas", std::string(p, 4));
+
+  EXPECT_EQ(6, buf->computeChainDataLength());
+}
+
+TEST(ClientSerializerTest, PreambleIncludesVersion) {
+  ClientSerializer ser;
+  auto buf = ser.preamble();
+  EXPECT_EQ(0, static_cast<const uint8_t *>(buf->data())[4]);
+  EXPECT_EQ(80, static_cast<const uint8_t *>(buf->data())[5]);
+}
+
+TEST(ClientSerializerTest, TestHeaderLengthPrefixed) {
+  ClientSerializer ser;
+  auto header = ser.header("elliott");
+
+  // The header should be prefixed by 4 bytes of length.
+  EXPECT_EQ(4, header->length());
+  EXPECT_TRUE(header->length() < header->computeChainDataLength());
+  EXPECT_TRUE(header->isChained());
+
+  // Now make sure the length is correct.
+  Cursor cursor(header.get());
+  auto prefixed_len = cursor.readBE<uint32_t>();
+  EXPECT_EQ(prefixed_len, header->next()->length());
+}
+
+TEST(ClientSerializerTest, TestHeaderDecode) {
+  ClientSerializer ser;
+  auto buf = ser.header("elliott");
+  auto header_buf = buf->next();
+  ConnectionHeader h;
+
+  EXPECT_TRUE(h.ParseFromArray(header_buf->data(), header_buf->length()));
+  EXPECT_EQ("elliott", h.user_info().effective_user());
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/43df1506/hbase-native-client/serde/client-serializer.cc
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-serializer.cc b/hbase-native-client/serde/client-serializer.cc
new file mode 100644
index 0000000..881b6e4
--- /dev/null
+++ b/hbase-native-client/serde/client-serializer.cc
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "serde/client-serializer.h"
+
+#include <folly/io/Cursor.h>
+#include <folly/Logging.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+
+#include "if/HBase.pb.h"
+#include "if/RPC.pb.h"
+
+using namespace hbase;
+
+using folly::IOBuf;
+using folly::io::RWPrivateCursor;
+using google::protobuf::Message;
+using google::protobuf::io::ArrayOutputStream;
+using google::protobuf::io::CodedOutputStream;
+using google::protobuf::io::ZeroCopyOutputStream;
+using std::string;
+using std::unique_ptr;
+
+static const std::string PREAMBLE = "HBas";
+static const std::string INTERFACE = "ClientService";
+static const uint8_t RPC_VERSION = 0;
+static const uint8_t DEFAULT_AUTH_TYPE = 80;
+
+ClientSerializer::ClientSerializer() : auth_type_(DEFAULT_AUTH_TYPE) {}
+
+unique_ptr<IOBuf> ClientSerializer::preamble() {
+  auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2);
+  magic->append(2);
+  RWPrivateCursor c(magic.get());
+  c.skip(4);
+  // Version
+  c.write(RPC_VERSION);
+  // Standard security aka Please don't lie to me.
+  c.write(auth_type_);
+  return magic;
+}
+
+unique_ptr<IOBuf> ClientSerializer::header(const string &user) {
+  pb::ConnectionHeader h;
+
+  // TODO(eclark): Make this not a total lie.
+  h.mutable_user_info()->set_effective_user(user);
+  // The service name that we want to talk to.
+  //
+  // Right now we're completely ignoring the service interface.
+  // That may or may not be the correct thing to do.
+  // It worked for a while with the java client; until it
+  // didn't.
+  h.set_service_name(INTERFACE);
+  return prepend_length(serialize_message(h));
+}
+
+unique_ptr<IOBuf> ClientSerializer::request(const uint32_t call_id,
+                                            const string &method,
+                                            const Message *msg) {
+  pb::RequestHeader rq;
+  rq.set_method_name(method);
+  rq.set_call_id(call_id);
+  rq.set_request_param(msg != nullptr);
+  auto ser_header = serialize_delimited(rq);
+  if (msg != nullptr) {
+    auto ser_req = serialize_delimited(*msg);
+    ser_header->appendChain(std::move(ser_req));
+  }
+
+  return prepend_length(std::move(ser_header));
+}
+
+unique_ptr<IOBuf> ClientSerializer::prepend_length(unique_ptr<IOBuf> msg) {
+  // Java ints are 4 long. So create a buffer that large
+  auto len_buf = IOBuf::create(4);
+  // Then make those bytes visible.
+  len_buf->append(4);
+
+  RWPrivateCursor c(len_buf.get());
+  // Get the size of the data to be pushed out the network.
+  auto size = msg->computeChainDataLength();
+
+  // Write the length to this IOBuf.
+  c.writeBE(static_cast<uint32_t>(size));
+
+  // Then attach the origional to the back of len_buf
+  len_buf->appendChain(std::move(msg));
+  return len_buf;
+}
+
+unique_ptr<IOBuf> ClientSerializer::serialize_delimited(const Message &msg) {
+  // Get the buffer size needed for just the message.
+  int msg_size = msg.ByteSize();
+  int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size;
+
+  // Create a buffer big enough to hold the varint and the object.
+  auto buf = IOBuf::create(buf_size);
+  buf->append(buf_size);
+
+  // Create the array output stream.
+  ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())};
+  // Wrap the ArrayOuputStream in the coded output stream to allow writing
+  // Varint32
+  CodedOutputStream cos{&aos};
+
+  // Write out the size.
+  cos.WriteVarint32(msg_size);
+
+  // Now write the rest out.
+  // We're using the protobuf output streams here to keep track
+  // of where in the output array we are rather than IOBuf.
+  msg.SerializeWithCachedSizesToArray(
+      cos.GetDirectBufferForNBytesAndAdvance(msg_size));
+
+  // Return the buffer.
+  return buf;
+}
+// TODO(eclark): Make this 1 copy.
+unique_ptr<IOBuf> ClientSerializer::serialize_message(const Message &msg) {
+  auto buf = IOBuf::copyBuffer(msg.SerializeAsString());
+  return buf;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/43df1506/hbase-native-client/serde/client-serializer.h
----------------------------------------------------------------------
diff --git a/hbase-native-client/serde/client-serializer.h b/hbase-native-client/serde/client-serializer.h
new file mode 100644
index 0000000..685095d
--- /dev/null
+++ b/hbase-native-client/serde/client-serializer.h
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#pragma once
+
+#include <folly/io/IOBuf.h>
+#include <string>
+#include <cstdint>
+
+// Forward
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
+namespace hbase {
+class Request;
+}
+
+namespace hbase {
+class ClientSerializer {
+public:
+  ClientSerializer();
+  std::unique_ptr<folly::IOBuf> preamble();
+  std::unique_ptr<folly::IOBuf> header(const std::string &user);
+  std::unique_ptr<folly::IOBuf> request(const uint32_t call_id,
+                                        const std::string &method,
+                                        const google::protobuf::Message *msg);
+  std::unique_ptr<folly::IOBuf>
+  serialize_delimited(const google::protobuf::Message &msg);
+
+  std::unique_ptr<folly::IOBuf>
+  serialize_message(const google::protobuf::Message &msg);
+
+  std::unique_ptr<folly::IOBuf>
+  prepend_length(std::unique_ptr<folly::IOBuf> msg);
+
+  uint8_t auth_type_;
+};
+} // namespace hbase


Mime
View raw message