arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [06/14] arrow git commit: [C++] Remove Plasma source tree for 0.5.0 release pending IP Clearance
Date Sun, 23 Jul 2017 18:39:13 GMT
[C++] Remove Plasma source tree for 0.5.0 release pending IP Clearance

Change-Id: I1244ada7297d2f6f567d679452d21a26b91c0360


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/62ef2cd8
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/62ef2cd8
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/62ef2cd8

Branch: refs/heads/master
Commit: 62ef2cd8a39fc93e7fa4bb790d7cd92adb77571f
Parents: db181d1
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Thu Jul 20 12:58:35 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Thu Jul 20 12:58:35 2017 -0400

----------------------------------------------------------------------
 cpp/src/plasma/CMakeLists.txt              |  113 -
 cpp/src/plasma/client.cc                   |  557 --
 cpp/src/plasma/client.h                    |  343 --
 cpp/src/plasma/common.cc                   |   83 -
 cpp/src/plasma/common.h                    |   63 -
 cpp/src/plasma/events.cc                   |   81 -
 cpp/src/plasma/events.h                    |   99 -
 cpp/src/plasma/eviction_policy.cc          |  107 -
 cpp/src/plasma/eviction_policy.h           |  134 -
 cpp/src/plasma/extension.cc                |  456 --
 cpp/src/plasma/extension.h                 |   50 -
 cpp/src/plasma/fling.cc                    |   90 -
 cpp/src/plasma/fling.h                     |   52 -
 cpp/src/plasma/format/.gitignore           |    1 -
 cpp/src/plasma/format/common.fbs           |   34 -
 cpp/src/plasma/format/plasma.fbs           |  291 --
 cpp/src/plasma/io.cc                       |  212 -
 cpp/src/plasma/io.h                        |   55 -
 cpp/src/plasma/malloc.cc                   |  178 -
 cpp/src/plasma/malloc.h                    |   26 -
 cpp/src/plasma/plasma.cc                   |   64 -
 cpp/src/plasma/plasma.h                    |  191 -
 cpp/src/plasma/protocol.cc                 |  502 --
 cpp/src/plasma/protocol.h                  |  170 -
 cpp/src/plasma/store.cc                    |  683 ---
 cpp/src/plasma/store.h                     |  169 -
 cpp/src/plasma/test/client_tests.cc        |  132 -
 cpp/src/plasma/test/run_tests.sh           |   61 -
 cpp/src/plasma/test/run_valgrind.sh        |   27 -
 cpp/src/plasma/test/serialization_tests.cc |  388 --
 cpp/src/plasma/thirdparty/ae/ae.c          |  465 --
 cpp/src/plasma/thirdparty/ae/ae.h          |  123 -
 cpp/src/plasma/thirdparty/ae/ae_epoll.c    |  135 -
 cpp/src/plasma/thirdparty/ae/ae_evport.c   |  320 --
 cpp/src/plasma/thirdparty/ae/ae_kqueue.c   |  138 -
 cpp/src/plasma/thirdparty/ae/ae_select.c   |  106 -
 cpp/src/plasma/thirdparty/ae/config.h      |   54 -
 cpp/src/plasma/thirdparty/ae/zmalloc.h     |   45 -
 cpp/src/plasma/thirdparty/dlmalloc.c       | 6281 -----------------------
 cpp/src/plasma/thirdparty/xxhash.cc        |  889 ----
 cpp/src/plasma/thirdparty/xxhash.h         |  293 --
 41 files changed, 14261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt
