arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-1255: [Plasma] Fix typo in plasma protocol; add DCHECK for ReadXXX in plasma protocol.
Date Wed, 26 Jul 2017 03:40:28 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 2eeaa95d4 -> 676a4a947


ARROW-1255: [Plasma] Fix typo in plasma protocol; add DCHECK for ReadXXX in plasma protocol.

Related to https://github.com/apache/arrow/pull/878, add DCHECK for ReadXXX.

Author: Yeolar <yeolar@gmail.com>

Closes #887 from Yeolar/fixtypo_plasma_and_add_DCHECK and squashes the following commits:

4df63bc [Yeolar] clang-format for too long lines.
143d254 [Yeolar] Update, compile passed.
09ff103 [Yeolar] Fix conflicts.
b951d8d [Yeolar] Merge pull request #1 from apache/master
ebae611 [Yeolar] Fix typo in plasma protocol; add DCHECK for ReadXXX in plasma protocol.


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/676a4a94
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/676a4a94
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/676a4a94

Branch: refs/heads/master
Commit: 676a4a9476b042bd9c7e801c083a49357ff4d4ac
Parents: 2eeaa95
Author: Yeolar <yeolar@gmail.com>
Authored: Tue Jul 25 23:40:21 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Tue Jul 25 23:40:21 2017 -0400

----------------------------------------------------------------------
 cpp/src/plasma/client.cc                   | 16 ++---
 cpp/src/plasma/protocol.cc                 | 84 ++++++++++++++++---------
 cpp/src/plasma/protocol.h                  | 67 +++++++++++---------
 cpp/src/plasma/store.cc                    | 19 +++---
 cpp/src/plasma/test/serialization_tests.cc | 53 +++++++++-------
 5 files changed, 143 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/676a4a94/cpp/src/plasma/client.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index bbbeb55..e14b3d9 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -152,7 +152,7 @@ Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
   RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer));
   ObjectID id;
   PlasmaObject object;
-  RETURN_NOT_OK(ReadCreateReply(buffer.data(), &id, &object));
+  RETURN_NOT_OK(ReadCreateReply(buffer.data(), buffer.size(), &id, &object));
   // If the CreateReply included an error, then the store will not send a file
   // descriptor.
   int fd = recv_fd(store_conn_);
@@ -227,7 +227,7 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
   std::vector<ObjectID> received_object_ids(num_objects);
   std::vector<PlasmaObject> object_data(num_objects);
   PlasmaObject* object;
-  RETURN_NOT_OK(ReadGetReply(buffer.data(), received_object_ids.data(),
+  RETURN_NOT_OK(ReadGetReply(buffer.data(), buffer.size(), received_object_ids.data(),
                              object_data.data(), num_objects));
 
   for (int i = 0; i < num_objects; ++i) {
@@ -356,7 +356,8 @@ Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object)
{
     std::vector<uint8_t> buffer;
     RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, &buffer));
     ObjectID object_id2;
-    RETURN_NOT_OK(ReadContainsReply(buffer.data(), &object_id2, has_object));
+    RETURN_NOT_OK(
+        ReadContainsReply(buffer.data(), buffer.size(), &object_id2, has_object));
   }
   return Status::OK();
 }
@@ -451,7 +452,7 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted)
{
   std::vector<uint8_t> buffer;
   int64_t type;
   RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer));
-  return ReadEvictReply(buffer.data(), num_bytes_evicted);
+  return ReadEvictReply(buffer.data(), buffer.size(), num_bytes_evicted);
 }
 
 Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) {
@@ -524,7 +525,7 @@ Status PlasmaClient::Connect(const std::string& store_socket_name,
   RETURN_NOT_OK(SendConnectRequest(store_conn_));
   std::vector<uint8_t> buffer;
   RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaConnectReply, &buffer));
-  RETURN_NOT_OK(ReadConnectReply(buffer.data(), &store_capacity_));
+  RETURN_NOT_OK(ReadConnectReply(buffer.data(), buffer.size(), &store_capacity_));
   return Status::OK();
 }
 
@@ -564,7 +565,7 @@ Status PlasmaClient::Info(const ObjectID& object_id, int* object_status)
{
   std::vector<uint8_t> buffer;
   RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaStatusReply, &buffer));
   ObjectID id;
