arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [13/14] arrow git commit: [C++] Restore Plasma source tree after 0.5.0 release
Date Sun, 23 Jul 2017 18:39:20 GMT
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/fling.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/fling.cc b/cpp/src/plasma/fling.cc
new file mode 100644
index 0000000..79da4f4
--- /dev/null
+++ b/cpp/src/plasma/fling.cc
@@ -0,0 +1,90 @@
+// Copyright 2013 Sharvil Nanavati
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "plasma/fling.h"
+
+#include <string.h>
+
+void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len) {
+  iov->iov_base = buf;
+  iov->iov_len = 1;
+
+  msg->msg_iov = iov;
+  msg->msg_iovlen = 1;
+  msg->msg_control = buf;
+  msg->msg_controllen = buf_len;
+  msg->msg_name = NULL;
+  msg->msg_namelen = 0;
+}
+
+int send_fd(int conn, int fd) {
+  struct msghdr msg;
+  struct iovec iov;
+  char buf[CMSG_SPACE(sizeof(int))];
+  memset(&buf, 0, CMSG_SPACE(sizeof(int)));
+
+  init_msg(&msg, &iov, buf, sizeof(buf));
+
+  struct cmsghdr* header = CMSG_FIRSTHDR(&msg);
+  header->cmsg_level = SOL_SOCKET;
+  header->cmsg_type = SCM_RIGHTS;
+  header->cmsg_len = CMSG_LEN(sizeof(int));
+  *reinterpret_cast<int*>(CMSG_DATA(header)) = fd;
+
+  // Send file descriptor.
+  ssize_t r = sendmsg(conn, &msg, 0);
+  if (r >= 0) {
+    return 0;
+  } else {
+    return static_cast<int>(r);
+  }
+}
+
+int recv_fd(int conn) {
+  struct msghdr msg;
+  struct iovec iov;
+  char buf[CMSG_SPACE(sizeof(int))];
+  init_msg(&msg, &iov, buf, sizeof(buf));
+
+  if (recvmsg(conn, &msg, 0) == -1) return -1;
+
+  int found_fd = -1;
+  int oh_noes = 0;
+  for (struct cmsghdr* header = CMSG_FIRSTHDR(&msg); header != NULL;
+       header = CMSG_NXTHDR(&msg, header))
+    if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) {
+      ssize_t count =
+          (header->cmsg_len - (CMSG_DATA(header) - (unsigned char*)header)) / sizeof(int);
+      for (int i = 0; i < count; ++i) {
+        int fd = (reinterpret_cast<int*>(CMSG_DATA(header)))[i];
+        if (found_fd == -1) {
+          found_fd = fd;
+        } else {
+          close(fd);
+          oh_noes = 1;
+        }
+      }
+    }
+
+  // The sender sent us more than one file descriptor. We've closed
+  // them all to prevent fd leaks but notify the caller that we got
+  // a bad message.
+  if (oh_noes) {
+    close(found_fd);
+    errno = EBADMSG;
+    return -1;
+  }
+
+  return found_fd;
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/fling.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/fling.h b/cpp/src/plasma/fling.h
new file mode 100644
index 0000000..78ac9d1
--- /dev/null
+++ b/cpp/src/plasma/fling.h
@@ -0,0 +1,52 @@
+// Copyright 2013 Sharvil Nanavati
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// FLING: Exchanging file descriptors over sockets
+//
+// This is a little library for sending file descriptors over a socket
+// between processes. The reason for doing that (as opposed to using
+// filenames to share the files) is so (a) no files remain in the
+// filesystem after all the processes terminate, (b) to make sure that
+// there are no name collisions and (c) to be able to control who has
+// access to the data.
+//
+// Most of the code is from https://github.com/sharvil/flingfd
+
+#include <errno.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+// This is neccessary for Mac OS X, see http://www.apuebook.com/faqs2e.html
+// (10).
+#if !defined(CMSG_SPACE) && !defined(CMSG_LEN)
+#define CMSG_SPACE(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + __DARWIN_ALIGN32(len))
+#define CMSG_LEN(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + (len))
+#endif
+
+void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len);
+
+// Send a file descriptor over a unix domain socket.
+//
+// @param conn Unix domain socket to send the file descriptor over.
+// @param fd File descriptor to send over.
+// @return Status code which is < 0 on failure.
+int send_fd(int conn, int fd);
+
+// Receive a file descriptor over a unix domain socket.
+//
+// @param conn Unix domain socket to receive the file descriptor from.
+// @return File descriptor or a value < 0 on failure.
+int recv_fd(int conn);

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/format/.gitignore
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/format/.gitignore b/cpp/src/plasma/format/.gitignore
new file mode 100644
index 0000000..b2ddb05
--- /dev/null
+++ b/cpp/src/plasma/format/.gitignore
@@ -0,0 +1 @@
+*_generated.h

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/format/common.fbs
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/format/common.fbs b/cpp/src/plasma/format/common.fbs
new file mode 100644
index 0000000..4d7d285
--- /dev/null
+++ b/cpp/src/plasma/format/common.fbs
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Object information data structure.
+table ObjectInfo {
+  // Object ID of this object.
+  object_id: string;
+  // Number of bytes the content of this object occupies in memory.
+  data_size: long;
+  // Number of bytes the metadata of this object occupies in memory.
+  metadata_size: long;
+  // Unix epoch of when this object was created.
+  create_time: long;
+  // How long creation of this object took.
+  construct_duration: long;
+  // Hash of the object content.
+  digest: string;
+  // Specifies if this object was deleted or added.
+  is_deletion: bool;
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/format/plasma.fbs
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
new file mode 100644
index 0000000..23782ad
--- /dev/null
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Plasma protocol specification
+
+enum MessageType:int {
+  // Create a new object.
+  PlasmaCreateRequest = 1,
+  PlasmaCreateReply,
+  // Seal an object.
+  PlasmaSealRequest,
+  PlasmaSealReply,
+  // Get an object that is stored on the local Plasma store.
+  PlasmaGetRequest,
+  PlasmaGetReply,
+  // Release an object.
+  PlasmaReleaseRequest,
+  PlasmaReleaseReply,
+  // Delete an object.
+  PlasmaDeleteRequest,
+  PlasmaDeleteReply,
+  // Get status of an object.
+  PlasmaStatusRequest,
+  PlasmaStatusReply,
+  // See if the store contains an object (will be deprecated).
+  PlasmaContainsRequest,
+  PlasmaContainsReply,
+  // Get information for a newly connecting client.
+  PlasmaConnectRequest,
+  PlasmaConnectReply,
+  // Make room for new objects in the plasma store.
+  PlasmaEvictRequest,
+  PlasmaEvictReply,
+  // Fetch objects from remote Plasma stores.
+  PlasmaFetchRequest,
+  // Wait for objects to be ready either from local or remote Plasma stores.
+  PlasmaWaitRequest,
+  PlasmaWaitReply,
+  // Subscribe to a list of objects or to all objects.
+  PlasmaSubscribeRequest,
+  // Unsubscribe.
+  PlasmaUnsubscribeRequest,
+  // Sending and receiving data.
+  // PlasmaDataRequest initiates sending the data, there will be one
+  // such message per data transfer.
+  PlasmaDataRequest,
+  // PlasmaDataReply contains the actual data and is sent back to the
+  // object store that requested the data. For each transfer, multiple
+  // reply messages get sent. Each one contains a fixed number of bytes.
+  PlasmaDataReply,
+  // Object notifications.
+  PlasmaNotification
+}
+
+enum PlasmaError:int {
+  // Operation was successful.
+  OK,
+  // Trying to create an object that already exists.
+  ObjectExists,
+  // Trying to access an object that doesn't exist.
+  ObjectNonexistent,
+  // Trying to create an object but there isn't enough space in the store.
+  OutOfMemory
+}
+
+// Plasma store messages
+
+struct PlasmaObjectSpec {
+  // Index of the memory segment (= memory mapped file) that
+  // this object is allocated in.
+  segment_index: int;
+  // Size in bytes of this segment (needed to call mmap).
+  mmap_size: ulong;
+  // The offset in bytes in the memory mapped file of the data.
+  data_offset: ulong;
+  // The size in bytes of the data.
+  data_size: ulong;
+  // The offset in bytes in the memory mapped file of the metadata.
+  metadata_offset: ulong;
+  // The size in bytes of the metadata.
+  metadata_size: ulong;
+}
+
+table PlasmaCreateRequest {
+  // ID of the object to be created.
+  object_id: string;
+  // The size of the object's data in bytes.
+  data_size: ulong;
+  // The size of the object's metadata in bytes.
+  metadata_size: ulong;
+}
+
+table PlasmaCreateReply {
+  // ID of the object that was created.
+  object_id: string;
+  // The object that is returned with this reply.
+  plasma_object: PlasmaObjectSpec;
+  // Error that occurred for this call.
+  error: PlasmaError;
+}
+
+table PlasmaSealRequest {
+  // ID of the object to be sealed.
+  object_id: string;
+  // Hash of the object data.
+  digest: string;
+}
+
+table PlasmaSealReply {
+  // ID of the object that was sealed.
+  object_id: string;
+  // Error code.
+  error: PlasmaError;
+}
+
+table PlasmaGetRequest {
+  // IDs of the objects stored at local Plasma store we are getting.
+  object_ids: [string];
+  // The number of milliseconds before the request should timeout.
+  timeout_ms: long;
+}
+
+table PlasmaGetReply {
+  // IDs of the objects being returned.
+  // This number can be smaller than the number of requested
+  // objects if not all requested objects are stored and sealed
+  // in the local Plasma store.
+  object_ids: [string];
+  // Plasma object information, in the same order as their IDs.
+  plasma_objects: [PlasmaObjectSpec];
+  // The number of elements in both object_ids and plasma_objects arrays must agree.
+}
+
+table PlasmaReleaseRequest {
+  // ID of the object to be released.
+  object_id: string;
+}
+
+table PlasmaReleaseReply {
+  // ID of the object that was released.
+  object_id: string;
+  // Error code.
+  error: PlasmaError;
+}
+
+table PlasmaDeleteRequest {
+  // ID of the object to be deleted.
+  object_id: string;
+}
+
+table PlasmaDeleteReply {
+  // ID of the object that was deleted.
+  object_id: string;
+  // Error code.
+  error: PlasmaError;
+}
+
+table PlasmaStatusRequest {
+  // IDs of the objects stored at local Plasma store we request the status of.
+  object_ids: [string];
+}
+
+enum ObjectStatus:int {
+  // Object is stored in the local Plasma Store.
+  Local = 1,
+  // Object is stored on a remote Plasma store, and it is not stored on the
+  // local Plasma Store.
+  Remote,
+  // Object is not stored in the system.
+  Nonexistent,
+  // Object is currently transferred from a remote Plasma store the the local
+  // Plasma Store.
+  Transfer
+}
+
+table PlasmaStatusReply {
+  // IDs of the objects being returned.
+  object_ids: [string];
+  // Status of the object.
+  status: [ObjectStatus];
+}
+
+// PlasmaContains is a subset of PlasmaStatus which does not
+// involve the plasma manager, only the store. We should consider
+// unifying them in the future and deprecating PlasmaContains.
+
+table PlasmaContainsRequest {
+  // ID of the object we are querying.
+  object_id: string;
+}
+
+table PlasmaContainsReply {
+  // ID of the object we are querying.
+  object_id: string;
+  // 1 if the object is in the store and 0 otherwise.
+  has_object: int;
+}
+
+// PlasmaConnect is used by a plasma client the first time it connects with the
+// store. This is not really necessary, but is used to get some information
+// about the store such as its memory capacity.
+
+table PlasmaConnectRequest {
+}
+
+table PlasmaConnectReply {
+  // The memory capacity of the store.
+  memory_capacity: long;
+}
+
+table PlasmaEvictRequest {
+  // Number of bytes that shall be freed.
+  num_bytes: ulong;
+}
+
+table PlasmaEvictReply {
+  // Number of bytes that have been freed.
+  num_bytes: ulong;
+}
+
+table PlasmaFetchRequest {
+  // IDs of objects to be gotten.
+  object_ids: [string];
+}
+
+table ObjectRequestSpec {
+  // ID of the object.
+  object_id: string;
+  // The type of the object. This specifies whether we
+  // will be waiting for an object store in the local or
+  // global Plasma store.
+  type: int;
+}
+
+table PlasmaWaitRequest {
+  // Array of object requests whose status we are asking for.
+  object_requests: [ObjectRequestSpec];
+  // Number of objects expected to be returned, if available.
+  num_ready_objects: int;
+  // timeout
+  timeout: long;
+}
+
+table ObjectReply {
+  // ID of the object.
+  object_id: string;
+  // The object status. This specifies where the object is stored.
+  status: int;
+}
+
+table PlasmaWaitReply {
+  // Array of object requests being returned.
+  object_requests: [ObjectReply];
+  // Number of objects expected to be returned, if available.
+  num_ready_objects: int;
+}
+
+table PlasmaSubscribeRequest {
+}
+
+table PlasmaDataRequest {
+  // ID of the object that is requested.
+  object_id: string;
+  // The host address where the data shall be sent to.
+  address: string;
+  // The port of the manager the data shall be sent to.
+  port: int;
+}
+
+table PlasmaDataReply {
+  // ID of the object that will be sent.
+  object_id: string;
+  // Size of the object data in bytes.
+  object_size: ulong;
+  // Size of the metadata in bytes.
+  metadata_size: ulong;
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/io.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc
new file mode 100644
index 0000000..5875ebb
--- /dev/null
+++ b/cpp/src/plasma/io.cc
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/io.h"
+
+#include "plasma/common.h"
+
+using arrow::Status;
+
+/* Number of times we try binding to a socket. */
+#define NUM_BIND_ATTEMPTS 5
+#define BIND_TIMEOUT_MS 100
+
+/* Number of times we try connecting to a socket. */
+#define NUM_CONNECT_ATTEMPTS 50
+#define CONNECT_TIMEOUT_MS 100
+
+Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
+  ssize_t nbytes = 0;
+  size_t bytesleft = length;
+  size_t offset = 0;
+  while (bytesleft > 0) {
+    /* While we haven't written the whole message, write to the file descriptor,
+     * advance the cursor, and decrease the amount left to write. */
+    nbytes = write(fd, cursor + offset, bytesleft);
+    if (nbytes < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; }
+      return Status::IOError(std::string(strerror(errno)));
+    } else if (nbytes == 0) {
+      return Status::IOError("Encountered unexpected EOF");
+    }
+    ARROW_CHECK(nbytes > 0);
+    bytesleft -= nbytes;
+    offset += nbytes;
+  }
+
+  return Status::OK();
+}
+
+Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes) {
+  int64_t version = PLASMA_PROTOCOL_VERSION;
+  RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)));
+  RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&type), sizeof(type)));
+  RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&length), sizeof(length)));
+  return WriteBytes(fd, bytes, length * sizeof(char));
+}
+
+Status ReadBytes(int fd, uint8_t* cursor, size_t length) {
+  ssize_t nbytes = 0;
+  /* Termination condition: EOF or read 'length' bytes total. */
+  size_t bytesleft = length;
+  size_t offset = 0;
+  while (bytesleft > 0) {
+    nbytes = read(fd, cursor + offset, bytesleft);
+    if (nbytes < 0) {
+      if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; }
+      return Status::IOError(std::string(strerror(errno)));
+    } else if (0 == nbytes) {
+      return Status::IOError("Encountered unexpected EOF");
+    }
+    ARROW_CHECK(nbytes > 0);
+    bytesleft -= nbytes;
+    offset += nbytes;
+  }
+
+  return Status::OK();
+}
+
+Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer) {
+  int64_t version;
+  RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)),
+      *type = DISCONNECT_CLIENT);
+  ARROW_CHECK(version == PLASMA_PROTOCOL_VERSION) << "version = " << version;
+  size_t length;
+  RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(type), sizeof(*type)),
+      *type = DISCONNECT_CLIENT);
+  RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&length), sizeof(length)),
+      *type = DISCONNECT_CLIENT);
+  if (length > buffer->size()) { buffer->resize(length); }
+  RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length), *type = DISCONNECT_CLIENT);
+  return Status::OK();
+}
+
+int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
+  struct sockaddr_un socket_address;
+  int socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+  if (socket_fd < 0) {
+    ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname;
+    return -1;
+  }
+  /* Tell the system to allow the port to be reused. */
+  int on = 1;
+  if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&on),
+          sizeof(on)) < 0) {
+    ARROW_LOG(ERROR) << "setsockopt failed for pathname " << pathname;
+    close(socket_fd);
+    return -1;
+  }
+
+  unlink(pathname.c_str());
+  memset(&socket_address, 0, sizeof(socket_address));
+  socket_address.sun_family = AF_UNIX;
+  if (pathname.size() + 1 > sizeof(socket_address.sun_path)) {
+    ARROW_LOG(ERROR) << "Socket pathname is too long.";
+    close(socket_fd);
+    return -1;
+  }
+  strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1);
+
+  if (bind(socket_fd, (struct sockaddr*)&socket_address, sizeof(socket_address)) != 0) {
+    ARROW_LOG(ERROR) << "Bind failed for pathname " << pathname;
+    close(socket_fd);
+    return -1;
+  }
+  if (shall_listen && listen(socket_fd, 128) == -1) {
+    ARROW_LOG(ERROR) << "Could not listen to socket " << pathname;
+    close(socket_fd);
+    return -1;
+  }
+  return socket_fd;
+}
+
+int connect_ipc_sock_retry(
+    const std::string& pathname, int num_retries, int64_t timeout) {
+  /* Pick the default values if the user did not specify. */
+  if (num_retries < 0) { num_retries = NUM_CONNECT_ATTEMPTS; }
+  if (timeout < 0) { timeout = CONNECT_TIMEOUT_MS; }
+
+  int fd = -1;
+  for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) {
+    fd = connect_ipc_sock(pathname);
+    if (fd >= 0) { break; }
+    if (num_attempts == 0) {
+      ARROW_LOG(ERROR) << "Connection to socket failed for pathname " << pathname;
+    }
+    /* Sleep for timeout milliseconds. */
+    usleep(static_cast<int>(timeout * 1000));
+  }
+  /* If we could not connect to the socket, exit. */
+  if (fd == -1) { ARROW_LOG(FATAL) << "Could not connect to socket " << pathname; }
+  return fd;
+}
+
+int connect_ipc_sock(const std::string& pathname) {
+  struct sockaddr_un socket_address;
+  int socket_fd;
+
+  socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
+  if (socket_fd < 0) {
+    ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname;
+    return -1;
+  }
+
+  memset(&socket_address, 0, sizeof(socket_address));
+  socket_address.sun_family = AF_UNIX;
+  if (pathname.size() + 1 > sizeof(socket_address.sun_path)) {
+    ARROW_LOG(ERROR) << "Socket pathname is too long.";
+    return -1;
+  }
+  strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1);
+
+  if (connect(socket_fd, (struct sockaddr*)&socket_address, sizeof(socket_address)) !=
+      0) {
+    close(socket_fd);
+    return -1;
+  }
+
+  return socket_fd;
+}
+
+int AcceptClient(int socket_fd) {
+  int client_fd = accept(socket_fd, NULL, NULL);
+  if (client_fd < 0) {
+    ARROW_LOG(ERROR) << "Error reading from socket.";
+    return -1;
+  }
+  return client_fd;
+}
+
+uint8_t* read_message_async(int sock) {
+  int64_t size;
+  Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size), sizeof(int64_t));
+  if (!s.ok()) {
+    /* The other side has closed the socket. */
+    ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
+    close(sock);
+    return NULL;
+  }
+  uint8_t* message = reinterpret_cast<uint8_t*>(malloc(size));
+  s = ReadBytes(sock, message, size);
+  if (!s.ok()) {
+    /* The other side has closed the socket. */
+    ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
+    close(sock);
+    return NULL;
+  }
+  return message;
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/io.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h
new file mode 100644
index 0000000..43c3fb5
--- /dev/null
+++ b/cpp/src/plasma/io.h
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_IO_H
+#define PLASMA_IO_H
+
+#include <inttypes.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include <string>
+#include <vector>
+
+#include "arrow/status.h"
+
+// TODO(pcm): Replace our own custom message header (message type,
+// message length, plasma protocol verion) with one that is serialized
+// using flatbuffers.
+#define PLASMA_PROTOCOL_VERSION 0x0000000000000000
+#define DISCONNECT_CLIENT 0
+
+arrow::Status WriteBytes(int fd, uint8_t* cursor, size_t length);
+
+arrow::Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes);
+
+arrow::Status ReadBytes(int fd, uint8_t* cursor, size_t length);
+
+arrow::Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer);
+
+int bind_ipc_sock(const std::string& pathname, bool shall_listen);
+
+int connect_ipc_sock(const std::string& pathname);
+
+int connect_ipc_sock_retry(const std::string& pathname, int num_retries, int64_t timeout);
+
+int AcceptClient(int socket_fd);
+
+uint8_t* read_message_async(int sock);
+
+#endif  // PLASMA_IO_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/malloc.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc
new file mode 100644
index 0000000..97c9a16
--- /dev/null
+++ b/cpp/src/plasma/malloc.cc
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/malloc.h"
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/mman.h>
+#include <unistd.h>
+
+#include <unordered_map>
+
+#include "plasma/common.h"
+
+extern "C" {
+void* fake_mmap(size_t);
+int fake_munmap(void*, int64_t);
+
+#define MMAP(s) fake_mmap(s)
+#define MUNMAP(a, s) fake_munmap(a, s)
+#define DIRECT_MMAP(s) fake_mmap(s)
+#define DIRECT_MUNMAP(a, s) fake_munmap(a, s)
+#define USE_DL_PREFIX
+#define HAVE_MORECORE 0
+#define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T
+#define DEFAULT_GRANULARITY ((size_t)128U * 1024U)
+
+#include "thirdparty/dlmalloc.c"  // NOLINT
+
+#undef MMAP
+#undef MUNMAP
+#undef DIRECT_MMAP
+#undef DIRECT_MUNMAP
+#undef USE_DL_PREFIX
+#undef HAVE_MORECORE
+#undef DEFAULT_GRANULARITY
+}
+
+struct mmap_record {
+  int fd;
+  int64_t size;
+};
+
+namespace {
+
+/** Hashtable that contains one entry per segment that we got from the OS
+ *  via mmap. Associates the address of that segment with its file descriptor
+ *  and size. */
+std::unordered_map<void*, mmap_record> mmap_records;
+
+} /* namespace */
+
+constexpr int GRANULARITY_MULTIPLIER = 2;
+
+static void* pointer_advance(void* p, ptrdiff_t n) {
+  return (unsigned char*)p + n;
+}
+
+static void* pointer_retreat(void* p, ptrdiff_t n) {
+  return (unsigned char*)p - n;
+}
+
+static ptrdiff_t pointer_distance(void const* pfrom, void const* pto) {
+  return (unsigned char const*)pto - (unsigned char const*)pfrom;
+}
+
+/* Create a buffer. This is creating a temporary file and then
+ * immediately unlinking it so we do not leave traces in the system. */
+int create_buffer(int64_t size) {
+  int fd;
+#ifdef _WIN32
+  if (!CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE,
+          (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), (DWORD)(uint64_t)size,
+          NULL)) {
+    fd = -1;
+  }
+#else
+#ifdef __linux__
+  constexpr char file_template[] = "/dev/shm/plasmaXXXXXX";
+#else
+  constexpr char file_template[] = "/tmp/plasmaXXXXXX";
+#endif
+  char file_name[32];
+  strncpy(file_name, file_template, 32);
+  fd = mkstemp(file_name);
+  if (fd < 0) return -1;
+  FILE* file = fdopen(fd, "a+");
+  if (!file) {
+    close(fd);
+    return -1;
+  }
+  if (unlink(file_name) != 0) {
+    ARROW_LOG(FATAL) << "unlink error";
+    return -1;
+  }
+  if (ftruncate(fd, (off_t)size) != 0) {
+    ARROW_LOG(FATAL) << "ftruncate error";
+    return -1;
+  }
+#endif
+  return fd;
+}
+
+void* fake_mmap(size_t size) {
+  /* Add sizeof(size_t) so that the returned pointer is deliberately not
+   * page-aligned. This ensures that the segments of memory returned by
+   * fake_mmap are never contiguous. */
+  size += sizeof(size_t);
+
+  int fd = create_buffer(size);
+  ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap";
+  void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+  if (pointer == MAP_FAILED) { return pointer; }
+
+  /* Increase dlmalloc's allocation granularity directly. */
+  mparams.granularity *= GRANULARITY_MULTIPLIER;
+
+  mmap_record& record = mmap_records[pointer];
+  record.fd = fd;
+  record.size = size;
+
+  /* We lie to dlmalloc about where mapped memory actually lives. */
+  pointer = pointer_advance(pointer, sizeof(size_t));
+  ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")";
+  return pointer;
+}
+
+int fake_munmap(void* addr, int64_t size) {
+  ARROW_LOG(DEBUG) << "fake_munmap(" << addr << ", " << size << ")";
+  addr = pointer_retreat(addr, sizeof(size_t));
+  size += sizeof(size_t);
+
+  auto entry = mmap_records.find(addr);
+
+  if (entry == mmap_records.end() || entry->second.size != size) {
+    /* Reject requests to munmap that don't directly match previous
+     * calls to mmap, to prevent dlmalloc from trimming. */
+    return -1;
+  }
+
+  int r = munmap(addr, size);
+  if (r == 0) { close(entry->second.fd); }
+
+  mmap_records.erase(entry);
+  return r;
+}
+
+void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offset) {
+  /* TODO(rshin): Implement a more efficient search through mmap_records. */
+  for (const auto& entry : mmap_records) {
+    if (addr >= entry.first && addr < pointer_advance(entry.first, entry.second.size)) {
+      *fd = entry.second.fd;
+      *map_size = entry.second.size;
+      *offset = pointer_distance(entry.first, addr);
+      return;
+    }
+  }
+  *fd = -1;
+  *map_size = 0;
+  *offset = 0;
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/malloc.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/malloc.h b/cpp/src/plasma/malloc.h
new file mode 100644
index 0000000..b4af2c8
--- /dev/null
+++ b/cpp/src/plasma/malloc.h
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_MALLOC_H
+#define PLASMA_MALLOC_H
+
+#include <inttypes.h>
+#include <stddef.h>
+
+void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset);
+
+#endif  // MALLOC_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/plasma.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc
new file mode 100644
index 0000000..559d8e7
--- /dev/null
+++ b/cpp/src/plasma/plasma.cc
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/plasma.h"
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "plasma/common.h"
+#include "plasma/protocol.h"
+
+int warn_if_sigpipe(int status, int client_sock) {
+  if (status >= 0) { return 0; }
+  if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {
+    ARROW_LOG(WARNING) << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when "
+                          "sending a message to client on fd "
+                       << client_sock << ". The client on the other end may "
+                                         "have hung up.";
+    return errno;
+  }
+  ARROW_LOG(FATAL) << "Failed to write message to client on fd " << client_sock << ".";
+  return -1;  // This is never reached.
+}
+
+/**
+ * This will create a new ObjectInfo buffer. The first sizeof(int64_t) bytes
+ * of this buffer are the length of the remaining message and the
+ * remaining message is a serialized version of the object info.
+ *
+ * @param object_info The object info to be serialized
+ * @return The object info buffer. It is the caller's responsibility to free
+ *         this buffer with "delete" after it has been used.
+ */
+uint8_t* create_object_info_buffer(ObjectInfoT* object_info) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreateObjectInfo(fbb, object_info);
+  fbb.Finish(message);
+  uint8_t* notification = new uint8_t[sizeof(int64_t) + fbb.GetSize()];
+  *(reinterpret_cast<int64_t*>(notification)) = fbb.GetSize();
+  memcpy(notification + sizeof(int64_t), fbb.GetBufferPointer(), fbb.GetSize());
+  return notification;
+}
+
+ObjectTableEntry* get_object_table_entry(
+    PlasmaStoreInfo* store_info, const ObjectID& object_id) {
+  auto it = store_info->objects.find(object_id);
+  if (it == store_info->objects.end()) { return NULL; }
+  return it->second.get();
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/plasma.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
new file mode 100644
index 0000000..275d0c7
--- /dev/null
+++ b/cpp/src/plasma/plasma.h
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_PLASMA_H
+#define PLASMA_PLASMA_H
+
+#include <errno.h>
+#include <inttypes.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>  // pid_t
+
+#include <unordered_map>
+#include <unordered_set>
+
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+#include "format/common_generated.h"
+#include "plasma/common.h"
+
+#define HANDLE_SIGPIPE(s, fd_)                                              \
+  do {                                                                      \
+    Status _s = (s);                                                        \
+    if (!_s.ok()) {                                                         \
+      if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {        \
+        ARROW_LOG(WARNING)                                                  \
+            << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " \
+               "sending a message to client on fd "                         \
+            << fd_ << ". "                                                  \
+                      "The client on the other end may have hung up.";      \
+      } else {                                                              \
+        return _s;                                                          \
+      }                                                                     \
+    }                                                                       \
+  } while (0);
+
+/// Allocation granularity used in plasma for object allocation.
+#define BLOCK_SIZE 64
+
+/// Size of object hash digests.
+constexpr int64_t kDigestSize = sizeof(uint64_t);
+
+struct Client;
+
+/// Object request data structure. Used in the plasma_wait_for_objects()
+/// argument.
+typedef struct {
+  /// The ID of the requested object. If ID_NIL request any object.
+  ObjectID object_id;
+  /// Request associated to the object. It can take one of the following values:
+  ///  - PLASMA_QUERY_LOCAL: return if or when the object is available in the
+  ///    local Plasma Store.
+  ///  - PLASMA_QUERY_ANYWHERE: return if or when the object is available in
+  ///    the system (i.e., either in the local or a remote Plasma Store).
+  int type;
+  /// Object status. Same as the status returned by plasma_status() function
+  /// call. This is filled in by plasma_wait_for_objects1():
+  ///  - ObjectStatus_Local: object is ready at the local Plasma Store.
+  ///  - ObjectStatus_Remote: object is ready at a remote Plasma Store.
+  ///  - ObjectStatus_Nonexistent: object does not exist in the system.
+  ///  - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled
+  ///    for being transferred or it is transferring.
+  int status;
+} ObjectRequest;
+
+/// Mapping from object IDs to type and status of the request.
+typedef std::unordered_map<ObjectID, ObjectRequest, UniqueIDHasher> ObjectRequestMap;
+
+/// Handle to access memory mapped file and map it into client address space.
+typedef struct {
+  /// The file descriptor of the memory mapped file in the store. It is used as
+  /// a unique identifier of the file in the client to look up the corresponding
+  /// file descriptor on the client's side.
+  int store_fd;
+  /// The size in bytes of the memory mapped file.
+  int64_t mmap_size;
+} object_handle;
+
+// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
+typedef struct {
+  /// Handle for memory mapped file the object is stored in.
+  object_handle handle;
+  /// The offset in bytes in the memory mapped file of the data.
+  ptrdiff_t data_offset;
+  /// The offset in bytes in the memory mapped file of the metadata.
+  ptrdiff_t metadata_offset;
+  /// The size in bytes of the data.
+  int64_t data_size;
+  /// The size in bytes of the metadata.
+  int64_t metadata_size;
+} PlasmaObject;
+
+typedef enum {
+  /// Object was created but not sealed in the local Plasma Store.
+  PLASMA_CREATED = 1,
+  /// Object is sealed and stored in the local Plasma Store.
+  PLASMA_SEALED
+} object_state;
+
+typedef enum {
+  /// The object was not found.
+  OBJECT_NOT_FOUND = 0,
+  /// The object was found.
+  OBJECT_FOUND = 1
+} object_status;
+
+typedef enum {
+  /// Query for object in the local plasma store.
+  PLASMA_QUERY_LOCAL = 1,
+  /// Query for object in the local plasma store or in a remote plasma store.
+  PLASMA_QUERY_ANYWHERE
+} object_request_type;
+
+/// This type is used by the Plasma store. It is here because it is exposed to
+/// the eviction policy.
+struct ObjectTableEntry {
+  /// Object id of this object.
+  ObjectID object_id;
+  /// Object info like size, creation time and owner.
+  ObjectInfoT info;
+  /// Memory mapped file containing the object.
+  int fd;
+  /// Size of the underlying map.
+  int64_t map_size;
+  /// Offset from the base of the mmap.
+  ptrdiff_t offset;
+  /// Pointer to the object data. Needed to free the object.
+  uint8_t* pointer;
+  /// Set of clients currently using this object.
+  std::unordered_set<Client*> clients;
+  /// The state of the object, e.g., whether it is open or sealed.
+  object_state state;
+  /// The digest of the object. Used to see if two objects are the same.
+  unsigned char digest[kDigestSize];
+};
+
+/// The plasma store information that is exposed to the eviction policy.
+struct PlasmaStoreInfo {
+  /// Objects that are in the Plasma store.
+  std::unordered_map<ObjectID, std::unique_ptr<ObjectTableEntry>, UniqueIDHasher> objects;
+  /// The amount of memory (in bytes) that we allow to be allocated in the
+  /// store.
+  int64_t memory_capacity;
+};
+
+/// Get an entry from the object table and return NULL if the object_id
+/// is not present.
+///
+/// @param store_info The PlasmaStoreInfo that contains the object table.
+/// @param object_id The object_id of the entry we are looking for.
+/// @return The entry associated with the object_id or NULL if the object_id
+///         is not present.
+ObjectTableEntry* get_object_table_entry(
+    PlasmaStoreInfo* store_info, const ObjectID& object_id);
+
+/// Print a warning if the status is less than zero. This should be used to check
+/// the success of messages sent to plasma clients. We print a warning instead of
+/// failing because the plasma clients are allowed to die. This is used to handle
+/// situations where the store writes to a client file descriptor, and the client
+/// may already have disconnected. If we have processed the disconnection and
+/// closed the file descriptor, we should get a BAD FILE DESCRIPTOR error. If we
+/// have not, then we should get a SIGPIPE. If we write to a TCP socket that
+/// isn't connected yet, then we should get an ECONNRESET.
+///
+/// @param status The status to check. If it is less less than zero, we will
+///        print a warning.
+/// @param client_sock The client socket. This is just used to print some extra
+///        information.
+/// @return The errno set.
+int warn_if_sigpipe(int status, int client_sock);
+
+uint8_t* create_object_info_buffer(ObjectInfoT* object_info);
+
+#endif  // PLASMA_PLASMA_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/protocol.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
new file mode 100644
index 0000000..246aa29
--- /dev/null
+++ b/cpp/src/plasma/protocol.cc
@@ -0,0 +1,502 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/protocol.h"
+
+#include "flatbuffers/flatbuffers.h"
+#include "format/plasma_generated.h"
+
+#include "plasma/common.h"
+#include "plasma/io.h"
+
+using flatbuffers::uoffset_t;
+
+flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+to_flatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids,
+    int64_t num_objects) {
+  std::vector<flatbuffers::Offset<flatbuffers::String>> results;
+  for (int64_t i = 0; i < num_objects; i++) {
+    results.push_back(fbb->CreateString(object_ids[i].binary()));
+  }
+  return fbb->CreateVector(results);
+}
+
+Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* buffer) {
+  int64_t type;
+  RETURN_NOT_OK(ReadMessage(sock, &type, buffer));
+  ARROW_CHECK(type == message_type) << "type = " << type
+                                    << ", message_type = " << message_type;
+  return Status::OK();
+}
+
+template <typename Message>
+Status PlasmaSend(int sock, int64_t message_type, flatbuffers::FlatBufferBuilder* fbb,
+    const Message& message) {
+  fbb->Finish(message);
+  return WriteMessage(sock, message_type, fbb->GetSize(), fbb->GetBufferPointer());
+}
+
+// Create messages.
+
+Status SendCreateRequest(
+    int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaCreateRequest(
+      fbb, fbb.CreateString(object_id.binary()), data_size, metadata_size);
+  return PlasmaSend(sock, MessageType_PlasmaCreateRequest, &fbb, message);
+}
+
+Status ReadCreateRequest(
+    uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data);
+  *data_size = message->data_size();
+  *metadata_size = message->metadata_size();
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return Status::OK();
+}
+
+Status SendCreateReply(
+    int sock, ObjectID object_id, PlasmaObject* object, int error_code) {
+  flatbuffers::FlatBufferBuilder fbb;
+  PlasmaObjectSpec plasma_object(object->handle.store_fd, object->handle.mmap_size,
+      object->data_offset, object->data_size, object->metadata_offset,
+      object->metadata_size);
+  auto message = CreatePlasmaCreateReply(
+      fbb, fbb.CreateString(object_id.binary()), &plasma_object, (PlasmaError)error_code);
+  return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message);
+}
+
+Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  object->handle.store_fd = message->plasma_object()->segment_index();
+  object->handle.mmap_size = message->plasma_object()->mmap_size();
+  object->data_offset = message->plasma_object()->data_offset();
+  object->data_size = message->plasma_object()->data_size();
+  object->metadata_offset = message->plasma_object()->metadata_offset();
+  object->metadata_size = message->plasma_object()->metadata_size();
+  return plasma_error_status(message->error());
+}
+
+// Seal messages.
+
+Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest), kDigestSize);
+  auto message =
+      CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()), digest_string);
+  return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message);
+}
+
+Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaSealRequest>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  ARROW_CHECK(message->digest()->size() == kDigestSize);
+  memcpy(digest, message->digest()->data(), kDigestSize);
+  return Status::OK();
+}
+
+Status SendSealReply(int sock, ObjectID object_id, int error) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaSealReply(
+      fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error);
+  return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message);
+}
+
+Status ReadSealReply(uint8_t* data, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaSealReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return plasma_error_status(message->error());
+}
+
+// Release messages.
+
+Status SendReleaseRequest(int sock, ObjectID object_id) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()));
+  return PlasmaSend(sock, MessageType_PlasmaReleaseRequest, &fbb, message);
+}
+
+Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaReleaseRequest>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return Status::OK();
+}
+
+Status SendReleaseReply(int sock, ObjectID object_id, int error) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaReleaseReply(
+      fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error);
+  return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message);
+}
+
+Status ReadReleaseReply(uint8_t* data, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return plasma_error_status(message->error());
+}
+
+// Delete messages.
+
+Status SendDeleteRequest(int sock, ObjectID object_id) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaDeleteRequest(fbb, fbb.CreateString(object_id.binary()));
+  return PlasmaSend(sock, MessageType_PlasmaDeleteRequest, &fbb, message);
+}
+
+Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return Status::OK();
+}
+
+Status SendDeleteReply(int sock, ObjectID object_id, int error) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaDeleteReply(
+      fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error);
+  return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message);
+}
+
+Status ReadDeleteReply(uint8_t* data, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return plasma_error_status(message->error());
+}
+
+// Satus messages.
+
+Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message =
+      CreatePlasmaStatusRequest(fbb, to_flatbuffer(&fbb, object_ids, num_objects));
+  return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message);
+}
+
+Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaStatusRequest>(data);
+  for (uoffset_t i = 0; i < num_objects; ++i) {
+    object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
+  }
+  return Status::OK();
+}
+
+Status SendStatusReply(
+    int sock, ObjectID object_ids[], int object_status[], int64_t num_objects) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message =
+      CreatePlasmaStatusReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects),
+          fbb.CreateVector(object_status, num_objects));
+  return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message);
+}
+
+int64_t ReadStatusReply_num_objects(uint8_t* data) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
+  return message->object_ids()->size();
+}
+
+Status ReadStatusReply(
+    uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
+  for (uoffset_t i = 0; i < num_objects; ++i) {
+    object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
+  }
+  for (uoffset_t i = 0; i < num_objects; ++i) {
+    object_status[i] = message->status()->data()[i];
+  }
+  return Status::OK();
+}
+
+// Contains messages.
+
+Status SendContainsRequest(int sock, ObjectID object_id) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.binary()));
+  return PlasmaSend(sock, MessageType_PlasmaContainsRequest, &fbb, message);
+}
+
+Status ReadContainsRequest(uint8_t* data, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaContainsRequest>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return Status::OK();
+}
+
+Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message =
+      CreatePlasmaContainsReply(fbb, fbb.CreateString(object_id.binary()), has_object);
+  return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message);
+}
+
+Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaContainsReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  *has_object = message->has_object();
+  return Status::OK();
+}
+
+// Connect messages.
+
+Status SendConnectRequest(int sock) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaConnectRequest(fbb);
+  return PlasmaSend(sock, MessageType_PlasmaConnectRequest, &fbb, message);
+}
+
+Status ReadConnectRequest(uint8_t* data) {
+  return Status::OK();
+}
+
+Status SendConnectReply(int sock, int64_t memory_capacity) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaConnectReply(fbb, memory_capacity);
+  return PlasmaSend(sock, MessageType_PlasmaConnectReply, &fbb, message);
+}
+
+Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaConnectReply>(data);
+  *memory_capacity = message->memory_capacity();
+  return Status::OK();
+}
+
+// Evict messages.
+
+Status SendEvictRequest(int sock, int64_t num_bytes) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaEvictRequest(fbb, num_bytes);
+  return PlasmaSend(sock, MessageType_PlasmaEvictRequest, &fbb, message);
+}
+
+Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaEvictRequest>(data);
+  *num_bytes = message->num_bytes();
+  return Status::OK();
+}
+
+Status SendEvictReply(int sock, int64_t num_bytes) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaEvictReply(fbb, num_bytes);
+  return PlasmaSend(sock, MessageType_PlasmaEvictReply, &fbb, message);
+}
+
+Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaEvictReply>(data);
+  num_bytes = message->num_bytes();
+  return Status::OK();
+}
+
+// Get messages.
+
+Status SendGetRequest(
+    int sock, const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaGetRequest(
+      fbb, to_flatbuffer(&fbb, object_ids, num_objects), timeout_ms);
+  return PlasmaSend(sock, MessageType_PlasmaGetRequest, &fbb, message);
+}
+
+Status ReadGetRequest(
+    uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaGetRequest>(data);
+  for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
+    auto object_id = message->object_ids()->Get(i)->str();
+    object_ids.push_back(ObjectID::from_binary(object_id));
+  }
+  *timeout_ms = message->timeout_ms();
+  return Status::OK();
+}
+
+Status SendGetReply(int sock, ObjectID object_ids[],
+    std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
+    int64_t num_objects) {
+  flatbuffers::FlatBufferBuilder fbb;
+  std::vector<PlasmaObjectSpec> objects;
+
+  for (int i = 0; i < num_objects; ++i) {
+    const PlasmaObject& object = plasma_objects[object_ids[i]];
+    objects.push_back(PlasmaObjectSpec(object.handle.store_fd, object.handle.mmap_size,
+        object.data_offset, object.data_size, object.metadata_offset,
+        object.metadata_size));
+  }
+  auto message = CreatePlasmaGetReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects),
+      fbb.CreateVectorOfStructs(objects.data(), num_objects));
+  return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message);
+}
+
+Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[],
+    int64_t num_objects) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
+  for (uoffset_t i = 0; i < num_objects; ++i) {
+    object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
+  }
+  for (uoffset_t i = 0; i < num_objects; ++i) {
+    const PlasmaObjectSpec* object = message->plasma_objects()->Get(i);
+    plasma_objects[i].handle.store_fd = object->segment_index();
+    plasma_objects[i].handle.mmap_size = object->mmap_size();
+    plasma_objects[i].data_offset = object->data_offset();
+    plasma_objects[i].data_size = object->data_size();
+    plasma_objects[i].metadata_offset = object->metadata_offset();
+    plasma_objects[i].metadata_size = object->metadata_size();
+  }
+  return Status::OK();
+}
+
+// Fetch messages.
+
+Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message =
+      CreatePlasmaFetchRequest(fbb, to_flatbuffer(&fbb, object_ids, num_objects));
+  return PlasmaSend(sock, MessageType_PlasmaFetchRequest, &fbb, message);
+}
+
+Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaFetchRequest>(data);
+  for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
+    object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str()));
+  }
+  return Status::OK();
+}
+
+// Wait messages.
+
+Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests,
+    int num_ready_objects, int64_t timeout_ms) {
+  flatbuffers::FlatBufferBuilder fbb;
+
+  std::vector<flatbuffers::Offset<ObjectRequestSpec>> object_request_specs;
+  for (int i = 0; i < num_requests; i++) {
+    object_request_specs.push_back(CreateObjectRequestSpec(fbb,
+        fbb.CreateString(object_requests[i].object_id.binary()),
+        object_requests[i].type));
+  }
+
+  auto message = CreatePlasmaWaitRequest(
+      fbb, fbb.CreateVector(object_request_specs), num_ready_objects, timeout_ms);
+  return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message);
+}
+
+Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
+    int64_t* timeout_ms, int* num_ready_objects) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaWaitRequest>(data);
+  *num_ready_objects = message->num_ready_objects();
+  *timeout_ms = message->timeout();
+
+  for (uoffset_t i = 0; i < message->object_requests()->size(); i++) {
+    ObjectID object_id =
+        ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str());
+    ObjectRequest object_request({object_id, message->object_requests()->Get(i)->type(),
+        ObjectStatus_Nonexistent});
+    object_requests[object_id] = object_request;
+  }
+  return Status::OK();
+}
+
+Status SendWaitReply(
+    int sock, const ObjectRequestMap& object_requests, int num_ready_objects) {
+  flatbuffers::FlatBufferBuilder fbb;
+
+  std::vector<flatbuffers::Offset<ObjectReply>> object_replies;
+  for (const auto& entry : object_requests) {
+    const auto& object_request = entry.second;
+    object_replies.push_back(CreateObjectReply(
+        fbb, fbb.CreateString(object_request.object_id.binary()), object_request.status));
+  }
+
+  auto message = CreatePlasmaWaitReply(
+      fbb, fbb.CreateVector(object_replies.data(), num_ready_objects), num_ready_objects);
+  return PlasmaSend(sock, MessageType_PlasmaWaitReply, &fbb, message);
+}
+
+Status ReadWaitReply(
+    uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects) {
+  DCHECK(data);
+
+  auto message = flatbuffers::GetRoot<PlasmaWaitReply>(data);
+  *num_ready_objects = message->num_ready_objects();
+  for (int i = 0; i < *num_ready_objects; i++) {
+    object_requests[i].object_id =
+        ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str());
+    object_requests[i].status = message->object_requests()->Get(i)->status();
+  }
+  return Status::OK();
+}
+
+// Subscribe messages.
+
+Status SendSubscribeRequest(int sock) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaSubscribeRequest(fbb);
+  return PlasmaSend(sock, MessageType_PlasmaSubscribeRequest, &fbb, message);
+}
+
+// Data messages.
+
+Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto addr = fbb.CreateString(address, strlen(address));
+  auto message =
+      CreatePlasmaDataRequest(fbb, fbb.CreateString(object_id.binary()), addr, port);
+  return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message);
+}
+
+Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaDataRequest>(data);
+  DCHECK(message->object_id()->size() == sizeof(ObjectID));
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  *address = strdup(message->address()->c_str());
+  *port = message->port();
+  return Status::OK();
+}
+
+Status SendDataReply(
+    int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaDataReply(
+      fbb, fbb.CreateString(object_id.binary()), object_size, metadata_size);
+  return PlasmaSend(sock, MessageType_PlasmaDataReply, &fbb, message);
+}
+
+Status ReadDataReply(
+    uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaDataReply>(data);
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  *object_size = (int64_t)message->object_size();
+  *metadata_size = (int64_t)message->metadata_size();
+  return Status::OK();
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/protocol.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
new file mode 100644
index 0000000..5d9d136
--- /dev/null
+++ b/cpp/src/plasma/protocol.h
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_PROTOCOL_H
+#define PLASMA_PROTOCOL_H
+
+#include <vector>
+
+#include "arrow/status.h"
+#include "format/plasma_generated.h"
+#include "plasma/plasma.h"
+
+using arrow::Status;
+
+/* Plasma receive message. */
+
+Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* buffer);
+
+/* Plasma Create message functions. */
+
+Status SendCreateRequest(
+    int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size);
+
+Status ReadCreateRequest(
+    uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size);
+
+Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error);
+
+Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object);
+
+/* Plasma Seal message functions. */
+
+Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest);
+
+Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest);
+
+Status SendSealReply(int sock, ObjectID object_id, int error);
+
+Status ReadSealReply(uint8_t* data, ObjectID* object_id);
+
+/* Plasma Get message functions. */
+
+Status SendGetRequest(
+    int sock, const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms);
+
+Status ReadGetRequest(
+    uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms);
+
+Status SendGetReply(int sock, ObjectID object_ids[],
+    std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
+    int64_t num_objects);
+
+Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[],
+    int64_t num_objects);
+
+/* Plasma Release message functions. */
+
+Status SendReleaseRequest(int sock, ObjectID object_id);
+
+Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id);
+
+Status SendReleaseReply(int sock, ObjectID object_id, int error);
+
+Status ReadReleaseReply(uint8_t* data, ObjectID* object_id);
+
+/* Plasma Delete message functions. */
+
+Status SendDeleteRequest(int sock, ObjectID object_id);
+
+Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id);
+
+Status SendDeleteReply(int sock, ObjectID object_id, int error);
+
+Status ReadDeleteReply(uint8_t* data, ObjectID* object_id);
+
+/* Satus messages. */
+
+Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
+
+Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects);
+
+Status SendStatusReply(
+    int sock, ObjectID object_ids[], int object_status[], int64_t num_objects);
+
+int64_t ReadStatusReply_num_objects(uint8_t* data);
+
+Status ReadStatusReply(
+    uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects);
+
+/* Plasma Constains message functions. */
+
+Status SendContainsRequest(int sock, ObjectID object_id);
+
+Status ReadContainsRequest(uint8_t* data, ObjectID* object_id);
+
+Status SendContainsReply(int sock, ObjectID object_id, bool has_object);
+
+Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object);
+
+/* Plasma Connect message functions. */
+
+Status SendConnectRequest(int sock);
+
+Status ReadConnectRequest(uint8_t* data);
+
+Status SendConnectReply(int sock, int64_t memory_capacity);
+
+Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity);
+
+/* Plasma Evict message functions (no reply so far). */
+
+Status SendEvictRequest(int sock, int64_t num_bytes);
+
+Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes);
+
+Status SendEvictReply(int sock, int64_t num_bytes);
+
+Status ReadEvictReply(uint8_t* data, int64_t& num_bytes);
+
+/* Plasma Fetch Remote message functions. */
+
+Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
+
+Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids);
+
+/* Plasma Wait message functions. */
+
+Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests,
+    int num_ready_objects, int64_t timeout_ms);
+
+Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
+    int64_t* timeout_ms, int* num_ready_objects);
+
+Status SendWaitReply(
+    int sock, const ObjectRequestMap& object_requests, int num_ready_objects);
+
+Status ReadWaitReply(
+    uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects);
+
+/* Plasma Subscribe message functions. */
+
+Status SendSubscribeRequest(int sock);
+
+/* Data messages. */
+
+Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port);
+
+Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port);
+
+Status SendDataReply(
+    int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size);
+
+Status ReadDataReply(
+    uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size);
+
+#endif /* PLASMA_PROTOCOL */


Mime
View raw message