arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-1438: [Python] Pull serialization context through PlasmaClient put and get
Date Fri, 01 Sep 2017 12:42:45 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 9ab0c9552 -> c6295f3b7


ARROW-1438: [Python] Pull serialization context through PlasmaClient put and get

Author: Philipp Moritz <pcmoritz@gmail.com>

Closes #1019 from pcmoritz/plasma-serialization-context and squashes the following commits:

caf85165 [Philipp Moritz] fix
8e8e9011 [Philipp Moritz] add error handling
1a4fade0 [Philipp Moritz] add tests and fixes
2389659e [Philipp Moritz] Pull serialization context through PlasmaClient put and get


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/c6295f3b
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/c6295f3b
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/c6295f3b

Branch: refs/heads/master
Commit: c6295f3b74bcc2fa9ea1b9442f922bf564669b8e
Parents: 9ab0c95
Author: Philipp Moritz <pcmoritz@gmail.com>
Authored: Fri Sep 1 08:42:38 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Fri Sep 1 08:42:38 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/python/python_to_arrow.cc |  8 +++++++-
 cpp/src/arrow/status.h                  |  7 +++++++
 python/pyarrow/__init__.py              |  4 +++-
 python/pyarrow/error.pxi                |  6 ++++++
 python/pyarrow/includes/common.pxd      |  1 +
 python/pyarrow/plasma.pyx               | 17 ++++++++++++-----
 python/pyarrow/serialization.pxi        | 17 ++++++++---------
 python/pyarrow/tests/test_plasma.py     | 22 ++++++++++++++++++++++
 8 files changed, 66 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/c6295f3b/cpp/src/arrow/python/python_to_arrow.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc
index 9ae66dc..c5cfd6e 100644
--- a/cpp/src/arrow/python/python_to_arrow.cc
+++ b/cpp/src/arrow/python/python_to_arrow.cc
@@ -325,10 +325,16 @@ Status CallCustomCallback(PyObject* context, PyObject* method_name,
PyObject* el
     std::stringstream ss;
     ScopedRef repr(PyObject_Repr(elem));
     RETURN_IF_PYERROR();
+#if PY_MAJOR_VERSION >= 3
     ScopedRef ascii(PyUnicode_AsASCIIString(repr.get()));
+    RETURN_IF_PYERROR();
     ss << "error while calling callback on " << PyBytes_AsString(ascii.get())
        << ": handler not registered";
-    return Status::NotImplemented(ss.str());
+#else
+    ss << "error while calling callback on " << PyString_AsString(repr.get())
+       << ": handler not registered";
+#endif
+    return Status::SerializationError(ss.str());
   } else {
     *result = PyObject_CallMethodObjArgs(context, method_name, elem, NULL);
     RETURN_IF_PYERROR();

http://git-wip-us.apache.org/repos/asf/arrow/blob/c6295f3b/cpp/src/arrow/status.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/status.h b/cpp/src/arrow/status.h
index a02752f..d52addf 100644
--- a/cpp/src/arrow/status.h
+++ b/cpp/src/arrow/status.h
@@ -71,6 +71,7 @@ enum class StatusCode : char {
   IOError = 5,
   UnknownError = 9,
   NotImplemented = 10,
+  SerializationError = 11,
   PlasmaObjectExists = 20,
   PlasmaObjectNonexistent = 21,
   PlasmaStoreFull = 22
@@ -125,6 +126,10 @@ class ARROW_EXPORT Status {
     return Status(StatusCode::IOError, msg);
   }
 
+  static Status SerializationError(const std::string& msg) {
+    return Status(StatusCode::SerializationError, msg);
+  }
+
   static Status PlasmaObjectExists(const std::string& msg) {
     return Status(StatusCode::PlasmaObjectExists, msg);
   }
@@ -147,6 +152,8 @@ class ARROW_EXPORT Status {
   bool IsTypeError() const { return code() == StatusCode::TypeError; }
   bool IsUnknownError() const { return code() == StatusCode::UnknownError; }
   bool IsNotImplemented() const { return code() == StatusCode::NotImplemented; }
+  // An object could not be serialized or deserialized.
+  bool IsSerializationError() const { return code() == StatusCode::SerializationError; }
   // An object with this object ID already exists in the plasma store.
   bool IsPlasmaObjectExists() const { return code() == StatusCode::PlasmaObjectExists; }
   // An object was requested that doesn't exist in the plasma store.

http://git-wip-us.apache.org/repos/asf/arrow/blob/c6295f3b/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index f97d356..a4a6ed4 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -86,13 +86,15 @@ from pyarrow.lib import (ArrowException,
                          ArrowMemoryError,
                          ArrowNotImplementedError,
                          ArrowTypeError,
+                         ArrowSerializationError,
                          PlasmaObjectExists)
 
 # Serialization
 from pyarrow.lib import (deserialize_from, deserialize,
                          serialize, serialize_to, read_serialized,
                          SerializedPyObject, SerializationContext,
-                         SerializationException, DeserializationException)
+                         SerializationCallbackError,
+                         DeserializationCallbackError)
 
 from pyarrow.filesystem import FileSystem, LocalFileSystem
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/c6295f3b/python/pyarrow/error.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pxi b/python/pyarrow/error.pxi
index 8793c4e..2a21302 100644
--- a/python/pyarrow/error.pxi
+++ b/python/pyarrow/error.pxi
@@ -60,6 +60,10 @@ class PlasmaStoreFull(ArrowException):
     pass
 
 
+class ArrowSerializationError(ArrowException):
+    pass
+
+
 cdef int check_status(const CStatus& status) nogil except -1:
     if status.ok():
         return 0
@@ -84,6 +88,8 @@ cdef int check_status(const CStatus& status) nogil except -1:
             raise PlasmaObjectNonexistent(message)
         elif status.IsPlasmaStoreFull():
             raise PlasmaStoreFull(message)
+        elif status.IsSerializationError():
+            raise ArrowSerializationError(message)
         else:
             message = frombytes(status.ToString())
             raise ArrowException(message)

http://git-wip-us.apache.org/repos/asf/arrow/blob/c6295f3b/python/pyarrow/includes/common.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd
index 6be08b0..1bd840c 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -51,6 +51,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         c_bool IsKeyError()
         c_bool IsNotImplemented()
         c_bool IsTypeError()
+        c_bool IsSerializationError()
         c_bool IsPlasmaObjectExists()
         c_bool IsPlasmaObjectNonexistent()
         c_bool IsPlasmaStoreFull()

http://git-wip-us.apache.org/repos/asf/arrow/blob/c6295f3b/python/pyarrow/plasma.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index cb19bea..bc0e94e 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -370,7 +370,8 @@ cdef class PlasmaClient:
                                          object_buffers[i].metadata_size))
         return result
 
-    def put(self, object value, ObjectID object_id=None):
+    def put(self, object value, ObjectID object_id=None,
+            serialization_context=None):
         """
         Store a Python value into the object store.
 
@@ -381,6 +382,8 @@ cdef class PlasmaClient:
         object_id : ObjectID, default None
             If this is provided, the specified object ID will be used to refer
             to the object.
+        serialization_context : pyarrow.SerializationContext, default None
+            Custom serialization and deserialization context.
 
         Returns
         -------
@@ -388,7 +391,7 @@ cdef class PlasmaClient:
         """
         cdef ObjectID target_id = (object_id if object_id
                                    else ObjectID.from_random())
-        serialized = pyarrow.serialize(value)
+        serialized = pyarrow.serialize(value, serialization_context)
         buffer = self.create(target_id, serialized.total_bytes)
         stream = pyarrow.FixedSizeBufferWriter(buffer)
         stream.set_memcopy_threads(4)
@@ -396,7 +399,7 @@ cdef class PlasmaClient:
         self.seal(target_id)
         return target_id
 