deleted file mode 100644
index 4ff3beb..0000000
--- a/cpp/src/plasma/CMakeLists.txt
+++ /dev/null
@@ -1,113 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-cmake_minimum_required(VERSION 2.8)
-
-project(plasma)
-
-find_package(PythonLibsNew REQUIRED)
-find_package(Threads)
-
-option(PLASMA_PYTHON
-  "Build the Plasma Python extensions"
-  OFF)
-
-if(APPLE)
-  SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so")
-endif(APPLE)
-
-include_directories(SYSTEM ${PYTHON_INCLUDE_DIRS})
-include_directories("${FLATBUFFERS_INCLUDE_DIR}" "${CMAKE_CURRENT_LIST_DIR}/" "${CMAKE_CURRENT_LIST_DIR}/thirdparty/" "${CMAKE_CURRENT_LIST_DIR}/../")
-
-set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L")
-
-set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion")
-
-# Compile flatbuffers
-
-set(PLASMA_FBS_SRC "${CMAKE_CURRENT_LIST_DIR}/format/plasma.fbs" "${CMAKE_CURRENT_LIST_DIR}/format/common.fbs")
-set(OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/format/)
-
-set(PLASMA_FBS_OUTPUT_FILES
-  "${OUTPUT_DIR}/common_generated.h"
-  "${OUTPUT_DIR}/plasma_generated.h")
-
-add_custom_target(gen_plasma_fbs DEPENDS ${PLASMA_FBS_OUTPUT_FILES})
-
-if(FLATBUFFERS_VENDORED)
-  add_dependencies(gen_plasma_fbs flatbuffers_ep)
-endif()
-
-add_custom_command(
-  OUTPUT ${PLASMA_FBS_OUTPUT_FILES}
-  # The --gen-object-api flag generates a C++ class MessageT for each
-  # flatbuffers message Message, which can be used to store deserialized
-  # messages in data structures. This is currently used for ObjectInfo for
-  # example.
-  COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${PLASMA_FBS_SRC} --gen-object-api
-  DEPENDS ${PLASMA_FBS_SRC}
-  COMMENT "Running flatc compiler on ${PLASMA_FBS_SRC}"
-  VERBATIM)
-
-if(UNIX AND NOT APPLE)
-  link_libraries(rt)
-endif()
-
-set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
-
-set_source_files_properties(extension.cc PROPERTIES COMPILE_FLAGS -Wno-strict-aliasing)
-
-set(PLASMA_SRCS
-  client.cc
-  common.cc
-  eviction_policy.cc
-  events.cc
-  fling.cc
-  io.cc
-  malloc.cc
-  plasma.cc
-  protocol.cc
-  thirdparty/ae/ae.c
-  thirdparty/xxhash.cc)
-
-ADD_ARROW_LIB(plasma
-  SOURCES ${PLASMA_SRCS}
-  DEPENDENCIES gen_plasma_fbs
-  SHARED_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT} arrow_static
-  STATIC_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT} arrow_static)
-
-# The optimization flag -O3 is suggested by dlmalloc.c, which is #included in
-# malloc.cc; we set it here regardless of whether we do a debug or release build.
-set_source_files_properties(malloc.cc PROPERTIES COMPILE_FLAGS "-Wno-error -O3")
-
-add_executable(plasma_store store.cc)
-target_link_libraries(plasma_store plasma_static)
-
-ADD_ARROW_TEST(test/serialization_tests)
-ARROW_TEST_LINK_LIBRARIES(test/serialization_tests plasma_static)
-ADD_ARROW_TEST(test/client_tests)
-ARROW_TEST_LINK_LIBRARIES(test/client_tests plasma_static)
-
-if(PLASMA_PYTHON)
-  add_library(plasma_extension SHARED extension.cc)
-
-  if(APPLE)
-    target_link_libraries(plasma_extension plasma_static "-undefined dynamic_lookup")
-  else(APPLE)
-    target_link_libraries(plasma_extension plasma_static -Wl,--whole-archive ${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive)
-  endif(APPLE)
-endif()

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/client.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
deleted file mode 100644
index dcb78e7..0000000
--- a/cpp/src/plasma/client.cc
+++ /dev/null
@@ -1,557 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// PLASMA CLIENT: Client library for using the plasma store and manager
-
-#include "plasma/client.h"
-
-#ifdef _WIN32
-#include <Win32_Interop/win32_types.h>
-#endif
-
-#include <assert.h>
-#include <fcntl.h>
-#include <netinet/in.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <strings.h>
-#include <sys/ioctl.h>
-#include <sys/mman.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-#include <algorithm>
-#include <thread>
-#include <vector>
-
-#include "plasma/common.h"
-#include "plasma/fling.h"
-#include "plasma/io.h"
-#include "plasma/plasma.h"
-#include "plasma/protocol.h"
-
-#define XXH_STATIC_LINKING_ONLY
-#include "thirdparty/xxhash.h"
-
-#define XXH64_DEFAULT_SEED 0
-
-// Number of threads used for memcopy and hash computations.
-constexpr int64_t kThreadPoolSize = 8;
-constexpr int64_t kBytesInMB = 1 << 20;
-static std::vector<std::thread> threadpool_(kThreadPoolSize);
-
-// If the file descriptor fd has been mmapped in this client process before,
-// return the pointer that was returned by mmap, otherwise mmap it and store the
-// pointer in a hash table.
-uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size) {
-  auto entry = mmap_table_.find(store_fd_val);
-  if (entry != mmap_table_.end()) {
-    close(fd);
-    return entry->second.pointer;
-  } else {
-    uint8_t* result = reinterpret_cast<uint8_t*>(
-        mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
-    // TODO(pcm): Don't fail here, instead return a Status.
-    if (result == MAP_FAILED) { ARROW_LOG(FATAL) << "mmap failed"; }
-    close(fd);
-    ClientMmapTableEntry& entry = mmap_table_[store_fd_val];
-    entry.pointer = result;
-    entry.length = map_size;
-    entry.count = 0;
-    return result;
-  }
-}
-
-// Get a pointer to a file that we know has been memory mapped in this client
-// process before.
-uint8_t* PlasmaClient::lookup_mmapped_file(int store_fd_val) {
-  auto entry = mmap_table_.find(store_fd_val);
-  ARROW_CHECK(entry != mmap_table_.end());
-  return entry->second.pointer;
-}
-
-void PlasmaClient::increment_object_count(
-    const ObjectID& object_id, PlasmaObject* object, bool is_sealed) {
-  // Increment the count of the object to track the fact that it is being used.
-  // The corresponding decrement should happen in PlasmaClient::Release.
-  auto elem = objects_in_use_.find(object_id);
-  ObjectInUseEntry* object_entry;
-  if (elem == objects_in_use_.end()) {
-    // Add this object ID to the hash table of object IDs in use. The
-    // corresponding call to free happens in PlasmaClient::Release.
-    objects_in_use_[object_id] =
-        std::unique_ptr<ObjectInUseEntry>(new ObjectInUseEntry());
-    objects_in_use_[object_id]->object = *object;
-    objects_in_use_[object_id]->count = 0;
-    objects_in_use_[object_id]->is_sealed = is_sealed;
-    object_entry = objects_in_use_[object_id].get();
-    // Increment the count of the number of objects in the memory-mapped file
-    // that are being used. The corresponding decrement should happen in
-    // PlasmaClient::Release.
-    auto entry = mmap_table_.find(object->handle.store_fd);
-    ARROW_CHECK(entry != mmap_table_.end());
-    ARROW_CHECK(entry->second.count >= 0);
-    // Update the in_use_object_bytes_.
-    in_use_object_bytes_ +=
-        (object_entry->object.data_size + object_entry->object.metadata_size);
-    entry->second.count += 1;
-  } else {
-    object_entry = elem->second.get();
-    ARROW_CHECK(object_entry->count > 0);
-  }
-  // Increment the count of the number of instances of this object that are
-  // being used by this client. The corresponding decrement should happen in
-  // PlasmaClient::Release.
-  object_entry->count += 1;
-}
-
-Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
-    uint8_t* metadata, int64_t metadata_size, uint8_t** data) {
-  ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
-                   << data_size << " and metadata size " << metadata_size;
-  RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size, metadata_size));
-  std::vector<uint8_t> buffer;
-  RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer));
-  ObjectID id;
-  PlasmaObject object;
-  RETURN_NOT_OK(ReadCreateReply(buffer.data(), &id, &object));
-  // If the CreateReply included an error, then the store will not send a file
-  // descriptor.
-  int fd = recv_fd(store_conn_);
-  ARROW_CHECK(fd >= 0) << "recv not successful";
-  ARROW_CHECK(object.data_size == data_size);
-  ARROW_CHECK(object.metadata_size == metadata_size);
-  // The metadata should come right after the data.
-  ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
-  *data = lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) +
-          object.data_offset;
-  // If plasma_create is being called from a transfer, then we will not copy the
-  // metadata here. The metadata will be written along with the data streamed
-  // from the transfer.
-  if (metadata != NULL) {
-    // Copy the metadata to the buffer.
-    memcpy(*data + object.data_size, metadata, metadata_size);
-  }
-  // Increment the count of the number of instances of this object that this
-  // client is using. A call to PlasmaClient::Release is required to decrement
-  // this
-  // count. Cache the reference to the object.
-  increment_object_count(object_id, &object, false);
-  // We increment the count a second time (and the corresponding decrement will
-  // happen in a PlasmaClient::Release call in plasma_seal) so even if the
-  // buffer
-  // returned by PlasmaClient::Dreate goes out of scope, the object does not get
-  // released before the call to PlasmaClient::Seal happens.
-  increment_object_count(object_id, &object, false);
-  return Status::OK();
-}
-
-Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
-    int64_t timeout_ms, ObjectBuffer* object_buffers) {
-  // Fill out the info for the objects that are already in use locally.
-  bool all_present = true;
-  for (int i = 0; i < num_objects; ++i) {
-    auto object_entry = objects_in_use_.find(object_ids[i]);
-    if (object_entry == objects_in_use_.end()) {
-      // This object is not currently in use by this client, so we need to send
-      // a request to the store.
-      all_present = false;
-      // Make a note to ourselves that the object is not present.
-      object_buffers[i].data_size = -1;
-    } else {
-      // NOTE: If the object is still unsealed, we will deadlock, since we must
-      // have been the one who created it.
-      ARROW_CHECK(object_entry->second->is_sealed)
-          << "Plasma client called get on an unsealed object that it created";
-      PlasmaObject* object = &object_entry->second->object;
-      object_buffers[i].data = lookup_mmapped_file(object->handle.store_fd);
-      object_buffers[i].data = object_buffers[i].data + object->data_offset;
-      object_buffers[i].data_size = object->data_size;
-      object_buffers[i].metadata = object_buffers[i].data + object->data_size;
-      object_buffers[i].metadata_size = object->metadata_size;
-      // Increment the count of the number of instances of this object that this
-      // client is using. A call to PlasmaClient::Release is required to
-      // decrement this
-      // count. Cache the reference to the object.
-      increment_object_count(object_ids[i], object, true);
-    }
-  }
-
-  if (all_present) { return Status::OK(); }
-
-  // If we get here, then the objects aren't all currently in use by this
-  // client, so we need to send a request to the plasma store.
-  RETURN_NOT_OK(SendGetRequest(store_conn_, object_ids, num_objects, timeout_ms));
-  std::vector<uint8_t> buffer;
-  RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaGetReply, &buffer));
-  std::vector<ObjectID> received_object_ids(num_objects);
-  std::vector<PlasmaObject> object_data(num_objects);
-  PlasmaObject* object;
-  RETURN_NOT_OK(ReadGetReply(
-      buffer.data(), received_object_ids.data(), object_data.data(), num_objects));
-
-  for (int i = 0; i < num_objects; ++i) {
-    DCHECK(received_object_ids[i] == object_ids[i]);
-    object = &object_data[i];
-    if (object_buffers[i].data_size != -1) {
-      // If the object was already in use by the client, then the store should
-      // have returned it.
-      DCHECK_NE(object->data_size, -1);
-      // We won't use this file descriptor, but the store sent us one, so we
-      // need to receive it and then close it right away so we don't leak file
-      // descriptors.
-      int fd = recv_fd(store_conn_);
-      close(fd);
-      ARROW_CHECK(fd >= 0);
-      // We've already filled out the information for this object, so we can
-      // just continue.
-      continue;
-    }
-    // If we are here, the object was not currently in use, so we need to
-    // process the reply from the object store.
-    if (object->data_size != -1) {
-      // The object was retrieved. The user will be responsible for releasing
-      // this object.
-      int fd = recv_fd(store_conn_);
-      ARROW_CHECK(fd >= 0);
-      object_buffers[i].data =
-          lookup_or_mmap(fd, object->handle.store_fd, object->handle.mmap_size);
-      // Finish filling out the return values.
-      object_buffers[i].data = object_buffers[i].data + object->data_offset;
-      object_buffers[i].data_size = object->data_size;
-      object_buffers[i].metadata = object_buffers[i].data + object->data_size;
-      object_buffers[i].metadata_size = object->metadata_size;
-      // Increment the count of the number of instances of this object that this
-      // client is using. A call to PlasmaClient::Release is required to
-      // decrement this
-      // count. Cache the reference to the object.
-      increment_object_count(received_object_ids[i], object, true);
-    } else {
-      // The object was not retrieved. Make sure we already put a -1 here to
-      // indicate that the object was not retrieved. The caller is not
-      // responsible for releasing this object.
-      DCHECK_EQ(object_buffers[i].data_size, -1);
-      object_buffers[i].data_size = -1;
-    }
-  }
-  return Status::OK();
-}
-
-/// This is a helper method for implementing plasma_release. We maintain a
-/// buffer
-/// of release calls and only perform them once the buffer becomes full (as
-/// judged by the aggregate sizes of the objects). There may be multiple release
-/// calls for the same object ID in the buffer. In this case, the first release
-/// calls will not do anything. The client will only send a message to the store
-/// releasing the object when the client is truly done with the object.
-///
-/// @param conn The plasma connection.
-/// @param object_id The object ID to attempt to release.
-Status PlasmaClient::PerformRelease(const ObjectID& object_id) {
-  // Decrement the count of the number of instances of this object that are
-  // being used by this client. The corresponding increment should have happened
-  // in PlasmaClient::Get.
-  auto object_entry = objects_in_use_.find(object_id);
-  ARROW_CHECK(object_entry != objects_in_use_.end());
-  object_entry->second->count -= 1;
-  ARROW_CHECK(object_entry->second->count >= 0);
-  // Check if the client is no longer using this object.
-  if (object_entry->second->count == 0) {
-    // Decrement the count of the number of objects in this memory-mapped file
-    // that the client is using. The corresponding increment should have
-    // happened in plasma_get.
-    int fd = object_entry->second->object.handle.store_fd;
-    auto entry = mmap_table_.find(fd);
-    ARROW_CHECK(entry != mmap_table_.end());
-    entry->second.count -= 1;
-    ARROW_CHECK(entry->second.count >= 0);
-    // If none are being used then unmap the file.
-    if (entry->second.count == 0) {
-      munmap(entry->second.pointer, entry->second.length);
-      // Remove the corresponding entry from the hash table.
-      mmap_table_.erase(fd);
-    }
-    // Tell the store that the client no longer needs the object.
-    RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id));
-    // Update the in_use_object_bytes_.
-    in_use_object_bytes_ -= (object_entry->second->object.data_size +
-                             object_entry->second->object.metadata_size);
-    DCHECK_GE(in_use_object_bytes_, 0);
-    // Remove the entry from the hash table of objects currently in use.
-    objects_in_use_.erase(object_id);
-  }
-  return Status::OK();
-}
-
-Status PlasmaClient::Release(const ObjectID& object_id) {
-  // Add the new object to the release history.
-  release_history_.push_front(object_id);
-  // If there are too many bytes in use by the client or if there are too many
-  // pending release calls, and there are at least some pending release calls in
-  // the release_history list, then release some objects.
-  while ((in_use_object_bytes_ > std::min(kL3CacheSizeBytes, store_capacity_ / 100) ||
-             release_history_.size() > config_.release_delay) &&
-         release_history_.size() > 0) {
-    // Perform a release for the object ID for the first pending release.
-    RETURN_NOT_OK(PerformRelease(release_history_.back()));
-    // Remove the last entry from the release history.
-    release_history_.pop_back();
-  }
-  return Status::OK();
-}
-
-// This method is used to query whether the plasma store contains an object.
-Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
-  // Check if we already have a reference to the object.
-  if (objects_in_use_.count(object_id) > 0) {
-    *has_object = 1;
-  } else {
-    // If we don't already have a reference to the object, check with the store
-    // to see if we have the object.
-    RETURN_NOT_OK(SendContainsRequest(store_conn_, object_id));
-    std::vector<uint8_t> buffer;
-    RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, &buffer));
-    ObjectID object_id2;
-    RETURN_NOT_OK(ReadContainsReply(buffer.data(), &object_id2, has_object));
-  }
-  return Status::OK();
-}
-
-static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t* hash) {
-  XXH64_state_t hash_state;
-  XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
-  XXH64_update(&hash_state, data, nbytes);
-  *hash = XXH64_digest(&hash_state);
-}
-
-static inline bool compute_object_hash_parallel(
-    XXH64_state_t* hash_state, const unsigned char* data, int64_t nbytes) {
-  // Note that this function will likely be faster if the address of data is
-  // aligned on a 64-byte boundary.
-  const int num_threads = kThreadPoolSize;
-  uint64_t threadhash[num_threads + 1];
-  const uint64_t data_address = reinterpret_cast<uint64_t>(data);
-  const uint64_t num_blocks = nbytes / BLOCK_SIZE;
-  const uint64_t chunk_size = (num_blocks / num_threads) * BLOCK_SIZE;
-  const uint64_t right_address = data_address + chunk_size * num_threads;
-  const uint64_t suffix = (data_address + nbytes) - right_address;
-  // Now the data layout is | k * num_threads * block_size | suffix | ==
-  // | num_threads * chunk_size | suffix |, where chunk_size = k * block_size.
-  // Each thread gets a "chunk" of k blocks, except the suffix thread.
-
-  for (int i = 0; i < num_threads; i++) {
-    threadpool_[i] = std::thread(ComputeBlockHash,
-        reinterpret_cast<uint8_t*>(data_address) + i * chunk_size, chunk_size,
-        &threadhash[i]);
-  }
-  ComputeBlockHash(
-      reinterpret_cast<uint8_t*>(right_address), suffix, &threadhash[num_threads]);
-
-  // Join the threads.
-  for (auto& t : threadpool_) {
-    if (t.joinable()) { t.join(); }
-  }
-
-  XXH64_update(hash_state, (unsigned char*)threadhash, sizeof(threadhash));
-  return true;
-}
-
-static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) {
-  XXH64_state_t hash_state;
-  XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
-  if (obj_buffer.data_size >= kBytesInMB) {
-    compute_object_hash_parallel(
-        &hash_state, (unsigned char*)obj_buffer.data, obj_buffer.data_size);
-  } else {
-    XXH64_update(&hash_state, (unsigned char*)obj_buffer.data, obj_buffer.data_size);
-  }
-  XXH64_update(
-      &hash_state, (unsigned char*)obj_buffer.metadata, obj_buffer.metadata_size);
-  return XXH64_digest(&hash_state);
-}
-
-bool plasma_compute_object_hash(
-    PlasmaClient* conn, ObjectID object_id, unsigned char* digest) {
-  // Get the plasma object data. We pass in a timeout of 0 to indicate that
-  // the operation should timeout immediately.
-  ObjectBuffer object_buffer;
-  ARROW_CHECK_OK(conn->Get(&object_id, 1, 0, &object_buffer));
-  // If the object was not retrieved, return false.
-  if (object_buffer.data_size == -1) { return false; }
-  // Compute the hash.
-  uint64_t hash = compute_object_hash(object_buffer);
-  memcpy(digest, &hash, sizeof(hash));
-  // Release the plasma object.
-  ARROW_CHECK_OK(conn->Release(object_id));
-  return true;
-}
-
-Status PlasmaClient::Seal(const ObjectID& object_id) {
-  // Make sure this client has a reference to the object before sending the
-  // request to Plasma.
-  auto object_entry = objects_in_use_.find(object_id);
-  ARROW_CHECK(object_entry != objects_in_use_.end())
-      << "Plasma client called seal an object without a reference to it";
-  ARROW_CHECK(!object_entry->second->is_sealed)
-      << "Plasma client called seal an already sealed object";
-  object_entry->second->is_sealed = true;
-  /// Send the seal request to Plasma.
-  static unsigned char digest[kDigestSize];
-  ARROW_CHECK(plasma_compute_object_hash(this, object_id, &digest[0]));
-  RETURN_NOT_OK(SendSealRequest(store_conn_, object_id, &digest[0]));
-  // We call PlasmaClient::Release to decrement the number of instances of this
-  // object
-  // that are currently being used by this client. The corresponding increment
-  // happened in plasma_create and was used to ensure that the object was not
-  // released before the call to PlasmaClient::Seal.
-  return Release(object_id);
-}
-
-Status PlasmaClient::Delete(const ObjectID& object_id) {
-  // TODO(rkn): In the future, we can use this method to give hints to the
-  // eviction policy about when an object will no longer be needed.
-  return Status::NotImplemented("PlasmaClient::Delete is not implemented.");
-}
-
-Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
-  // Send a request to the store to evict objects.
-  RETURN_NOT_OK(SendEvictRequest(store_conn_, num_bytes));
-  // Wait for a response with the number of bytes actually evicted.
-  std::vector<uint8_t> buffer;
-  int64_t type;
-  RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer));
-  return ReadEvictReply(buffer.data(), num_bytes_evicted);
-}
-
-Status PlasmaClient::Subscribe(int* fd) {
-  int sock[2];
-  // Create a non-blocking socket pair. This will only be used to send
-  // notifications from the Plasma store to the client.
-  socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
-  // Make the socket non-blocking.
-  int flags = fcntl(sock[1], F_GETFL, 0);
-  ARROW_CHECK(fcntl(sock[1], F_SETFL, flags | O_NONBLOCK) == 0);
-  // Tell the Plasma store about the subscription.
-  RETURN_NOT_OK(SendSubscribeRequest(store_conn_));
-  // Send the file descriptor that the Plasma store should use to push
-  // notifications about sealed objects to this client.
-  ARROW_CHECK(send_fd(store_conn_, sock[1]) >= 0);
-  close(sock[1]);
-  // Return the file descriptor that the client should use to read notifications
-  // about sealed objects.
-  *fd = sock[0];
-  return Status::OK();
-}
-
-Status PlasmaClient::Connect(const std::string& store_socket_name,
-    const std::string& manager_socket_name, int release_delay) {
-  store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1);
-  if (manager_socket_name != "") {
-    manager_conn_ = connect_ipc_sock_retry(manager_socket_name, -1, -1);
-  } else {
-    manager_conn_ = -1;
-  }
-  config_.release_delay = release_delay;
-  in_use_object_bytes_ = 0;
-  // Send a ConnectRequest to the store to get its memory capacity.
-  RETURN_NOT_OK(SendConnectRequest(store_conn_));
-  std::vector<uint8_t> buffer;
-  RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaConnectReply, &buffer));
-  RETURN_NOT_OK(ReadConnectReply(buffer.data(), &store_capacity_));
-  return Status::OK();
-}
-
-Status PlasmaClient::Disconnect() {
-  // NOTE: We purposefully do not finish sending release calls for objects in
-  // use, so that we don't duplicate PlasmaClient::Release calls (when handling
-  // a SIGTERM, for example).
-
-  // Close the connections to Plasma. The Plasma store will release the objects
-  // that were in use by us when handling the SIGPIPE.
-  close(store_conn_);
-  if (manager_conn_ >= 0) { close(manager_conn_); }
-  return Status::OK();
-}
-
-#define h_addr h_addr_list[0]
-
-Status PlasmaClient::Transfer(const char* address, int port, const ObjectID& object_id) {
-  return SendDataRequest(manager_conn_, object_id, address, port);
-}
-
-Status PlasmaClient::Fetch(int num_object_ids, const ObjectID* object_ids) {
-  ARROW_CHECK(manager_conn_ >= 0);
-  return SendFetchRequest(manager_conn_, object_ids, num_object_ids);
-}
-
-int PlasmaClient::get_manager_fd() {
-  return manager_conn_;
-}
-
-Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {
-  ARROW_CHECK(manager_conn_ >= 0);
-
-  RETURN_NOT_OK(SendStatusRequest(manager_conn_, &object_id, 1));
-  std::vector<uint8_t> buffer;
-  RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaStatusReply, &buffer));
-  ObjectID id;
-  RETURN_NOT_OK(ReadStatusReply(buffer.data(), &id, object_status, 1));
-  ARROW_CHECK(object_id == id);
-  return Status::OK();
-}
-
-Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_requests,
-    int num_ready_objects, int64_t timeout_ms, int* num_objects_ready) {
-  ARROW_CHECK(manager_conn_ >= 0);
-  ARROW_CHECK(num_object_requests > 0);
-  ARROW_CHECK(num_ready_objects > 0);
-  ARROW_CHECK(num_ready_objects <= num_object_requests);
-
-  for (int i = 0; i < num_object_requests; ++i) {
-    ARROW_CHECK(object_requests[i].type == PLASMA_QUERY_LOCAL ||
-                object_requests[i].type == PLASMA_QUERY_ANYWHERE);
-  }
-
-  RETURN_NOT_OK(SendWaitRequest(manager_conn_, object_requests, num_object_requests,
-      num_ready_objects, timeout_ms));
-  std::vector<uint8_t> buffer;
-  RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, &buffer));
-  RETURN_NOT_OK(ReadWaitReply(buffer.data(), object_requests, &num_ready_objects));
-
-  *num_objects_ready = 0;
-  for (int i = 0; i < num_object_requests; ++i) {
-    int type = object_requests[i].type;
-    int status = object_requests[i].status;
-    switch (type) {
-      case PLASMA_QUERY_LOCAL:
-        if (status == ObjectStatus_Local) { *num_objects_ready += 1; }
-        break;
-      case PLASMA_QUERY_ANYWHERE:
-        if (status == ObjectStatus_Local || status == ObjectStatus_Remote) {
-          *num_objects_ready += 1;
-        } else {
-          ARROW_CHECK(status == ObjectStatus_Nonexistent);
-        }
-        break;
-      default:
-        ARROW_LOG(FATAL) << "This code should be unreachable.";
-    }
-  }
-  return Status::OK();
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/client.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
deleted file mode 100644
index fb3a161..0000000
--- a/cpp/src/plasma/client.h
+++ /dev/null
@@ -1,343 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_CLIENT_H
-#define PLASMA_CLIENT_H
-
-#include <stdbool.h>
-#include <time.h>
-
-#include <deque>
-#include <string>
-
-#include "plasma/plasma.h"
-
-using arrow::Status;
-
-#define PLASMA_DEFAULT_RELEASE_DELAY 64
-
-// Use 100MB as an overestimate of the L3 cache size.
-constexpr int64_t kL3CacheSizeBytes = 100000000;
-
-/// Object buffer data structure.
-struct ObjectBuffer {
-  /// The size in bytes of the data object.
-  int64_t data_size;
-  /// The address of the data object.
-  uint8_t* data;
-  /// The metadata size in bytes.
-  int64_t metadata_size;
-  /// The address of the metadata.
-  uint8_t* metadata;
-};
-
-/// Configuration options for the plasma client.
-struct PlasmaClientConfig {
-  /// Number of release calls we wait until the object is actually released.
-  /// This allows us to avoid invalidating the cpu cache on workers if objects
-  /// are reused accross tasks.
-  size_t release_delay;
-};
-
-struct ClientMmapTableEntry {
-  /// The result of mmap for this file descriptor.
-  uint8_t* pointer;
-  /// The length of the memory-mapped file.
-  size_t length;
-  /// The number of objects in this memory-mapped file that are currently being
-  /// used by the client. When this count reaches zeros, we unmap the file.
-  int count;
-};
-
-struct ObjectInUseEntry {
-  /// A count of the number of times this client has called PlasmaClient::Create
-  /// or
-  /// PlasmaClient::Get on this object ID minus the number of calls to
-  /// PlasmaClient::Release.
-  /// When this count reaches zero, we remove the entry from the ObjectsInUse
-  /// and decrement a count in the relevant ClientMmapTableEntry.
-  int count;
-  /// Cached information to read the object.
-  PlasmaObject object;
-  /// A flag representing whether the object has been sealed.
-  bool is_sealed;
-};
-
-class PlasmaClient {
- public:
-  /// Connect to the local plasma store and plasma manager. Return
-  /// the resulting connection.
-  ///
-  /// @param store_socket_name The name of the UNIX domain socket to use to
-  ///        connect to the Plasma store.
-  /// @param manager_socket_name The name of the UNIX domain socket to use to
-  ///        connect to the local Plasma manager. If this is "", then this
-  ///        function will not connect to a manager.
-  /// @param release_delay Number of released objects that are kept around
-  ///        and not evicted to avoid too many munmaps.
-  /// @return The return status.
-  Status Connect(const std::string& store_socket_name,
-      const std::string& manager_socket_name, int release_delay);
-
-  /// Create an object in the Plasma Store. Any metadata for this object must be
-  /// be passed in when the object is created.
-  ///
-  /// @param object_id The ID to use for the newly created object.
-  /// @param data_size The size in bytes of the space to be allocated for this
-  /// object's
-  ///        data (this does not include space used for metadata).
-  /// @param metadata The object's metadata. If there is no metadata, this
-  /// pointer
-  ///        should be NULL.
-  /// @param metadata_size The size in bytes of the metadata. If there is no
-  ///        metadata, this should be 0.
-  /// @param data The address of the newly created object will be written here.
-  /// @return The return status.
-  Status Create(const ObjectID& object_id, int64_t data_size, uint8_t* metadata,
-      int64_t metadata_size, uint8_t** data);
-
-  /// Get some objects from the Plasma Store. This function will block until the
-  /// objects have all been created and sealed in the Plasma Store or the
-  /// timeout
-  /// expires. The caller is responsible for releasing any retrieved objects,
-  /// but
-  /// the caller should not release objects that were not retrieved.
-  ///
-  /// @param object_ids The IDs of the objects to get.
-  /// @param num_object_ids The number of object IDs to get.
-  /// @param timeout_ms The amount of time in milliseconds to wait before this
-  ///        request times out. If this value is -1, then no timeout is set.
-  /// @param object_buffers An array where the results will be stored. If the
-  /// data
-  ///        size field is -1, then the object was not retrieved.
-  /// @return The return status.
-  Status Get(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
-      ObjectBuffer* object_buffers);
-
-  /// Tell Plasma that the client no longer needs the object. This should be
-  /// called
-  /// after Get when the client is done with the object. After this call,
-  /// the address returned by Get is no longer valid. This should be called
-  /// once for each call to Get (with the same object ID).
-  ///
-  /// @param object_id The ID of the object that is no longer needed.
-  /// @return The return status.
-  Status Release(const ObjectID& object_id);
-
-  /// Check if the object store contains a particular object and the object has
-  /// been sealed. The result will be stored in has_object.
-  ///
-  /// @todo: We may want to indicate if the object has been created but not
-  /// sealed.
-  ///
-  /// @param object_id The ID of the object whose presence we are checking.
-  /// @param has_object The function will write true at this address if
-  ///        the object is present and false if it is not present.
-  /// @return The return status.
-  Status Contains(const ObjectID& object_id, bool* has_object);
-
-  /// Seal an object in the object store. The object will be immutable after
-  /// this
-  /// call.
-  ///
-  /// @param object_id The ID of the object to seal.
-  /// @return The return status.
-  Status Seal(const ObjectID& object_id);
-
-  /// Delete an object from the object store. This currently assumes that the
-  /// object is present and has been sealed.
-  ///
-  /// @todo We may want to allow the deletion of objects that are not present or
-  ///       haven't been sealed.
-  ///
-  /// @param object_id The ID of the object to delete.
-  /// @return The return status.
-  Status Delete(const ObjectID& object_id);
-
-  /// Delete objects until we have freed up num_bytes bytes or there are no more
-  /// released objects that can be deleted.
-  ///
-  /// @param num_bytes The number of bytes to try to free up.
-  /// @param num_bytes_evicted Out parameter for total number of bytes of space
-  /// retrieved.
-  /// @return The return status.
-  Status Evict(int64_t num_bytes, int64_t& num_bytes_evicted);
-
-  /// Subscribe to notifications when objects are sealed in the object store.
-  /// Whenever an object is sealed, a message will be written to the client
-  /// socket
-  /// that is returned by this method.
-  ///
-  /// @param fd Out parameter for the file descriptor the client should use to
-  /// read notifications
-  ///         from the object store about sealed objects.
-  /// @return The return status.
-  Status Subscribe(int* fd);
-
-  /// Disconnect from the local plasma instance, including the local store and
-  /// manager.
-  ///
-  /// @return The return status.
-  Status Disconnect();
-
-  /// Attempt to initiate the transfer of some objects from remote Plasma
-  /// Stores.
-  /// This method does not guarantee that the fetched objects will arrive
-  /// locally.
-  ///
-  /// For an object that is available in the local Plasma Store, this method
-  /// will
-  /// not do anything. For an object that is not available locally, it will
-  /// check
-  /// if the object are already being fetched. If so, it will not do anything.
-  /// If
-  /// not, it will query the object table for a list of Plasma Managers that
-  /// have
-  /// the object. The object table will return a non-empty list, and this Plasma
-  /// Manager will attempt to initiate transfers from one of those Plasma
-  /// Managers.
-  ///
-  /// This function is non-blocking.
-  ///
-  /// This method is idempotent in the sense that it is ok to call it multiple
-  /// times.
-  ///
-  /// @param num_object_ids The number of object IDs fetch is being called on.
-  /// @param object_ids The IDs of the objects that fetch is being called on.
-  /// @return The return status.
-  Status Fetch(int num_object_ids, const ObjectID* object_ids);
-
-  /// Wait for (1) a specified number of objects to be available (sealed) in the
-  /// local Plasma Store or in a remote Plasma Store, or (2) for a timeout to
-  /// expire. This is a blocking call.
-  ///
-  /// @param num_object_requests Size of the object_requests array.
-  /// @param object_requests Object event array. Each element contains a request
-  ///        for a particular object_id. The type of request is specified in the
-  ///        "type" field.
-  ///        - A PLASMA_QUERY_LOCAL request is satisfied when object_id becomes
-  ///          available in the local Plasma Store. In this case, this function
-  ///          sets the "status" field to ObjectStatus_Local. Note, if the
-  ///          status
-  ///          is not ObjectStatus_Local, it will be ObjectStatus_Nonexistent,
-  ///          but it may exist elsewhere in the system.
-  ///        - A PLASMA_QUERY_ANYWHERE request is satisfied when object_id
-  ///        becomes
-  ///          available either at the local Plasma Store or on a remote Plasma
-  ///          Store. In this case, the functions sets the "status" field to
-  ///          ObjectStatus_Local or ObjectStatus_Remote.
-  /// @param num_ready_objects The number of requests in object_requests array
-  /// that
-  ///        must be satisfied before the function returns, unless it timeouts.
-  ///        The num_ready_objects should be no larger than num_object_requests.
-  /// @param timeout_ms Timeout value in milliseconds. If this timeout expires
-  ///        before min_num_ready_objects of requests are satisfied, the
-  ///        function
-  ///        returns.
-  /// @param num_objects_ready Out parameter for number of satisfied requests in
-  ///        the object_requests list. If the returned number is less than
-  ///        min_num_ready_objects this means that timeout expired.
-  /// @return The return status.
-  Status Wait(int64_t num_object_requests, ObjectRequest* object_requests,
-      int num_ready_objects, int64_t timeout_ms, int* num_objects_ready);
-
-  /// Transfer local object to a different plasma manager.
-  ///
-  /// @param conn The object containing the connection state.
-  /// @param addr IP address of the plasma manager we are transfering to.
-  /// @param port Port of the plasma manager we are transfering to.
-  /// @object_id ObjectID of the object we are transfering.
-  /// @return The return status.
-  Status Transfer(const char* addr, int port, const ObjectID& object_id);
-
-  /// Return the status of a given object. This method may query the object
-  /// table.
-  ///
-  /// @param conn The object containing the connection state.
-  /// @param object_id The ID of the object whose status we query.
-  /// @param object_status Out parameter for object status. Can take the
-  ///         following values.
-  ///         - PLASMA_CLIENT_LOCAL, if object is stored in the local Plasma
-  ///         Store.
-  ///           has been already scheduled by the Plasma Manager.
-  ///         - PLASMA_CLIENT_TRANSFER, if the object is either currently being
-  ///           transferred or just scheduled.
-  ///         - PLASMA_CLIENT_REMOTE, if the object is stored at a remote
-  ///           Plasma Store.
-  ///         - PLASMA_CLIENT_DOES_NOT_EXIST, if the object doesn’t exist in the
-  ///           system.
-  /// @return The return status.
-  Status Info(const ObjectID& object_id, int* object_status);
-
-  /// Get the file descriptor for the socket connection to the plasma manager.
-  ///
-  /// @param conn The plasma connection.
-  /// @return The file descriptor for the manager connection. If there is no
-  ///         connection to the manager, this is -1.
-  int get_manager_fd();
-
- private:
-  Status PerformRelease(const ObjectID& object_id);
-
-  uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size);
-
-  uint8_t* lookup_mmapped_file(int store_fd_val);
-
-  void increment_object_count(
-      const ObjectID& object_id, PlasmaObject* object, bool is_sealed);
-
-  /// File descriptor of the Unix domain socket that connects to the store.
-  int store_conn_;
-  /// File descriptor of the Unix domain socket that connects to the manager.
-  int manager_conn_;
-  /// Table of dlmalloc buffer files that have been memory mapped so far. This
-  /// is a hash table mapping a file descriptor to a struct containing the
-  /// address of the corresponding memory-mapped file.
-  std::unordered_map<int, ClientMmapTableEntry> mmap_table_;
-  /// A hash table of the object IDs that are currently being used by this
-  /// client.
-  std::unordered_map<ObjectID, std::unique_ptr<ObjectInUseEntry>, UniqueIDHasher>
-      objects_in_use_;
-  /// Object IDs of the last few release calls. This is a deque and
-  /// is used to delay releasing objects to see if they can be reused by
-  /// subsequent tasks so we do not unneccessarily invalidate cpu caches.
-  /// TODO(pcm): replace this with a proper lru cache using the size of the L3
-  /// cache.
-  std::deque<ObjectID> release_history_;
-  /// The number of bytes in the combined objects that are held in the release
-  /// history doubly-linked list. If this is too large then the client starts
-  /// releasing objects.
-  int64_t in_use_object_bytes_;
-  /// Configuration options for the plasma client.
-  PlasmaClientConfig config_;
-  /// The amount of memory available to the Plasma store. The client needs this
-  /// information to make sure that it does not delay in releasing so much
-  /// memory that the store is unable to evict enough objects to free up space.
-  int64_t store_capacity_;
-};
-
-/// Compute the hash of an object in the object store.
-///
-/// @param conn The object containing the connection state.
-/// @param object_id The ID of the object we want to hash.
-/// @param digest A pointer at which to return the hash digest of the object.
-///        The pointer must have at least DIGEST_SIZE bytes allocated.
-/// @return A boolean representing whether the hash operation succeeded.
-bool plasma_compute_object_hash(
-    PlasmaClient* conn, ObjectID object_id, unsigned char* digest);
-
-#endif  // PLASMA_CLIENT_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/common.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc
deleted file mode 100644
index a09a963..0000000
--- a/cpp/src/plasma/common.cc
+++ /dev/null
@@ -1,83 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/common.h"
-
-#include <random>
-
-#include "format/plasma_generated.h"
-
-using arrow::Status;
-
-UniqueID UniqueID::from_random() {
-  UniqueID id;
-  uint8_t* data = id.mutable_data();
-  std::random_device engine;
-  for (int i = 0; i < kUniqueIDSize; i++) {
-    data[i] = static_cast<uint8_t>(engine());
-  }
-  return id;
-}
-
-UniqueID UniqueID::from_binary(const std::string& binary) {
-  UniqueID id;
-  std::memcpy(&id, binary.data(), sizeof(id));
-  return id;
-}
-
-const uint8_t* UniqueID::data() const {
-  return id_;
-}
-
-uint8_t* UniqueID::mutable_data() {
-  return id_;
-}
-
-std::string UniqueID::binary() const {
-  return std::string(reinterpret_cast<const char*>(id_), kUniqueIDSize);
-}
-
-std::string UniqueID::hex() const {
-  constexpr char hex[] = "0123456789abcdef";
-  std::string result;
-  for (int i = 0; i < kUniqueIDSize; i++) {
-    unsigned int val = id_[i];
-    result.push_back(hex[val >> 4]);
-    result.push_back(hex[val & 0xf]);
-  }
-  return result;
-}
-
-bool UniqueID::operator==(const UniqueID& rhs) const {
-  return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0;
-}
-
-Status plasma_error_status(int plasma_error) {
-  switch (plasma_error) {
-    case PlasmaError_OK:
-      return Status::OK();
-    case PlasmaError_ObjectExists:
-      return Status::PlasmaObjectExists("object already exists in the plasma store");
-    case PlasmaError_ObjectNonexistent:
-      return Status::PlasmaObjectNonexistent("object does not exist in the plasma store");
-    case PlasmaError_OutOfMemory:
-      return Status::PlasmaStoreFull("object does not fit in the plasma store");
-    default:
-      ARROW_LOG(FATAL) << "unknown plasma error code " << plasma_error;
-  }
-  return Status::OK();
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/common.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
deleted file mode 100644
index 85dc74b..0000000
--- a/cpp/src/plasma/common.h
+++ /dev/null
@@ -1,63 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_COMMON_H
-#define PLASMA_COMMON_H
-
-#include <cstring>
-#include <string>
-// TODO(pcm): Convert getopt and sscanf in the store to use more idiomatic C++
-// and get rid of the next three lines:
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include "arrow/status.h"
-#include "arrow/util/logging.h"
-
-constexpr int64_t kUniqueIDSize = 20;
-
-class UniqueID {
- public:
-  static UniqueID from_random();
-  static UniqueID from_binary(const std::string& binary);
-  bool operator==(const UniqueID& rhs) const;
-  const uint8_t* data() const;
-  uint8_t* mutable_data();
-  std::string binary() const;
-  std::string hex() const;
-
- private:
-  uint8_t id_[kUniqueIDSize];
-};
-
-static_assert(std::is_pod<UniqueID>::value, "UniqueID must be plain old data");
-
-struct UniqueIDHasher {
-  // ObjectID hashing function.
-  size_t operator()(const UniqueID& id) const {
-    size_t result;
-    std::memcpy(&result, id.data(), sizeof(size_t));
-    return result;
-  }
-};
-
-typedef UniqueID ObjectID;
-
-arrow::Status plasma_error_status(int plasma_error);
-
-#endif  // PLASMA_COMMON_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/events.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.cc b/cpp/src/plasma/events.cc
deleted file mode 100644
index a9f7356..0000000
--- a/cpp/src/plasma/events.cc
+++ /dev/null
@@ -1,81 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/events.h"
-
-#include <errno.h>
-
-void EventLoop::file_event_callback(
-    aeEventLoop* loop, int fd, void* context, int events) {
-  FileCallback* callback = reinterpret_cast<FileCallback*>(context);
-  (*callback)(events);
-}
-
-int EventLoop::timer_event_callback(aeEventLoop* loop, TimerID timer_id, void* context) {
-  TimerCallback* callback = reinterpret_cast<TimerCallback*>(context);
-  return (*callback)(timer_id);
-}
-
-constexpr int kInitialEventLoopSize = 1024;
-
-EventLoop::EventLoop() {
-  loop_ = aeCreateEventLoop(kInitialEventLoopSize);
-}
-
-bool EventLoop::add_file_event(int fd, int events, const FileCallback& callback) {
-  if (file_callbacks_.find(fd) != file_callbacks_.end()) { return false; }
-  auto data = std::unique_ptr<FileCallback>(new FileCallback(callback));
-  void* context = reinterpret_cast<void*>(data.get());
-  // Try to add the file descriptor.
-  int err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, context);
-  // If it cannot be added, increase the size of the event loop.
-  if (err == AE_ERR && errno == ERANGE) {
-    err = aeResizeSetSize(loop_, 3 * aeGetSetSize(loop_) / 2);
-    if (err != AE_OK) { return false; }
-    err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, context);
-  }
-  // In any case, test if there were errors.
-  if (err == AE_OK) {
-    file_callbacks_.emplace(fd, std::move(data));
-    return true;
-  }
-  return false;
-}
-
-void EventLoop::remove_file_event(int fd) {
-  aeDeleteFileEvent(loop_, fd, AE_READABLE | AE_WRITABLE);
-  file_callbacks_.erase(fd);
-}
-
-void EventLoop::run() {
-  aeMain(loop_);
-}
-
-int64_t EventLoop::add_timer(int64_t timeout, const TimerCallback& callback) {
-  auto data = std::unique_ptr<TimerCallback>(new TimerCallback(callback));
-  void* context = reinterpret_cast<void*>(data.get());
-  int64_t timer_id =
-      aeCreateTimeEvent(loop_, timeout, EventLoop::timer_event_callback, context, NULL);
-  timer_callbacks_.emplace(timer_id, std::move(data));
-  return timer_id;
-}
-
-int EventLoop::remove_timer(int64_t timer_id) {
-  int err = aeDeleteTimeEvent(loop_, timer_id);
-  timer_callbacks_.erase(timer_id);
-  return err;
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/events.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.h b/cpp/src/plasma/events.h
deleted file mode 100644
index bd93d6b..0000000
--- a/cpp/src/plasma/events.h
+++ /dev/null
@@ -1,99 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_EVENTS
-#define PLASMA_EVENTS
-
-#include <functional>
-#include <memory>
-#include <unordered_map>
-
-extern "C" {
-#include "ae/ae.h"
-}
-
-/// Constant specifying that the timer is done and it will be removed.
-constexpr int kEventLoopTimerDone = AE_NOMORE;
-
-/// Read event on the file descriptor.
-constexpr int kEventLoopRead = AE_READABLE;
-
-/// Write event on the file descriptor.
-constexpr int kEventLoopWrite = AE_WRITABLE;
-
-typedef long long TimerID;  // NOLINT
-
-class EventLoop {
- public:
-  // Signature of the handler that will be called when there is a new event
-  // on the file descriptor that this handler has been registered for.
-  //
-  // The arguments are the event flags (read or write).
-  using FileCallback = std::function<void(int)>;
-
-  // This handler will be called when a timer times out. The timer id is
-  // passed as an argument. The return is the number of milliseconds the timer
-  // shall be reset to or kEventLoopTimerDone if the timer shall not be
-  // triggered again.
-  using TimerCallback = std::function<int(int64_t)>;
-
-  EventLoop();
-
-  /// Add a new file event handler to the event loop.
-  ///
-  /// @param fd The file descriptor we are listening to.
-  /// @param events The flags for events we are listening to (read or write).
-  /// @param callback The callback that will be called when the event happens.
-  /// @return Returns true if the event handler was added successfully.
-  bool add_file_event(int fd, int events, const FileCallback& callback);
-
-  /// Remove a file event handler from the event loop.
-  ///
-  /// @param fd The file descriptor of the event handler.
-  /// @return Void.
-  void remove_file_event(int fd);
-
-  /// Register a handler that will be called after a time slice of
-  ///  "timeout" milliseconds.
-  ///
-  ///  @param timeout The timeout in milliseconds.
-  ///  @param callback The callback for the timeout.
-  ///  @return The ID of the newly created timer.
-  int64_t add_timer(int64_t timeout, const TimerCallback& callback);
-
-  /// Remove a timer handler from the event loop.
-  ///
-  /// @param timer_id The ID of the timer that is to be removed.
-  /// @return The ae.c error code. TODO(pcm): needs to be standardized
-  int remove_timer(int64_t timer_id);
-
-  /// Run the event loop.
-  ///
-  /// @return Void.
-  void run();
-
- private:
-  static void file_event_callback(aeEventLoop* loop, int fd, void* context, int events);
-
-  static int timer_event_callback(aeEventLoop* loop, TimerID timer_id, void* context);
-
-  aeEventLoop* loop_;
-  std::unordered_map<int, std::unique_ptr<FileCallback>> file_callbacks_;
-  std::unordered_map<int64_t, std::unique_ptr<TimerCallback>> timer_callbacks_;
-};
-
-#endif  // PLASMA_EVENTS

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/eviction_policy.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc
deleted file mode 100644
index 4ae6384..0000000
--- a/cpp/src/plasma/eviction_policy.cc
+++ /dev/null
@@ -1,107 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/eviction_policy.h"
-
-#include <algorithm>
-
-void LRUCache::add(const ObjectID& key, int64_t size) {
-  auto it = item_map_.find(key);
-  ARROW_CHECK(it == item_map_.end());
-  /* Note that it is important to use a list so the iterators stay valid. */
-  item_list_.emplace_front(key, size);
-  item_map_.emplace(key, item_list_.begin());
-}
-
-void LRUCache::remove(const ObjectID& key) {
-  auto it = item_map_.find(key);
-  ARROW_CHECK(it != item_map_.end());
-  item_list_.erase(it->second);
-  item_map_.erase(it);
-}
-
-int64_t LRUCache::choose_objects_to_evict(
-    int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict) {
-  int64_t bytes_evicted = 0;
-  auto it = item_list_.end();
-  while (bytes_evicted < num_bytes_required && it != item_list_.begin()) {
-    it--;
-    objects_to_evict->push_back(it->first);
-    bytes_evicted += it->second;
-  }
-  return bytes_evicted;
-}
-
-EvictionPolicy::EvictionPolicy(PlasmaStoreInfo* store_info)
-    : memory_used_(0), store_info_(store_info) {}
-
-int64_t EvictionPolicy::choose_objects_to_evict(
-    int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict) {
-  int64_t bytes_evicted =
-      cache_.choose_objects_to_evict(num_bytes_required, objects_to_evict);
-  /* Update the LRU cache. */
-  for (auto& object_id : *objects_to_evict) {
-    cache_.remove(object_id);
-  }
-  /* Update the number of bytes used. */
-  memory_used_ -= bytes_evicted;
-  return bytes_evicted;
-}
-
-void EvictionPolicy::object_created(const ObjectID& object_id) {
-  auto entry = store_info_->objects[object_id].get();
-  cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
-}
-
-bool EvictionPolicy::require_space(
-    int64_t size, std::vector<ObjectID>* objects_to_evict) {
-  /* Check if there is enough space to create the object. */
-  int64_t required_space = memory_used_ + size - store_info_->memory_capacity;
-  int64_t num_bytes_evicted;
-  if (required_space > 0) {
-    /* Try to free up at least as much space as we need right now but ideally
-     * up to 20% of the total capacity. */
-    int64_t space_to_free = std::max(size, store_info_->memory_capacity / 5);
-    ARROW_LOG(DEBUG) << "not enough space to create this object, so evicting objects";
-    /* Choose some objects to evict, and update the return pointers. */
-    num_bytes_evicted = choose_objects_to_evict(space_to_free, objects_to_evict);
-    ARROW_LOG(INFO) << "There is not enough space to create this object, so evicting "
-                    << objects_to_evict->size() << " objects to free up "
-                    << num_bytes_evicted << " bytes.";
-  } else {
-    num_bytes_evicted = 0;
-  }
-  if (num_bytes_evicted >= required_space) {
-    /* We only increment the space used if there is enough space to create the
-     * object. */
-    memory_used_ += size;
-  }
-  return num_bytes_evicted >= required_space;
-}
-
-void EvictionPolicy::begin_object_access(
-    const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict) {
-  /* If the object is in the LRU cache, remove it. */
-  cache_.remove(object_id);
-}
-
-void EvictionPolicy::end_object_access(
-    const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict) {
-  auto entry = store_info_->objects[object_id].get();
-  /* Add the object to the LRU cache.*/
-  cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/eviction_policy.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h
deleted file mode 100644
index 3815fc6..0000000
--- a/cpp/src/plasma/eviction_policy.h
+++ /dev/null
@@ -1,134 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_EVICTION_POLICY_H
-#define PLASMA_EVICTION_POLICY_H
-
-#include <list>
-#include <unordered_map>
-#include <utility>
-#include <vector>
-
-#include "plasma/common.h"
-#include "plasma/plasma.h"
-
-// ==== The eviction policy ====
-//
-// This file contains declaration for all functions and data structures that
-// need to be provided if you want to implement a new eviction algorithm for the
-// Plasma store.
-
-class LRUCache {
- public:
-  LRUCache() {}
-
-  void add(const ObjectID& key, int64_t size);
-
-  void remove(const ObjectID& key);
-
-  int64_t choose_objects_to_evict(
-      int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict);
-
- private:
-  /// A doubly-linked list containing the items in the cache and
-  /// their sizes in LRU order.
-  typedef std::list<std::pair<ObjectID, int64_t>> ItemList;
-  ItemList item_list_;
-  /// A hash table mapping the object ID of an object in the cache to its
-  /// location in the doubly linked list item_list_.
-  std::unordered_map<ObjectID, ItemList::iterator, UniqueIDHasher> item_map_;
-};
-
-/// The eviction policy.
-class EvictionPolicy {
- public:
-  /// Construct an eviction policy.
-  ///
-  /// @param store_info Information about the Plasma store that is exposed
-  ///        to the eviction policy.
-  explicit EvictionPolicy(PlasmaStoreInfo* store_info);
-
-  /// This method will be called whenever an object is first created in order to
-  /// add it to the LRU cache. This is done so that the first time, the Plasma
-  /// store calls begin_object_access, we can remove the object from the LRU
-  /// cache.
-  ///
-  /// @param object_id The object ID of the object that was created.
-  /// @return Void.
-  void object_created(const ObjectID& object_id);
-
-  /// This method will be called when the Plasma store needs more space, perhaps
-  /// to create a new object. If the required amount of space cannot be freed up,
-  /// then a fatal error will be thrown. When this method is called, the eviction
-  /// policy will assume that the objects chosen to be evicted will in fact be
-  /// evicted from the Plasma store by the caller.
-  ///
-  /// @param size The size in bytes of the new object, including both data and
-  ///        metadata.
-  /// @param objects_to_evict The object IDs that were chosen for eviction will
-  ///        be stored into this vector.
-  /// @return True if enough space can be freed and false otherwise.
-  bool require_space(int64_t size, std::vector<ObjectID>* objects_to_evict);
-
-  /// This method will be called whenever an unused object in the Plasma store
-  /// starts to be used. When this method is called, the eviction policy will
-  /// assume that the objects chosen to be evicted will in fact be evicted from
-  /// the Plasma store by the caller.
-  ///
-  /// @param object_id The ID of the object that is now being used.
-  /// @param objects_to_evict The object IDs that were chosen for eviction will
-  ///        be stored into this vector.
-  /// @return Void.
-  void begin_object_access(
-      const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict);
-
-  /// This method will be called whenever an object in the Plasma store that was
-  /// being used is no longer being used. When this method is called, the
-  /// eviction policy will assume that the objects chosen to be evicted will in
-  /// fact be evicted from the Plasma store by the caller.
-  ///
-  /// @param object_id The ID of the object that is no longer being used.
-  /// @param objects_to_evict The object IDs that were chosen for eviction will
-  ///        be stored into this vector.
-  /// @return Void.
-  void end_object_access(
-      const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict);
-
-  /// Choose some objects to evict from the Plasma store. When this method is
-  /// called, the eviction policy will assume that the objects chosen to be
-  /// evicted will in fact be evicted from the Plasma store by the caller.
-  ///
-  /// @note This method is not part of the API. It is exposed in the header file
-  /// only for testing.
-  ///
-  /// @param num_bytes_required The number of bytes of space to try to free up.
-  /// @param objects_to_evict The object IDs that were chosen for eviction will
-  ///        be stored into this vector.
-  /// @return The total number of bytes of space chosen to be evicted.
-  int64_t choose_objects_to_evict(
-      int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict);
-
- private:
-  /// The amount of memory (in bytes) currently being used.
-  int64_t memory_used_;
-  /// Pointer to the plasma store info.
-  PlasmaStoreInfo* store_info_;
-  /// Datastructure for the LRU cache.
-  LRUCache cache_;
-};
-
-#endif  // PLASMA_EVICTION_POLICY_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/extension.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/extension.cc b/cpp/src/plasma/extension.cc
deleted file mode 100644
index 5d61e33..0000000
--- a/cpp/src/plasma/extension.cc
+++ /dev/null
@@ -1,456 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/extension.h"
-
-#include <algorithm>
-#include <vector>
-
-#include "plasma/client.h"
-#include "plasma/common.h"
-#include "plasma/io.h"
-#include "plasma/protocol.h"
-
-PyObject* PlasmaOutOfMemoryError;
-PyObject* PlasmaObjectExistsError;
-
-PyObject* PyPlasma_connect(PyObject* self, PyObject* args) {
-  const char* store_socket_name;
-  const char* manager_socket_name;
-  int release_delay;
-  if (!PyArg_ParseTuple(
-          args, "ssi", &store_socket_name, &manager_socket_name, &release_delay)) {
-    return NULL;
-  }
-  PlasmaClient* client = new PlasmaClient();
-  ARROW_CHECK_OK(client->Connect(store_socket_name, manager_socket_name, release_delay));
-
-  return PyCapsule_New(client, "plasma", NULL);
-}
-
-PyObject* PyPlasma_disconnect(PyObject* self, PyObject* args) {
-  PyObject* client_capsule;
-  if (!PyArg_ParseTuple(args, "O", &client_capsule)) { return NULL; }
-  PlasmaClient* client;
-  ARROW_CHECK(PyObjectToPlasmaClient(client_capsule, &client));
-  ARROW_CHECK_OK(client->Disconnect());
-  /* We use the context of the connection capsule to indicate if the connection
-   * is still active (if the context is NULL) or if it is closed (if the context
-   * is (void*) 0x1). This is neccessary because the primary pointer of the
-   * capsule cannot be NULL. */
-  PyCapsule_SetContext(client_capsule, reinterpret_cast<void*>(0x1));
-  Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_create(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  Py_ssize_t size;
-  PyObject* metadata;
-  if (!PyArg_ParseTuple(args, "O&O&nO", PyObjectToPlasmaClient, &client,
-          PyStringToUniqueID, &object_id, &size, &metadata)) {
-    return NULL;
-  }
-  if (!PyByteArray_Check(metadata)) {
-    PyErr_SetString(PyExc_TypeError, "metadata must be a bytearray");
-    return NULL;
-  }
-  uint8_t* data;
-  Status s = client->Create(object_id, size,
-      reinterpret_cast<uint8_t*>(PyByteArray_AsString(metadata)),
-      PyByteArray_Size(metadata), &data);
-  if (s.IsPlasmaObjectExists()) {
-    PyErr_SetString(PlasmaObjectExistsError,
-        "An object with this ID already exists in the plasma "
-        "store.");
-    return NULL;
-  }
-  if (s.IsPlasmaStoreFull()) {
-    PyErr_SetString(PlasmaOutOfMemoryError,
-        "The plasma store ran out of memory and could not create "
-        "this object.");
-    return NULL;
-  }
-  ARROW_CHECK(s.ok());
-
-#if PY_MAJOR_VERSION >= 3
-  return PyMemoryView_FromMemory(reinterpret_cast<char*>(data), size, PyBUF_WRITE);
-#else
-  return PyBuffer_FromReadWriteMemory(reinterpret_cast<void*>(data), size);
-#endif
-}
-
-PyObject* PyPlasma_hash(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
-          &object_id)) {
-    return NULL;
-  }
-  unsigned char digest[kDigestSize];
-  bool success = plasma_compute_object_hash(client, object_id, digest);
-  if (success) {
-    PyObject* digest_string =
-        PyBytes_FromStringAndSize(reinterpret_cast<char*>(digest), kDigestSize);
-    return digest_string;
-  } else {
-    Py_RETURN_NONE;
-  }
-}
-
-PyObject* PyPlasma_seal(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
-          &object_id)) {
-    return NULL;
-  }
-  ARROW_CHECK_OK(client->Seal(object_id));
-  Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_release(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
-          &object_id)) {
-    return NULL;
-  }
-  ARROW_CHECK_OK(client->Release(object_id));
-  Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_get(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  PyObject* object_id_list;
-  Py_ssize_t timeout_ms;
-  if (!PyArg_ParseTuple(
-          args, "O&On", PyObjectToPlasmaClient, &client, &object_id_list, &timeout_ms)) {
-    return NULL;
-  }
-
-  Py_ssize_t num_object_ids = PyList_Size(object_id_list);
-  std::vector<ObjectID> object_ids(num_object_ids);
-  std::vector<ObjectBuffer> object_buffers(num_object_ids);
-
-  for (int i = 0; i < num_object_ids; ++i) {
-    PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]);
-  }
-
-  Py_BEGIN_ALLOW_THREADS;
-  ARROW_CHECK_OK(
-      client->Get(object_ids.data(), num_object_ids, timeout_ms, object_buffers.data()));
-  Py_END_ALLOW_THREADS;
-
-  PyObject* returns = PyList_New(num_object_ids);
-  for (int i = 0; i < num_object_ids; ++i) {
-    if (object_buffers[i].data_size != -1) {
-      /* The object was retrieved, so return the object. */
-      PyObject* t = PyTuple_New(2);
-      Py_ssize_t data_size = static_cast<Py_ssize_t>(object_buffers[i].data_size);
-      Py_ssize_t metadata_size = static_cast<Py_ssize_t>(object_buffers[i].metadata_size);
-#if PY_MAJOR_VERSION >= 3
-      char* data = reinterpret_cast<char*>(object_buffers[i].data);
-      char* metadata = reinterpret_cast<char*>(object_buffers[i].metadata);
-      PyTuple_SET_ITEM(t, 0, PyMemoryView_FromMemory(data, data_size, PyBUF_READ));
-      PyTuple_SET_ITEM(
-          t, 1, PyMemoryView_FromMemory(metadata, metadata_size, PyBUF_READ));
-#else
-      void* data = reinterpret_cast<void*>(object_buffers[i].data);
-      void* metadata = reinterpret_cast<void*>(object_buffers[i].metadata);
-      PyTuple_SET_ITEM(t, 0, PyBuffer_FromMemory(data, data_size));
-      PyTuple_SET_ITEM(t, 1, PyBuffer_FromMemory(metadata, metadata_size));
-#endif
-      ARROW_CHECK(PyList_SetItem(returns, i, t) == 0);
-    } else {
-      /* The object was not retrieved, so just add None to the list of return
-       * values. */
-      Py_INCREF(Py_None);
-      ARROW_CHECK(PyList_SetItem(returns, i, Py_None) == 0);
-    }
-  }
-  return returns;
-}
-
-PyObject* PyPlasma_contains(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
-          &object_id)) {
-    return NULL;
-  }
-  bool has_object;
-  ARROW_CHECK_OK(client->Contains(object_id, &has_object));
-
-  if (has_object) {
-    Py_RETURN_TRUE;
-  } else {
-    Py_RETURN_FALSE;
-  }
-}
-
-PyObject* PyPlasma_fetch(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  PyObject* object_id_list;
-  if (!PyArg_ParseTuple(args, "O&O", PyObjectToPlasmaClient, &client, &object_id_list)) {
-    return NULL;
-  }
-  if (client->get_manager_fd() == -1) {
-    PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
-    return NULL;
-  }
-  Py_ssize_t n = PyList_Size(object_id_list);
-  ObjectID* object_ids = new ObjectID[n];
-  for (int i = 0; i < n; ++i) {
-    PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]);
-  }
-  ARROW_CHECK_OK(client->Fetch(static_cast<int>(n), object_ids));
-  delete[] object_ids;
-  Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_wait(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  PyObject* object_id_list;
-  Py_ssize_t timeout;
-  int num_returns;
-  if (!PyArg_ParseTuple(args, "O&Oni", PyObjectToPlasmaClient, &client, &object_id_list,
-          &timeout, &num_returns)) {
-    return NULL;
-  }
-  Py_ssize_t n = PyList_Size(object_id_list);
-
-  if (client->get_manager_fd() == -1) {
-    PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
-    return NULL;
-  }
-  if (num_returns < 0) {
-    PyErr_SetString(
-        PyExc_RuntimeError, "The argument num_returns cannot be less than zero.");
-    return NULL;
-  }
-  if (num_returns > n) {
-    PyErr_SetString(PyExc_RuntimeError,
-        "The argument num_returns cannot be greater than len(object_ids)");
-    return NULL;
-  }
-  int64_t threshold = 1 << 30;
-  if (timeout > threshold) {
-    PyErr_SetString(
-        PyExc_RuntimeError, "The argument timeout cannot be greater than 2 ** 30.");
-    return NULL;
-  }
-
-  std::vector<ObjectRequest> object_requests(n);
-  for (int i = 0; i < n; ++i) {
-    ARROW_CHECK(PyStringToUniqueID(PyList_GetItem(object_id_list, i),
-                    &object_requests[i].object_id) == 1);
-    object_requests[i].type = PLASMA_QUERY_ANYWHERE;
-  }
-  /* Drop the global interpreter lock while we are waiting, so other threads can
-   * run. */
-  int num_return_objects;
-  Py_BEGIN_ALLOW_THREADS;
-  ARROW_CHECK_OK(
-      client->Wait(n, object_requests.data(), num_returns, timeout, &num_return_objects));
-  Py_END_ALLOW_THREADS;
-
-  int num_to_return = std::min(num_return_objects, num_returns);
-  PyObject* ready_ids = PyList_New(num_to_return);
-  PyObject* waiting_ids = PySet_New(object_id_list);
-  int num_returned = 0;
-  for (int i = 0; i < n; ++i) {
-    if (num_returned == num_to_return) { break; }
-    if (object_requests[i].status == ObjectStatus_Local ||
-        object_requests[i].status == ObjectStatus_Remote) {
-      PyObject* ready = PyBytes_FromStringAndSize(
-          reinterpret_cast<char*>(&object_requests[i].object_id),
-          sizeof(object_requests[i].object_id));
-      PyList_SetItem(ready_ids, num_returned, ready);
-      PySet_Discard(waiting_ids, ready);
-      num_returned += 1;
-    } else {
-      ARROW_CHECK(object_requests[i].status == ObjectStatus_Nonexistent);
-    }
-  }
-  ARROW_CHECK(num_returned == num_to_return);
-  /* Return both the ready IDs and the remaining IDs. */
-  PyObject* t = PyTuple_New(2);
-  PyTuple_SetItem(t, 0, ready_ids);
-  PyTuple_SetItem(t, 1, waiting_ids);
-  return t;
-}
-
-PyObject* PyPlasma_evict(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  Py_ssize_t num_bytes;
-  if (!PyArg_ParseTuple(args, "O&n", PyObjectToPlasmaClient, &client, &num_bytes)) {
-    return NULL;
-  }
-  int64_t evicted_bytes;
-  ARROW_CHECK_OK(client->Evict(static_cast<int64_t>(num_bytes), evicted_bytes));
-  return PyLong_FromSsize_t(static_cast<Py_ssize_t>(evicted_bytes));
-}
-
-PyObject* PyPlasma_delete(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
-          &object_id)) {
-    return NULL;
-  }
-  ARROW_CHECK_OK(client->Delete(object_id));
-  Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_transfer(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  const char* addr;
-  int port;
-  if (!PyArg_ParseTuple(args, "O&O&si", PyObjectToPlasmaClient, &client,
-          PyStringToUniqueID, &object_id, &addr, &port)) {
-    return NULL;
-  }
-
-  if (client->get_manager_fd() == -1) {
-    PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
-    return NULL;
-  }
-
-  ARROW_CHECK_OK(client->Transfer(addr, port, object_id));
-  Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_subscribe(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  if (!PyArg_ParseTuple(args, "O&", PyObjectToPlasmaClient, &client)) { return NULL; }
-
-  int sock;
-  ARROW_CHECK_OK(client->Subscribe(&sock));
-  return PyLong_FromLong(sock);
-}
-
-PyObject* PyPlasma_receive_notification(PyObject* self, PyObject* args) {
-  int plasma_sock;
-
-  if (!PyArg_ParseTuple(args, "i", &plasma_sock)) { return NULL; }
-  /* Receive object notification from the plasma connection socket. If the
-   * object was added, return a tuple of its fields: ObjectID, data_size,
-   * metadata_size. If the object was deleted, data_size and metadata_size will
-   * be set to -1. */
-  uint8_t* notification = read_message_async(plasma_sock);
-  if (notification == NULL) {
-    PyErr_SetString(
-        PyExc_RuntimeError, "Failed to read object notification from Plasma socket");
-    return NULL;
-  }
-  auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
-  /* Construct a tuple from object_info and return. */
-  PyObject* t = PyTuple_New(3);
-  PyTuple_SetItem(t, 0, PyBytes_FromStringAndSize(object_info->object_id()->data(),
-                            object_info->object_id()->size()));
-  if (object_info->is_deletion()) {
-    PyTuple_SetItem(t, 1, PyLong_FromLong(-1));
-    PyTuple_SetItem(t, 2, PyLong_FromLong(-1));
-  } else {
-    PyTuple_SetItem(t, 1, PyLong_FromLong(object_info->data_size()));
-    PyTuple_SetItem(t, 2, PyLong_FromLong(object_info->metadata_size()));
-  }
-
-  delete[] notification;
-  return t;
-}
-
-static PyMethodDef plasma_methods[] = {
-    {"connect", PyPlasma_connect, METH_VARARGS, "Connect to plasma."},
-    {"disconnect", PyPlasma_disconnect, METH_VARARGS, "Disconnect from plasma."},
-    {"create", PyPlasma_create, METH_VARARGS, "Create a new plasma object."},
-    {"hash", PyPlasma_hash, METH_VARARGS, "Compute the hash of a plasma object."},
-    {"seal", PyPlasma_seal, METH_VARARGS, "Seal a plasma object."},
-    {"get", PyPlasma_get, METH_VARARGS, "Get a plasma object."},
-    {"contains", PyPlasma_contains, METH_VARARGS,
-        "Does the plasma store contain this plasma object?"},
-    {"fetch", PyPlasma_fetch, METH_VARARGS,
-        "Fetch the object from another plasma manager instance."},
-    {"wait", PyPlasma_wait, METH_VARARGS,
-        "Wait until num_returns objects in object_ids are ready."},
-    {"evict", PyPlasma_evict, METH_VARARGS,
-        "Evict some objects until we recover some number of bytes."},
-    {"release", PyPlasma_release, METH_VARARGS, "Release the plasma object."},
-    {"delete", PyPlasma_delete, METH_VARARGS, "Delete a plasma object."},
-    {"transfer", PyPlasma_transfer, METH_VARARGS,
-        "Transfer object to another plasma manager."},
-    {"subscribe", PyPlasma_subscribe, METH_VARARGS,
-        "Subscribe to the plasma notification socket."},
-    {"receive_notification", PyPlasma_receive_notification, METH_VARARGS,
-        "Receive next notification from plasma notification socket."},
-    {NULL} /* Sentinel */
-};
-
-#if PY_MAJOR_VERSION >= 3
-static struct PyModuleDef moduledef = {
-    PyModuleDef_HEAD_INIT, "libplasma",    /* m_name */
-    "A Python client library for plasma.", /* m_doc */
-    0,                                     /* m_size */
-    plasma_methods,                        /* m_methods */
-    NULL,                                  /* m_reload */
-    NULL,                                  /* m_traverse */
-    NULL,                                  /* m_clear */
-    NULL,                                  /* m_free */
-};
-#endif
-
-#if PY_MAJOR_VERSION >= 3
-#define INITERROR return NULL
-#else
-#define INITERROR return
-#endif
-
-#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */
-#define PyMODINIT_FUNC void
-#endif
-
-#if PY_MAJOR_VERSION >= 3
-#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void)
-#else
-#define MOD_INIT(name) PyMODINIT_FUNC init##name(void)
-#endif
-
-MOD_INIT(libplasma) {
-#if PY_MAJOR_VERSION >= 3
-  PyObject* m = PyModule_Create(&moduledef);
-#else
-  PyObject* m =
-      Py_InitModule3("libplasma", plasma_methods, "A Python client library for plasma.");
-#endif
-
-  /* Create a custom exception for when an object ID is reused. */
-  char plasma_object_exists_error[] = "plasma_object_exists.error";
-  PlasmaObjectExistsError = PyErr_NewException(plasma_object_exists_error, NULL, NULL);
-  Py_INCREF(PlasmaObjectExistsError);
-  PyModule_AddObject(m, "plasma_object_exists_error", PlasmaObjectExistsError);
-  /* Create a custom exception for when the plasma store is out of memory. */
-  char plasma_out_of_memory_error[] = "plasma_out_of_memory.error";
-  PlasmaOutOfMemoryError = PyErr_NewException(plasma_out_of_memory_error, NULL, NULL);
-  Py_INCREF(PlasmaOutOfMemoryError);
-  PyModule_AddObject(m, "plasma_out_of_memory_error", PlasmaOutOfMemoryError);
-
-#if PY_MAJOR_VERSION >= 3
-  return m;
-#endif
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/extension.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/extension.h b/cpp/src/plasma/extension.h
deleted file mode 100644
index cee30ab..0000000
--- a/cpp/src/plasma/extension.h
+++ /dev/null
@@ -1,50 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_EXTENSION_H
-#define PLASMA_EXTENSION_H
-
-#undef _XOPEN_SOURCE
-#undef _POSIX_C_SOURCE
-#include <Python.h>
-
-#include "bytesobject.h"  // NOLINT
-
-#include "plasma/client.h"
-#include "plasma/common.h"
-
-static int PyObjectToPlasmaClient(PyObject* object, PlasmaClient** client) {
-  if (PyCapsule_IsValid(object, "plasma")) {
-    *client = reinterpret_cast<PlasmaClient*>(PyCapsule_GetPointer(object, "plasma"));
-    return 1;
-  } else {
-    PyErr_SetString(PyExc_TypeError, "must be a 'plasma' capsule");
-    return 0;
-  }
-}
-
-int PyStringToUniqueID(PyObject* object, ObjectID* object_id) {
-  if (PyBytes_Check(object)) {
-    memcpy(object_id, PyBytes_AsString(object), sizeof(ObjectID));
-    return 1;
-  } else {
-    PyErr_SetString(PyExc_TypeError, "must be a 20 character string");
-    return 0;
-  }
-}
-
-#endif  // PLASMA_EXTENSION_H


Mime
View raw message