arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [4/6] arrow git commit: ARROW-1104: Integrate in-memory object store into arrow
Date Thu, 22 Jun 2017 13:35:44 GMT
http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/store.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
new file mode 100644
index 0000000..5151a44
--- /dev/null
+++ b/cpp/src/plasma/store.cc
@@ -0,0 +1,681 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// PLASMA STORE: This is a simple object store server process
+//
+// It accepts incoming client connections on a unix domain socket
+// (name passed in via the -s option of the executable) and uses a
+// single thread to serve the clients. Each client establishes a
+// connection and can create objects, wait for objects and seal
+// objects through that connection.
+//
+// It keeps a hash table that maps object_ids (which are 20 byte long,
+// just enough to store and SHA1 hash) to memory mapped files.
+
+#include "plasma/store.h"
+
+#include <assert.h>
+#include <fcntl.h>
+#include <getopt.h>
+#include <limits.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <sys/statvfs.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include <deque>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "format/common_generated.h"
+#include "plasma/common.h"
+#include "plasma/fling.h"
+#include "plasma/io.h"
+#include "plasma/malloc.h"
+
+extern "C" {
+void* dlmalloc(size_t bytes);
+void* dlmemalign(size_t alignment, size_t bytes);
+void dlfree(void* mem);
+size_t dlmalloc_set_footprint_limit(size_t bytes);
+}
+
+struct GetRequest {
+  GetRequest(Client* client, const std::vector<ObjectID>& object_ids);
+  /// The client that called get.
+  Client* client;
+  /// The ID of the timer that will time out and cause this wait to return to
+  ///  the client if it hasn't already returned.
+  int64_t timer;
+  /// The object IDs involved in this request. This is used in the reply.
+  std::vector<ObjectID> object_ids;
+  /// The object information for the objects in this request. This is used in
+  /// the reply.
+  std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher> objects;
+  /// The minimum number of objects to wait for in this request.
+  int64_t num_objects_to_wait_for;
+  /// The number of object requests in this wait request that are already
+  /// satisfied.
+  int64_t num_satisfied;
+};
+
+GetRequest::GetRequest(Client* client, const std::vector<ObjectID>& object_ids)
+    : client(client),
+      timer(-1),
+      object_ids(object_ids.begin(), object_ids.end()),
+      objects(object_ids.size()),
+      num_satisfied(0) {
+  std::unordered_set<ObjectID, UniqueIDHasher> unique_ids(
+      object_ids.begin(), object_ids.end());
+  num_objects_to_wait_for = unique_ids.size();
+}
+
+Client::Client(int fd) : fd(fd) {}
+
+PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory)
+    : loop_(loop), eviction_policy_(&store_info_) {
+  store_info_.memory_capacity = system_memory;
+}
+
+// TODO(pcm): Get rid of this destructor by using RAII to clean up data.
+PlasmaStore::~PlasmaStore() {
+  for (const auto& element : pending_notifications_) {
+    auto object_notifications = element.second.object_notifications;
+    for (size_t i = 0; i < object_notifications.size(); ++i) {
+      uint8_t* notification = reinterpret_cast<uint8_t*>(object_notifications.at(i));
+      uint8_t* data = notification;
+      // TODO(pcm): Get rid of this delete.
+      delete[] data;
+    }
+  }
+}
+
+// If this client is not already using the object, add the client to the
+// object's list of clients, otherwise do nothing.
+void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* client) {
+  // Check if this client is already using the object.
+  if (entry->clients.find(client) != entry->clients.end()) { return; }
+  // If there are no other clients using this object, notify the eviction policy
+  // that the object is being used.
+  if (entry->clients.size() == 0) {
+    // Tell the eviction policy that this object is being used.
+    std::vector<ObjectID> objects_to_evict;
+    eviction_policy_.begin_object_access(entry->object_id, &objects_to_evict);
+    delete_objects(objects_to_evict);
+  }
+  // Add the client pointer to the list of clients using this object.
+  entry->clients.insert(client);
+}
+
+// Create a new object buffer in the hash table.
+int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
+    int64_t metadata_size, Client* client, PlasmaObject* result) {
+  ARROW_LOG(DEBUG) << "creating object " << object_id.hex();
+  if (store_info_.objects.count(object_id) != 0) {
+    // There is already an object with the same ID in the Plasma Store, so
+    // ignore this requst.
+    return PlasmaError_ObjectExists;
+  }
+  // Try to evict objects until there is enough space.
+  uint8_t* pointer;
+  do {
+    // Allocate space for the new object. We use dlmemalign instead of dlmalloc
+    // in order to align the allocated region to a 64-byte boundary. This is not
+    // strictly necessary, but it is an optimization that could speed up the
+    // computation of a hash of the data (see compute_object_hash_parallel in
+    // plasma_client.cc). Note that even though this pointer is 64-byte aligned,
+    // it is not guaranteed that the corresponding pointer in the client will be
+    // 64-byte aligned, but in practice it often will be.
+    pointer =
+        reinterpret_cast<uint8_t*>(dlmemalign(BLOCK_SIZE, data_size + metadata_size));
+    if (pointer == NULL) {
+      // Tell the eviction policy how much space we need to create this object.
+      std::vector<ObjectID> objects_to_evict;
+      bool success =
+          eviction_policy_.require_space(data_size + metadata_size, &objects_to_evict);
+      delete_objects(objects_to_evict);
+      // Return an error to the client if not enough space could be freed to
+      // create the object.
+      if (!success) { return PlasmaError_OutOfMemory; }
+    }
+  } while (pointer == NULL);
+  int fd;
+  int64_t map_size;
+  ptrdiff_t offset;
+  get_malloc_mapinfo(pointer, &fd, &map_size, &offset);
+  assert(fd != -1);
+
+  auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
+  entry->object_id = object_id;
+  entry->info.object_id = object_id.binary();
+  entry->info.data_size = data_size;
+  entry->info.metadata_size = metadata_size;
+  entry->pointer = pointer;
+  // TODO(pcm): Set the other fields.
+  entry->fd = fd;
+  entry->map_size = map_size;
+  entry->offset = offset;
+  entry->state = PLASMA_CREATED;
+
+  store_info_.objects[object_id] = std::move(entry);
+  result->handle.store_fd = fd;
+  result->handle.mmap_size = map_size;
+  result->data_offset = offset;
+  result->metadata_offset = offset + data_size;
+  result->data_size = data_size;
+  result->metadata_size = metadata_size;
+  // Notify the eviction policy that this object was created. This must be done
+  // immediately before the call to add_client_to_object_clients so that the
+  // eviction policy does not have an opportunity to evict the object.
+  eviction_policy_.object_created(object_id);
+  // Record that this client is using this object.
+  add_client_to_object_clients(store_info_.objects[object_id].get(), client);
+  return PlasmaError_OK;
+}
+
+void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
+  DCHECK(object != NULL);
+  DCHECK(entry != NULL);
+  DCHECK(entry->state == PLASMA_SEALED);
+  object->handle.store_fd = entry->fd;
+  object->handle.mmap_size = entry->map_size;
+  object->data_offset = entry->offset;
+  object->metadata_offset = entry->offset + entry->info.data_size;
+  object->data_size = entry->info.data_size;
+  object->metadata_size = entry->info.metadata_size;
+}
+
+void PlasmaStore::return_from_get(GetRequest* get_req) {
+  // Send the get reply to the client.
+  Status s = SendGetReply(get_req->client->fd, &get_req->object_ids[0], get_req->objects,
+      get_req->object_ids.size());
+  warn_if_sigpipe(s.ok() ? 0 : -1, get_req->client->fd);
+  // If we successfully sent the get reply message to the client, then also send
+  // the file descriptors.
+  if (s.ok()) {
+    // Send all of the file descriptors for the present objects.
+    for (const auto& object_id : get_req->object_ids) {
+      PlasmaObject& object = get_req->objects[object_id];
+      // We use the data size to indicate whether the object is present or not.
+      if (object.data_size != -1) {
+        int error_code = send_fd(get_req->client->fd, object.handle.store_fd);
+        // If we failed to send the file descriptor, loop until we have sent it
+        // successfully. TODO(rkn): This is problematic for two reasons. First
+        // of all, sending the file descriptor should just succeed without any
+        // errors, but sometimes I see a "Message too long" error number.
+        // Second, looping like this allows a client to potentially block the
+        // plasma store event loop which should never happen.
+        while (error_code < 0) {
+          if (errno == EMSGSIZE) {
+            ARROW_LOG(WARNING) << "Failed to send file descriptor, retrying.";
+            error_code = send_fd(get_req->client->fd, object.handle.store_fd);
+            continue;
+          }
+          warn_if_sigpipe(error_code, get_req->client->fd);
+          break;
+        }
+      }
+    }
+  }
+
+  // Remove the get request from each of the relevant object_get_requests hash
+  // tables if it is present there. It should only be present there if the get
+  // request timed out.
+  for (ObjectID& object_id : get_req->object_ids) {
+    auto& get_requests = object_get_requests_[object_id];
+    // Erase get_req from the vector.
+    auto it = std::find(get_requests.begin(), get_requests.end(), get_req);
+    if (it != get_requests.end()) { get_requests.erase(it); }
+  }
+  // Remove the get request.
+  if (get_req->timer != -1) { ARROW_CHECK(loop_->remove_timer(get_req->timer) == AE_OK); }
+  delete get_req;
+}
+
+void PlasmaStore::update_object_get_requests(const ObjectID& object_id) {
+  std::vector<GetRequest*>& get_requests = object_get_requests_[object_id];
+  size_t index = 0;
+  size_t num_requests = get_requests.size();
+  for (size_t i = 0; i < num_requests; ++i) {
+    GetRequest* get_req = get_requests[index];
+    auto entry = get_object_table_entry(&store_info_, object_id);
+    ARROW_CHECK(entry != NULL);
+
+    PlasmaObject_init(&get_req->objects[object_id], entry);
+    get_req->num_satisfied += 1;
+    // Record the fact that this client will be using this object and will
+    // be responsible for releasing this object.
+    add_client_to_object_clients(entry, get_req->client);
+
+    // If this get request is done, reply to the client.
+    if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
+      return_from_get(get_req);
+    } else {
+      // The call to return_from_get will remove the current element in the
+      // array, so we only increment the counter in the else branch.
+      index += 1;
+    }
+  }
+
+  DCHECK(index == get_requests.size());
+  // Remove the array of get requests for this object, since no one should be
+  // waiting for this object anymore.
+  object_get_requests_.erase(object_id);
+}
+
+void PlasmaStore::process_get_request(
+    Client* client, const std::vector<ObjectID>& object_ids, int64_t timeout_ms) {
+  // Create a get request for this object.
+  GetRequest* get_req = new GetRequest(client, object_ids);
+
+  for (auto object_id : object_ids) {
+    // Check if this object is already present locally. If so, record that the
+    // object is being used and mark it as accounted for.
+    auto entry = get_object_table_entry(&store_info_, object_id);
+    if (entry && entry->state == PLASMA_SEALED) {
+      // Update the get request to take into account the present object.
+      PlasmaObject_init(&get_req->objects[object_id], entry);
+      get_req->num_satisfied += 1;
+      // If necessary, record that this client is using this object. In the case
+      // where entry == NULL, this will be called from seal_object.
+      add_client_to_object_clients(entry, client);
+    } else {
+      // Add a placeholder plasma object to the get request to indicate that the
+      // object is not present. This will be parsed by the client. We set the
+      // data size to -1 to indicate that the object is not present.
+      get_req->objects[object_id].data_size = -1;
+      // Add the get request to the relevant data structures.
+      object_get_requests_[object_id].push_back(get_req);
+    }
+  }
+
+  // If all of the objects are present already or if the timeout is 0, return to
+  // the client.
+  if (get_req->num_satisfied == get_req->num_objects_to_wait_for || timeout_ms == 0) {
+    return_from_get(get_req);
+  } else if (timeout_ms != -1) {
+    // Set a timer that will cause the get request to return to the client. Note
+    // that a timeout of -1 is used to indicate that no timer should be set.
+    get_req->timer = loop_->add_timer(timeout_ms, [this, get_req](int64_t timer_id) {
+      return_from_get(get_req);
+      return kEventLoopTimerDone;
+    });
+  }
+}
+
+int PlasmaStore::remove_client_from_object_clients(
+    ObjectTableEntry* entry, Client* client) {
+  auto it = entry->clients.find(client);
+  if (it != entry->clients.end()) {
+    entry->clients.erase(it);
+    // If no more clients are using this object, notify the eviction policy
+    // that the object is no longer being used.
+    if (entry->clients.size() == 0) {
+      // Tell the eviction policy that this object is no longer being used.
+      std::vector<ObjectID> objects_to_evict;
+      eviction_policy_.end_object_access(entry->object_id, &objects_to_evict);
+      delete_objects(objects_to_evict);
+    }
+    // Return 1 to indicate that the client was removed.
+    return 1;
+  } else {
+    // Return 0 to indicate that the client was not removed.
+    return 0;
+  }
+}
+
+void PlasmaStore::release_object(const ObjectID& object_id, Client* client) {
+  auto entry = get_object_table_entry(&store_info_, object_id);
+  ARROW_CHECK(entry != NULL);
+  // Remove the client from the object's array of clients.
+  ARROW_CHECK(remove_client_from_object_clients(entry, client) == 1);
+}
+
+// Check if an object is present.
+int PlasmaStore::contains_object(const ObjectID& object_id) {
+  auto entry = get_object_table_entry(&store_info_, object_id);
+  return entry && (entry->state == PLASMA_SEALED) ? OBJECT_FOUND : OBJECT_NOT_FOUND;
+}
+
+// Seal an object that has been created in the hash table.
+void PlasmaStore::seal_object(const ObjectID& object_id, unsigned char digest[]) {
+  ARROW_LOG(DEBUG) << "sealing object " << object_id.hex();
+  auto entry = get_object_table_entry(&store_info_, object_id);
+  ARROW_CHECK(entry != NULL);
+  ARROW_CHECK(entry->state == PLASMA_CREATED);
+  // Set the state of object to SEALED.
+  entry->state = PLASMA_SEALED;
+  // Set the object digest.
+  entry->info.digest = std::string(reinterpret_cast<char*>(&digest[0]), kDigestSize);
+  // Inform all subscribers that a new object has been sealed.
+  push_notification(&entry->info);
+
+  // Update all get requests that involve this object.
+  update_object_get_requests(object_id);
+}
+
+void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
+  for (const auto& object_id : object_ids) {
+    ARROW_LOG(DEBUG) << "deleting object " << object_id.hex();
+    auto entry = get_object_table_entry(&store_info_, object_id);
+    // TODO(rkn): This should probably not fail, but should instead throw an
+    // error. Maybe we should also support deleting objects that have been
+    // created but not sealed.
+    ARROW_CHECK(entry != NULL) << "To delete an object it must be in the object table.";
+    ARROW_CHECK(entry->state == PLASMA_SEALED)
+        << "To delete an object it must have been sealed.";
+    ARROW_CHECK(entry->clients.size() == 0)
+        << "To delete an object, there must be no clients currently using it.";
+    dlfree(entry->pointer);
+    store_info_.objects.erase(object_id);
+    // Inform all subscribers that the object has been deleted.
+    ObjectInfoT notification;
+    notification.object_id = object_id.binary();
+    notification.is_deletion = true;
+    push_notification(&notification);
+  }
+}
+
+void PlasmaStore::connect_client(int listener_sock) {
+  int client_fd = AcceptClient(listener_sock);
+  // This is freed in disconnect_client.
+  Client* client = new Client(client_fd);
+  // Add a callback to handle events on this socket.
+  // TODO(pcm): Check return value.
+  loop_->add_file_event(
+      client_fd, kEventLoopRead, [this, client](int events) { process_message(client); });
+  ARROW_LOG(DEBUG) << "New connection with fd " << client_fd;
+}
+
+void PlasmaStore::disconnect_client(Client* client) {
+  ARROW_CHECK(client != NULL);
+  ARROW_CHECK(client->fd > 0);
+  loop_->remove_file_event(client->fd);
+  // Close the socket.
+  close(client->fd);
+  ARROW_LOG(INFO) << "Disconnecting client on fd " << client->fd;
+  // If this client was using any objects, remove it from the appropriate
+  // lists.
+  for (const auto& entry : store_info_.objects) {
+    remove_client_from_object_clients(entry.second.get(), client);
+  }
+  // Note, the store may still attempt to send a message to the disconnected
+  // client (for example, when an object ID that the client was waiting for
+  // is ready). In these cases, the attempt to send the message will fail, but
+  // the store should just ignore the failure.
+  delete client;
+}
+
+/// Send notifications about sealed objects to the subscribers. This is called
+/// in seal_object. If the socket's send buffer is full, the notification will
+/// be
+/// buffered, and this will be called again when the send buffer has room.
+///
+/// @param client The client to send the notification to.
+/// @return Void.
+void PlasmaStore::send_notifications(int client_fd) {
+  auto it = pending_notifications_.find(client_fd);
+
+  int num_processed = 0;
+  bool closed = false;
+  // Loop over the array of pending notifications and send as many of them as
+  // possible.
+  for (size_t i = 0; i < it->second.object_notifications.size(); ++i) {
+    uint8_t* notification =
+        reinterpret_cast<uint8_t*>(it->second.object_notifications.at(i));
+    // Decode the length, which is the first bytes of the message.
+    int64_t size = *(reinterpret_cast<int64_t*>(notification));
+
+    // Attempt to send a notification about this object ID.
+    ssize_t nbytes = send(client_fd, notification, sizeof(int64_t) + size, 0);
+    if (nbytes >= 0) {
+      ARROW_CHECK(nbytes == static_cast<ssize_t>(sizeof(int64_t)) + size);
+    } else if (nbytes == -1 &&
+               (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
+      ARROW_LOG(DEBUG) << "The socket's send buffer is full, so we are caching this "
+                          "notification and will send it later.";
+      // Add a callback to the event loop to send queued notifications whenever
+      // there is room in the socket's send buffer. Callbacks can be added
+      // more than once here and will be overwritten. The callback is removed
+      // at the end of the method.
+      // TODO(pcm): Introduce status codes and check in case the file descriptor
+      // is added twice.
+      loop_->add_file_event(client_fd, kEventLoopWrite,
+          [this, client_fd](int events) { send_notifications(client_fd); });
+      break;
+    } else {
+      ARROW_LOG(WARNING) << "Failed to send notification to client on fd " << client_fd;
+      if (errno == EPIPE) {
+        closed = true;
+        break;
+      }
+    }
+    num_processed += 1;
+    // The corresponding malloc happened in create_object_info_buffer
+    // within push_notification.
+    delete[] notification;
+  }
+  // Remove the sent notifications from the array.
+  it->second.object_notifications.erase(it->second.object_notifications.begin(),
+      it->second.object_notifications.begin() + num_processed);
+
+  // Stop sending notifications if the pipe was broken.
+  if (closed) {
+    close(client_fd);
+    pending_notifications_.erase(client_fd);
+  }
+
+  // If we have sent all notifications, remove the fd from the event loop.
+  if (it->second.object_notifications.empty()) { loop_->remove_file_event(client_fd); }
+}
+
+void PlasmaStore::push_notification(ObjectInfoT* object_info) {
+  for (auto& element : pending_notifications_) {
+    uint8_t* notification = create_object_info_buffer(object_info);
+    element.second.object_notifications.push_back(notification);
+    send_notifications(element.first);
+    // The notification gets freed in send_notifications when the notification
+    // is sent over the socket.
+  }
+}
+
+// Subscribe to notifications about sealed objects.
+void PlasmaStore::subscribe_to_updates(Client* client) {
+  ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd;
+  // TODO(rkn): The store could block here if the client doesn't send a file
+  // descriptor.
+  int fd = recv_fd(client->fd);
+  if (fd < 0) {
+    // This may mean that the client died before sending the file descriptor.
+    ARROW_LOG(WARNING) << "Failed to receive file descriptor from client on fd "
+                       << client->fd << ".";
+    return;
+  }
+
+  // Create a new array to buffer notifications that can't be sent to the
+  // subscriber yet because the socket send buffer is full. TODO(rkn): the queue
+  // never gets freed.
+  // TODO(pcm): Is the following neccessary?
+  pending_notifications_[fd];
+
+  // Push notifications to the new subscriber about existing objects.
+  for (const auto& entry : store_info_.objects) {
+    push_notification(&entry.second->info);
+  }
+  send_notifications(fd);
+}
+
+Status PlasmaStore::process_message(Client* client) {
+  int64_t type;
+  Status s = ReadMessage(client->fd, &type, &input_buffer_);
+  ARROW_CHECK(s.ok() || s.IsIOError());
+
+  uint8_t* input = input_buffer_.data();
+  ObjectID object_id;
+  PlasmaObject object;
+  // TODO(pcm): Get rid of the following.
+  memset(&object, 0, sizeof(object));
+
+  // Process the different types of requests.
+  switch (type) {
+    case MessageType_PlasmaCreateRequest: {
+      int64_t data_size;
+      int64_t metadata_size;
+      RETURN_NOT_OK(ReadCreateRequest(input, &object_id, &data_size, &metadata_size));
+      int error_code =
+          create_object(object_id, data_size, metadata_size, client, &object);
+      HANDLE_SIGPIPE(
+          SendCreateReply(client->fd, object_id, &object, error_code), client->fd);
+      if (error_code == PlasmaError_OK) {
+        warn_if_sigpipe(send_fd(client->fd, object.handle.store_fd), client->fd);
+      }
+    } break;
+    case MessageType_PlasmaGetRequest: {
+      std::vector<ObjectID> object_ids_to_get;
+      int64_t timeout_ms;
+      RETURN_NOT_OK(ReadGetRequest(input, object_ids_to_get, &timeout_ms));
+      process_get_request(client, object_ids_to_get, timeout_ms);
+    } break;
+    case MessageType_PlasmaReleaseRequest:
+      RETURN_NOT_OK(ReadReleaseRequest(input, &object_id));
+      release_object(object_id, client);
+      break;
+    case MessageType_PlasmaContainsRequest:
+      RETURN_NOT_OK(ReadContainsRequest(input, &object_id));
+      if (contains_object(object_id) == OBJECT_FOUND) {
+        HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1), client->fd);
+      } else {
+        HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd);
+      }
+      break;
+    case MessageType_PlasmaSealRequest: {
+      unsigned char digest[kDigestSize];
+      RETURN_NOT_OK(ReadSealRequest(input, &object_id, &digest[0]));
+      seal_object(object_id, &digest[0]);
+    } break;
+    case MessageType_PlasmaEvictRequest: {
+      // This code path should only be used for testing.
+      int64_t num_bytes;
+      RETURN_NOT_OK(ReadEvictRequest(input, &num_bytes));
+      std::vector<ObjectID> objects_to_evict;
+      int64_t num_bytes_evicted =
+          eviction_policy_.choose_objects_to_evict(num_bytes, &objects_to_evict);
+      delete_objects(objects_to_evict);
+      HANDLE_SIGPIPE(SendEvictReply(client->fd, num_bytes_evicted), client->fd);
+    } break;
+    case MessageType_PlasmaSubscribeRequest:
+      subscribe_to_updates(client);
+      break;
+    case MessageType_PlasmaConnectRequest: {
+      HANDLE_SIGPIPE(
+          SendConnectReply(client->fd, store_info_.memory_capacity), client->fd);
+    } break;
+    case DISCONNECT_CLIENT:
+      ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd;
+      disconnect_client(client);
+      break;
+    default:
+      // This code should be unreachable.
+      ARROW_CHECK(0);
+  }
+  return Status::OK();
+}
+
+// Report "success" to valgrind.
+void signal_handler(int signal) {
+  if (signal == SIGTERM) { exit(0); }
+}
+
+void start_server(char* socket_name, int64_t system_memory) {
+  // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
+  // to a client that has already died, the store could die.
+  signal(SIGPIPE, SIG_IGN);
+  // Create the event loop.
+  EventLoop loop;
+  PlasmaStore store(&loop, system_memory);
+  int socket = bind_ipc_sock(socket_name, true);
+  ARROW_CHECK(socket >= 0);
+  // TODO(pcm): Check return value.
+  loop.add_file_event(socket, kEventLoopRead,
+      [&store, socket](int events) { store.connect_client(socket); });
+  loop.run();
+}
+
+int main(int argc, char* argv[]) {
+  signal(SIGTERM, signal_handler);
+  char* socket_name = NULL;
+  int64_t system_memory = -1;
+  int c;
+  while ((c = getopt(argc, argv, "s:m:")) != -1) {
+    switch (c) {
+      case 's':
+        socket_name = optarg;
+        break;
+      case 'm': {
+        char extra;
+        int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra);
+        ARROW_CHECK(scanned == 1);
+        ARROW_LOG(INFO) << "Allowing the Plasma store to use up to "
+                        << static_cast<double>(system_memory) / 1000000000
+                        << "GB of memory.";
+        break;
+      }
+      default:
+        exit(-1);
+    }
+  }
+  if (!socket_name) {
+    ARROW_LOG(FATAL) << "please specify socket for incoming connections with -s switch";
+  }
+  if (system_memory == -1) {
+    ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch";
+  }
+#ifdef __linux__
+  // On Linux, check that the amount of memory available in /dev/shm is large
+  // enough to accommodate the request. If it isn't, then fail.
+  int shm_fd = open("/dev/shm", O_RDONLY);
+  struct statvfs shm_vfs_stats;
+  fstatvfs(shm_fd, &shm_vfs_stats);
+  // The value shm_vfs_stats.f_bsize is the block size, and the value
+  // shm_vfs_stats.f_bavail is the number of available blocks.
+  int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
+  close(shm_fd);
+  if (system_memory > shm_mem_avail) {
+    ARROW_LOG(FATAL) << "System memory request exceeds memory available in /dev/shm. The "
+                        "request is for "
+                     << system_memory << " bytes, and the amount available is "
+                     << shm_mem_avail
+                     << " bytes. You may be able to free up space by deleting files in "
+                        "/dev/shm. If you are inside a Docker container, you may need to "
+                        "pass "
+                        "an argument with the flag '--shm-size' to 'docker run'.";
+  }
+#endif
+  // Make it so dlmalloc fails if we try to request more memory than is
+  // available.
+  dlmalloc_set_footprint_limit((size_t)system_memory);
+  ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
+  start_server(socket_name, system_memory);
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/store.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
new file mode 100644
index 0000000..8bd9426
--- /dev/null
+++ b/cpp/src/plasma/store.h
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_STORE_H
+#define PLASMA_STORE_H
+
+#include <deque>
+#include <vector>
+
+#include "plasma/common.h"
+#include "plasma/events.h"
+#include "plasma/eviction_policy.h"
+#include "plasma/plasma.h"
+#include "plasma/protocol.h"
+
+struct GetRequest;
+
+struct NotificationQueue {
+  /// The object notifications for clients. We notify the client about the
+  /// objects in the order that the objects were sealed or deleted.
+  std::deque<uint8_t*> object_notifications;
+};
+
+/// Contains all information that is associated with a Plasma store client.
+struct Client {
+  explicit Client(int fd);
+
+  /// The file descriptor used to communicate with the client.
+  int fd;
+};
+
+class PlasmaStore {
+ public:
+  PlasmaStore(EventLoop* loop, int64_t system_memory);
+
+  ~PlasmaStore();
+
+  /// Create a new object. The client must do a call to release_object to tell
+  /// the store when it is done with the object.
+  ///
+  /// @param object_id Object ID of the object to be created.
+  /// @param data_size Size in bytes of the object to be created.
+  /// @param metadata_size Size in bytes of the object metadata.
+  /// @return One of the following error codes:
+  ///  - PlasmaError_OK, if the object was created successfully.
+  ///  - PlasmaError_ObjectExists, if an object with this ID is already
+  ///    present in the store. In this case, the client should not call
+  ///    plasma_release.
+  ///  - PlasmaError_OutOfMemory, if the store is out of memory and
+  ///    cannot create the object. In this case, the client should not call
+  ///    plasma_release.
+  int create_object(const ObjectID& object_id, int64_t data_size, int64_t metadata_size,
+      Client* client, PlasmaObject* result);
+
+  /// Delete objects that have been created in the hash table. This should only
+  /// be called on objects that are returned by the eviction policy to evict.
+  ///
+  /// @param object_ids Object IDs of the objects to be deleted.
+  /// @return Void.
+  void delete_objects(const std::vector<ObjectID>& object_ids);
+
+  /// Process a get request from a client. This method assumes that we will
+  /// eventually have these objects sealed. If one of the objects has not yet
+  /// been sealed, the client that requested the object will be notified when it
+  /// is sealed.
+  ///
+  /// For each object, the client must do a call to release_object to tell the
+  /// store when it is done with the object.
+  ///
+  /// @param client The client making this request.
+  /// @param object_ids Object IDs of the objects to be gotten.
+  /// @param timeout_ms The timeout for the get request in milliseconds.
+  /// @return Void.
+  void process_get_request(
+      Client* client, const std::vector<ObjectID>& object_ids, int64_t timeout_ms);
+
+  /// Seal an object. The object is now immutable and can be accessed with get.
+  ///
+  /// @param object_id Object ID of the object to be sealed.
+  /// @param digest The digest of the object. This is used to tell if two
+  /// objects
+  ///        with the same object ID are the same.
+  /// @return Void.
+  void seal_object(const ObjectID& object_id, unsigned char digest[]);
+
+  /// Check if the plasma store contains an object:
+  ///
+  /// @param object_id Object ID that will be checked.
+  /// @return OBJECT_FOUND if the object is in the store, OBJECT_NOT_FOUND if
+  /// not
+  int contains_object(const ObjectID& object_id);
+
+  /// Record the fact that a particular client is no longer using an object.
+  ///
+  /// @param object_id The object ID of the object that is being released.
+  /// @param client The client making this request.
+  /// @param Void.
+  void release_object(const ObjectID& object_id, Client* client);
+
+  /// Subscribe a file descriptor to updates about new sealed objects.
+  ///
+  /// @param client The client making this request.
+  /// @return Void.
+  void subscribe_to_updates(Client* client);
+
+  /// Connect a new client to the PlasmaStore.
+  ///
+  /// @param listener_sock The socket that is listening to incoming connections.
+  /// @return Void.
+  void connect_client(int listener_sock);
+
+  /// Disconnect a client from the PlasmaStore.
+  ///
+  /// @param client The client that is disconnected.
+  /// @return Void.
+  void disconnect_client(Client* client);
+
+  void send_notifications(int client_fd);
+
+  Status process_message(Client* client);
+
+ private:
+  void push_notification(ObjectInfoT* object_notification);
+
+  void add_client_to_object_clients(ObjectTableEntry* entry, Client* client);
+
+  void return_from_get(GetRequest* get_req);
+
+  void update_object_get_requests(const ObjectID& object_id);
+
+  int remove_client_from_object_clients(ObjectTableEntry* entry, Client* client);
+
+  /// Event loop of the plasma store.
+  EventLoop* loop_;
+  /// The plasma store information, including the object tables, that is exposed
+  /// to the eviction policy.
+  PlasmaStoreInfo store_info_;
+  /// The state that is managed by the eviction policy.
+  EvictionPolicy eviction_policy_;
+  /// Input buffer. This is allocated only once to avoid mallocs for every
+  /// call to process_message.
+  std::vector<uint8_t> input_buffer_;
+  /// A hash table mapping object IDs to a vector of the get requests that are
+  /// waiting for the object to arrive.
+  std::unordered_map<ObjectID, std::vector<GetRequest*>, UniqueIDHasher>
+      object_get_requests_;
+  /// The pending notifications that have not been sent to subscribers because
+  /// the socket send buffers were full. This is a hash table from client file
+  /// descriptor to an array of object_ids to send to that client.
+  /// TODO(pcm): Consider putting this into the Client data structure and
+  /// reorganize the code slightly.
+  std::unordered_map<int, NotificationQueue> pending_notifications_;
+};
+
+#endif  // PLASMA_STORE_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/test/client_tests.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
new file mode 100644
index 0000000..dc45773
--- /dev/null
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include <assert.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "plasma/client.h"
+#include "plasma/common.h"
+#include "plasma/plasma.h"
+#include "plasma/protocol.h"
+
+std::string g_test_executable;
+
+class TestPlasmaStore : public ::testing::Test {
+ public:
+  // TODO(pcm): At the moment, stdout of the test gets mixed up with
+  // stdout of the object store. Consider changing that.
+  void SetUp() {
+    std::string plasma_directory =
+        g_test_executable.substr(0, g_test_executable.find_last_of("/"));
+    std::string plasma_command =
+        plasma_directory +
+        "/plasma_store -m 1000000000 -s /tmp/store 1> /dev/null 2> /dev/null &";
+    system(plasma_command.c_str());
+    ARROW_CHECK_OK(client_.Connect("/tmp/store", "", PLASMA_DEFAULT_RELEASE_DELAY));
+  }
+  virtual void Finish() {
+    ARROW_CHECK_OK(client_.Disconnect());
+    system("killall plasma_store &");
+  }
+
+ protected:
+  PlasmaClient client_;
+};
+
+TEST_F(TestPlasmaStore, ContainsTest) {
+  ObjectID object_id = ObjectID::from_random();
+
+  // Test for object non-existence.
+  bool has_object;
+  ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+  ASSERT_EQ(has_object, false);
+
+  // Test for the object being in local Plasma store.
+  // First create object.
+  int64_t data_size = 100;
+  uint8_t metadata[] = {5};
+  int64_t metadata_size = sizeof(metadata);
+  uint8_t* data;
+  ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
+  ARROW_CHECK_OK(client_.Seal(object_id));
+  // Avoid race condition of Plasma Manager waiting for notification.
+  ObjectBuffer object_buffer;
+  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+  ASSERT_EQ(has_object, true);
+}
+
+TEST_F(TestPlasmaStore, GetTest) {
+  ObjectID object_id = ObjectID::from_random();
+  ObjectBuffer object_buffer;
+
+  // Test for object non-existence.
+  ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
+  ASSERT_EQ(object_buffer.data_size, -1);
+
+  // Test for the object being in local Plasma store.
+  // First create object.
+  int64_t data_size = 4;
+  uint8_t metadata[] = {5};
+  int64_t metadata_size = sizeof(metadata);
+  uint8_t* data;
+  ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
+  for (int64_t i = 0; i < data_size; i++) {
+    data[i] = static_cast<uint8_t>(i % 4);
+  }
+  ARROW_CHECK_OK(client_.Seal(object_id));
+
+  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  for (int64_t i = 0; i < data_size; i++) {
+    ASSERT_EQ(data[i], object_buffer.data[i]);
+  }
+}
+
+TEST_F(TestPlasmaStore, MultipleGetTest) {
+  ObjectID object_id1 = ObjectID::from_random();
+  ObjectID object_id2 = ObjectID::from_random();
+  ObjectID object_ids[2] = {object_id1, object_id2};
+  ObjectBuffer object_buffer[2];
+
+  int64_t data_size = 4;
+  uint8_t metadata[] = {5};
+  int64_t metadata_size = sizeof(metadata);
+  uint8_t* data;
+  ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data));
+  data[0] = 1;
+  ARROW_CHECK_OK(client_.Seal(object_id1));
+
+  ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data));
+  data[0] = 2;
+  ARROW_CHECK_OK(client_.Seal(object_id2));
+
+  ARROW_CHECK_OK(client_.Get(object_ids, 2, -1, object_buffer));
+  ASSERT_EQ(object_buffer[0].data[0], 1);
+  ASSERT_EQ(object_buffer[1].data[0], 2);
+}
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  g_test_executable = std::string(argv[0]);
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/test/run_tests.sh
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/test/run_tests.sh b/cpp/src/plasma/test/run_tests.sh
new file mode 100644
index 0000000..958bd08
--- /dev/null
+++ b/cpp/src/plasma/test/run_tests.sh
@@ -0,0 +1,61 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Cause the script to exit if a single command fails.
+set -e
+
+./src/plasma/plasma_store -s /tmp/plasma_store_socket_1 -m 0 &
+sleep 1
+./src/plasma/manager_tests
+killall plasma_store
+./src/plasma/serialization_tests
+
+# Start the Redis shards.
+./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 &
+redis_pid1=$!
+./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 &
+redis_pid2=$!
+sleep 1
+
+# Flush the redis server
+./src/common/thirdparty/redis/src/redis-cli flushall
+# Register the shard location with the primary shard.
+./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1
+./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380
+sleep 1
+./src/plasma/plasma_store -s /tmp/store1 -m 1000000000 &
+plasma1_pid=$!
+./src/plasma/plasma_manager -m /tmp/manager1 -s /tmp/store1 -h 127.0.0.1 -p 11111 -r 127.0.0.1:6379 &
+plasma2_pid=$!
+./src/plasma/plasma_store -s /tmp/store2 -m 1000000000 &
+plasma3_pid=$!
+./src/plasma/plasma_manager -m /tmp/manager2 -s /tmp/store2 -h 127.0.0.1 -p 22222 -r 127.0.0.1:6379 &
+plasma4_pid=$!
+sleep 1
+
+./src/plasma/client_tests
+
+kill $plasma4_pid
+kill $plasma3_pid
+kill $plasma2_pid
+kill $plasma1_pid
+kill $redis_pid1
+wait $redis_pid1
+kill $redis_pid2
+wait $redis_pid2

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/test/run_valgrind.sh
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/test/run_valgrind.sh b/cpp/src/plasma/test/run_valgrind.sh
new file mode 100644
index 0000000..0472194
--- /dev/null
+++ b/cpp/src/plasma/test/run_valgrind.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Cause the script to exit if a single command fails.
+set -e
+
+./src/plasma/plasma_store -s /tmp/plasma_store_socket_1 -m 0 &
+sleep 1
+valgrind --leak-check=full --error-exitcode=1 ./src/plasma/manager_tests
+killall plasma_store
+valgrind --leak-check=full --error-exitcode=1 ./src/plasma/serialization_tests

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/test/serialization_tests.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
new file mode 100644
index 0000000..325cead
--- /dev/null
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "plasma/common.h"
+#include "plasma/io.h"
+#include "plasma/plasma.h"
+#include "plasma/protocol.h"
+
+/**
+ * Create a temporary file. Needs to be closed by the caller.
+ *
+ * @return File descriptor of the file.
+ */
+int create_temp_file(void) {
+  static char temp[] = "/tmp/tempfileXXXXXX";
+  char file_name[32];
+  strncpy(file_name, temp, 32);
+  return mkstemp(file_name);
+}
+
+/**
+ * Seek to the beginning of a file and read a message from it.
+ *
+ * @param fd File descriptor of the file.
+ * @param message type Message type that we expect in the file.
+ *
+ * @return Pointer to the content of the message. Needs to be freed by the
+ * caller.
+ */
+std::vector<uint8_t> read_message_from_file(int fd, int message_type) {
+  /* Go to the beginning of the file. */
+  lseek(fd, 0, SEEK_SET);
+  int64_t type;
+  std::vector<uint8_t> data;
+  ARROW_CHECK_OK(ReadMessage(fd, &type, &data));
+  ARROW_CHECK(type == message_type);
+  return data;
+}
+
+PlasmaObject random_plasma_object(void) {
+  unsigned int seed = static_cast<unsigned int>(time(NULL));
+  int random = rand_r(&seed);
+  PlasmaObject object;
+  memset(&object, 0, sizeof(object));
+  object.handle.store_fd = random + 7;
+  object.handle.mmap_size = random + 42;
+  object.data_offset = random + 1;
+  object.metadata_offset = random + 2;
+  object.data_size = random + 3;
+  object.metadata_size = random + 4;
+  return object;
+}
+
+TEST(PlasmaSerialization, CreateRequest) {
+  int fd = create_temp_file();
+  ObjectID object_id1 = ObjectID::from_random();
+  int64_t data_size1 = 42;
+  int64_t metadata_size1 = 11;
+  ARROW_CHECK_OK(SendCreateRequest(fd, object_id1, data_size1, metadata_size1));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaCreateRequest);
+  ObjectID object_id2;
+  int64_t data_size2;
+  int64_t metadata_size2;
+  ARROW_CHECK_OK(
+      ReadCreateRequest(data.data(), &object_id2, &data_size2, &metadata_size2));
+  ASSERT_EQ(data_size1, data_size2);
+  ASSERT_EQ(metadata_size1, metadata_size2);
+  ASSERT_EQ(object_id1, object_id2);
+  close(fd);
+}
+
+TEST(PlasmaSerialization, CreateReply) {
+  int fd = create_temp_file();
+  ObjectID object_id1 = ObjectID::from_random();
+  PlasmaObject object1 = random_plasma_object();
+  ARROW_CHECK_OK(SendCreateReply(fd, object_id1, &object1, 0));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaCreateReply);
+  ObjectID object_id2;
+  PlasmaObject object2;
+  memset(&object2, 0, sizeof(object2));
+  ARROW_CHECK_OK(ReadCreateReply(data.data(), &object_id2, &object2));
+  ASSERT_EQ(object_id1, object_id2);
+  ASSERT_EQ(memcmp(&object1, &object2, sizeof(object1)), 0);
+  close(fd);
+}
+
+TEST(PlasmaSerialization, SealRequest) {
+  int fd = create_temp_file();
+  ObjectID object_id1 = ObjectID::from_random();
+  unsigned char digest1[kDigestSize];
+  memset(&digest1[0], 7, kDigestSize);
+  ARROW_CHECK_OK(SendSealRequest(fd, object_id1, &digest1[0]));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaSealRequest);
+  ObjectID object_id2;
+  unsigned char digest2[kDigestSize];
+  ARROW_CHECK_OK(ReadSealRequest(data.data(), &object_id2, &digest2[0]));
+  ASSERT_EQ(object_id1, object_id2);
+  ASSERT_EQ(memcmp(&digest1[0], &digest2[0], kDigestSize), 0);
+  close(fd);
+}
+
+TEST(PlasmaSerialization, SealReply) {
+  int fd = create_temp_file();
+  ObjectID object_id1 = ObjectID::from_random();
+  ARROW_CHECK_OK(SendSealReply(fd, object_id1, PlasmaError_ObjectExists));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaSealReply);
+  ObjectID object_id2;
+  Status s = ReadSealReply(data.data(), &object_id2);
+  ASSERT_EQ(object_id1, object_id2);
+  ASSERT_TRUE(s.IsPlasmaObjectExists());
+  close(fd);
+}
+
+TEST(PlasmaSerialization, GetRequest) {
+  int fd = create_temp_file();
+  ObjectID object_ids[2];
+  object_ids[0] = ObjectID::from_random();
+  object_ids[1] = ObjectID::from_random();
+  int64_t timeout_ms = 1234;
+  ARROW_CHECK_OK(SendGetRequest(fd, object_ids, 2, timeout_ms));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaGetRequest);
+  std::vector<ObjectID> object_ids_return;
+  int64_t timeout_ms_return;
+  ARROW_CHECK_OK(ReadGetRequest(data.data(), object_ids_return, &timeout_ms_return));
+  ASSERT_EQ(object_ids[0], object_ids_return[0]);
+  ASSERT_EQ(object_ids[1], object_ids_return[1]);
+  ASSERT_EQ(timeout_ms, timeout_ms_return);
+  close(fd);
+}
+
+TEST(PlasmaSerialization, GetReply) {
+  int fd = create_temp_file();
+  ObjectID object_ids[2];
+  object_ids[0] = ObjectID::from_random();
+  object_ids[1] = ObjectID::from_random();
+  std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher> plasma_objects;
+  plasma_objects[object_ids[0]] = random_plasma_object();
+  plasma_objects[object_ids[1]] = random_plasma_object();
+  ARROW_CHECK_OK(SendGetReply(fd, object_ids, plasma_objects, 2));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaGetReply);
+  ObjectID object_ids_return[2];
+  PlasmaObject plasma_objects_return[2];
+  memset(&plasma_objects_return, 0, sizeof(plasma_objects_return));
+  ARROW_CHECK_OK(
+      ReadGetReply(data.data(), object_ids_return, &plasma_objects_return[0], 2));
+  ASSERT_EQ(object_ids[0], object_ids_return[0]);
+  ASSERT_EQ(object_ids[1], object_ids_return[1]);
+  ASSERT_EQ(memcmp(&plasma_objects[object_ids[0]], &plasma_objects_return[0],
+                sizeof(PlasmaObject)),
+      0);
+  ASSERT_EQ(memcmp(&plasma_objects[object_ids[1]], &plasma_objects_return[1],
+                sizeof(PlasmaObject)),
+      0);
+  close(fd);
+}
+
+TEST(PlasmaSerialization, ReleaseRequest) {
+  int fd = create_temp_file();
+  ObjectID object_id1 = ObjectID::from_random();
+  ARROW_CHECK_OK(SendReleaseRequest(fd, object_id1));
+  std::vector<uint8_t> data =
+      read_message_from_file(fd, MessageType_PlasmaReleaseRequest);
+  ObjectID object_id2;
+  ARROW_CHECK_OK(ReadReleaseRequest(data.data(), &object_id2));
+  ASSERT_EQ(object_id1, object_id2);
+  close(fd);
+}
+
+TEST(PlasmaSerialization, ReleaseReply) {
+  int fd = create_temp_file();
+  ObjectID object_id1 = ObjectID::from_random();
+  ARROW_CHECK_OK(SendReleaseReply(fd, object_id1, PlasmaError_ObjectExists));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaReleaseReply);
+  ObjectID object_id2;
+  Status s = ReadReleaseReply(data.data(), &object_id2);
+  ASSERT_EQ(object_id1, object_id2);
+  ASSERT_TRUE(s.IsPlasmaObjectExists());
+  close(fd);
+}
+
+TEST(PlasmaSerialization, DeleteRequest) {
+  int fd = create_temp_file();
+  ObjectID object_id1 = ObjectID::from_random();
+  ARROW_CHECK_OK(SendDeleteRequest(fd, object_id1));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaDeleteRequest);
+  ObjectID object_id2;
+  ARROW_CHECK_OK(ReadDeleteRequest(data.data(), &object_id2));
+  ASSERT_EQ(object_id1, object_id2);
+  close(fd);
+}
+
+TEST(PlasmaSerialization, DeleteReply) {
+  int fd = create_temp_file();
+  ObjectID object_id1 = ObjectID::from_random();
+  int error1 = PlasmaError_ObjectExists;
+  ARROW_CHECK_OK(SendDeleteReply(fd, object_id1, error1));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaDeleteReply);
+  ObjectID object_id2;
+  Status s = ReadDeleteReply(data.data(), &object_id2);
+  ASSERT_EQ(object_id1, object_id2);
+  ASSERT_TRUE(s.IsPlasmaObjectExists());
+  close(fd);
+}
+
+TEST(PlasmaSerialization, StatusRequest) {
+  int fd = create_temp_file();
+  int64_t num_objects = 2;
+  ObjectID object_ids[num_objects];
+  object_ids[0] = ObjectID::from_random();
+  object_ids[1] = ObjectID::from_random();
+  ARROW_CHECK_OK(SendStatusRequest(fd, object_ids, num_objects));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaStatusRequest);
+  ObjectID object_ids_read[num_objects];
+  ARROW_CHECK_OK(ReadStatusRequest(data.data(), object_ids_read, num_objects));
+  ASSERT_EQ(object_ids[0], object_ids_read[0]);
+  ASSERT_EQ(object_ids[1], object_ids_read[1]);
+  close(fd);
+}
+
+TEST(PlasmaSerialization, StatusReply) {
+  int fd = create_temp_file();
+  ObjectID object_ids[2];
+  object_ids[0] = ObjectID::from_random();
+  object_ids[1] = ObjectID::from_random();
+  int object_statuses[2] = {42, 43};
+  ARROW_CHECK_OK(SendStatusReply(fd, object_ids, object_statuses, 2));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaStatusReply);
+  int64_t num_objects = ReadStatusReply_num_objects(data.data());
+  ObjectID object_ids_read[num_objects];
+  int object_statuses_read[num_objects];
+  ARROW_CHECK_OK(
+      ReadStatusReply(data.data(), object_ids_read, object_statuses_read, num_objects));
+  ASSERT_EQ(object_ids[0], object_ids_read[0]);
+  ASSERT_EQ(object_ids[1], object_ids_read[1]);
+  ASSERT_EQ(object_statuses[0], object_statuses_read[0]);
+  ASSERT_EQ(object_statuses[1], object_statuses_read[1]);
+  close(fd);
+}
+
+TEST(PlasmaSerialization, EvictRequest) {
+  int fd = create_temp_file();
+  int64_t num_bytes = 111;
+  ARROW_CHECK_OK(SendEvictRequest(fd, num_bytes));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaEvictRequest);
+  int64_t num_bytes_received;
+  ARROW_CHECK_OK(ReadEvictRequest(data.data(), &num_bytes_received));
+  ASSERT_EQ(num_bytes, num_bytes_received);
+  close(fd);
+}
+
+TEST(PlasmaSerialization, EvictReply) {
+  int fd = create_temp_file();
+  int64_t num_bytes = 111;
+  ARROW_CHECK_OK(SendEvictReply(fd, num_bytes));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaEvictReply);
+  int64_t num_bytes_received;
+  ARROW_CHECK_OK(ReadEvictReply(data.data(), num_bytes_received));
+  ASSERT_EQ(num_bytes, num_bytes_received);
+  close(fd);
+}
+
+TEST(PlasmaSerialization, FetchRequest) {
+  int fd = create_temp_file();
+  ObjectID object_ids[2];
+  object_ids[0] = ObjectID::from_random();
+  object_ids[1] = ObjectID::from_random();
+  ARROW_CHECK_OK(SendFetchRequest(fd, object_ids, 2));
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaFetchRequest);
+  std::vector<ObjectID> object_ids_read;
+  ARROW_CHECK_OK(ReadFetchRequest(data.data(), object_ids_read));
+  ASSERT_EQ(object_ids[0], object_ids_read[0]);
+  ASSERT_EQ(object_ids[1], object_ids_read[1]);
+  close(fd);
+}
+
+TEST(PlasmaSerialization, WaitRequest) {
+  int fd = create_temp_file();
+  const int num_objects_in = 2;
+  ObjectRequest object_requests_in[num_objects_in] = {
+      ObjectRequest({ObjectID::from_random(), PLASMA_QUERY_ANYWHERE, 0}),
+      ObjectRequest({ObjectID::from_random(), PLASMA_QUERY_LOCAL, 0})};
+  const int num_ready_objects_in = 1;
+  int64_t timeout_ms = 1000;
+
+  ARROW_CHECK_OK(SendWaitRequest(
+      fd, &object_requests_in[0], num_objects_in, num_ready_objects_in, timeout_ms));
+  /* Read message back. */
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaWaitRequest);
+  int num_ready_objects_out;
+  int64_t timeout_ms_read;
+  ObjectRequestMap object_requests_out;
+  ARROW_CHECK_OK(ReadWaitRequest(
+      data.data(), object_requests_out, &timeout_ms_read, &num_ready_objects_out));
+  ASSERT_EQ(num_objects_in, object_requests_out.size());
+  ASSERT_EQ(num_ready_objects_out, num_ready_objects_in);
+  for (int i = 0; i < num_objects_in; i++) {
+    const ObjectID& object_id = object_requests_in[i].object_id;
+    ASSERT_EQ(1, object_requests_out.count(object_id));
+    const auto& entry = object_requests_out.find(object_id);
+    ASSERT_TRUE(entry != object_requests_out.end());
+    ASSERT_EQ(entry->second.object_id, object_requests_in[i].object_id);
+    ASSERT_EQ(entry->second.type, object_requests_in[i].type);
+  }
+  close(fd);
+}
+
+TEST(PlasmaSerialization, WaitReply) {
+  int fd = create_temp_file();
+  const int num_objects_in = 2;
+  /* Create a map with two ObjectRequests in it. */
+  ObjectRequestMap objects_in(num_objects_in);
+  ObjectID id1 = ObjectID::from_random();
+  objects_in[id1] = ObjectRequest({id1, 0, ObjectStatus_Local});
+  ObjectID id2 = ObjectID::from_random();
+  objects_in[id2] = ObjectRequest({id2, 0, ObjectStatus_Nonexistent});
+
+  ARROW_CHECK_OK(SendWaitReply(fd, objects_in, num_objects_in));
+  /* Read message back. */
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaWaitReply);
+  ObjectRequest objects_out[2];
+  int num_objects_out;
+  ARROW_CHECK_OK(ReadWaitReply(data.data(), &objects_out[0], &num_objects_out));
+  ASSERT_EQ(num_objects_in, num_objects_out);
+  for (int i = 0; i < num_objects_out; i++) {
+    /* Each object request must appear exactly once. */
+    ASSERT_EQ(objects_in.count(objects_out[i].object_id), 1);
+    const auto& entry = objects_in.find(objects_out[i].object_id);
+    ASSERT_TRUE(entry != objects_in.end());
+    ASSERT_EQ(entry->second.object_id, objects_out[i].object_id);
+    ASSERT_EQ(entry->second.status, objects_out[i].status);
+  }
+  close(fd);
+}
+
+TEST(PlasmaSerialization, DataRequest) {
+  int fd = create_temp_file();
+  ObjectID object_id1 = ObjectID::from_random();
+  const char* address1 = "address1";
+  int port1 = 12345;
+  ARROW_CHECK_OK(SendDataRequest(fd, object_id1, address1, port1));
+  /* Reading message back. */
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaDataRequest);
+  ObjectID object_id2;
+  char* address2;
+  int port2;
+  ARROW_CHECK_OK(ReadDataRequest(data.data(), &object_id2, &address2, &port2));
+  ASSERT_EQ(object_id1, object_id2);
+  ASSERT_EQ(strcmp(address1, address2), 0);
+  ASSERT_EQ(port1, port2);
+  free(address2);
+  close(fd);
+}
+
+TEST(PlasmaSerialization, DataReply) {
+  int fd = create_temp_file();
+  ObjectID object_id1 = ObjectID::from_random();
+  int64_t object_size1 = 146;
+  int64_t metadata_size1 = 198;
+  ARROW_CHECK_OK(SendDataReply(fd, object_id1, object_size1, metadata_size1));
+  /* Reading message back. */
+  std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaDataReply);
+  ObjectID object_id2;
+  int64_t object_size2;
+  int64_t metadata_size2;
+  ARROW_CHECK_OK(ReadDataReply(data.data(), &object_id2, &object_size2, &metadata_size2));
+  ASSERT_EQ(object_id1, object_id2);
+  ASSERT_EQ(object_size1, object_size2);
+  ASSERT_EQ(metadata_size1, metadata_size2);
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/thirdparty/ae/ae.c
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/thirdparty/ae/ae.c b/cpp/src/plasma/thirdparty/ae/ae.c
new file mode 100644
index 0000000..e66808a
--- /dev/null
+++ b/cpp/src/plasma/thirdparty/ae/ae.c
@@ -0,0 +1,465 @@
+/* A simple event-driven programming library. Originally I wrote this code
+ * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
+ * it in form of a library for easy reuse.
+ *
+ * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *   * Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   * Redistributions in binary form must reproduce the above copyright
+ *     notice, this list of conditions and the following disclaimer in the
+ *     documentation and/or other materials provided with the distribution.
+ *   * Neither the name of Redis nor the names of its contributors may be used
+ *     to endorse or promote products derived from this software without
+ *     specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <poll.h>
+#include <string.h>
+#include <time.h>
+#include <errno.h>
+
+#include "ae.h"
+#include "zmalloc.h"
+#include "config.h"
+
+/* Include the best multiplexing layer supported by this system.
+ * The following should be ordered by performances, descending. */
+#ifdef HAVE_EVPORT
+#include "ae_evport.c"
+#else
+    #ifdef HAVE_EPOLL
+    #include "ae_epoll.c"
+    #else
+        #ifdef HAVE_KQUEUE
+        #include "ae_kqueue.c"
+        #else
+        #include "ae_select.c"
+        #endif
+    #endif
+#endif
+
+aeEventLoop *aeCreateEventLoop(int setsize) {
+    aeEventLoop *eventLoop;
+    int i;
+
+    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
+    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
+    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
+    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
+    eventLoop->setsize = setsize;
+    eventLoop->lastTime = time(NULL);
+    eventLoop->timeEventHead = NULL;
+    eventLoop->timeEventNextId = 0;
+    eventLoop->stop = 0;
+    eventLoop->maxfd = -1;
+    eventLoop->beforesleep = NULL;
+    if (aeApiCreate(eventLoop) == -1) goto err;
+    /* Events with mask == AE_NONE are not set. So let's initialize the
+     * vector with it. */
+    for (i = 0; i < setsize; i++)
+        eventLoop->events[i].mask = AE_NONE;
+    return eventLoop;
+
+err:
+    if (eventLoop) {
+        zfree(eventLoop->events);
+        zfree(eventLoop->fired);
+        zfree(eventLoop);
+    }
+    return NULL;
+}
+
+/* Return the current set size. */
+int aeGetSetSize(aeEventLoop *eventLoop) {
+    return eventLoop->setsize;
+}
+
+/* Resize the maximum set size of the event loop.
+ * If the requested set size is smaller than the current set size, but
+ * there is already a file descriptor in use that is >= the requested
+ * set size minus one, AE_ERR is returned and the operation is not
+ * performed at all.
+ *
+ * Otherwise AE_OK is returned and the operation is successful. */
+int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
+    int i;
+
+    if (setsize == eventLoop->setsize) return AE_OK;
+    if (eventLoop->maxfd >= setsize) return AE_ERR;
+    if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR;
+
+    eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize);
+    eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize);
+    eventLoop->setsize = setsize;
+
+    /* Make sure that if we created new slots, they are initialized with
+     * an AE_NONE mask. */
+    for (i = eventLoop->maxfd+1; i < setsize; i++)
+        eventLoop->events[i].mask = AE_NONE;
+    return AE_OK;
+}
+
+void aeDeleteEventLoop(aeEventLoop *eventLoop) {
+    aeApiFree(eventLoop);
+    zfree(eventLoop->events);
+    zfree(eventLoop->fired);
+    zfree(eventLoop);
+}
+
+void aeStop(aeEventLoop *eventLoop) {
+    eventLoop->stop = 1;
+}
+
+int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
+        aeFileProc *proc, void *clientData)
+{
+    if (fd >= eventLoop->setsize) {
+        errno = ERANGE;
+        return AE_ERR;
+    }
+    aeFileEvent *fe = &eventLoop->events[fd];
+
+    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
+        return AE_ERR;
+    fe->mask |= mask;
+    if (mask & AE_READABLE) fe->rfileProc = proc;
+    if (mask & AE_WRITABLE) fe->wfileProc = proc;
+    fe->clientData = clientData;
+    if (fd > eventLoop->maxfd)
+        eventLoop->maxfd = fd;
+    return AE_OK;
+}
+
+void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
+{
+    if (fd >= eventLoop->setsize) return;
+    aeFileEvent *fe = &eventLoop->events[fd];
+    if (fe->mask == AE_NONE) return;
+
+    aeApiDelEvent(eventLoop, fd, mask);
+    fe->mask = fe->mask & (~mask);
+    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
+        /* Update the max fd */
+        int j;
+
+        for (j = eventLoop->maxfd-1; j >= 0; j--)
+            if (eventLoop->events[j].mask != AE_NONE) break;
+        eventLoop->maxfd = j;
+    }
+}
+
+int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
+    if (fd >= eventLoop->setsize) return 0;
+    aeFileEvent *fe = &eventLoop->events[fd];
+
+    return fe->mask;
+}
+
+static void aeGetTime(long *seconds, long *milliseconds)
+{
+    struct timeval tv;
+
+    gettimeofday(&tv, NULL);
+    *seconds = tv.tv_sec;
+    *milliseconds = tv.tv_usec/1000;
+}
+
+static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
+    long cur_sec, cur_ms, when_sec, when_ms;
+
+    aeGetTime(&cur_sec, &cur_ms);
+    when_sec = cur_sec + milliseconds/1000;
+    when_ms = cur_ms + milliseconds%1000;
+    if (when_ms >= 1000) {
+        when_sec ++;
+        when_ms -= 1000;
+    }
+    *sec = when_sec;
+    *ms = when_ms;
+}
+
+long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
+        aeTimeProc *proc, void *clientData,
+        aeEventFinalizerProc *finalizerProc)
+{
+    long long id = eventLoop->timeEventNextId++;
+    aeTimeEvent *te;
+
+    te = zmalloc(sizeof(*te));
+    if (te == NULL) return AE_ERR;
+    te->id = id;
+    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
+    te->timeProc = proc;
+    te->finalizerProc = finalizerProc;
+    te->clientData = clientData;
+    te->next = eventLoop->timeEventHead;
+    eventLoop->timeEventHead = te;
+    return id;
+}
+
+int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
+{
+    aeTimeEvent *te = eventLoop->timeEventHead;
+    while(te) {
+        if (te->id == id) {
+            te->id = AE_DELETED_EVENT_ID;
+            return AE_OK;
+        }
+        te = te->next;
+    }
+    return AE_ERR; /* NO event with the specified ID found */
+}
+
+/* Search the first timer to fire.
+ * This operation is useful to know how many time the select can be
+ * put in sleep without to delay any event.
+ * If there are no timers NULL is returned.
+ *
+ * Note that's O(N) since time events are unsorted.
+ * Possible optimizations (not needed by Redis so far, but...):
+ * 1) Insert the event in order, so that the nearest is just the head.
+ *    Much better but still insertion or deletion of timers is O(N).
+ * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
+ */
+static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
+{
+    aeTimeEvent *te = eventLoop->timeEventHead;
+    aeTimeEvent *nearest = NULL;
+
+    while(te) {
+        if (!nearest || te->when_sec < nearest->when_sec ||
+                (te->when_sec == nearest->when_sec &&
+                 te->when_ms < nearest->when_ms))
+            nearest = te;
+        te = te->next;
+    }
+    return nearest;
+}
+
+/* Process time events */
+static int processTimeEvents(aeEventLoop *eventLoop) {
+    int processed = 0;
+    aeTimeEvent *te, *prev;
+    long long maxId;
+    time_t now = time(NULL);
+
+    /* If the system clock is moved to the future, and then set back to the
+     * right value, time events may be delayed in a random way. Often this
+     * means that scheduled operations will not be performed soon enough.
+     *
+     * Here we try to detect system clock skews, and force all the time
+     * events to be processed ASAP when this happens: the idea is that
+     * processing events earlier is less dangerous than delaying them
+     * indefinitely, and practice suggests it is. */
+    if (now < eventLoop->lastTime) {
+        te = eventLoop->timeEventHead;
+        while(te) {
+            te->when_sec = 0;
+            te = te->next;
+        }
+    }
+    eventLoop->lastTime = now;
+
+    prev = NULL;
+    te = eventLoop->timeEventHead;
+    maxId = eventLoop->timeEventNextId-1;
+    while(te) {
+        long now_sec, now_ms;
+        long long id;
+
+        /* Remove events scheduled for deletion. */
+        if (te->id == AE_DELETED_EVENT_ID) {
+            aeTimeEvent *next = te->next;
+            if (prev == NULL)
+                eventLoop->timeEventHead = te->next;
+            else
+                prev->next = te->next;
+            if (te->finalizerProc)
+                te->finalizerProc(eventLoop, te->clientData);
+            zfree(te);
+            te = next;
+            continue;
+        }
+
+        /* Make sure we don't process time events created by time events in
+         * this iteration. Note that this check is currently useless: we always
+         * add new timers on the head, however if we change the implementation
+         * detail, this check may be useful again: we keep it here for future
+         * defense. */
+        if (te->id > maxId) {
+            te = te->next;
+            continue;
+        }
+        aeGetTime(&now_sec, &now_ms);
+        if (now_sec > te->when_sec ||
+            (now_sec == te->when_sec && now_ms >= te->when_ms))
+        {
+            int retval;
+
+            id = te->id;
+            retval = te->timeProc(eventLoop, id, te->clientData);
+            processed++;
+            if (retval != AE_NOMORE) {
+                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
+            } else {
+                te->id = AE_DELETED_EVENT_ID;
+            }
+        }
+        prev = te;
+        te = te->next;
+    }
+    return processed;
+}
+
+/* Process every pending time event, then every pending file event
+ * (that may be registered by time event callbacks just processed).
+ * Without special flags the function sleeps until some file event
+ * fires, or when the next time event occurs (if any).
+ *
+ * If flags is 0, the function does nothing and returns.
+ * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
+ * if flags has AE_FILE_EVENTS set, file events are processed.
+ * if flags has AE_TIME_EVENTS set, time events are processed.
+ * if flags has AE_DONT_WAIT set the function returns ASAP until all
+ * the events that's possible to process without to wait are processed.
+ *
+ * The function returns the number of events processed. */
+int aeProcessEvents(aeEventLoop *eventLoop, int flags)
+{
+    int processed = 0, numevents;
+
+    /* Nothing to do? return ASAP */
+    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
+
+    /* Note that we want call select() even if there are no
+     * file events to process as long as we want to process time
+     * events, in order to sleep until the next time event is ready
+     * to fire. */
+    if (eventLoop->maxfd != -1 ||
+        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
+        int j;
+        aeTimeEvent *shortest = NULL;
+        struct timeval tv, *tvp;
+
+        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
+            shortest = aeSearchNearestTimer(eventLoop);
+        if (shortest) {
+            long now_sec, now_ms;
+
+            aeGetTime(&now_sec, &now_ms);
+            tvp = &tv;
+
+            /* How many milliseconds we need to wait for the next
+             * time event to fire? */
+            long long ms =
+                (shortest->when_sec - now_sec)*1000 +
+                shortest->when_ms - now_ms;
+
+            if (ms > 0) {
+                tvp->tv_sec = ms/1000;
+                tvp->tv_usec = (ms % 1000)*1000;
+            } else {
+                tvp->tv_sec = 0;
+                tvp->tv_usec = 0;
+            }
+        } else {
+            /* If we have to check for events but need to return
+             * ASAP because of AE_DONT_WAIT we need to set the timeout
+             * to zero */
+            if (flags & AE_DONT_WAIT) {
+                tv.tv_sec = tv.tv_usec = 0;
+                tvp = &tv;
+            } else {
+                /* Otherwise we can block */
+                tvp = NULL; /* wait forever */
+            }
+        }
+
+        numevents = aeApiPoll(eventLoop, tvp);
+        for (j = 0; j < numevents; j++) {
+            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
+            int mask = eventLoop->fired[j].mask;
+            int fd = eventLoop->fired[j].fd;
+            int rfired = 0;
+
+	    /* note the fe->mask & mask & ... code: maybe an already processed
+             * event removed an element that fired and we still didn't
+             * processed, so we check if the event is still valid. */
+            if (fe->mask & mask & AE_READABLE) {
+                rfired = 1;
+                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
+            }
+            if (fe->mask & mask & AE_WRITABLE) {
+                if (!rfired || fe->wfileProc != fe->rfileProc)
+                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
+            }
+            processed++;
+        }
+    }
+    /* Check time events */
+    if (flags & AE_TIME_EVENTS)
+        processed += processTimeEvents(eventLoop);
+
+    return processed; /* return the number of processed file/time events */
+}
+
+/* Wait for milliseconds until the given file descriptor becomes
+ * writable/readable/exception */
+int aeWait(int fd, int mask, long long milliseconds) {
+    struct pollfd pfd;
+    int retmask = 0, retval;
+
+    memset(&pfd, 0, sizeof(pfd));
+    pfd.fd = fd;
+    if (mask & AE_READABLE) pfd.events |= POLLIN;
+    if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
+
+    if ((retval = poll(&pfd, 1, milliseconds))== 1) {
+        if (pfd.revents & POLLIN) retmask |= AE_READABLE;
+        if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
+	if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
+        if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
+        return retmask;
+    } else {
+        return retval;
+    }
+}
+
+void aeMain(aeEventLoop *eventLoop) {
+    eventLoop->stop = 0;
+    while (!eventLoop->stop) {
+        if (eventLoop->beforesleep != NULL)
+            eventLoop->beforesleep(eventLoop);
+        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
+    }
+}
+
+char *aeGetApiName(void) {
+    return aeApiName();
+}
+
+void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
+    eventLoop->beforesleep = beforesleep;
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/thirdparty/ae/ae.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/thirdparty/ae/ae.h b/cpp/src/plasma/thirdparty/ae/ae.h
new file mode 100644
index 0000000..827c4c9
--- /dev/null
+++ b/cpp/src/plasma/thirdparty/ae/ae.h
@@ -0,0 +1,123 @@
+/* A simple event-driven programming library. Originally I wrote this code
+ * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
+ * it in form of a library for easy reuse.
+ *
+ * Copyright (c) 2006-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *   * Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   * Redistributions in binary form must reproduce the above copyright
+ *     notice, this list of conditions and the following disclaimer in the
+ *     documentation and/or other materials provided with the distribution.
+ *   * Neither the name of Redis nor the names of its contributors may be used
+ *     to endorse or promote products derived from this software without
+ *     specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __AE_H__
+#define __AE_H__
+
+#include <time.h>
+
+#define AE_OK 0
+#define AE_ERR -1
+
+#define AE_NONE 0
+#define AE_READABLE 1
+#define AE_WRITABLE 2
+
+#define AE_FILE_EVENTS 1
+#define AE_TIME_EVENTS 2
+#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
+#define AE_DONT_WAIT 4
+
+#define AE_NOMORE -1
+#define AE_DELETED_EVENT_ID -1
+
+/* Macros */
+#define AE_NOTUSED(V) ((void) V)
+
+struct aeEventLoop;
+
+/* Types and data structures */
+typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
+typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
+typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
+typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
+
+/* File event structure */
+typedef struct aeFileEvent {
+    int mask; /* one of AE_(READABLE|WRITABLE) */
+    aeFileProc *rfileProc;
+    aeFileProc *wfileProc;
+    void *clientData;
+} aeFileEvent;
+
+/* Time event structure */
+typedef struct aeTimeEvent {
+    long long id; /* time event identifier. */
+    long when_sec; /* seconds */
+    long when_ms; /* milliseconds */
+    aeTimeProc *timeProc;
+    aeEventFinalizerProc *finalizerProc;
+    void *clientData;
+    struct aeTimeEvent *next;
+} aeTimeEvent;
+
+/* A fired event */
+typedef struct aeFiredEvent {
+    int fd;
+    int mask;
+} aeFiredEvent;
+
+/* State of an event based program */
+typedef struct aeEventLoop {
+    int maxfd;   /* highest file descriptor currently registered */
+    int setsize; /* max number of file descriptors tracked */
+    long long timeEventNextId;
+    time_t lastTime;     /* Used to detect system clock skew */
+    aeFileEvent *events; /* Registered events */
+    aeFiredEvent *fired; /* Fired events */
+    aeTimeEvent *timeEventHead;
+    int stop;
+    void *apidata; /* This is used for polling API specific data */
+    aeBeforeSleepProc *beforesleep;
+} aeEventLoop;
+
+/* Prototypes */
+aeEventLoop *aeCreateEventLoop(int setsize);
+void aeDeleteEventLoop(aeEventLoop *eventLoop);
+void aeStop(aeEventLoop *eventLoop);
+int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
+        aeFileProc *proc, void *clientData);
+void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
+int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
+long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
+        aeTimeProc *proc, void *clientData,
+        aeEventFinalizerProc *finalizerProc);
+int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
+int aeProcessEvents(aeEventLoop *eventLoop, int flags);
+int aeWait(int fd, int mask, long long milliseconds);
+void aeMain(aeEventLoop *eventLoop);
+char *aeGetApiName(void);
+void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
+int aeGetSetSize(aeEventLoop *eventLoop);
+int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
+
+#endif

