arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [1/2] arrow git commit: ARROW-1149: [Plasma] Create Cython client library for Plasma
Date Mon, 24 Jul 2017 16:12:48 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 05f7058ce -> a94f4716b


http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/plasma.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
new file mode 100644
index 0000000..bb17685
--- /dev/null
+++ b/python/pyarrow/plasma.pyx
@@ -0,0 +1,560 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from libcpp cimport bool as c_bool, nullptr
+from libcpp.memory cimport shared_ptr, unique_ptr, make_shared
+from libcpp.string cimport string as c_string
+from libcpp.vector cimport vector as c_vector
+from libc.stdint cimport int64_t, uint8_t, uintptr_t
+from cpython.pycapsule cimport *
+
+from pyarrow.lib cimport Buffer, NativeFile, check_status
+from pyarrow.includes.libarrow cimport (CMutableBuffer, CBuffer,
+                                        CFixedSizeBufferWriter, CStatus)
+
+
+PLASMA_WAIT_TIMEOUT = 2 ** 30
+
+
+cdef extern from "plasma/common.h" nogil:
+
+    cdef cppclass CUniqueID" plasma::UniqueID":
+
+        @staticmethod
+        CUniqueID from_binary(const c_string& binary)
+
+        c_bool operator==(const CUniqueID& rhs) const
+
+        c_string hex() const
+
+        c_string binary() const
+
+    cdef struct CObjectRequest" plasma::ObjectRequest":
+        CUniqueID object_id
+        int type
+        int status
+
+
+cdef extern from "plasma/common.h":
+    cdef int64_t kDigestSize" plasma::kDigestSize"
+
+    cdef enum ObjectRequestType:
+        PLASMA_QUERY_LOCAL"plasma::PLASMA_QUERY_LOCAL",
+        PLASMA_QUERY_ANYWHERE"plasma::PLASMA_QUERY_ANYWHERE"
+
+    cdef int ObjectStatusLocal"plasma::ObjectStatusLocal";
+    cdef int ObjectStatusRemote"plasma::ObjectStatusRemote";
+
+cdef extern from "plasma/client.h" nogil:
+
+    cdef cppclass CPlasmaClient" plasma::PlasmaClient":
+
+        CPlasmaClient()
+
+        CStatus Connect(const c_string& store_socket_name,
+                        const c_string& manager_socket_name, int release_delay)
+
+        CStatus Create(const CUniqueID& object_id, int64_t data_size,
+                       const uint8_t* metadata, int64_t metadata_size,
+                       uint8_t** data)
+
+        CStatus Get(const CUniqueID* object_ids, int64_t num_objects,
+                    int64_t timeout_ms, CObjectBuffer* object_buffers)
+
+        CStatus Seal(const CUniqueID& object_id)
+
+        CStatus Evict(int64_t num_bytes, int64_t& num_bytes_evicted)
+
+        CStatus Hash(const CUniqueID& object_id, uint8_t* digest)
+
+        CStatus Release(const CUniqueID& object_id)
+
+        CStatus Contains(const CUniqueID& object_id, c_bool* has_object)
+
+        CStatus Subscribe(int* fd)
+
+        CStatus GetNotification(int fd, CUniqueID* object_id,
+                                int64_t* data_size, int64_t* metadata_size)
+
+        CStatus Disconnect()
+
+        CStatus Fetch(int num_object_ids, const CUniqueID* object_ids)
+
+        CStatus Wait(int64_t num_object_requests, CObjectRequest* object_requests,
+           int num_ready_objects, int64_t timeout_ms, int* num_objects_ready);
+
+        CStatus Transfer(const char* addr, int port, const CUniqueID& object_id)
+
+
+cdef extern from "plasma/client.h" nogil:
+
+    cdef struct CObjectBuffer" plasma::ObjectBuffer":
+        int64_t data_size
+        uint8_t* data
+        int64_t metadata_size
+        uint8_t* metadata
+
+
+def make_object_id(object_id):
+    return ObjectID(object_id)
+
+
+cdef class ObjectID:
+    """
+    An ObjectID represents a string of bytes used to identify Plasma objects.
+    """
+
+    cdef:
+        CUniqueID data
+
+    def __cinit__(self, object_id):
+        self.data = CUniqueID.from_binary(object_id)
+
+    def __richcmp__(ObjectID self, ObjectID object_id, operation):
+        if operation != 2:
+            raise ValueError("operation != 2 (only equality is supported)")
+        return self.data == object_id.data
+
+    def __hash__(self):
+        return hash(self.data.binary())
+
+    def __repr__(self):
+        return "ObjectID(" + self.data.hex().decode() + ")"
+
+    def __reduce__(self):
+        return (make_object_id, (self.data.binary(),))
+
+    def binary(self):
+        """
+        Return the binary representation of this ObjectID.
+
+        Returns
+        -------
+        bytes
+            Binary representation of the ObjectID.
+        """
+        return self.data.binary()
+
+
+cdef class PlasmaBuffer(Buffer):
+    """
+    This is the type returned by calls to get with a PlasmaClient.
+
+    We define our own class instead of directly returning a buffer object so
+    that we can add a custom destructor which notifies Plasma that the object
+    is no longer being used, so the memory in the Plasma store backing the
+    object can potentially be freed.
+
+    Attributes
+    ----------
+    object_id : ObjectID
+        The ID of the object in the buffer.
+    client : PlasmaClient
+        The PlasmaClient that we use to communicate with the store and manager.
+    """
+
+    cdef:
+        ObjectID object_id
+        PlasmaClient client
+
+    def __cinit__(self, ObjectID object_id, PlasmaClient client):
+        """
+        Initialize a PlasmaBuffer.
+        """
+        self.object_id = object_id
+        self.client = client
+
+    def __dealloc__(self):
+        """
+        Notify Plasma that the object is no longer needed.
+
+        If the plasma client has been shut down, then don't do anything.
+        """
+        self.client.release(self.object_id)
+
+
+cdef class PlasmaClient:
+    """
+    The PlasmaClient is used to interface with a plasma store and manager.
+
+    The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a
+    buffer, and get a buffer. Buffers are referred to by object IDs, which are
+    strings.
+    """
+
+    cdef:
+        shared_ptr[CPlasmaClient] client
+        int notification_fd
+        c_string store_socket_name
+        c_string manager_socket_name
+
+    def __cinit__(self, store_socket_name, manager_socket_name, int release_delay):
+        """
+        Create a new PlasmaClient that is connected to a plasma store
+        and optionally a plasma manager.
+
+        Parameters
+        ----------
+        store_socket_name : str
+            Name of the socket the plasma store is listening at.
+        manager_socket_name : str
+            Name of the socket the plasma manager is listening at.
+        release_delay : int
+            The maximum number of objects that the client will keep and
+            delay releasing (for caching reasons).
+        """
+        self.client.reset(new CPlasmaClient())
+        self.notification_fd = -1
+        self.store_socket_name = store_socket_name.encode()
+        self.manager_socket_name = manager_socket_name.encode()
+        with nogil:
+            check_status(self.client.get().Connect(self.store_socket_name,
+                         self.manager_socket_name, release_delay))
+
+    cdef _get_object_buffers(self, object_ids, int64_t timeout_ms,
+                             c_vector[CObjectBuffer]* result):
+        cdef c_vector[CUniqueID] ids
+        cdef ObjectID object_id
+        for object_id in object_ids:
+            ids.push_back(object_id.data)
+        result[0].resize(ids.size())
+        with nogil:
+            check_status(self.client.get().Get(ids.data(), ids.size(),
+                         timeout_ms, result[0].data()))
+
+    cdef _make_plasma_buffer(self, ObjectID object_id, uint8_t* data,
+                             int64_t size):
+        cdef shared_ptr[CBuffer] buffer
+        buffer.reset(new CBuffer(data, size))
+        result = PlasmaBuffer(object_id, self)
+        result.init(buffer)
+        return result
+
+    cdef _make_mutable_plasma_buffer(self, ObjectID object_id, uint8_t* data,
+                                     int64_t size):
+        cdef shared_ptr[CBuffer] buffer
+        buffer.reset(new CMutableBuffer(data, size))
+        result = PlasmaBuffer(object_id, self)
+        result.init(buffer)
+        return result
+
+    @property
+    def store_socket_name(self):
+        return self.store_socket_name.decode()
+
+    @property
+    def manager_socket_name(self):
+        return self.manager_socket_name.decode()
+
+    def create(self, ObjectID object_id, int64_t data_size, c_string metadata=b""):
+        """
+        Create a new buffer in the PlasmaStore for a particular object ID.
+
+        The returned buffer is mutable until seal is called.
+
+        Parameters
+        ----------
+        object_id : ObjectID
+            The object ID used to identify an object.
+        size : int
+            The size in bytes of the created buffer.
+        metadata : bytes
+            An optional string of bytes encoding whatever metadata the user
+            wishes to encode.
+
+        Raises
+        ------
+        PlasmaObjectExists
+            This exception is raised if the object could not be created because
+            there already is an object with the same ID in the plasma store.
+
+        PlasmaStoreFull: This exception is raised if the object could
+                not be created because the plasma store is unable to evict
+                enough objects to create room for it.
+        """
+        cdef uint8_t* data
+        with nogil:
+            check_status(self.client.get().Create(object_id.data, data_size,
+                                                  <uint8_t*>(metadata.data()),
+                                                  metadata.size(), &data))
+        return self._make_mutable_plasma_buffer(object_id, data, data_size)
+
+    def get(self, object_ids, timeout_ms=-1):
+        """
+        Returns data buffer from the PlasmaStore based on object ID.
+
+        If the object has not been sealed yet, this call will block. The
+        retrieved buffer is immutable.
+
+        Parameters
+        ----------
+        object_ids : list
+            A list of ObjectIDs used to identify some objects.
+        timeout_ms :int
+            The number of milliseconds that the get call should block before
+            timing out and returning. Pass -1 if the call should block and 0
+            if the call should return immediately.
+
+        Returns
+        -------
+        list
+            List of PlasmaBuffers for the data associated with the object_ids
+            and None if the object was not available.
+        """
+        cdef c_vector[CObjectBuffer] object_buffers
+        self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
+        result = []
+        for i in range(object_buffers.size()):
+            if object_buffers[i].data_size != -1:
+                result.append(self._make_plasma_buffer(
+                                  object_ids[i], object_buffers[i].data,
+                                  object_buffers[i].data_size))
+            else:
+                result.append(None)
+        return result
+
+    def get_metadata(self, object_ids, timeout_ms=-1):
+        """
+        Returns metadata buffer from the PlasmaStore based on object ID.
+
+        If the object has not been sealed yet, this call will block. The
+        retrieved buffer is immutable.
+
+        Parameters
+        ----------
+        object_ids : list
+            A list of ObjectIDs used to identify some objects.
+        timeout_ms : int
+            The number of milliseconds that the get call should block before
+            timing out and returning. Pass -1 if the call should block and 0
+            if the call should return immediately.
+
+        Returns
+        -------
+        list
+            List of PlasmaBuffers for the metadata associated with the
+            object_ids and None if the object was not available.
+        """
+        cdef c_vector[CObjectBuffer] object_buffers
+        self._get_object_buffers(object_ids, timeout_ms, &object_buffers)
+        result = []
+        for i in range(object_buffers.size()):
+            result.append(self._make_plasma_buffer(
+                              object_ids[i], object_buffers[i].metadata,
+                              object_buffers[i].metadata_size))
+        return result
+
+    def seal(self, ObjectID object_id):
+        """
+        Seal the buffer in the PlasmaStore for a particular object ID.
+
+        Once a buffer has been sealed, the buffer is immutable and can only be
+        accessed through get.
+
+        Parameters
+        ----------
+        object_id : ObjectID
+            A string used to identify an object.
+        """
+        with nogil:
+            check_status(self.client.get().Seal(object_id.data))
+
+    def release(self, ObjectID object_id):
+        """
+        Notify Plasma that the object is no longer needed.
+
+        Parameters
+        ----------
+        object_id : ObjectID
+            A string used to identify an object.
+        """
+        with nogil:
+            check_status(self.client.get().Release(object_id.data))
+
+    def contains(self, ObjectID object_id):
+        """
+        Check if the object is present and sealed in the PlasmaStore.
+
+        Parameters
+        ----------
+        object_id : ObjectID
+            A string used to identify an object.
+        """
+        cdef c_bool is_contained
+        with nogil:
+            check_status(self.client.get().Contains(object_id.data,
+                                                    &is_contained))
+        return is_contained
+
+    def hash(self, ObjectID object_id):
+        """
+        Compute the checksum of an object in the object store.
+
+        Parameters
+        ----------
+        object_id : ObjectID
+            A string used to identify an object.
+
+        Returns
+        -------
+        bytes
+            A digest string object's hash. If the object isn't in the object
+            store, the string will have length zero.
+        """
+        cdef c_vector[uint8_t] digest = c_vector[uint8_t](kDigestSize)
+        with nogil:
+            check_status(self.client.get().Hash(object_id.data,
+                                                digest.data()))
+        return bytes(digest[:])
+
+    def evict(self, int64_t num_bytes):
+        """
+        Evict some objects until to recover some bytes.
+
+        Recover at least num_bytes bytes if possible.
+
+        Parameters
+        ----------
+        num_bytes : int
+            The number of bytes to attempt to recover.
+        """
+        cdef int64_t num_bytes_evicted = -1
+        with nogil:
+            check_status(self.client.get().Evict(num_bytes, num_bytes_evicted))
+        return num_bytes_evicted
+
+    def transfer(self, address, int port, ObjectID object_id):
+        """
+        Transfer local object with id object_id to another plasma instance
+
+        Parameters
+        ----------
+        addr : str
+            IPv4 address of the plasma instance the object is sent to.
+        port : int
+            Port number of the plasma instance the object is sent to.
+        object_id : str
+            A string used to identify an object.
+        """
+        cdef c_string addr = address.encode()
+        with nogil:
+            check_status(self.client.get().Transfer(addr.c_str(), port, object_id.data))
+
+    def fetch(self, object_ids):
+        """
+        Fetch the objects with the given IDs from other plasma managers.
+
+        Parameters
+        ----------
+        object_ids : list
+            A list of strings used to identify the objects.
+        """
+        cdef c_vector[CUniqueID] ids
+        cdef ObjectID object_id
+        for object_id in object_ids:
+            ids.push_back(object_id.data)
+        with nogil:
+            check_status(self.client.get().Fetch(ids.size(), ids.data()))
+
+    def wait(self, object_ids, int64_t timeout=PLASMA_WAIT_TIMEOUT, int num_returns=1):
+        """
+        Wait until num_returns objects in object_ids are ready.
+        Currently, the object ID arguments to wait must be unique.
+
+        Parameters
+        ----------
+        object_ids : list
+            List of object IDs to wait for.
+        timeout :int
+            Return to the caller after timeout milliseconds.
+        num_returns : int
+            We are waiting for this number of objects to be ready.
+
+        Returns
+        -------
+        list
+            List of object IDs that are ready.
+        list
+            List of object IDs we might still wait on.
+        """
+        # Check that the object ID arguments are unique. The plasma manager
+        # currently crashes if given duplicate object IDs.
+        if len(object_ids) != len(set(object_ids)):
+            raise Exception("Wait requires a list of unique object IDs.")
+        cdef int64_t num_object_requests = len(object_ids)
+        cdef c_vector[CObjectRequest] object_requests = c_vector[CObjectRequest](num_object_requests)
+        cdef int num_objects_ready = 0
+        cdef ObjectID object_id
+        for i, object_id in enumerate(object_ids):
+            object_requests[i].object_id = object_id.data
+            object_requests[i].type = PLASMA_QUERY_ANYWHERE
+        with nogil:
+            check_status(self.client.get().Wait(num_object_requests, object_requests.data(), num_returns, timeout, &num_objects_ready))
+        cdef int num_to_return = min(num_objects_ready, num_returns);
+        ready_ids = []
+        waiting_ids = set(object_ids)
+        cdef int num_returned = 0
+        for i in range(len(object_ids)):
+            if num_returned == num_to_return:
+                break
+            if object_requests[i].status == ObjectStatusLocal or object_requests[i].status == ObjectStatusRemote:
+                ready_ids.append(ObjectID(object_requests[i].object_id.binary()))
+                waiting_ids.discard(ObjectID(object_requests[i].object_id.binary()))
+                num_returned += 1
+        return ready_ids, list(waiting_ids)
+
+    def subscribe(self):
+        """Subscribe to notifications about sealed objects."""
+        with nogil:
+            check_status(self.client.get().Subscribe(&self.notification_fd))
+
+    def get_next_notification(self):
+        """
+        Get the next notification from the notification socket.
+
+        Returns
+        -------
+        ObjectID
+            The object ID of the object that was stored.
+        int
+            The data size of the object that was stored.
+        int
+            The metadata size of the object that was stored.
+        """
+        cdef ObjectID object_id = ObjectID(20 * b"\0")
+        cdef int64_t data_size
+        cdef int64_t metadata_size
+        with nogil:
+            check_status(self.client.get().GetNotification(self.notification_fd,
+                                                           &object_id.data,
+                                                           &data_size,
+                                                           &metadata_size))
+        return object_id, data_size, metadata_size
+
+    def to_capsule(self):
+        return PyCapsule_New(<void *>self.client.get(), "plasma", NULL)
+
+    def disconnect(self):
+        """
+        Disconnect this client from the Plasma store.
+        """
+        with nogil:
+            check_status(self.client.get().Disconnect())

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/tests/conftest.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py
index 2aeeab7..21288e4 100644
--- a/python/pyarrow/tests/conftest.py
+++ b/python/pyarrow/tests/conftest.py
@@ -18,11 +18,12 @@
 from pytest import skip
 
 