-  RETURN_NOT_OK(ReadStatusReply(buffer.data(), &id, object_status, 1));
+  RETURN_NOT_OK(ReadStatusReply(buffer.data(), buffer.size(), &id, object_status, 1));
   ARROW_CHECK(object_id == id);
   return Status::OK();
 }
@@ -586,7 +587,8 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest*
object_req
                                 num_ready_objects, timeout_ms));
   std::vector<uint8_t> buffer;
   RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, &buffer));
-  RETURN_NOT_OK(ReadWaitReply(buffer.data(), object_requests, &num_ready_objects));
+  RETURN_NOT_OK(
+      ReadWaitReply(buffer.data(), buffer.size(), object_requests, &num_ready_objects));
 
   *num_objects_ready = 0;
   for (int i = 0; i < num_object_requests; ++i) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/676a4a94/cpp/src/plasma/protocol.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index 19240bb..77bc8b7 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -62,10 +62,11 @@ Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
   return PlasmaSend(sock, MessageType_PlasmaCreateRequest, &fbb, message);
 }
 
-Status ReadCreateRequest(uint8_t* data, ObjectID* object_id, int64_t* data_size,
-                         int64_t* metadata_size) {
+Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
+                         int64_t* data_size, int64_t* metadata_size) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *data_size = message->data_size();
   *metadata_size = message->metadata_size();
   *object_id = ObjectID::from_binary(message->object_id()->str());
@@ -83,9 +84,11 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
   return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message);
 }
 
-Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object) {
+Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
+                       PlasmaObject* object) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *object_id = ObjectID::from_binary(message->object_id()->str());
   object->handle.store_fd = message->plasma_object()->segment_index();
   object->handle.mmap_size = message->plasma_object()->mmap_size();
@@ -106,9 +109,11 @@ Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest)
{
   return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message);
 }
 
-Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest) {
+Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
+                       unsigned char* digest) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaSealRequest>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *object_id = ObjectID::from_binary(message->object_id()->str());
   ARROW_CHECK(message->digest()->size() == kDigestSize);
   memcpy(digest, message->digest()->data(), kDigestSize);
@@ -122,9 +127,10 @@ Status SendSealReply(int sock, ObjectID object_id, int error) {
   return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message);
 }
 
-Status ReadSealReply(uint8_t* data, ObjectID* object_id) {
+Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaSealReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *object_id = ObjectID::from_binary(message->object_id()->str());
   return plasma_error_status(message->error());
 }
@@ -133,13 +139,14 @@ Status ReadSealReply(uint8_t* data, ObjectID* object_id) {
 
 Status SendReleaseRequest(int sock, ObjectID object_id) {
   flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()));
+  auto message = CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary()));
   return PlasmaSend(sock, MessageType_PlasmaReleaseRequest, &fbb, message);
 }
 
-Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) {
+Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaReleaseRequest>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *object_id = ObjectID::from_binary(message->object_id()->str());
   return Status::OK();
 }
@@ -151,9 +158,10 @@ Status SendReleaseReply(int sock, ObjectID object_id, int error) {
   return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message);
 }
 
-Status ReadReleaseReply(uint8_t* data, ObjectID* object_id) {
+Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *object_id = ObjectID::from_binary(message->object_id()->str());
   return plasma_error_status(message->error());
 }
@@ -166,9 +174,10 @@ Status SendDeleteRequest(int sock, ObjectID object_id) {
   return PlasmaSend(sock, MessageType_PlasmaDeleteRequest, &fbb, message);
 }
 
-Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) {
+Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *object_id = ObjectID::from_binary(message->object_id()->str());
   return Status::OK();
 }
@@ -180,9 +189,10 @@ Status SendDeleteReply(int sock, ObjectID object_id, int error) {
   return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message);
 }
 
-Status ReadDeleteReply(uint8_t* data, ObjectID* object_id) {
+Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *object_id = ObjectID::from_binary(message->object_id()->str());
   return plasma_error_status(message->error());
 }
@@ -196,9 +206,11 @@ Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t
num_objec
   return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message);
 }
 
-Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects) {
+Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[],
+                         int64_t num_objects) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaStatusRequest>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   for (uoffset_t i = 0; i < num_objects; ++i) {
     object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
   }
@@ -214,16 +226,18 @@ Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[],
   return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message);
 }
 
