Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AC3CC200CE3 for ; Sun, 30 Jul 2017 05:14:15 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AA93D1647DF; Sun, 30 Jul 2017 03:14:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 78EE91647DE for ; Sun, 30 Jul 2017 05:14:14 +0200 (CEST) Received: (qmail 50985 invoked by uid 500); 30 Jul 2017 03:14:13 -0000 Mailing-List: contact commits-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@arrow.apache.org Delivered-To: mailing list commits@arrow.apache.org Received: (qmail 50976 invoked by uid 99); 30 Jul 2017 03:14:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 30 Jul 2017 03:14:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 81FC0E02F8; Sun, 30 Jul 2017 03:14:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wesm@apache.org To: commits@arrow.apache.org Message-Id: <98d2085465cb41dba60362b4389ab132@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: arrow git commit: ARROW-1264: [Python] Raise exception in Python instead of aborting if cannot connect to Plasma store Date: Sun, 30 Jul 2017 03:14:13 +0000 (UTC) archived-at: Sun, 30 Jul 2017 03:14:15 -0000 Repository: arrow Updated Branches: refs/heads/master 4108bda82 -> 2288bfc18 ARROW-1264: [Python] Raise exception in Python instead of aborting if cannot connect to Plasma store cc @pcmoritz @robertnishihara Author: Wes McKinney Closes #912 from wesm/ARROW-1264 and squashes the following commits: bd134d7e [Wes McKinney] Add flags to disable certain classes of unit tests 1d9de777 [Wes McKinney] Raise exception in Python instead of aborting if cannot connect to Plasma store Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/2288bfc1 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/2288bfc1 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/2288bfc1 Branch: refs/heads/master Commit: 2288bfc18fdbd6f50eb6c184d2349bcdd538f469 Parents: 4108bda Author: Wes McKinney Authored: Sat Jul 29 23:14:09 2017 -0400 Committer: Wes McKinney Committed: Sat Jul 29 23:14:09 2017 -0400 ---------------------------------------------------------------------- cpp/src/plasma/client.cc | 8 +++-- cpp/src/plasma/client.h | 4 ++- cpp/src/plasma/io.cc | 30 +++++++++++----- cpp/src/plasma/io.h | 17 ++++++--- python/manylinux1/build_arrow.sh | 5 ++- python/pyarrow/plasma.pyx | 60 ++++++++++++++++++++++---------- python/pyarrow/tests/conftest.py | 9 ++++- python/pyarrow/tests/test_plasma.py | 6 ++++ 8 files changed, 100 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/cpp/src/plasma/client.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index e14b3d9..8ea62c6 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -512,10 +512,12 @@ Status PlasmaClient::GetNotification(int fd, ObjectID* object_id, int64_t* data_ } Status PlasmaClient::Connect(const std::string& store_socket_name, - const std::string& manager_socket_name, int release_delay) { - store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1); + const std::string& manager_socket_name, int release_delay, + int num_retries) { + RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &store_conn_)); if (manager_socket_name != "") { - manager_conn_ = connect_ipc_sock_retry(manager_socket_name, -1, -1); + RETURN_NOT_OK( + ConnectIpcSocketRetry(manager_socket_name, num_retries, -1, &manager_conn_)); } else { manager_conn_ = -1; } http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/cpp/src/plasma/client.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index cc05a06..50ec55f 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -89,9 +89,11 @@ class ARROW_EXPORT PlasmaClient { /// function will not connect to a manager. /// @param release_delay Number of released objects that are kept around /// and not evicted to avoid too many munmaps. + /// @param num_retries number of attempts to connect to IPC socket, default 50 /// @return The return status. Status Connect(const std::string& store_socket_name, - const std::string& manager_socket_name, int release_delay); + const std::string& manager_socket_name, int release_delay, + int num_retries = -1); /// Create an object in the Plasma Store. Any metadata for this object must be /// be passed in when the object is created. http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/cpp/src/plasma/io.cc ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc index e3b6b61..9bb4339 100644 --- a/cpp/src/plasma/io.cc +++ b/cpp/src/plasma/io.cc @@ -17,6 +17,11 @@ #include "plasma/io.h" +#include +#include + +#include "arrow/status.h" + #include "plasma/common.h" using arrow::Status; @@ -29,6 +34,8 @@ using arrow::Status; #define NUM_CONNECT_ATTEMPTS 50 #define CONNECT_TIMEOUT_MS 100 +namespace plasma { + Status WriteBytes(int fd, uint8_t* cursor, size_t length) { ssize_t nbytes = 0; size_t bytesleft = length; @@ -140,8 +147,8 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) { return socket_fd; } -int connect_ipc_sock_retry(const std::string& pathname, int num_retries, - int64_t timeout) { +Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries, + int64_t timeout, int* fd) { /* Pick the default values if the user did not specify. */ if (num_retries < 0) { num_retries = NUM_CONNECT_ATTEMPTS; @@ -150,23 +157,26 @@ int connect_ipc_sock_retry(const std::string& pathname, int num_retries, timeout = CONNECT_TIMEOUT_MS; } - int fd = -1; + *fd = -1; for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) { - fd = connect_ipc_sock(pathname); - if (fd >= 0) { + *fd = connect_ipc_sock(pathname); + if (*fd >= 0) { break; } if (num_attempts == 0) { - ARROW_LOG(ERROR) << "Connection to socket failed for pathname " << pathname; + ARROW_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname + << ", retrying " << num_retries << " times"; } /* Sleep for timeout milliseconds. */ usleep(static_cast(timeout * 1000)); } /* If we could not connect to the socket, exit. */ - if (fd == -1) { - ARROW_LOG(FATAL) << "Could not connect to socket " << pathname; + if (*fd == -1) { + std::stringstream ss; + ss << "Could not connect to socket " << pathname; + return Status::IOError(ss.str()); } - return fd; + return Status::OK(); } int connect_ipc_sock(const std::string& pathname) { @@ -224,3 +234,5 @@ uint8_t* read_message_async(int sock) { } return message; } + +} // namespace plasma http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/cpp/src/plasma/io.h ---------------------------------------------------------------------- diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h index 43c3fb5..ef96c06 100644 --- a/cpp/src/plasma/io.h +++ b/cpp/src/plasma/io.h @@ -34,22 +34,29 @@ #define PLASMA_PROTOCOL_VERSION 0x0000000000000000 #define DISCONNECT_CLIENT 0 -arrow::Status WriteBytes(int fd, uint8_t* cursor, size_t length); +namespace plasma { -arrow::Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes); +using arrow::Status; -arrow::Status ReadBytes(int fd, uint8_t* cursor, size_t length); +Status WriteBytes(int fd, uint8_t* cursor, size_t length); -arrow::Status ReadMessage(int fd, int64_t* type, std::vector* buffer); +Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes); + +Status ReadBytes(int fd, uint8_t* cursor, size_t length); + +Status ReadMessage(int fd, int64_t* type, std::vector* buffer); int bind_ipc_sock(const std::string& pathname, bool shall_listen); int connect_ipc_sock(const std::string& pathname); -int connect_ipc_sock_retry(const std::string& pathname, int num_retries, int64_t timeout); +Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries, + int64_t timeout, int* fd); int AcceptClient(int socket_fd); uint8_t* read_message_async(int sock); +} // namespace plasma + #endif // PLASMA_IO_H http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/python/manylinux1/build_arrow.sh ---------------------------------------------------------------------- diff --git a/python/manylinux1/build_arrow.sh b/python/manylinux1/build_arrow.sh index 5725b2a..5a21e36 100755 --- a/python/manylinux1/build_arrow.sh +++ b/python/manylinux1/build_arrow.sh @@ -80,7 +80,10 @@ for PYTHON in ${PYTHON_VERSIONS}; do echo "=== (${PYTHON}) Testing manylinux1 wheel ===" source /venv-test-${PYTHON}/bin/activate pip install repaired_wheels/*.whl - py.test --parquet /venv-test-${PYTHON}/lib/*/site-packages/pyarrow + + # ARROW-1264; for some reason the test case added causes a segfault inside + # the Docker container when writing and error message to stderr + py.test --parquet /venv-test-${PYTHON}/lib/*/site-packages/pyarrow -v -s --disable-plasma deactivate mv repaired_wheels/*.whl /io/dist http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/python/pyarrow/plasma.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index 8aaca99..dd62d47 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -70,7 +70,8 @@ cdef extern from "plasma/client.h" nogil: CPlasmaClient() CStatus Connect(const c_string& store_socket_name, - const c_string& manager_socket_name, int release_delay) + const c_string& manager_socket_name, + int release_delay, int num_retries) CStatus Create(const CUniqueID& object_id, int64_t data_size, const uint8_t* metadata, int64_t metadata_size, @@ -98,10 +99,13 @@ cdef extern from "plasma/client.h" nogil: CStatus Fetch(int num_object_ids, const CUniqueID* object_ids) - CStatus Wait(int64_t num_object_requests, CObjectRequest* object_requests, - int num_ready_objects, int64_t timeout_ms, int* num_objects_ready); + CStatus Wait(int64_t num_object_requests, + CObjectRequest* object_requests, + int num_ready_objects, int64_t timeout_ms, + int* num_objects_ready); - CStatus Transfer(const char* addr, int port, const CUniqueID& object_id) + CStatus Transfer(const char* addr, int port, + const CUniqueID& object_id) cdef extern from "plasma/client.h" nogil: @@ -247,7 +251,8 @@ cdef class PlasmaClient: def manager_socket_name(self): return self.manager_socket_name.decode() - def create(self, ObjectID object_id, int64_t data_size, c_string metadata=b""): + def create(self, ObjectID object_id, int64_t data_size, + c_string metadata=b""): """ Create a new buffer in the PlasmaStore for a particular object ID. @@ -439,7 +444,8 @@ cdef class PlasmaClient: """ cdef c_string addr = address.encode() with nogil: - check_status(self.client.get().Transfer(addr.c_str(), port, object_id.data)) + check_status(self.client.get() + .Transfer(addr.c_str(), port, object_id.data)) def fetch(self, object_ids): """ @@ -457,7 +463,8 @@ cdef class PlasmaClient: with nogil: check_status(self.client.get().Fetch(ids.size(), ids.data())) - def wait(self, object_ids, int64_t timeout=PLASMA_WAIT_TIMEOUT, int num_returns=1): + def wait(self, object_ids, int64_t timeout=PLASMA_WAIT_TIMEOUT, + int num_returns=1): """ Wait until num_returns objects in object_ids are ready. Currently, the object ID arguments to wait must be unique. @@ -483,14 +490,18 @@ cdef class PlasmaClient: if len(object_ids) != len(set(object_ids)): raise Exception("Wait requires a list of unique object IDs.") cdef int64_t num_object_requests = len(object_ids) - cdef c_vector[CObjectRequest] object_requests = c_vector[CObjectRequest](num_object_requests) + cdef c_vector[CObjectRequest] object_requests = ( + c_vector[CObjectRequest](num_object_requests)) cdef int num_objects_ready = 0 cdef ObjectID object_id for i, object_id in enumerate(object_ids): object_requests[i].object_id = object_id.data object_requests[i].type = PLASMA_QUERY_ANYWHERE with nogil: - check_status(self.client.get().Wait(num_object_requests, object_requests.data(), num_returns, timeout, &num_objects_ready)) + check_status(self.client.get().Wait(num_object_requests, + object_requests.data(), + num_returns, timeout, + &num_objects_ready)) cdef int num_to_return = min(num_objects_ready, num_returns); ready_ids = [] waiting_ids = set(object_ids) @@ -498,9 +509,12 @@ cdef class PlasmaClient: for i in range(len(object_ids)): if num_returned == num_to_return: break - if object_requests[i].status == ObjectStatusLocal or object_requests[i].status == ObjectStatusRemote: - ready_ids.append(ObjectID(object_requests[i].object_id.binary())) - waiting_ids.discard(ObjectID(object_requests[i].object_id.binary())) + if (object_requests[i].status == ObjectStatusLocal or + object_requests[i].status == ObjectStatusRemote): + ready_ids.append( + ObjectID(object_requests[i].object_id.binary())) + waiting_ids.discard( + ObjectID(object_requests[i].object_id.binary())) num_returned += 1 return ready_ids, list(waiting_ids) @@ -526,10 +540,11 @@ cdef class PlasmaClient: cdef int64_t data_size cdef int64_t metadata_size with nogil: - check_status(self.client.get().GetNotification(self.notification_fd, - &object_id.data, - &data_size, - &metadata_size)) + check_status(self.client.get() + .GetNotification(self.notification_fd, + &object_id.data, + &data_size, + &metadata_size)) return object_id, data_size, metadata_size def to_capsule(self): @@ -542,7 +557,9 @@ cdef class PlasmaClient: with nogil: check_status(self.client.get().Disconnect()) -def connect(store_socket_name, manager_socket_name, int release_delay): + +def connect(store_socket_name, manager_socket_name, int release_delay, + int num_retries=-1): """ Return a new PlasmaClient that is connected a plasma store and optionally a manager. @@ -556,11 +573,16 @@ def connect(store_socket_name, manager_socket_name, int release_delay): release_delay : int The maximum number of objects that the client will keep and delay releasing (for caching reasons). + num_retries : int, default -1 + Number of times tor ty to connect to plasma store. Default value of -1 + uses the default (50) """ cdef PlasmaClient result = PlasmaClient() result.store_socket_name = store_socket_name.encode() result.manager_socket_name = manager_socket_name.encode() with nogil: - check_status(result.client.get().Connect(result.store_socket_name, - result.manager_socket_name, release_delay)) + check_status(result.client.get() + .Connect(result.store_socket_name, + result.manager_socket_name, + release_delay, num_retries)) return result http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/python/pyarrow/tests/conftest.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py index f2d67f6..651438b 100644 --- a/python/pyarrow/tests/conftest.py +++ b/python/pyarrow/tests/conftest.py @@ -52,6 +52,11 @@ def pytest_addoption(parser): help=('Enable the {0} test group'.format(group))) for group in groups: + parser.addoption('--disable-{0}'.format(group), action='store_true', + default=False, + help=('Disable the {0} test group'.format(group))) + + for group in groups: parser.addoption('--only-{0}'.format(group), action='store_true', default=False, help=('Run only the {0} test group'.format(group))) @@ -62,12 +67,14 @@ def pytest_runtest_setup(item): for group in groups: only_flag = '--only-{0}'.format(group) + disable_flag = '--disable-{0}'.format(group) flag = '--{0}'.format(group) if item.config.getoption(only_flag): only_set = True elif getattr(item.obj, group, None): - if not item.config.getoption(flag): + if (item.config.getoption(disable_flag) or + not item.config.getoption(flag)): skip('{0} NOT enabled'.format(flag)) if only_set: http://git-wip-us.apache.org/repos/asf/arrow/blob/2288bfc1/python/pyarrow/tests/test_plasma.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index e168d9f..04162bb 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -168,6 +168,12 @@ class TestPlasmaClient(object): else: self.p.kill() + def test_connection_failure_raises_exception(self): + import pyarrow.plasma as plasma + # ARROW-1264 + with pytest.raises(IOError): + plasma.connect('unknown-store-name', '', 0, 1) + def test_create(self): # Create an object id string. object_id = random_object_id()