-groups = ['hdfs', 'parquet', 'large_memory']
+groups = ['hdfs', 'parquet', 'plasma', 'large_memory']
 
 defaults = {
     'hdfs': False,
     'parquet': False,
+    'plasma': False,
     'large_memory': False
 }
 
@@ -32,6 +33,11 @@ try:
 except ImportError:
     pass
 
+try:
+    import pyarrow.plasma as plasma
+    defaults['plasma'] = True
+except ImportError:
+    pass
 
 def pytest_configure(config):
     pass

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/tests/test_plasma.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
new file mode 100644
index 0000000..ce684e3
--- /dev/null
+++ b/python/pyarrow/tests/test_plasma.py
@@ -0,0 +1,683 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+
+import glob
+import numpy as np
+import os
+import pytest
+import random
+import signal
+import subprocess
+import sys
+import time
+import unittest
+
+import pyarrow as pa
+import pandas as pd
+
+DEFAULT_PLASMA_STORE_MEMORY = 10 ** 9
+
+def random_name():
+    return str(random.randint(0, 99999999))
+
+
+def random_object_id():
+    import pyarrow.plasma as plasma
+    return plasma.ObjectID(np.random.bytes(20))
+
+
+def generate_metadata(length):
+    metadata = bytearray(length)
+    if length > 0:
+        metadata[0] = random.randint(0, 255)
+        metadata[-1] = random.randint(0, 255)
+        for _ in range(100):
+            metadata[random.randint(0, length - 1)] = random.randint(0, 255)
+    return metadata
+
+
+def write_to_data_buffer(buff, length):
+    array = np.frombuffer(buff, dtype="uint8")
+    if length > 0:
+        array[0] = random.randint(0, 255)
+        array[-1] = random.randint(0, 255)
+        for _ in range(100):
+            array[random.randint(0, length - 1)] = random.randint(0, 255)
+
+
+def create_object_with_id(client, object_id, data_size, metadata_size,
+                          seal=True):
+    metadata = generate_metadata(metadata_size)
+    memory_buffer = client.create(object_id, data_size, metadata)
+    write_to_data_buffer(memory_buffer, data_size)
+    if seal:
+        client.seal(object_id)
+    return memory_buffer, metadata
+
+
+def create_object(client, data_size, metadata_size, seal=True):
+    object_id = random_object_id()
+    memory_buffer, metadata = create_object_with_id(client, object_id,
+                                                    data_size, metadata_size,
+                                                    seal=seal)
+    return object_id, memory_buffer, metadata
+
+
+def assert_get_object_equal(unit_test, client1, client2, object_id,
+                            memory_buffer=None, metadata=None):
+    import pyarrow.plasma as plasma
+    client1_buff = client1.get([object_id])[0]
+    client2_buff = client2.get([object_id])[0]
+    client1_metadata = client1.get_metadata([object_id])[0]
+    client2_metadata = client2.get_metadata([object_id])[0]
+    assert len(client1_buff) == len(client2_buff)
+    assert len(client1_metadata) == len(client2_metadata)
+    # Check that the buffers from the two clients are the same.
+    assert plasma.buffers_equal(client1_buff, client2_buff)
+    # Check that the metadata buffers from the two clients are the same.
+    assert plasma.buffers_equal(client1_metadata, client2_metadata)
+    # If a reference buffer was provided, check that it is the same as well.
+    if memory_buffer is not None:
+        assert plasma.buffers_equal(memory_buffer, client1_buff)
+    # If reference metadata was provided, check that it is the same as well.
+    if metadata is not None:
+        assert plasma.buffers_equal(metadata, client1_metadata)
+
+
+def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
+                       use_valgrind=False, use_profiler=False,
+                       stdout_file=None, stderr_file=None):
+    """Start a plasma store process.
+    Args:
+        use_valgrind (bool): True if the plasma store should be started inside
+            of valgrind. If this is True, use_profiler must be False.
+        use_profiler (bool): True if the plasma store should be started inside
+            a profiler. If this is True, use_valgrind must be False.
+        stdout_file: A file handle opened for writing to redirect stdout to. If
+            no redirection should happen, then this should be None.
+        stderr_file: A file handle opened for writing to redirect stderr to. If
+            no redirection should happen, then this should be None.
+    Return:
+        A tuple of the name of the plasma store socket and the process ID of
+            the plasma store process.
+    """
+    if use_valgrind and use_profiler:
+        raise Exception("Cannot use valgrind and profiler at the same time.")
+    plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store")
+    plasma_store_name = "/tmp/plasma_store{}".format(random_name())
+    command = [plasma_store_executable,
+               "-s", plasma_store_name,
+               "-m", str(plasma_store_memory)]
+    if use_valgrind:
+        pid = subprocess.Popen(["valgrind",
+                                "--track-origins=yes",
+                                "--leak-check=full",
+                                "--show-leak-kinds=all",
+                                "--leak-check-heuristics=stdstring",
+                                "--error-exitcode=1"] + command,
+                               stdout=stdout_file, stderr=stderr_file)
+        time.sleep(1.0)
+    elif use_profiler:
+        pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command,
+                               stdout=stdout_file, stderr=stderr_file)
+        time.sleep(1.0)
+    else:
+        pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)
+        time.sleep(0.1)
+    return plasma_store_name, pid
+
+
+@pytest.mark.plasma
+class TestPlasmaClient(object):
+
+    def setup_method(self, test_method):
+        import pyarrow.plasma as plasma
+        # Start Plasma store.
+        plasma_store_name, self.p = start_plasma_store(
+            use_valgrind=os.getenv("PLASMA_VALGRIND") == "1")
+        # Connect to Plasma.
+        self.plasma_client = plasma.PlasmaClient(plasma_store_name, "", 64)
+        # For the eviction test
+        self.plasma_client2 = plasma.PlasmaClient(plasma_store_name, "", 0)
+
+    def teardown_method(self, test_method):
+        # Check that the Plasma store is still alive.
+        assert self.p.poll() == None
+        # Kill the plasma store process.
+        if os.getenv("PLASMA_VALGRIND") == "1":
+            self.p.send_signal(signal.SIGTERM)
+            self.p.wait()
+            if self.p.returncode != 0:
+                assert False
+        else:
+            self.p.kill()
+
+    def test_create(self):
+        # Create an object id string.
+        object_id = random_object_id()
+        # Create a new buffer and write to it.
+        length = 50
+        memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
+                                                                length),
+                                      dtype="uint8")
+        for i in range(length):
+            memory_buffer[i] = i % 256
+        # Seal the object.
+        self.plasma_client.seal(object_id)
+        # Get the object.
+        memory_buffer = np.frombuffer(self.plasma_client.get([object_id])[0],
+                                      dtype="uint8")
+        for i in range(length):
+            assert memory_buffer[i] == i % 256
+
+    def test_create_with_metadata(self):
+        for length in range(1000):
+            # Create an object id string.
+            object_id = random_object_id()
+            # Create a random metadata string.
+            metadata = generate_metadata(length)
+            # Create a new buffer and write to it.
+            memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
+                                                                    length,
+                                                                    metadata),
+                                          dtype="uint8")
+            for i in range(length):
+                memory_buffer[i] = i % 256
+            # Seal the object.
+            self.plasma_client.seal(object_id)
+            # Get the object.
+            memory_buffer = np.frombuffer(
+                self.plasma_client.get([object_id])[0], dtype="uint8")
+            for i in range(length):
+                assert memory_buffer[i] == i % 256
+            # Get the metadata.
+            metadata_buffer = np.frombuffer(
+                self.plasma_client.get_metadata([object_id])[0], dtype="uint8")
+            assert len(metadata) == len(metadata_buffer)
+            for i in range(len(metadata)):
+                assert metadata[i] == metadata_buffer[i]
+
+    def test_create_existing(self):
+        # This test is partially used to test the code path in which we create
+        # an object with an ID that already exists
+        length = 100
+        for _ in range(1000):
+            object_id = random_object_id()
+            self.plasma_client.create(object_id, length,
+                                      generate_metadata(length))
+            try:
+                self.plasma_client.create(object_id, length,
+                                          generate_metadata(length))
+            # TODO(pcm): Introduce a more specific error type here.
+            except pa.lib.ArrowException as e:
+                pass
+            else:
+                assert False
+
+    def test_get(self):
+        num_object_ids = 100
+        # Test timing out of get with various timeouts.
+        for timeout in [0, 10, 100, 1000]:
+            object_ids = [random_object_id() for _ in range(num_object_ids)]
+            results = self.plasma_client.get(object_ids, timeout_ms=timeout)
+            assert results == num_object_ids * [None]
+
+        data_buffers = []
+        metadata_buffers = []
+        for i in range(num_object_ids):
+            if i % 2 == 0:
+                data_buffer, metadata_buffer = create_object_with_id(
+                    self.plasma_client, object_ids[i], 2000, 2000)
+                data_buffers.append(data_buffer)
+                metadata_buffers.append(metadata_buffer)
+
+        # Test timing out from some but not all get calls with various
+        # timeouts.
+        for timeout in [0, 10, 100, 1000]:
+            data_results = self.plasma_client.get(object_ids,
+                                                  timeout_ms=timeout)
+            # metadata_results = self.plasma_client.get_metadata(
+            #     object_ids, timeout_ms=timeout)
+            for i in range(num_object_ids):
+                if i % 2 == 0:
+                    array1 = np.frombuffer(data_buffers[i // 2], dtype="uint8")
+                    array2 = np.frombuffer(data_results[i], dtype="uint8")
+                    np.testing.assert_equal(array1, array2)
+                    # TODO(rkn): We should compare the metadata as well. But
+                    # currently the types are different (e.g., memoryview
+                    # versus bytearray).
+                    # assert plasma.buffers_equal(
+                    #     metadata_buffers[i // 2], metadata_results[i])
+                else:
+                    assert results[i] is None
+
+    def test_store_arrow_objects(self):
+        import pyarrow.plasma as plasma
+        data = np.random.randn(10, 4)
+        # Write an arrow object.
+        object_id = random_object_id()
+        tensor = pa.Tensor.from_numpy(data)
+        data_size = pa.get_tensor_size(tensor)
+        buf = self.plasma_client.create(object_id, data_size)
+        stream = pa.FixedSizeBufferOutputStream(buf)
+        pa.write_tensor(tensor, stream)
+        self.plasma_client.seal(object_id)
+        # Read the arrow object.
+        [tensor] = self.plasma_client.get([object_id])
+        reader = pa.BufferReader(tensor)
+        array = pa.read_tensor(reader).to_numpy()
+        # Assert that they are equal.
+        np.testing.assert_equal(data, array)
+
+    def test_store_pandas_dataframe(self):
+        import pyarrow.plasma as plasma
+        d = {'one': pd.Series([1., 2., 3.], index=['a', 'b', 'c']),
+             'two': pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])}
+        df = pd.DataFrame(d)
+
+        # Write the DataFrame.
+        record_batch = pa.RecordBatch.from_pandas(df)
+        # Determine the size.
+        s = pa.MockOutputStream()
+        stream_writer = pa.RecordBatchStreamWriter(s, record_batch.schema)
+        stream_writer.write_batch(record_batch)
+        data_size = s.size()
+        object_id = plasma.ObjectID(np.random.bytes(20))
+
+        buf = self.plasma_client.create(object_id, data_size)
+        stream = pa.FixedSizeBufferOutputStream(buf)
+        stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
+        stream_writer.write_batch(record_batch)
+
+        self.plasma_client.seal(object_id)
+
+        # Read the DataFrame.
+        [data] = self.plasma_client.get([object_id])
+        reader = pa.RecordBatchStreamReader(pa.BufferReader(data))
+        result = reader.get_next_batch().to_pandas()
+
+        pd.util.testing.assert_frame_equal(df, result)
+
+    def test_pickle_object_ids(self):
+        # This can be used for sharing object IDs between processes.
+        import pickle
+        object_id = random_object_id()
+        data = pickle.dumps(object_id)
+        object_id2 = pickle.loads(data)
+        assert object_id == object_id2
+
+    def test_store_full(self):
+        # The store is started with 1GB, so make sure that create throws an
+        # exception when it is full.
+        def assert_create_raises_plasma_full(unit_test, size):
+            partial_size = np.random.randint(size)
+            try:
+                _, memory_buffer, _ = create_object(unit_test.plasma_client,
+                                                    partial_size,
+                                                    size - partial_size)
+            # TODO(pcm): More specific error here.
+            except pa.lib.ArrowException as e:
+                pass
+            else:
+                # For some reason the above didn't throw an exception, so fail.
+                assert False
+
+        # Create a list to keep some of the buffers in scope.
+        memory_buffers = []
+        _, memory_buffer, _ = create_object(self.plasma_client, 5 * 10 ** 8, 0)
+        memory_buffers.append(memory_buffer)
+        # Remaining space is 5 * 10 ** 8. Make sure that we can't create an
+        # object of size 5 * 10 ** 8 + 1, but we can create one of size
+        # 2 * 10 ** 8.
+        assert_create_raises_plasma_full(self, 5 * 10 ** 8 + 1)
+        _, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0)
+        del memory_buffer
+        _, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0)
+        del memory_buffer
+        assert_create_raises_plasma_full(self, 5 * 10 ** 8 + 1)
+
+        _, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0)
+        memory_buffers.append(memory_buffer)
+        # Remaining space is 3 * 10 ** 8.
+        assert_create_raises_plasma_full(self, 3 * 10 ** 8 + 1)
+
+        _, memory_buffer, _ = create_object(self.plasma_client, 10 ** 8, 0)
+        memory_buffers.append(memory_buffer)
+        # Remaining space is 2 * 10 ** 8.
+        assert_create_raises_plasma_full(self, 2 * 10 ** 8 + 1)
+
+    def test_contains(self):
+        fake_object_ids = [random_object_id() for _ in range(100)]
+        real_object_ids = [random_object_id() for _ in range(100)]
+        for object_id in real_object_ids:
+            assert self.plasma_client.contains(object_id) == False
+            self.plasma_client.create(object_id, 100)
+            self.plasma_client.seal(object_id)
+            assert self.plasma_client.contains(object_id)
+        for object_id in fake_object_ids:
+            assert not self.plasma_client.contains(object_id)
+        for object_id in real_object_ids:
+            assert self.plasma_client.contains(object_id)
+
+    def test_hash(self):
+        # Check the hash of an object that doesn't exist.
+        object_id1 = random_object_id()
+        try:
+            self.plasma_client.hash(object_id1)
+            # TODO(pcm): Introduce a more specific error type here
+        except pa.lib.ArrowException as e:
+            pass
+        else:
+            assert False
+
+        length = 1000
+        # Create a random object, and check that the hash function always
+        # returns the same value.
+        metadata = generate_metadata(length)
+        memory_buffer = np.frombuffer(self.plasma_client.create(object_id1,
+                                                                length,
+                                                                metadata),
+                                      dtype="uint8")
+        for i in range(length):
+            memory_buffer[i] = i % 256
+        self.plasma_client.seal(object_id1)
+        assert (self.plasma_client.hash(object_id1) ==
+                self.plasma_client.hash(object_id1))
+
+        # Create a second object with the same value as the first, and check
+        # that their hashes are equal.
+        object_id2 = random_object_id()
+        memory_buffer = np.frombuffer(self.plasma_client.create(object_id2,
+                                                                length,
+                                                                metadata),
+                                      dtype="uint8")
+        for i in range(length):
+            memory_buffer[i] = i % 256
+        self.plasma_client.seal(object_id2)
+        assert (self.plasma_client.hash(object_id1) ==
+                self.plasma_client.hash(object_id2))
+
+        # Create a third object with a different value from the first two, and
+        # check that its hash is different.
+        object_id3 = random_object_id()
+        metadata = generate_metadata(length)
+        memory_buffer = np.frombuffer(self.plasma_client.create(object_id3,
+                                                                length,
+                                                                metadata),
+                                      dtype="uint8")
+        for i in range(length):
+            memory_buffer[i] = (i + 1) % 256
+        self.plasma_client.seal(object_id3)
+        assert (self.plasma_client.hash(object_id1) !=
+                self.plasma_client.hash(object_id3))
+
+        # Create a fourth object with the same value as the third, but
+        # different metadata. Check that its hash is different from any of the
+        # previous three.
+        object_id4 = random_object_id()
+        metadata4 = generate_metadata(length)
+        memory_buffer = np.frombuffer(self.plasma_client.create(object_id4,
+                                                                length,
+                                                                metadata4),
+                                      dtype="uint8")
+        for i in range(length):
+            memory_buffer[i] = (i + 1) % 256
+        self.plasma_client.seal(object_id4)
+        assert (self.plasma_client.hash(object_id1) !=
+                self.plasma_client.hash(object_id4))
+        assert (self.plasma_client.hash(object_id3) !=
+                self.plasma_client.hash(object_id4))
+
+    def test_many_hashes(self):
+        hashes = []
+        length = 2 ** 10
+
+        for i in range(256):
+            object_id = random_object_id()
+            memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
+                                                                    length),
+                                          dtype="uint8")
+            for j in range(length):
+                memory_buffer[j] = i
+            self.plasma_client.seal(object_id)
+            hashes.append(self.plasma_client.hash(object_id))
+
+        # Create objects of varying length. Each pair has two bits different.
+        for i in range(length):
+            object_id = random_object_id()
+            memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
+                                                                    length),
+                                          dtype="uint8")
+            for j in range(length):
+                memory_buffer[j] = 0
+            memory_buffer[i] = 1
+            self.plasma_client.seal(object_id)
+            hashes.append(self.plasma_client.hash(object_id))
+
+        # Create objects of varying length, all with value 0.
+        for i in range(length):
+            object_id = random_object_id()
+            memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
+                                                                    i),
+                                          dtype="uint8")
+            for j in range(i):
+                memory_buffer[j] = 0
+            self.plasma_client.seal(object_id)
+            hashes.append(self.plasma_client.hash(object_id))
+
+        # Check that all hashes were unique.
+        assert len(set(hashes)) == 256 + length + length
+
+    # def test_individual_delete(self):
+    #     length = 100
+    #     # Create an object id string.
+    #     object_id = random_object_id()
+    #     # Create a random metadata string.
+    #     metadata = generate_metadata(100)
+    #     # Create a new buffer and write to it.
+    #     memory_buffer = self.plasma_client.create(object_id, length,
+    #                                               metadata)
+    #     for i in range(length):
+    #         memory_buffer[i] = chr(i % 256)
+    #     # Seal the object.
+    #     self.plasma_client.seal(object_id)
+    #     # Check that the object is present.
+    #     assert self.plasma_client.contains(object_id)
+    #     # Delete the object.
+    #     self.plasma_client.delete(object_id)
+    #     # Make sure the object is no longer present.
+    #     self.assertFalse(self.plasma_client.contains(object_id))
+    #
+    # def test_delete(self):
+    #     # Create some objects.
+    #     object_ids = [random_object_id() for _ in range(100)]
+    #     for object_id in object_ids:
+    #         length = 100
+    #         # Create a random metadata string.
+    #         metadata = generate_metadata(100)
+    #         # Create a new buffer and write to it.
+    #         memory_buffer = self.plasma_client.create(object_id, length,
+    #                                                   metadata)
+    #         for i in range(length):
+    #             memory_buffer[i] = chr(i % 256)
+    #         # Seal the object.
+    #         self.plasma_client.seal(object_id)
+    #         # Check that the object is present.
+    #         assert self.plasma_client.contains(object_id)
+    #
+    #     # Delete the objects and make sure they are no longer present.
+    #     for object_id in object_ids:
+    #         # Delete the object.
+    #         self.plasma_client.delete(object_id)
+    #         # Make sure the object is no longer present.
+    #         self.assertFalse(self.plasma_client.contains(object_id))
+
+    def test_illegal_functionality(self):
+        # Create an object id string.
+        object_id = random_object_id()
+        # Create a new buffer and write to it.
+        length = 1000
+        memory_buffer = self.plasma_client.create(object_id, length)
+        # Make sure we cannot access memory out of bounds.
+        with pytest.raises(Exception):
+            memory_buffer[length]
+        # Seal the object.
+        self.plasma_client.seal(object_id)
+        # This test is commented out because it currently fails.
+        # # Make sure the object is ready only now.
+        # def illegal_assignment():
+        #     memory_buffer[0] = chr(0)
+        # with pytest.raises(Exception):
+        # illegal_assignment()
+        # Get the object.
+        memory_buffer = self.plasma_client.get([object_id])[0]
+
+        # Make sure the object is read only.
+        def illegal_assignment():
+            memory_buffer[0] = chr(0)
+        with pytest.raises(Exception):
+            illegal_assignment()
+
+    def test_evict(self):
+        client = self.plasma_client2
+        object_id1 = random_object_id()
+        b1 = client.create(object_id1, 1000)
+        client.seal(object_id1)
+        del b1
+        assert client.evict(1) == 1000
+
+        object_id2 = random_object_id()
+        object_id3 = random_object_id()
+        b2 = client.create(object_id2, 999)
+        b3 = client.create(object_id3, 998)
+        client.seal(object_id3)
+        del b3
+        assert client.evict(1000) == 998
+
+        object_id4 = random_object_id()
+        b4 = client.create(object_id4, 997)
+        client.seal(object_id4)
+        del b4
+        client.seal(object_id2)
+        del b2
+        assert client.evict(1) == 997
+        assert client.evict(1) == 999
+
+        object_id5 = random_object_id()
+        object_id6 = random_object_id()
+        object_id7 = random_object_id()
+        b5 = client.create(object_id5, 996)
+        b6 = client.create(object_id6, 995)
+        b7 = client.create(object_id7, 994)
+        client.seal(object_id5)
+        client.seal(object_id6)
+        client.seal(object_id7)
+        del b5
+        del b6
+        del b7
+        assert client.evict(2000) == 996 + 995 + 994
+
+    def test_subscribe(self):
+        # Subscribe to notifications from the Plasma Store.
+        self.plasma_client.subscribe()
+        for i in [1, 10, 100, 1000, 10000, 100000]:
+            object_ids = [random_object_id() for _ in range(i)]
+            metadata_sizes = [np.random.randint(1000) for _ in range(i)]
+            data_sizes = [np.random.randint(1000) for _ in range(i)]
+            for j in range(i):
+                self.plasma_client.create(
+                    object_ids[j], data_sizes[j],
+                    metadata=bytearray(np.random.bytes(metadata_sizes[j])))
+                self.plasma_client.seal(object_ids[j])
+            # Check that we received notifications for all of the objects.
+            for j in range(i):
+                notification_info = self.plasma_client.get_next_notification()
+                recv_objid, recv_dsize, recv_msize = notification_info
+                assert object_ids[j] == recv_objid
+                assert data_sizes[j] == recv_dsize
+                assert metadata_sizes[j] == recv_msize
+
+    def test_subscribe_deletions(self):
+        # Subscribe to notifications from the Plasma Store. We use
+        # plasma_client2 to make sure that all used objects will get evicted
+        # properly.
+        self.plasma_client2.subscribe()
+        for i in [1, 10, 100, 1000, 10000, 100000]:
+            object_ids = [random_object_id() for _ in range(i)]
+            # Add 1 to the sizes to make sure we have nonzero object sizes.
+            metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
+            data_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
+            for j in range(i):
+                x = self.plasma_client2.create(
+                        object_ids[j], data_sizes[j],
+                        metadata=bytearray(np.random.bytes(metadata_sizes[j])))
+                self.plasma_client2.seal(object_ids[j])
+            del x
+            # Check that we received notifications for creating all of the
+            # objects.
+            for j in range(i):
+                notification_info = self.plasma_client2.get_next_notification()
+                recv_objid, recv_dsize, recv_msize = notification_info
+                assert object_ids[j] == recv_objid
+                assert data_sizes[j] == recv_dsize
+                assert metadata_sizes[j] == recv_msize
+
+            # Check that we receive notifications for deleting all objects, as
+            # we evict them.
+            for j in range(i):
+                assert (self.plasma_client2.evict(1) ==
+                        data_sizes[j] + metadata_sizes[j])
+                notification_info = self.plasma_client2.get_next_notification()
+                recv_objid, recv_dsize, recv_msize = notification_info
+                assert object_ids[j] == recv_objid
+                assert -1 == recv_dsize
+                assert -1 == recv_msize
+
+        # Test multiple deletion notifications. The first 9 object IDs have
+        # size 0, and the last has a nonzero size. When Plasma evicts 1 byte,
+        # it will evict all objects, so we should receive deletion
+        # notifications for each.
+        num_object_ids = 10
+        object_ids = [random_object_id() for _ in range(num_object_ids)]
+        metadata_sizes = [0] * (num_object_ids - 1)
+        data_sizes = [0] * (num_object_ids - 1)
+        metadata_sizes.append(np.random.randint(1000))
+        data_sizes.append(np.random.randint(1000))
+        for i in range(num_object_ids):
+            x = self.plasma_client2.create(
+                    object_ids[i], data_sizes[i],
+                    metadata=bytearray(np.random.bytes(metadata_sizes[i])))
+            self.plasma_client2.seal(object_ids[i])
+        del x
+        for i in range(num_object_ids):
+            notification_info = self.plasma_client2.get_next_notification()
+            recv_objid, recv_dsize, recv_msize = notification_info
+            assert object_ids[i] == recv_objid
+            assert data_sizes[i] == recv_dsize
+            assert metadata_sizes[i] == recv_msize
+        assert (self.plasma_client2.evict(1) ==
+                data_sizes[-1] + metadata_sizes[-1])
+        for i in range(num_object_ids):
+            notification_info = self.plasma_client2.get_next_notification()
+            recv_objid, recv_dsize, recv_msize = notification_info
+            assert object_ids[i] == recv_objid
+            assert -1 == recv_dsize
+            assert -1 == recv_msize

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 1ea57ae..7425b71 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -99,6 +99,10 @@ class build_ext(_build_ext):
 
         self.with_parquet = strtobool(
             os.environ.get('PYARROW_WITH_PARQUET', '0'))