-int64_t ReadStatusReply_num_objects(uint8_t* data) {
+int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   return message->object_ids()->size();
 }
 
-Status ReadStatusReply(uint8_t* data, ObjectID object_ids[], int object_status[],
-                       int64_t num_objects) {
+Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[],
+                       int object_status[], int64_t num_objects) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   for (uoffset_t i = 0; i < num_objects; ++i) {
     object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
   }
@@ -241,9 +255,10 @@ Status SendContainsRequest(int sock, ObjectID object_id) {
   return PlasmaSend(sock, MessageType_PlasmaContainsRequest, &fbb, message);
 }
 
-Status ReadContainsRequest(uint8_t* data, ObjectID* object_id) {
+Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaContainsRequest>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *object_id = ObjectID::from_binary(message->object_id()->str());
   return Status::OK();
 }
@@ -255,9 +270,11 @@ Status SendContainsReply(int sock, ObjectID object_id, bool has_object)
{
   return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message);
 }
 
-Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object) {
+Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
+                         bool* has_object) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaContainsReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *object_id = ObjectID::from_binary(message->object_id()->str());
   *has_object = message->has_object();
   return Status::OK();
@@ -279,9 +296,10 @@ Status SendConnectReply(int sock, int64_t memory_capacity) {
   return PlasmaSend(sock, MessageType_PlasmaConnectReply, &fbb, message);
 }
 
-Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity) {
+Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaConnectReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *memory_capacity = message->memory_capacity();
   return Status::OK();
 }
@@ -294,9 +312,10 @@ Status SendEvictRequest(int sock, int64_t num_bytes) {
   return PlasmaSend(sock, MessageType_PlasmaEvictRequest, &fbb, message);
 }
 
-Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes) {
+Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaEvictRequest>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *num_bytes = message->num_bytes();
   return Status::OK();
 }
@@ -307,9 +326,10 @@ Status SendEvictReply(int sock, int64_t num_bytes) {
   return PlasmaSend(sock, MessageType_PlasmaEvictReply, &fbb, message);
 }
 
-Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) {
+Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaEvictReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   num_bytes = message->num_bytes();
   return Status::OK();
 }
@@ -324,10 +344,11 @@ Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t
num_objects,
   return PlasmaSend(sock, MessageType_PlasmaGetRequest, &fbb, message);
 }
 
-Status ReadGetRequest(uint8_t* data, std::vector<ObjectID>& object_ids,
+Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids,
                       int64_t* timeout_ms) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaGetRequest>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
     auto object_id = message->object_ids()->Get(i)->str();
     object_ids.push_back(ObjectID::from_binary(object_id));
@@ -355,10 +376,11 @@ Status SendGetReply(
   return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message);
 }
 
-Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[],
-                    int64_t num_objects) {
+Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
+                    PlasmaObject plasma_objects[], int64_t num_objects) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   for (uoffset_t i = 0; i < num_objects; ++i) {
     object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
   }
@@ -383,9 +405,10 @@ Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t
num_object
   return PlasmaSend(sock, MessageType_PlasmaFetchRequest, &fbb, message);
 }
 
-Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids) {
+Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids)
{
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaFetchRequest>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
     object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str()));
   }
@@ -410,10 +433,11 @@ Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t
num_re
   return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message);
 }
 
-Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
+Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
                        int64_t* timeout_ms, int* num_ready_objects) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaWaitRequest>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *num_ready_objects = message->num_ready_objects();
   *timeout_ms = message->timeout();
 
@@ -443,11 +467,12 @@ Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
   return PlasmaSend(sock, MessageType_PlasmaWaitReply, &fbb, message);
 }
 
-Status ReadWaitReply(uint8_t* data, ObjectRequest object_requests[],
+Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[],
                      int* num_ready_objects) {
   DCHECK(data);
 
   auto message = flatbuffers::GetRoot<PlasmaWaitReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *num_ready_objects = message->num_ready_objects();
   for (int i = 0; i < *num_ready_objects; i++) {
     object_requests[i].object_id =
@@ -475,9 +500,11 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address,
int po
   return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message);
 }
 
-Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port) {
+Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address,
+                       int* port) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaDataRequest>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   DCHECK(message->object_id()->size() == sizeof(ObjectID));
   *object_id = ObjectID::from_binary(message->object_id()->str());
   *address = strdup(message->address()->c_str());