http://git-wip-us.apache.org/repos/asf/arrow/blob/5e343098/cpp/src/plasma/thirdparty/ae/ae_epoll.c
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/thirdparty/ae/ae_epoll.c b/cpp/src/plasma/thirdparty/ae/ae_epoll.c
new file mode 100644
index 0000000..410aac7
--- /dev/null
+++ b/cpp/src/plasma/thirdparty/ae/ae_epoll.c
@@ -0,0 +1,135 @@
+/* Linux epoll(2) based ae.c module
+ *
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *   * Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   * Redistributions in binary form must reproduce the above copyright
+ *     notice, this list of conditions and the following disclaimer in the
+ *     documentation and/or other materials provided with the distribution.
+ *   * Neither the name of Redis nor the names of its contributors may be used
+ *     to endorse or promote products derived from this software without
+ *     specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#include <sys/epoll.h>
+
+typedef struct aeApiState {
+    int epfd;
+    struct epoll_event *events;
+} aeApiState;
+
+static int aeApiCreate(aeEventLoop *eventLoop) {
+    aeApiState *state = zmalloc(sizeof(aeApiState));
+
+    if (!state) return -1;
+    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
+    if (!state->events) {
+        zfree(state);
+        return -1;
+    }
+    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
+    if (state->epfd == -1) {
+        zfree(state->events);
+        zfree(state);
+        return -1;
+    }
+    eventLoop->apidata = state;
+    return 0;
+}
+
+static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
+    aeApiState *state = eventLoop->apidata;
+
+    state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
+    return 0;
+}
+
+static void aeApiFree(aeEventLoop *eventLoop) {
+    aeApiState *state = eventLoop->apidata;
+
+    close(state->epfd);
+    zfree(state->events);
+    zfree(state);
+}
+
+static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
+    aeApiState *state = eventLoop->apidata;
+    struct epoll_event ee = {0}; /* avoid valgrind warning */
+    /* If the fd was already monitored for some event, we need a MOD
+     * operation. Otherwise we need an ADD operation. */
+    int op = eventLoop->events[fd].mask == AE_NONE ?
+            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
+
+    ee.events = 0;
+    mask |= eventLoop->events[fd].mask; /* Merge old events */
+    if (mask & AE_READABLE) ee.events |= EPOLLIN;
+    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
+    ee.data.fd = fd;
+    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
+    return 0;
+}
+
+static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
+    aeApiState *state = eventLoop->apidata;
+    struct epoll_event ee = {0}; /* avoid valgrind warning */
+    int mask = eventLoop->events[fd].mask & (~delmask);
+
+    ee.events = 0;
+    if (mask & AE_READABLE) ee.events |= EPOLLIN;
+    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
+    ee.data.fd = fd;
+    if (mask != AE_NONE) {
+        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
+    } else {
+        /* Note, Kernel < 2.6.9 requires a non null event pointer even for
+         * EPOLL_CTL_DEL. */
+        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
+    }
+}
+
+static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
+    aeApiState *state = eventLoop->apidata;
+    int retval, numevents = 0;
+
+    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
+            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
+    if (retval > 0) {
+        int j;
+
+        numevents = retval;
+        for (j = 0; j < numevents; j++) {
+            int mask = 0;
+            struct epoll_event *e = state->events+j;
+
+            if (e->events & EPOLLIN) mask |= AE_READABLE;
+            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
+            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
+            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
+            eventLoop->fired[j].fd = e->data.fd;
+            eventLoop->fired[j].mask = mask;
+        }
+    }
+    return numevents;
+}
+
+static char *aeApiName(void) {
+    return "epoll";
+}


Mime
View raw message