+        self.with_plasma = strtobool(
+            os.environ.get('PYARROW_WITH_PLASMA', '0'))
+        if self.with_plasma and "plasma" not in self.CYTHON_MODULE_NAMES:
+            self.CYTHON_MODULE_NAMES.append("plasma")
         self.bundle_arrow_cpp = strtobool(
             os.environ.get('PYARROW_BUNDLE_ARROW_CPP', '0'))
 
@@ -242,6 +246,8 @@ class build_ext(_build_ext):
             shutil.move(pjoin(build_prefix, 'include'), pjoin(build_lib, 'pyarrow'))
             move_lib("arrow")
             move_lib("arrow_python")
+            if self.with_plasma:
+                move_lib("plasma")
             if self.with_parquet:
                 move_lib("parquet")
 
@@ -270,11 +276,20 @@ class build_ext(_build_ext):
                 shutil.move(self.get_ext_built_api_header(name),
                             pjoin(os.path.dirname(ext_path), name + '_api.h'))
 
+        # Move the plasma store
+        if self.with_plasma:
+            build_py = self.get_finalized_command('build_py')
+            source = os.path.join(self.build_type, "plasma_store")
+            target = os.path.join(build_lib, build_py.get_package_dir('pyarrow'), "plasma_store")
+            shutil.move(source, target)
+
         os.chdir(saved_cwd)
 
     def _failure_permitted(self, name):
         if name == '_parquet' and not self.with_parquet:
             return True
+        if name == 'plasma' and not self.with_plasma:
+            return True
         return False
 
     def _get_inplace_dir(self):


Mime
View raw message