arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pcmor...@apache.org
Subject arrow git commit: ARROW-1695: [Serialization] Fix reference counting of numpy arrays created in custom serializer
Date Fri, 20 Oct 2017 21:27:40 GMT
Repository: arrow
Updated Branches:
  refs/heads/master a8f518588 -> 971e99dde


ARROW-1695: [Serialization] Fix reference counting of numpy arrays created in custom serializer

This uses the NumPyBuffer built into Arrow's Tensor facility to protect the numpy arrays holding
the Tensors to be serialized. See also the problem description in https://issues.apache.org/jira/browse/ARROW-1695.

Author: Philipp Moritz <pcmoritz@gmail.com>

Closes #1220 from pcmoritz/fix-serialize-tensors and squashes the following commits:

7e23bb5 [Philipp Moritz] fix linting
dce92ad [Philipp Moritz] fix handling of numpy arrays generated in the custom serializer methods


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/971e99dd
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/971e99dd
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/971e99dd

Branch: refs/heads/master
Commit: 971e99dde3ebabcd6791f4e936c0273938c45893
Parents: a8f5185
Author: Philipp Moritz <pcmoritz@gmail.com>
Authored: Fri Oct 20 14:27:38 2017 -0700
Committer: Philipp Moritz <pcmoritz@gmail.com>
Committed: Fri Oct 20 14:27:38 2017 -0700

----------------------------------------------------------------------
 cpp/src/arrow/python/python_to_arrow.cc    | 27 +++++++++++--------------
 python/pyarrow/tests/test_serialization.py | 21 +++++++++++++++++++
 2 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/971e99dd/cpp/src/arrow/python/python_to_arrow.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc
index 47d48d7..a46d10d 100644
--- a/cpp/src/arrow/python/python_to_arrow.cc
+++ b/cpp/src/arrow/python/python_to_arrow.cc
@@ -390,15 +390,15 @@ Status CallDeserializeCallback(PyObject* context, PyObject* value,
 
 Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts,
                      int32_t recursion_depth, std::shared_ptr<Array>* out,
-                     std::vector<PyObject*>* tensors_out);
+                     std::vector<std::shared_ptr<Tensor>>* tensors_out);
 
 Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder,
                       std::vector<PyObject*>* subdicts,
-                      std::vector<PyObject*>* tensors_out);
+                      std::vector<std::shared_ptr<Tensor>>* tensors_out);
 
 Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences,
                           int32_t recursion_depth, std::shared_ptr<Array>* out,
-                          std::vector<PyObject*>* tensors_out);
+                          std::vector<std::shared_ptr<Tensor>>* tensors_out);
 
 Status AppendScalar(PyObject* obj, SequenceBuilder* builder) {
   if (PyArray_IsScalar(obj, Bool)) {
@@ -444,7 +444,7 @@ Status AppendScalar(PyObject* obj, SequenceBuilder* builder) {
 Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
               std::vector<PyObject*>* sublists, std::vector<PyObject*>* subtuples,
               std::vector<PyObject*>* subdicts, std::vector<PyObject*>* subsets,
-              std::vector<PyObject*>* tensors_out) {
+              std::vector<std::shared_ptr<Tensor>>* tensors_out) {
   // The bool case must precede the int case (PyInt_Check passes for bools)
   if (PyBool_Check(elem)) {
     RETURN_NOT_OK(builder->AppendBool(elem == Py_True));
@@ -525,7 +525,7 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
 
 Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder,
                       std::vector<PyObject*>* subdicts,
-                      std::vector<PyObject*>* tensors_out) {
+                      std::vector<std::shared_ptr<Tensor>>* tensors_out) {
   int dtype = PyArray_TYPE(array);
   switch (dtype) {
     case NPY_UINT8:
@@ -540,7 +540,10 @@ Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder*
     case NPY_FLOAT:
     case NPY_DOUBLE: {
       RETURN_NOT_OK(builder->AppendTensor(static_cast<int32_t>(tensors_out->size())));
-      tensors_out->push_back(reinterpret_cast<PyObject*>(array));
+      std::shared_ptr<Tensor> tensor;
+      RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(),
+                                    reinterpret_cast<PyObject*>(array), &tensor));
+      tensors_out->push_back(tensor);
     } break;
     default: {
       PyObject* serialized_object;
@@ -556,7 +559,7 @@ Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder*
 
 Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences,
                           int32_t recursion_depth, std::shared_ptr<Array>* out,
-                          std::vector<PyObject*>* tensors_out) {
+                          std::vector<std::shared_ptr<Tensor>>* tensors_out)
{
   DCHECK(out);
   if (recursion_depth >= kMaxRecursionDepth) {
     return Status::NotImplemented(
@@ -603,7 +606,7 @@ Status SerializeSequences(PyObject* context, std::vector<PyObject*>
sequences,
 
 Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts,
                      int32_t recursion_depth, std::shared_ptr<Array>* out,
-                     std::vector<PyObject*>* tensors_out) {
+                     std::vector<std::shared_ptr<Tensor>>* tensors_out) {
   DictBuilder result;
   if (recursion_depth >= kMaxRecursionDepth) {
     return Status::NotImplemented(
@@ -686,14 +689,8 @@ Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject
   PyDateTime_IMPORT;
   std::vector<PyObject*> sequences = {sequence};
   std::shared_ptr<Array> array;
-  std::vector<PyObject*> py_tensors;
-  RETURN_NOT_OK(SerializeSequences(context, sequences, 0, &array, &py_tensors));
+  RETURN_NOT_OK(SerializeSequences(context, sequences, 0, &array, &out->tensors));
   out->batch = MakeBatch(array);
-  for (const auto& py_tensor : py_tensors) {
-    std::shared_ptr<Tensor> arrow_tensor;
-    RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(), py_tensor, &arrow_tensor));
-    out->tensors.push_back(arrow_tensor);
-  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/971e99dd/python/pyarrow/tests/test_serialization.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py
index 460a11b..fea7cea 100644
--- a/python/pyarrow/tests/test_serialization.py
+++ b/python/pyarrow/tests/test_serialization.py
@@ -297,6 +297,27 @@ def test_numpy_immutable(large_memory_map):
             result[0] = 1.0
 
 
+# see https://issues.apache.org/jira/browse/ARROW-1695
+def test_serialization_callback_numpy():
+
+    class DummyClass(object):
+        pass
+
+    def serialize_dummy_class(obj):
+        x = np.zeros(4)
+        return x
+
+    def deserialize_dummy_class(serialized_obj):
+        return serialized_obj
+
+    pa._default_serialization_context.register_type(
+        DummyClass, "DummyClass", pickle=False,
+        custom_serializer=serialize_dummy_class,
+        custom_deserializer=deserialize_dummy_class)
+
+    pa.serialize(DummyClass())
+
+
 @pytest.mark.skip(reason="extensive memory requirements")
 def test_arrow_limits(self):
     def huge_memory_map(temp_dir):


Mime
View raw message