-    def get(self, object_ids, int timeout_ms=-1):
+    def get(self, object_ids, int timeout_ms=-1, serialization_context=None):
         """
         Get one or more Python values from the object store.
 
@@ -409,6 +412,8 @@ cdef class PlasmaClient:
             The number of milliseconds that the get call should block before
             timing out and returning. Pass -1 if the call should block and 0
             if the call should return immediately.
+        serialization_context : pyarrow.SerializationContext, default None
+            Custom serialization and deserialization context.
 
         Returns
         -------
@@ -424,12 +429,14 @@ cdef class PlasmaClient:
                 # buffers[i] is None if this object was not available within
                 # the timeout
                 if buffers[i]:
-                    results.append(pyarrow.deserialize(buffers[i]))
+                    val = pyarrow.deserialize(buffers[i],
+                                              serialization_context)
+                    results.append(val)
                 else:
                     results.append(ObjectNotAvailable)
             return results
         else:
-            return self.get([object_ids], timeout_ms)[0]
+            return self.get([object_ids], timeout_ms, serialization_context)[0]
 
     def seal(self, ObjectID object_id):
         """

http://git-wip-us.apache.org/repos/asf/arrow/blob/c6295f3b/python/pyarrow/serialization.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi
index 5c0fbc6..f38845e 100644
--- a/python/pyarrow/serialization.pxi
+++ b/python/pyarrow/serialization.pxi
@@ -31,15 +31,15 @@ def is_named_tuple(cls):
     return all(type(n) == str for n in f)
 
 
-class SerializationException(Exception):
+class SerializationCallbackError(ArrowSerializationError):
     def __init__(self, message, example_object):
-        Exception.__init__(self, message)
+        ArrowSerializationError.__init__(self, message)
         self.example_object = example_object
 
 
-class DeserializationException(Exception):
+class DeserializationCallbackError(ArrowSerializationError):
     def __init__(self, message, type_id):
-        Exception.__init__(self, message)
+        ArrowSerializationError.__init__(self, message)
         self.type_id = type_id
 
 
@@ -89,10 +89,9 @@ cdef class SerializationContext:
 
     def _serialize_callback(self, obj):
         if type(obj) not in self.type_to_type_id:
-            raise SerializationException("pyarrow does not know how to "
-                                         "serialize objects of type {}."
-                                         .format(type(obj)),
-                                         obj)
+            raise SerializationCallbackError(
+                "pyarrow does not know how to "
+                "serialize objects of type {}.".format(type(obj)), obj)
         type_id = self.type_to_type_id[type(obj)]
         if type_id in self.types_to_pickle:
             serialized_obj = {"data": pickle.dumps(obj), "pickle": True}
@@ -107,7 +106,7 @@ cdef class SerializationContext:
             else:
                 msg = "We do not know how to serialize " \
                       "the object '{}'".format(obj)
-                raise SerializationException(msg, obj)
+                raise SerializationCallbackError(msg, obj)
         return dict(serialized_obj, **{"_pytype_": type_id})
 
     def _deserialize_callback(self, serialized_obj):

http://git-wip-us.apache.org/repos/asf/arrow/blob/c6295f3b/python/pyarrow/tests/test_plasma.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index 9af97df..b73d92d 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -287,6 +287,28 @@ class TestPlasmaClient(object):
             [result] = self.plasma_client.get([object_id], timeout_ms=0)
             assert result == pa.plasma.ObjectNotAvailable
 
+    def test_put_and_get_serialization_context(self):
+
+        class CustomType(object):
+            def __init__(self, val):
+                self.val = val
+
+        val = CustomType(42)
+
+        with pytest.raises(pa.ArrowSerializationError):
+            self.plasma_client.put(val)
+
+        serialization_context = pa.SerializationContext()
+        serialization_context.register_type(CustomType, 20*b"\x00")
+
+        object_id = self.plasma_client.put(val, None, serialization_context)
+
+        with pytest.raises(pa.ArrowSerializationError):
+            result = self.plasma_client.get(object_id)
+
+        result = self.plasma_client.get(object_id, -1, serialization_context)
+        assert result.val == val.val
+
     def test_store_arrow_objects(self):
         data = np.random.randn(10, 4)
         # Write an arrow object.


Mime
View raw message