@@ -493,10 +520,11 @@ Status SendDataReply(int sock, ObjectID object_id, int64_t object_size,
   return PlasmaSend(sock, MessageType_PlasmaDataReply, &fbb, message);
 }
 
-Status ReadDataReply(uint8_t* data, ObjectID* object_id, int64_t* object_size,
-                     int64_t* metadata_size) {
+Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id,
+                     int64_t* object_size, int64_t* metadata_size) {
   DCHECK(data);
   auto message = flatbuffers::GetRoot<PlasmaDataReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
   *object_id = ObjectID::from_binary(message->object_id()->str());
   *object_size = (int64_t)message->object_size();
   *metadata_size = (int64_t)message->metadata_size();

http://git-wip-us.apache.org/repos/asf/arrow/blob/676a4a94/cpp/src/plasma/protocol.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index bab08b6..af4b139 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -28,6 +28,12 @@ namespace plasma {
 
 using arrow::Status;
 
+template <class T>
+bool verify_flatbuffer(T* object, uint8_t* data, size_t size) {
+  flatbuffers::Verifier verifier(data, size);
+  return object->Verify(verifier);
+}
+
 /* Plasma receive message. */
 
 Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* buffer);
@@ -37,29 +43,31 @@ Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>*
buffe
 Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
                          int64_t metadata_size);
 
-Status ReadCreateRequest(uint8_t* data, ObjectID* object_id, int64_t* data_size,
-                         int64_t* metadata_size);
+Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
+                         int64_t* data_size, int64_t* metadata_size);
 
 Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error);
 
-Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object);
+Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
+                       PlasmaObject* object);
 
 /* Plasma Seal message functions. */
 
 Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest);
 
-Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest);
+Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
+                       unsigned char* digest);
 
 Status SendSealReply(int sock, ObjectID object_id, int error);
 
-Status ReadSealReply(uint8_t* data, ObjectID* object_id);
+Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id);
 
 /* Plasma Get message functions. */
 
 Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects,
                       int64_t timeout_ms);
 
-Status ReadGetRequest(uint8_t* data, std::vector<ObjectID>& object_ids,
+Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids,
                       int64_t* timeout_ms);
 
 Status SendGetReply(
@@ -67,91 +75,93 @@ Status SendGetReply(
     std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
     int64_t num_objects);
 
-Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[],
-                    int64_t num_objects);
+Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
+                    PlasmaObject plasma_objects[], int64_t num_objects);
 
 /* Plasma Release message functions. */
 
 Status SendReleaseRequest(int sock, ObjectID object_id);
 
-Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id);
+Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id);
 
 Status SendReleaseReply(int sock, ObjectID object_id, int error);
 
-Status ReadReleaseReply(uint8_t* data, ObjectID* object_id);
+Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id);
 
 /* Plasma Delete message functions. */
 
 Status SendDeleteRequest(int sock, ObjectID object_id);
 
-Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id);
+Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id);
 
 Status SendDeleteReply(int sock, ObjectID object_id, int error);
 
-Status ReadDeleteReply(uint8_t* data, ObjectID* object_id);
+Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id);
 
 /* Satus messages. */
 
 Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
 
-Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects);
+Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[],
+                         int64_t num_objects);
 
 Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[],
                        int64_t num_objects);
 
-int64_t ReadStatusReply_num_objects(uint8_t* data);
+int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size);
 
-Status ReadStatusReply(uint8_t* data, ObjectID object_ids[], int object_status[],
-                       int64_t num_objects);
+Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[],
+                       int object_status[], int64_t num_objects);
 
 /* Plasma Constains message functions. */
 
 Status SendContainsRequest(int sock, ObjectID object_id);
 
-Status ReadContainsRequest(uint8_t* data, ObjectID* object_id);
+Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id);
 
 Status SendContainsReply(int sock, ObjectID object_id, bool has_object);
 
-Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object);
+Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
+                         bool* has_object);
 
 /* Plasma Connect message functions. */
 
 Status SendConnectRequest(int sock);
 
-Status ReadConnectRequest(uint8_t* data);
+Status ReadConnectRequest(uint8_t* data, size_t size);
 
 Status SendConnectReply(int sock, int64_t memory_capacity);
 
-Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity);
+Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity);
 
 /* Plasma Evict message functions (no reply so far). */
 
 Status SendEvictRequest(int sock, int64_t num_bytes);
 
-Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes);
+Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes);
 
 Status SendEvictReply(int sock, int64_t num_bytes);
 
-Status ReadEvictReply(uint8_t* data, int64_t& num_bytes);
+Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes);
 
 /* Plasma Fetch Remote message functions. */
 
 Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
 
-Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids);
+Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids);
 
 /* Plasma Wait message functions. */
 
 Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests,
                        int num_ready_objects, int64_t timeout_ms);
 
-Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
+Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
                        int64_t* timeout_ms, int* num_ready_objects);
 
 Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
                      int num_ready_objects);
 
-Status ReadWaitReply(uint8_t* data, ObjectRequest object_requests[],
+Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[],
                      int* num_ready_objects);
 
 /* Plasma Subscribe message functions. */
@@ -162,13 +172,14 @@ Status SendSubscribeRequest(int sock);
 
 Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port);
 
-Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port);
+Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address,
+                       int* port);
 
 Status SendDataReply(int sock, ObjectID object_id, int64_t object_size,
                      int64_t metadata_size);
 
-Status ReadDataReply(uint8_t* data, ObjectID* object_id, int64_t* object_size,
-                     int64_t* metadata_size);
+Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id,
+                     int64_t* object_size, int64_t* metadata_size);
 
 }  // namespace plasma
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/676a4a94/cpp/src/plasma/store.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 9ceecdc..34adc62 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -553,6 +553,7 @@ Status PlasmaStore::process_message(Client* client) {
   ARROW_CHECK(s.ok() || s.IsIOError());
 
   uint8_t* input = input_buffer_.data();
+  size_t input_size = input_buffer_.size();
   ObjectID object_id;
   PlasmaObject object;
   // TODO(pcm): Get rid of the following.
@@ -563,7 +564,8 @@ Status PlasmaStore::process_message(Client* client) {
     case MessageType_PlasmaCreateRequest: {
       int64_t data_size;
       int64_t metadata_size;
-      RETURN_NOT_OK(ReadCreateRequest(input, &object_id, &data_size, &metadata_size));
+      RETURN_NOT_OK(
+          ReadCreateRequest(input, input_size, &object_id, &data_size, &metadata_size));
       int error_code =
           create_object(object_id, data_size, metadata_size, client, &object);
       HANDLE_SIGPIPE(SendCreateReply(client->fd, object_id, &object, error_code),
@@ -575,15 +577,15 @@ Status PlasmaStore::process_message(Client* client) {
     case MessageType_PlasmaGetRequest: {
       std::vector<ObjectID> object_ids_to_get;
       int64_t timeout_ms;
-      RETURN_NOT_OK(ReadGetRequest(input, object_ids_to_get, &timeout_ms));
+      RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms));
       process_get_request(client, object_ids_to_get, timeout_ms);
     } break;
     case MessageType_PlasmaReleaseRequest:
-      RETURN_NOT_OK(ReadReleaseRequest(input, &object_id));
+      RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id));
       release_object(object_id, client);
       break;
     case MessageType_PlasmaContainsRequest:
-      RETURN_NOT_OK(ReadContainsRequest(input, &object_id));
+      RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id));
       if (contains_object(object_id) == OBJECT_FOUND) {
         HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1), client->fd);
       } else {
@@ -592,13 +594,13 @@ Status PlasmaStore::process_message(Client* client) {
       break;
     case MessageType_PlasmaSealRequest: {
       unsigned char digest[kDigestSize];
-      RETURN_NOT_OK(ReadSealRequest(input, &object_id, &digest[0]));
+      RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest[0]));
       seal_object(object_id, &digest[0]);
     } break;
     case MessageType_PlasmaEvictRequest: {
       // This code path should only be used for testing.
       int64_t num_bytes;
-      RETURN_NOT_OK(ReadEvictRequest(input, &num_bytes));
+      RETURN_NOT_OK(ReadEvictRequest(input, input_size, &num_bytes));
       std::vector<ObjectID> objects_to_evict;
       int64_t num_bytes_evicted =
           eviction_policy_.choose_objects_to_evict(num_bytes, &objects_to_evict);
@@ -688,9 +690,8 @@ int main(int argc, char* argv[]) {
   close(shm_fd);
   if (system_memory > shm_mem_avail) {
     ARROW_LOG(FATAL) << "System memory request exceeds memory available in /dev/shm.
The "
-                        "request is for "
-                     << system_memory << " bytes, and the amount available is
"
-                     << shm_mem_avail
+                        "request is for " << system_memory
+                     << " bytes, and the amount available is " << shm_mem_avail
                      << " bytes. You may be able to free up space by deleting files
in "
                         "/dev/shm. If you are inside a Docker container, you may need to
"
                         "pass "

http://git-wip-us.apache.org/repos/asf/arrow/blob/676a4a94/cpp/src/plasma/test/serialization_tests.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index aca47d3..c76f5ce 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -82,8 +82,8 @@ TEST(PlasmaSerialization, CreateRequest) {
   ObjectID object_id2;
   int64_t data_size2;
   int64_t metadata_size2;
-  ARROW_CHECK_OK(
-      ReadCreateRequest(data.data(), &object_id2, &data_size2, &metadata_size2));
+  ARROW_CHECK_OK(ReadCreateRequest(data.data(), data.size(), &object_id2, &data_size2,
+                                   &metadata_size2));
   ASSERT_EQ(data_size1, data_size2);
   ASSERT_EQ(metadata_size1, metadata_size2);
   ASSERT_EQ(object_id1, object_id2);
@@ -99,7 +99,7 @@ TEST(PlasmaSerialization, CreateReply) {
   ObjectID object_id2;
   PlasmaObject object2;
   memset(&object2, 0, sizeof(object2));
-  ARROW_CHECK_OK(ReadCreateReply(data.data(), &object_id2, &object2));
+  ARROW_CHECK_OK(ReadCreateReply(data.data(), data.size(), &object_id2, &object2));
   ASSERT_EQ(object_id1, object_id2);
   ASSERT_EQ(memcmp(&object1, &object2, sizeof(object1)), 0);
   close(fd);
@@ -114,7 +114,7 @@ TEST(PlasmaSerialization, SealRequest) {
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaSealRequest);
   ObjectID object_id2;
   unsigned char digest2[kDigestSize];
-  ARROW_CHECK_OK(ReadSealRequest(data.data(), &object_id2, &digest2[0]));
+  ARROW_CHECK_OK(ReadSealRequest(data.data(), data.size(), &object_id2, &digest2[0]));
   ASSERT_EQ(object_id1, object_id2);
   ASSERT_EQ(memcmp(&digest1[0], &digest2[0], kDigestSize), 0);
   close(fd);
@@ -126,7 +126,7 @@ TEST(PlasmaSerialization, SealReply) {
   ARROW_CHECK_OK(SendSealReply(fd, object_id1, PlasmaError_ObjectExists));
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaSealReply);
   ObjectID object_id2;
-  Status s = ReadSealReply(data.data(), &object_id2);
+  Status s = ReadSealReply(data.data(), data.size(), &object_id2);
   ASSERT_EQ(object_id1, object_id2);
   ASSERT_TRUE(s.IsPlasmaObjectExists());
   close(fd);
@@ -142,7 +142,8 @@ TEST(PlasmaSerialization, GetRequest) {
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaGetRequest);
   std::vector<ObjectID> object_ids_return;
   int64_t timeout_ms_return;
-  ARROW_CHECK_OK(ReadGetRequest(data.data(), object_ids_return, &timeout_ms_return));
+  ARROW_CHECK_OK(
+      ReadGetRequest(data.data(), data.size(), object_ids_return, &timeout_ms_return));
   ASSERT_EQ(object_ids[0], object_ids_return[0]);
   ASSERT_EQ(object_ids[1], object_ids_return[1]);
   ASSERT_EQ(timeout_ms, timeout_ms_return);
@@ -162,8 +163,8 @@ TEST(PlasmaSerialization, GetReply) {
   ObjectID object_ids_return[2];
   PlasmaObject plasma_objects_return[2];
   memset(&plasma_objects_return, 0, sizeof(plasma_objects_return));
-  ARROW_CHECK_OK(
-      ReadGetReply(data.data(), object_ids_return, &plasma_objects_return[0], 2));
+  ARROW_CHECK_OK(ReadGetReply(data.data(), data.size(), object_ids_return,
+                              &plasma_objects_return[0], 2));
   ASSERT_EQ(object_ids[0], object_ids_return[0]);
   ASSERT_EQ(object_ids[1], object_ids_return[1]);
   ASSERT_EQ(memcmp(&plasma_objects[object_ids[0]], &plasma_objects_return[0],
@@ -182,7 +183,7 @@ TEST(PlasmaSerialization, ReleaseRequest) {
   std::vector<uint8_t> data =
       read_message_from_file(fd, MessageType_PlasmaReleaseRequest);
   ObjectID object_id2;
-  ARROW_CHECK_OK(ReadReleaseRequest(data.data(), &object_id2));
+  ARROW_CHECK_OK(ReadReleaseRequest(data.data(), data.size(), &object_id2));
   ASSERT_EQ(object_id1, object_id2);
   close(fd);
 }
@@ -193,7 +194,7 @@ TEST(PlasmaSerialization, ReleaseReply) {
   ARROW_CHECK_OK(SendReleaseReply(fd, object_id1, PlasmaError_ObjectExists));
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaReleaseReply);
   ObjectID object_id2;
-  Status s = ReadReleaseReply(data.data(), &object_id2);
+  Status s = ReadReleaseReply(data.data(), data.size(), &object_id2);
   ASSERT_EQ(object_id1, object_id2);
   ASSERT_TRUE(s.IsPlasmaObjectExists());
   close(fd);
@@ -205,7 +206,7 @@ TEST(PlasmaSerialization, DeleteRequest) {
   ARROW_CHECK_OK(SendDeleteRequest(fd, object_id1));
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaDeleteRequest);
   ObjectID object_id2;
-  ARROW_CHECK_OK(ReadDeleteRequest(data.data(), &object_id2));
+  ARROW_CHECK_OK(ReadDeleteRequest(data.data(), data.size(), &object_id2));
   ASSERT_EQ(object_id1, object_id2);
   close(fd);
 }
@@ -217,7 +218,7 @@ TEST(PlasmaSerialization, DeleteReply) {
   ARROW_CHECK_OK(SendDeleteReply(fd, object_id1, error1));
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaDeleteReply);
   ObjectID object_id2;
-  Status s = ReadDeleteReply(data.data(), &object_id2);
+  Status s = ReadDeleteReply(data.data(), data.size(), &object_id2);
   ASSERT_EQ(object_id1, object_id2);
   ASSERT_TRUE(s.IsPlasmaObjectExists());
   close(fd);
@@ -232,7 +233,8 @@ TEST(PlasmaSerialization, StatusRequest) {
   ARROW_CHECK_OK(SendStatusRequest(fd, object_ids, num_objects));
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaStatusRequest);
   ObjectID object_ids_read[num_objects];
-  ARROW_CHECK_OK(ReadStatusRequest(data.data(), object_ids_read, num_objects));
+  ARROW_CHECK_OK(
+      ReadStatusRequest(data.data(), data.size(), object_ids_read, num_objects));
   ASSERT_EQ(object_ids[0], object_ids_read[0]);
   ASSERT_EQ(object_ids[1], object_ids_read[1]);
   close(fd);
@@ -246,11 +248,11 @@ TEST(PlasmaSerialization, StatusReply) {
   int object_statuses[2] = {42, 43};
   ARROW_CHECK_OK(SendStatusReply(fd, object_ids, object_statuses, 2));
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaStatusReply);
-  int64_t num_objects = ReadStatusReply_num_objects(data.data());
+  int64_t num_objects = ReadStatusReply_num_objects(data.data(), data.size());
   ObjectID object_ids_read[num_objects];
   int object_statuses_read[num_objects];
-  ARROW_CHECK_OK(
-      ReadStatusReply(data.data(), object_ids_read, object_statuses_read, num_objects));
+  ARROW_CHECK_OK(ReadStatusReply(data.data(), data.size(), object_ids_read,
+                                 object_statuses_read, num_objects));
   ASSERT_EQ(object_ids[0], object_ids_read[0]);
   ASSERT_EQ(object_ids[1], object_ids_read[1]);
   ASSERT_EQ(object_statuses[0], object_statuses_read[0]);
@@ -264,7 +266,7 @@ TEST(PlasmaSerialization, EvictRequest) {
   ARROW_CHECK_OK(SendEvictRequest(fd, num_bytes));
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaEvictRequest);
   int64_t num_bytes_received;
-  ARROW_CHECK_OK(ReadEvictRequest(data.data(), &num_bytes_received));
+  ARROW_CHECK_OK(ReadEvictRequest(data.data(), data.size(), &num_bytes_received));
   ASSERT_EQ(num_bytes, num_bytes_received);
   close(fd);
 }
@@ -275,7 +277,7 @@ TEST(PlasmaSerialization, EvictReply) {
   ARROW_CHECK_OK(SendEvictReply(fd, num_bytes));
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaEvictReply);
   int64_t num_bytes_received;
-  ARROW_CHECK_OK(ReadEvictReply(data.data(), num_bytes_received));
+  ARROW_CHECK_OK(ReadEvictReply(data.data(), data.size(), num_bytes_received));
   ASSERT_EQ(num_bytes, num_bytes_received);
   close(fd);
 }
@@ -288,7 +290,7 @@ TEST(PlasmaSerialization, FetchRequest) {
   ARROW_CHECK_OK(SendFetchRequest(fd, object_ids, 2));
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaFetchRequest);
   std::vector<ObjectID> object_ids_read;
-  ARROW_CHECK_OK(ReadFetchRequest(data.data(), object_ids_read));
+  ARROW_CHECK_OK(ReadFetchRequest(data.data(), data.size(), object_ids_read));
   ASSERT_EQ(object_ids[0], object_ids_read[0]);
   ASSERT_EQ(object_ids[1], object_ids_read[1]);
   close(fd);
@@ -310,8 +312,8 @@ TEST(PlasmaSerialization, WaitRequest) {
   int num_ready_objects_out;
   int64_t timeout_ms_read;
   ObjectRequestMap object_requests_out;
-  ARROW_CHECK_OK(ReadWaitRequest(data.data(), object_requests_out, &timeout_ms_read,
-                                 &num_ready_objects_out));
+  ARROW_CHECK_OK(ReadWaitRequest(data.data(), data.size(), object_requests_out,
+                                 &timeout_ms_read, &num_ready_objects_out));
   ASSERT_EQ(num_objects_in, object_requests_out.size());
   ASSERT_EQ(num_ready_objects_out, num_ready_objects_in);
   for (int i = 0; i < num_objects_in; i++) {
@@ -340,7 +342,8 @@ TEST(PlasmaSerialization, WaitReply) {
   std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaWaitReply);
   ObjectRequest objects_out[2];
   int num_objects_out;
-  ARROW_CHECK_OK(ReadWaitReply(data.data(), &objects_out[0], &num_objects_out));
+  ARROW_CHECK_OK(
+      ReadWaitReply(data.data(), data.size(), &objects_out[0], &num_objects_out));
   ASSERT_EQ(num_objects_in, num_objects_out);
   for (int i = 0; i < num_objects_out; i++) {
     /* Each object request must appear exactly once. */
@@ -364,7 +367,8 @@ TEST(PlasmaSerialization, DataRequest) {
   ObjectID object_id2;
   char* address2;
   int port2;
-  ARROW_CHECK_OK(ReadDataRequest(data.data(), &object_id2, &address2, &port2));
+  ARROW_CHECK_OK(
+      ReadDataRequest(data.data(), data.size(), &object_id2, &address2, &port2));
   ASSERT_EQ(object_id1, object_id2);
   ASSERT_EQ(strcmp(address1, address2), 0);
   ASSERT_EQ(port1, port2);
@@ -383,7 +387,8 @@ TEST(PlasmaSerialization, DataReply) {
   ObjectID object_id2;
   int64_t object_size2;
   int64_t metadata_size2;
-  ARROW_CHECK_OK(ReadDataReply(data.data(), &object_id2, &object_size2, &metadata_size2));
+  ARROW_CHECK_OK(ReadDataReply(data.data(), data.size(), &object_id2, &object_size2,
+                               &metadata_size2));
   ASSERT_EQ(object_id1, object_id2);
   ASSERT_EQ(object_size1, object_size2);
   ASSERT_EQ(metadata_size1, metadata_size2);


Mime
View raw message