Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 00B2F200CCF for ; Mon, 24 Jul 2017 18:12:53 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F2E58163905; Mon, 24 Jul 2017 16:12:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 21C4E1638CB for ; Mon, 24 Jul 2017 18:12:50 +0200 (CEST) Received: (qmail 1595 invoked by uid 500); 24 Jul 2017 16:12:48 -0000 Mailing-List: contact commits-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@arrow.apache.org Delivered-To: mailing list commits@arrow.apache.org Received: (qmail 1586 invoked by uid 99); 24 Jul 2017 16:12:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Jul 2017 16:12:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8D755E2F41; Mon, 24 Jul 2017 16:12:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wesm@apache.org To: commits@arrow.apache.org Date: Mon, 24 Jul 2017 16:12:48 -0000 Message-Id: <3495c147aa344d2dab81a1491dc9c5bf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] arrow git commit: ARROW-1149: [Plasma] Create Cython client library for Plasma archived-at: Mon, 24 Jul 2017 16:12:53 -0000 Repository: arrow Updated Branches: refs/heads/master 05f7058ce -> a94f4716b http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/plasma.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx new file mode 100644 index 0000000..bb17685 --- /dev/null +++ b/python/pyarrow/plasma.pyx @@ -0,0 +1,560 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + +from libcpp cimport bool as c_bool, nullptr +from libcpp.memory cimport shared_ptr, unique_ptr, make_shared +from libcpp.string cimport string as c_string +from libcpp.vector cimport vector as c_vector +from libc.stdint cimport int64_t, uint8_t, uintptr_t +from cpython.pycapsule cimport * + +from pyarrow.lib cimport Buffer, NativeFile, check_status +from pyarrow.includes.libarrow cimport (CMutableBuffer, CBuffer, + CFixedSizeBufferWriter, CStatus) + + +PLASMA_WAIT_TIMEOUT = 2 ** 30 + + +cdef extern from "plasma/common.h" nogil: + + cdef cppclass CUniqueID" plasma::UniqueID": + + @staticmethod + CUniqueID from_binary(const c_string& binary) + + c_bool operator==(const CUniqueID& rhs) const + + c_string hex() const + + c_string binary() const + + cdef struct CObjectRequest" plasma::ObjectRequest": + CUniqueID object_id + int type + int status + + +cdef extern from "plasma/common.h": + cdef int64_t kDigestSize" plasma::kDigestSize" + + cdef enum ObjectRequestType: + PLASMA_QUERY_LOCAL"plasma::PLASMA_QUERY_LOCAL", + PLASMA_QUERY_ANYWHERE"plasma::PLASMA_QUERY_ANYWHERE" + + cdef int ObjectStatusLocal"plasma::ObjectStatusLocal"; + cdef int ObjectStatusRemote"plasma::ObjectStatusRemote"; + +cdef extern from "plasma/client.h" nogil: + + cdef cppclass CPlasmaClient" plasma::PlasmaClient": + + CPlasmaClient() + + CStatus Connect(const c_string& store_socket_name, + const c_string& manager_socket_name, int release_delay) + + CStatus Create(const CUniqueID& object_id, int64_t data_size, + const uint8_t* metadata, int64_t metadata_size, + uint8_t** data) + + CStatus Get(const CUniqueID* object_ids, int64_t num_objects, + int64_t timeout_ms, CObjectBuffer* object_buffers) + + CStatus Seal(const CUniqueID& object_id) + + CStatus Evict(int64_t num_bytes, int64_t& num_bytes_evicted) + + CStatus Hash(const CUniqueID& object_id, uint8_t* digest) + + CStatus Release(const CUniqueID& object_id) + + CStatus Contains(const CUniqueID& object_id, c_bool* has_object) + + CStatus Subscribe(int* fd) + + CStatus GetNotification(int fd, CUniqueID* object_id, + int64_t* data_size, int64_t* metadata_size) + + CStatus Disconnect() + + CStatus Fetch(int num_object_ids, const CUniqueID* object_ids) + + CStatus Wait(int64_t num_object_requests, CObjectRequest* object_requests, + int num_ready_objects, int64_t timeout_ms, int* num_objects_ready); + + CStatus Transfer(const char* addr, int port, const CUniqueID& object_id) + + +cdef extern from "plasma/client.h" nogil: + + cdef struct CObjectBuffer" plasma::ObjectBuffer": + int64_t data_size + uint8_t* data + int64_t metadata_size + uint8_t* metadata + + +def make_object_id(object_id): + return ObjectID(object_id) + + +cdef class ObjectID: + """ + An ObjectID represents a string of bytes used to identify Plasma objects. + """ + + cdef: + CUniqueID data + + def __cinit__(self, object_id): + self.data = CUniqueID.from_binary(object_id) + + def __richcmp__(ObjectID self, ObjectID object_id, operation): + if operation != 2: + raise ValueError("operation != 2 (only equality is supported)") + return self.data == object_id.data + + def __hash__(self): + return hash(self.data.binary()) + + def __repr__(self): + return "ObjectID(" + self.data.hex().decode() + ")" + + def __reduce__(self): + return (make_object_id, (self.data.binary(),)) + + def binary(self): + """ + Return the binary representation of this ObjectID. + + Returns + ------- + bytes + Binary representation of the ObjectID. + """ + return self.data.binary() + + +cdef class PlasmaBuffer(Buffer): + """ + This is the type returned by calls to get with a PlasmaClient. + + We define our own class instead of directly returning a buffer object so + that we can add a custom destructor which notifies Plasma that the object + is no longer being used, so the memory in the Plasma store backing the + object can potentially be freed. + + Attributes + ---------- + object_id : ObjectID + The ID of the object in the buffer. + client : PlasmaClient + The PlasmaClient that we use to communicate with the store and manager. + """ + + cdef: + ObjectID object_id + PlasmaClient client + + def __cinit__(self, ObjectID object_id, PlasmaClient client): + """ + Initialize a PlasmaBuffer. + """ + self.object_id = object_id + self.client = client + + def __dealloc__(self): + """ + Notify Plasma that the object is no longer needed. + + If the plasma client has been shut down, then don't do anything. + """ + self.client.release(self.object_id) + + +cdef class PlasmaClient: + """ + The PlasmaClient is used to interface with a plasma store and manager. + + The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a + buffer, and get a buffer. Buffers are referred to by object IDs, which are + strings. + """ + + cdef: + shared_ptr[CPlasmaClient] client + int notification_fd + c_string store_socket_name + c_string manager_socket_name + + def __cinit__(self, store_socket_name, manager_socket_name, int release_delay): + """ + Create a new PlasmaClient that is connected to a plasma store + and optionally a plasma manager. + + Parameters + ---------- + store_socket_name : str + Name of the socket the plasma store is listening at. + manager_socket_name : str + Name of the socket the plasma manager is listening at. + release_delay : int + The maximum number of objects that the client will keep and + delay releasing (for caching reasons). + """ + self.client.reset(new CPlasmaClient()) + self.notification_fd = -1 + self.store_socket_name = store_socket_name.encode() + self.manager_socket_name = manager_socket_name.encode() + with nogil: + check_status(self.client.get().Connect(self.store_socket_name, + self.manager_socket_name, release_delay)) + + cdef _get_object_buffers(self, object_ids, int64_t timeout_ms, + c_vector[CObjectBuffer]* result): + cdef c_vector[CUniqueID] ids + cdef ObjectID object_id + for object_id in object_ids: + ids.push_back(object_id.data) + result[0].resize(ids.size()) + with nogil: + check_status(self.client.get().Get(ids.data(), ids.size(), + timeout_ms, result[0].data())) + + cdef _make_plasma_buffer(self, ObjectID object_id, uint8_t* data, + int64_t size): + cdef shared_ptr[CBuffer] buffer + buffer.reset(new CBuffer(data, size)) + result = PlasmaBuffer(object_id, self) + result.init(buffer) + return result + + cdef _make_mutable_plasma_buffer(self, ObjectID object_id, uint8_t* data, + int64_t size): + cdef shared_ptr[CBuffer] buffer + buffer.reset(new CMutableBuffer(data, size)) + result = PlasmaBuffer(object_id, self) + result.init(buffer) + return result + + @property + def store_socket_name(self): + return self.store_socket_name.decode() + + @property + def manager_socket_name(self): + return self.manager_socket_name.decode() + + def create(self, ObjectID object_id, int64_t data_size, c_string metadata=b""): + """ + Create a new buffer in the PlasmaStore for a particular object ID. + + The returned buffer is mutable until seal is called. + + Parameters + ---------- + object_id : ObjectID + The object ID used to identify an object. + size : int + The size in bytes of the created buffer. + metadata : bytes + An optional string of bytes encoding whatever metadata the user + wishes to encode. + + Raises + ------ + PlasmaObjectExists + This exception is raised if the object could not be created because + there already is an object with the same ID in the plasma store. + + PlasmaStoreFull: This exception is raised if the object could + not be created because the plasma store is unable to evict + enough objects to create room for it. + """ + cdef uint8_t* data + with nogil: + check_status(self.client.get().Create(object_id.data, data_size, + (metadata.data()), + metadata.size(), &data)) + return self._make_mutable_plasma_buffer(object_id, data, data_size) + + def get(self, object_ids, timeout_ms=-1): + """ + Returns data buffer from the PlasmaStore based on object ID. + + If the object has not been sealed yet, this call will block. The + retrieved buffer is immutable. + + Parameters + ---------- + object_ids : list + A list of ObjectIDs used to identify some objects. + timeout_ms :int + The number of milliseconds that the get call should block before + timing out and returning. Pass -1 if the call should block and 0 + if the call should return immediately. + + Returns + ------- + list + List of PlasmaBuffers for the data associated with the object_ids + and None if the object was not available. + """ + cdef c_vector[CObjectBuffer] object_buffers + self._get_object_buffers(object_ids, timeout_ms, &object_buffers) + result = [] + for i in range(object_buffers.size()): + if object_buffers[i].data_size != -1: + result.append(self._make_plasma_buffer( + object_ids[i], object_buffers[i].data, + object_buffers[i].data_size)) + else: + result.append(None) + return result + + def get_metadata(self, object_ids, timeout_ms=-1): + """ + Returns metadata buffer from the PlasmaStore based on object ID. + + If the object has not been sealed yet, this call will block. The + retrieved buffer is immutable. + + Parameters + ---------- + object_ids : list + A list of ObjectIDs used to identify some objects. + timeout_ms : int + The number of milliseconds that the get call should block before + timing out and returning. Pass -1 if the call should block and 0 + if the call should return immediately. + + Returns + ------- + list + List of PlasmaBuffers for the metadata associated with the + object_ids and None if the object was not available. + """ + cdef c_vector[CObjectBuffer] object_buffers + self._get_object_buffers(object_ids, timeout_ms, &object_buffers) + result = [] + for i in range(object_buffers.size()): + result.append(self._make_plasma_buffer( + object_ids[i], object_buffers[i].metadata, + object_buffers[i].metadata_size)) + return result + + def seal(self, ObjectID object_id): + """ + Seal the buffer in the PlasmaStore for a particular object ID. + + Once a buffer has been sealed, the buffer is immutable and can only be + accessed through get. + + Parameters + ---------- + object_id : ObjectID + A string used to identify an object. + """ + with nogil: + check_status(self.client.get().Seal(object_id.data)) + + def release(self, ObjectID object_id): + """ + Notify Plasma that the object is no longer needed. + + Parameters + ---------- + object_id : ObjectID + A string used to identify an object. + """ + with nogil: + check_status(self.client.get().Release(object_id.data)) + + def contains(self, ObjectID object_id): + """ + Check if the object is present and sealed in the PlasmaStore. + + Parameters + ---------- + object_id : ObjectID + A string used to identify an object. + """ + cdef c_bool is_contained + with nogil: + check_status(self.client.get().Contains(object_id.data, + &is_contained)) + return is_contained + + def hash(self, ObjectID object_id): + """ + Compute the checksum of an object in the object store. + + Parameters + ---------- + object_id : ObjectID + A string used to identify an object. + + Returns + ------- + bytes + A digest string object's hash. If the object isn't in the object + store, the string will have length zero. + """ + cdef c_vector[uint8_t] digest = c_vector[uint8_t](kDigestSize) + with nogil: + check_status(self.client.get().Hash(object_id.data, + digest.data())) + return bytes(digest[:]) + + def evict(self, int64_t num_bytes): + """ + Evict some objects until to recover some bytes. + + Recover at least num_bytes bytes if possible. + + Parameters + ---------- + num_bytes : int + The number of bytes to attempt to recover. + """ + cdef int64_t num_bytes_evicted = -1 + with nogil: + check_status(self.client.get().Evict(num_bytes, num_bytes_evicted)) + return num_bytes_evicted + + def transfer(self, address, int port, ObjectID object_id): + """ + Transfer local object with id object_id to another plasma instance + + Parameters + ---------- + addr : str + IPv4 address of the plasma instance the object is sent to. + port : int + Port number of the plasma instance the object is sent to. + object_id : str + A string used to identify an object. + """ + cdef c_string addr = address.encode() + with nogil: + check_status(self.client.get().Transfer(addr.c_str(), port, object_id.data)) + + def fetch(self, object_ids): + """ + Fetch the objects with the given IDs from other plasma managers. + + Parameters + ---------- + object_ids : list + A list of strings used to identify the objects. + """ + cdef c_vector[CUniqueID] ids + cdef ObjectID object_id + for object_id in object_ids: + ids.push_back(object_id.data) + with nogil: + check_status(self.client.get().Fetch(ids.size(), ids.data())) + + def wait(self, object_ids, int64_t timeout=PLASMA_WAIT_TIMEOUT, int num_returns=1): + """ + Wait until num_returns objects in object_ids are ready. + Currently, the object ID arguments to wait must be unique. + + Parameters + ---------- + object_ids : list + List of object IDs to wait for. + timeout :int + Return to the caller after timeout milliseconds. + num_returns : int + We are waiting for this number of objects to be ready. + + Returns + ------- + list + List of object IDs that are ready. + list + List of object IDs we might still wait on. + """ + # Check that the object ID arguments are unique. The plasma manager + # currently crashes if given duplicate object IDs. + if len(object_ids) != len(set(object_ids)): + raise Exception("Wait requires a list of unique object IDs.") + cdef int64_t num_object_requests = len(object_ids) + cdef c_vector[CObjectRequest] object_requests = c_vector[CObjectRequest](num_object_requests) + cdef int num_objects_ready = 0 + cdef ObjectID object_id + for i, object_id in enumerate(object_ids): + object_requests[i].object_id = object_id.data + object_requests[i].type = PLASMA_QUERY_ANYWHERE + with nogil: + check_status(self.client.get().Wait(num_object_requests, object_requests.data(), num_returns, timeout, &num_objects_ready)) + cdef int num_to_return = min(num_objects_ready, num_returns); + ready_ids = [] + waiting_ids = set(object_ids) + cdef int num_returned = 0 + for i in range(len(object_ids)): + if num_returned == num_to_return: + break + if object_requests[i].status == ObjectStatusLocal or object_requests[i].status == ObjectStatusRemote: + ready_ids.append(ObjectID(object_requests[i].object_id.binary())) + waiting_ids.discard(ObjectID(object_requests[i].object_id.binary())) + num_returned += 1 + return ready_ids, list(waiting_ids) + + def subscribe(self): + """Subscribe to notifications about sealed objects.""" + with nogil: + check_status(self.client.get().Subscribe(&self.notification_fd)) + + def get_next_notification(self): + """ + Get the next notification from the notification socket. + + Returns + ------- + ObjectID + The object ID of the object that was stored. + int + The data size of the object that was stored. + int + The metadata size of the object that was stored. + """ + cdef ObjectID object_id = ObjectID(20 * b"\0") + cdef int64_t data_size + cdef int64_t metadata_size + with nogil: + check_status(self.client.get().GetNotification(self.notification_fd, + &object_id.data, + &data_size, + &metadata_size)) + return object_id, data_size, metadata_size + + def to_capsule(self): + return PyCapsule_New(self.client.get(), "plasma", NULL) + + def disconnect(self): + """ + Disconnect this client from the Plasma store. + """ + with nogil: + check_status(self.client.get().Disconnect()) http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/tests/conftest.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index 2aeeab7..21288e4 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -18,11 +18,12 @@ from pytest import skip -groups = ['hdfs', 'parquet', 'large_memory'] +groups = ['hdfs', 'parquet', 'plasma', 'large_memory'] defaults = { 'hdfs': False, 'parquet': False, + 'plasma': False, 'large_memory': False } @@ -32,6 +33,11 @@ try: except ImportError: pass +try: + import pyarrow.plasma as plasma + defaults['plasma'] = True +except ImportError: + pass def pytest_configure(config): pass http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/tests/test_plasma.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py new file mode 100644 index 0000000..ce684e3 --- /dev/null +++ b/python/pyarrow/tests/test_plasma.py @@ -0,0 +1,683 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import glob +import numpy as np +import os +import pytest +import random +import signal +import subprocess +import sys +import time +import unittest + +import pyarrow as pa +import pandas as pd + +DEFAULT_PLASMA_STORE_MEMORY = 10 ** 9 + +def random_name(): + return str(random.randint(0, 99999999)) + + +def random_object_id(): + import pyarrow.plasma as plasma + return plasma.ObjectID(np.random.bytes(20)) + + +def generate_metadata(length): + metadata = bytearray(length) + if length > 0: + metadata[0] = random.randint(0, 255) + metadata[-1] = random.randint(0, 255) + for _ in range(100): + metadata[random.randint(0, length - 1)] = random.randint(0, 255) + return metadata + + +def write_to_data_buffer(buff, length): + array = np.frombuffer(buff, dtype="uint8") + if length > 0: + array[0] = random.randint(0, 255) + array[-1] = random.randint(0, 255) + for _ in range(100): + array[random.randint(0, length - 1)] = random.randint(0, 255) + + +def create_object_with_id(client, object_id, data_size, metadata_size, + seal=True): + metadata = generate_metadata(metadata_size) + memory_buffer = client.create(object_id, data_size, metadata) + write_to_data_buffer(memory_buffer, data_size) + if seal: + client.seal(object_id) + return memory_buffer, metadata + + +def create_object(client, data_size, metadata_size, seal=True): + object_id = random_object_id() + memory_buffer, metadata = create_object_with_id(client, object_id, + data_size, metadata_size, + seal=seal) + return object_id, memory_buffer, metadata + + +def assert_get_object_equal(unit_test, client1, client2, object_id, + memory_buffer=None, metadata=None): + import pyarrow.plasma as plasma + client1_buff = client1.get([object_id])[0] + client2_buff = client2.get([object_id])[0] + client1_metadata = client1.get_metadata([object_id])[0] + client2_metadata = client2.get_metadata([object_id])[0] + assert len(client1_buff) == len(client2_buff) + assert len(client1_metadata) == len(client2_metadata) + # Check that the buffers from the two clients are the same. + assert plasma.buffers_equal(client1_buff, client2_buff) + # Check that the metadata buffers from the two clients are the same. + assert plasma.buffers_equal(client1_metadata, client2_metadata) + # If a reference buffer was provided, check that it is the same as well. + if memory_buffer is not None: + assert plasma.buffers_equal(memory_buffer, client1_buff) + # If reference metadata was provided, check that it is the same as well. + if metadata is not None: + assert plasma.buffers_equal(metadata, client1_metadata) + + +def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, + use_valgrind=False, use_profiler=False, + stdout_file=None, stderr_file=None): + """Start a plasma store process. + Args: + use_valgrind (bool): True if the plasma store should be started inside + of valgrind. If this is True, use_profiler must be False. + use_profiler (bool): True if the plasma store should be started inside + a profiler. If this is True, use_valgrind must be False. + stdout_file: A file handle opened for writing to redirect stdout to. If + no redirection should happen, then this should be None. + stderr_file: A file handle opened for writing to redirect stderr to. If + no redirection should happen, then this should be None. + Return: + A tuple of the name of the plasma store socket and the process ID of + the plasma store process. + """ + if use_valgrind and use_profiler: + raise Exception("Cannot use valgrind and profiler at the same time.") + plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store") + plasma_store_name = "/tmp/plasma_store{}".format(random_name()) + command = [plasma_store_executable, + "-s", plasma_store_name, + "-m", str(plasma_store_memory)] + if use_valgrind: + pid = subprocess.Popen(["valgrind", + "--track-origins=yes", + "--leak-check=full", + "--show-leak-kinds=all", + "--leak-check-heuristics=stdstring", + "--error-exitcode=1"] + command, + stdout=stdout_file, stderr=stderr_file) + time.sleep(1.0) + elif use_profiler: + pid = subprocess.Popen(["valgrind", "--tool=callgrind"] + command, + stdout=stdout_file, stderr=stderr_file) + time.sleep(1.0) + else: + pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) + time.sleep(0.1) + return plasma_store_name, pid + + +@pytest.mark.plasma +class TestPlasmaClient(object): + + def setup_method(self, test_method): + import pyarrow.plasma as plasma + # Start Plasma store. + plasma_store_name, self.p = start_plasma_store( + use_valgrind=os.getenv("PLASMA_VALGRIND") == "1") + # Connect to Plasma. + self.plasma_client = plasma.PlasmaClient(plasma_store_name, "", 64) + # For the eviction test + self.plasma_client2 = plasma.PlasmaClient(plasma_store_name, "", 0) + + def teardown_method(self, test_method): + # Check that the Plasma store is still alive. + assert self.p.poll() == None + # Kill the plasma store process. + if os.getenv("PLASMA_VALGRIND") == "1": + self.p.send_signal(signal.SIGTERM) + self.p.wait() + if self.p.returncode != 0: + assert False + else: + self.p.kill() + + def test_create(self): + # Create an object id string. + object_id = random_object_id() + # Create a new buffer and write to it. + length = 50 + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + length), + dtype="uint8") + for i in range(length): + memory_buffer[i] = i % 256 + # Seal the object. + self.plasma_client.seal(object_id) + # Get the object. + memory_buffer = np.frombuffer(self.plasma_client.get([object_id])[0], + dtype="uint8") + for i in range(length): + assert memory_buffer[i] == i % 256 + + def test_create_with_metadata(self): + for length in range(1000): + # Create an object id string. + object_id = random_object_id() + # Create a random metadata string. + metadata = generate_metadata(length) + # Create a new buffer and write to it. + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + length, + metadata), + dtype="uint8") + for i in range(length): + memory_buffer[i] = i % 256 + # Seal the object. + self.plasma_client.seal(object_id) + # Get the object. + memory_buffer = np.frombuffer( + self.plasma_client.get([object_id])[0], dtype="uint8") + for i in range(length): + assert memory_buffer[i] == i % 256 + # Get the metadata. + metadata_buffer = np.frombuffer( + self.plasma_client.get_metadata([object_id])[0], dtype="uint8") + assert len(metadata) == len(metadata_buffer) + for i in range(len(metadata)): + assert metadata[i] == metadata_buffer[i] + + def test_create_existing(self): + # This test is partially used to test the code path in which we create + # an object with an ID that already exists + length = 100 + for _ in range(1000): + object_id = random_object_id() + self.plasma_client.create(object_id, length, + generate_metadata(length)) + try: + self.plasma_client.create(object_id, length, + generate_metadata(length)) + # TODO(pcm): Introduce a more specific error type here. + except pa.lib.ArrowException as e: + pass + else: + assert False + + def test_get(self): + num_object_ids = 100 + # Test timing out of get with various timeouts. + for timeout in [0, 10, 100, 1000]: + object_ids = [random_object_id() for _ in range(num_object_ids)] + results = self.plasma_client.get(object_ids, timeout_ms=timeout) + assert results == num_object_ids * [None] + + data_buffers = [] + metadata_buffers = [] + for i in range(num_object_ids): + if i % 2 == 0: + data_buffer, metadata_buffer = create_object_with_id( + self.plasma_client, object_ids[i], 2000, 2000) + data_buffers.append(data_buffer) + metadata_buffers.append(metadata_buffer) + + # Test timing out from some but not all get calls with various + # timeouts. + for timeout in [0, 10, 100, 1000]: + data_results = self.plasma_client.get(object_ids, + timeout_ms=timeout) + # metadata_results = self.plasma_client.get_metadata( + # object_ids, timeout_ms=timeout) + for i in range(num_object_ids): + if i % 2 == 0: + array1 = np.frombuffer(data_buffers[i // 2], dtype="uint8") + array2 = np.frombuffer(data_results[i], dtype="uint8") + np.testing.assert_equal(array1, array2) + # TODO(rkn): We should compare the metadata as well. But + # currently the types are different (e.g., memoryview + # versus bytearray). + # assert plasma.buffers_equal( + # metadata_buffers[i // 2], metadata_results[i]) + else: + assert results[i] is None + + def test_store_arrow_objects(self): + import pyarrow.plasma as plasma + data = np.random.randn(10, 4) + # Write an arrow object. + object_id = random_object_id() + tensor = pa.Tensor.from_numpy(data) + data_size = pa.get_tensor_size(tensor) + buf = self.plasma_client.create(object_id, data_size) + stream = pa.FixedSizeBufferOutputStream(buf) + pa.write_tensor(tensor, stream) + self.plasma_client.seal(object_id) + # Read the arrow object. + [tensor] = self.plasma_client.get([object_id]) + reader = pa.BufferReader(tensor) + array = pa.read_tensor(reader).to_numpy() + # Assert that they are equal. + np.testing.assert_equal(data, array) + + def test_store_pandas_dataframe(self): + import pyarrow.plasma as plasma + d = {'one': pd.Series([1., 2., 3.], index=['a', 'b', 'c']), + 'two': pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])} + df = pd.DataFrame(d) + + # Write the DataFrame. + record_batch = pa.RecordBatch.from_pandas(df) + # Determine the size. + s = pa.MockOutputStream() + stream_writer = pa.RecordBatchStreamWriter(s, record_batch.schema) + stream_writer.write_batch(record_batch) + data_size = s.size() + object_id = plasma.ObjectID(np.random.bytes(20)) + + buf = self.plasma_client.create(object_id, data_size) + stream = pa.FixedSizeBufferOutputStream(buf) + stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema) + stream_writer.write_batch(record_batch) + + self.plasma_client.seal(object_id) + + # Read the DataFrame. + [data] = self.plasma_client.get([object_id]) + reader = pa.RecordBatchStreamReader(pa.BufferReader(data)) + result = reader.get_next_batch().to_pandas() + + pd.util.testing.assert_frame_equal(df, result) + + def test_pickle_object_ids(self): + # This can be used for sharing object IDs between processes. + import pickle + object_id = random_object_id() + data = pickle.dumps(object_id) + object_id2 = pickle.loads(data) + assert object_id == object_id2 + + def test_store_full(self): + # The store is started with 1GB, so make sure that create throws an + # exception when it is full. + def assert_create_raises_plasma_full(unit_test, size): + partial_size = np.random.randint(size) + try: + _, memory_buffer, _ = create_object(unit_test.plasma_client, + partial_size, + size - partial_size) + # TODO(pcm): More specific error here. + except pa.lib.ArrowException as e: + pass + else: + # For some reason the above didn't throw an exception, so fail. + assert False + + # Create a list to keep some of the buffers in scope. + memory_buffers = [] + _, memory_buffer, _ = create_object(self.plasma_client, 5 * 10 ** 8, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 5 * 10 ** 8. Make sure that we can't create an + # object of size 5 * 10 ** 8 + 1, but we can create one of size + # 2 * 10 ** 8. + assert_create_raises_plasma_full(self, 5 * 10 ** 8 + 1) + _, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0) + del memory_buffer + _, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0) + del memory_buffer + assert_create_raises_plasma_full(self, 5 * 10 ** 8 + 1) + + _, memory_buffer, _ = create_object(self.plasma_client, 2 * 10 ** 8, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 3 * 10 ** 8. + assert_create_raises_plasma_full(self, 3 * 10 ** 8 + 1) + + _, memory_buffer, _ = create_object(self.plasma_client, 10 ** 8, 0) + memory_buffers.append(memory_buffer) + # Remaining space is 2 * 10 ** 8. + assert_create_raises_plasma_full(self, 2 * 10 ** 8 + 1) + + def test_contains(self): + fake_object_ids = [random_object_id() for _ in range(100)] + real_object_ids = [random_object_id() for _ in range(100)] + for object_id in real_object_ids: + assert self.plasma_client.contains(object_id) == False + self.plasma_client.create(object_id, 100) + self.plasma_client.seal(object_id) + assert self.plasma_client.contains(object_id) + for object_id in fake_object_ids: + assert not self.plasma_client.contains(object_id) + for object_id in real_object_ids: + assert self.plasma_client.contains(object_id) + + def test_hash(self): + # Check the hash of an object that doesn't exist. + object_id1 = random_object_id() + try: + self.plasma_client.hash(object_id1) + # TODO(pcm): Introduce a more specific error type here + except pa.lib.ArrowException as e: + pass + else: + assert False + + length = 1000 + # Create a random object, and check that the hash function always + # returns the same value. + metadata = generate_metadata(length) + memory_buffer = np.frombuffer(self.plasma_client.create(object_id1, + length, + metadata), + dtype="uint8") + for i in range(length): + memory_buffer[i] = i % 256 + self.plasma_client.seal(object_id1) + assert (self.plasma_client.hash(object_id1) == + self.plasma_client.hash(object_id1)) + + # Create a second object with the same value as the first, and check + # that their hashes are equal. + object_id2 = random_object_id() + memory_buffer = np.frombuffer(self.plasma_client.create(object_id2, + length, + metadata), + dtype="uint8") + for i in range(length): + memory_buffer[i] = i % 256 + self.plasma_client.seal(object_id2) + assert (self.plasma_client.hash(object_id1) == + self.plasma_client.hash(object_id2)) + + # Create a third object with a different value from the first two, and + # check that its hash is different. + object_id3 = random_object_id() + metadata = generate_metadata(length) + memory_buffer = np.frombuffer(self.plasma_client.create(object_id3, + length, + metadata), + dtype="uint8") + for i in range(length): + memory_buffer[i] = (i + 1) % 256 + self.plasma_client.seal(object_id3) + assert (self.plasma_client.hash(object_id1) != + self.plasma_client.hash(object_id3)) + + # Create a fourth object with the same value as the third, but + # different metadata. Check that its hash is different from any of the + # previous three. + object_id4 = random_object_id() + metadata4 = generate_metadata(length) + memory_buffer = np.frombuffer(self.plasma_client.create(object_id4, + length, + metadata4), + dtype="uint8") + for i in range(length): + memory_buffer[i] = (i + 1) % 256 + self.plasma_client.seal(object_id4) + assert (self.plasma_client.hash(object_id1) != + self.plasma_client.hash(object_id4)) + assert (self.plasma_client.hash(object_id3) != + self.plasma_client.hash(object_id4)) + + def test_many_hashes(self): + hashes = [] + length = 2 ** 10 + + for i in range(256): + object_id = random_object_id() + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + length), + dtype="uint8") + for j in range(length): + memory_buffer[j] = i + self.plasma_client.seal(object_id) + hashes.append(self.plasma_client.hash(object_id)) + + # Create objects of varying length. Each pair has two bits different. + for i in range(length): + object_id = random_object_id() + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + length), + dtype="uint8") + for j in range(length): + memory_buffer[j] = 0 + memory_buffer[i] = 1 + self.plasma_client.seal(object_id) + hashes.append(self.plasma_client.hash(object_id)) + + # Create objects of varying length, all with value 0. + for i in range(length): + object_id = random_object_id() + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + i), + dtype="uint8") + for j in range(i): + memory_buffer[j] = 0 + self.plasma_client.seal(object_id) + hashes.append(self.plasma_client.hash(object_id)) + + # Check that all hashes were unique. + assert len(set(hashes)) == 256 + length + length + + # def test_individual_delete(self): + # length = 100 + # # Create an object id string. + # object_id = random_object_id() + # # Create a random metadata string. + # metadata = generate_metadata(100) + # # Create a new buffer and write to it. + # memory_buffer = self.plasma_client.create(object_id, length, + # metadata) + # for i in range(length): + # memory_buffer[i] = chr(i % 256) + # # Seal the object. + # self.plasma_client.seal(object_id) + # # Check that the object is present. + # assert self.plasma_client.contains(object_id) + # # Delete the object. + # self.plasma_client.delete(object_id) + # # Make sure the object is no longer present. + # self.assertFalse(self.plasma_client.contains(object_id)) + # + # def test_delete(self): + # # Create some objects. + # object_ids = [random_object_id() for _ in range(100)] + # for object_id in object_ids: + # length = 100 + # # Create a random metadata string. + # metadata = generate_metadata(100) + # # Create a new buffer and write to it. + # memory_buffer = self.plasma_client.create(object_id, length, + # metadata) + # for i in range(length): + # memory_buffer[i] = chr(i % 256) + # # Seal the object. + # self.plasma_client.seal(object_id) + # # Check that the object is present. + # assert self.plasma_client.contains(object_id) + # + # # Delete the objects and make sure they are no longer present. + # for object_id in object_ids: + # # Delete the object. + # self.plasma_client.delete(object_id) + # # Make sure the object is no longer present. + # self.assertFalse(self.plasma_client.contains(object_id)) + + def test_illegal_functionality(self): + # Create an object id string. + object_id = random_object_id() + # Create a new buffer and write to it. + length = 1000 + memory_buffer = self.plasma_client.create(object_id, length) + # Make sure we cannot access memory out of bounds. + with pytest.raises(Exception): + memory_buffer[length] + # Seal the object. + self.plasma_client.seal(object_id) + # This test is commented out because it currently fails. + # # Make sure the object is ready only now. + # def illegal_assignment(): + # memory_buffer[0] = chr(0) + # with pytest.raises(Exception): + # illegal_assignment() + # Get the object. + memory_buffer = self.plasma_client.get([object_id])[0] + + # Make sure the object is read only. + def illegal_assignment(): + memory_buffer[0] = chr(0) + with pytest.raises(Exception): + illegal_assignment() + + def test_evict(self): + client = self.plasma_client2 + object_id1 = random_object_id() + b1 = client.create(object_id1, 1000) + client.seal(object_id1) + del b1 + assert client.evict(1) == 1000 + + object_id2 = random_object_id() + object_id3 = random_object_id() + b2 = client.create(object_id2, 999) + b3 = client.create(object_id3, 998) + client.seal(object_id3) + del b3 + assert client.evict(1000) == 998 + + object_id4 = random_object_id() + b4 = client.create(object_id4, 997) + client.seal(object_id4) + del b4 + client.seal(object_id2) + del b2 + assert client.evict(1) == 997 + assert client.evict(1) == 999 + + object_id5 = random_object_id() + object_id6 = random_object_id() + object_id7 = random_object_id() + b5 = client.create(object_id5, 996) + b6 = client.create(object_id6, 995) + b7 = client.create(object_id7, 994) + client.seal(object_id5) + client.seal(object_id6) + client.seal(object_id7) + del b5 + del b6 + del b7 + assert client.evict(2000) == 996 + 995 + 994 + + def test_subscribe(self): + # Subscribe to notifications from the Plasma Store. + self.plasma_client.subscribe() + for i in [1, 10, 100, 1000, 10000, 100000]: + object_ids = [random_object_id() for _ in range(i)] + metadata_sizes = [np.random.randint(1000) for _ in range(i)] + data_sizes = [np.random.randint(1000) for _ in range(i)] + for j in range(i): + self.plasma_client.create( + object_ids[j], data_sizes[j], + metadata=bytearray(np.random.bytes(metadata_sizes[j]))) + self.plasma_client.seal(object_ids[j]) + # Check that we received notifications for all of the objects. + for j in range(i): + notification_info = self.plasma_client.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[j] == recv_objid + assert data_sizes[j] == recv_dsize + assert metadata_sizes[j] == recv_msize + + def test_subscribe_deletions(self): + # Subscribe to notifications from the Plasma Store. We use + # plasma_client2 to make sure that all used objects will get evicted + # properly. + self.plasma_client2.subscribe() + for i in [1, 10, 100, 1000, 10000, 100000]: + object_ids = [random_object_id() for _ in range(i)] + # Add 1 to the sizes to make sure we have nonzero object sizes. + metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)] + data_sizes = [np.random.randint(1000) + 1 for _ in range(i)] + for j in range(i): + x = self.plasma_client2.create( + object_ids[j], data_sizes[j], + metadata=bytearray(np.random.bytes(metadata_sizes[j]))) + self.plasma_client2.seal(object_ids[j]) + del x + # Check that we received notifications for creating all of the + # objects. + for j in range(i): + notification_info = self.plasma_client2.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[j] == recv_objid + assert data_sizes[j] == recv_dsize + assert metadata_sizes[j] == recv_msize + + # Check that we receive notifications for deleting all objects, as + # we evict them. + for j in range(i): + assert (self.plasma_client2.evict(1) == + data_sizes[j] + metadata_sizes[j]) + notification_info = self.plasma_client2.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[j] == recv_objid + assert -1 == recv_dsize + assert -1 == recv_msize + + # Test multiple deletion notifications. The first 9 object IDs have + # size 0, and the last has a nonzero size. When Plasma evicts 1 byte, + # it will evict all objects, so we should receive deletion + # notifications for each. + num_object_ids = 10 + object_ids = [random_object_id() for _ in range(num_object_ids)] + metadata_sizes = [0] * (num_object_ids - 1) + data_sizes = [0] * (num_object_ids - 1) + metadata_sizes.append(np.random.randint(1000)) + data_sizes.append(np.random.randint(1000)) + for i in range(num_object_ids): + x = self.plasma_client2.create( + object_ids[i], data_sizes[i], + metadata=bytearray(np.random.bytes(metadata_sizes[i]))) + self.plasma_client2.seal(object_ids[i]) + del x + for i in range(num_object_ids): + notification_info = self.plasma_client2.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[i] == recv_objid + assert data_sizes[i] == recv_dsize + assert metadata_sizes[i] == recv_msize + assert (self.plasma_client2.evict(1) == + data_sizes[-1] + metadata_sizes[-1]) + for i in range(num_object_ids): + notification_info = self.plasma_client2.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[i] == recv_objid + assert -1 == recv_dsize + assert -1 == recv_msize http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/setup.py ---------------------------------------------------------------------- diff --git a/python/setup.py b/python/setup.py index 1ea57ae..7425b71 100644 --- a/python/setup.py +++ b/python/setup.py @@ -99,6 +99,10 @@ class build_ext(_build_ext): self.with_parquet = strtobool( os.environ.get('PYARROW_WITH_PARQUET', '0')) + self.with_plasma = strtobool( + os.environ.get('PYARROW_WITH_PLASMA', '0')) + if self.with_plasma and "plasma" not in self.CYTHON_MODULE_NAMES: + self.CYTHON_MODULE_NAMES.append("plasma") self.bundle_arrow_cpp = strtobool( os.environ.get('PYARROW_BUNDLE_ARROW_CPP', '0')) @@ -242,6 +246,8 @@ class build_ext(_build_ext): shutil.move(pjoin(build_prefix, 'include'), pjoin(build_lib, 'pyarrow')) move_lib("arrow") move_lib("arrow_python") + if self.with_plasma: + move_lib("plasma") if self.with_parquet: move_lib("parquet") @@ -270,11 +276,20 @@ class build_ext(_build_ext): shutil.move(self.get_ext_built_api_header(name), pjoin(os.path.dirname(ext_path), name + '_api.h')) + # Move the plasma store + if self.with_plasma: + build_py = self.get_finalized_command('build_py') + source = os.path.join(self.build_type, "plasma_store") + target = os.path.join(build_lib, build_py.get_package_dir('pyarrow'), "plasma_store") + shutil.move(source, target) + os.chdir(saved_cwd) def _failure_permitted(self, name): if name == '_parquet' and not self.with_parquet: return True + if name == 'plasma' and not self.with_plasma: + return True return False def _get_inplace_dir(self):