arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [arrow] branch master updated: ARROW-1522: [Python] Zero copy buffer deserialization
Date Sun, 22 Oct 2017 11:49:01 GMT
This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ee5508  ARROW-1522: [Python] Zero copy buffer deserialization
9ee5508 is described below

commit 9ee5508ab3ec010b256c88181ba3215d5e966b04
Author: Philipp Moritz <pcmoritz@gmail.com>
AuthorDate: Sun Oct 22 13:48:55 2017 +0200

    ARROW-1522: [Python] Zero copy buffer deserialization
    
    This PR makes it possible to add serialization handlers that allow to deserialize python
objects using zero copy. If the serialization handler returns an arrow buffer, it will be
appended to the serialized objects and the object can be reconstructed from there without
copying the buffer.
    
    TODO before merge:
    
    - [x] Add pandas zero copy buffer read
    - [x] See if fixing the TODO about union tags makes the code cleaner
    
    Author: Philipp Moritz <pcmoritz@gmail.com>
    
    Closes #1231 from pcmoritz/buffer-serialization and squashes the following commits:
    
    10a01c0 [Philipp Moritz] run clang-format
    529366d [Philipp Moritz] fix linting
    7d88a67 [Philipp Moritz] fixes and make pandas deserialization zero-copy
    2b75a0d [Philipp Moritz] fix linting
    e078cb0 [Philipp Moritz] use union array fields to distinguish between ints and tensor/buffer
references
    dbd2364 [Philipp Moritz] add read path
    0cc6d45 [Philipp Moritz] add read path
    9f0173d [Philipp Moritz] buffer serialization write path
---
 cpp/src/arrow/python/arrow_to_python.cc    | 127 ++++++++++++++++-------------
 cpp/src/arrow/python/python_to_arrow.cc    |  94 +++++++++++++--------
 cpp/src/arrow/python/python_to_arrow.h     |   3 +
 python/pyarrow/serialization.py            |   5 +-
 python/pyarrow/tests/test_serialization.py |  19 +++++
 5 files changed, 153 insertions(+), 95 deletions(-)

diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc
index 761d290..9686050 100644
--- a/cpp/src/arrow/python/arrow_to_python.cc
+++ b/cpp/src/arrow/python/arrow_to_python.cc
@@ -21,6 +21,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <string>
 #include <vector>
 
 #include <numpy/arrayobject.h>
@@ -32,6 +33,7 @@
 #include "arrow/python/common.h"
 #include "arrow/python/helpers.h"
 #include "arrow/python/numpy_convert.h"
+#include "arrow/python/pyarrow.h"
 #include "arrow/python/python_to_arrow.h"
 #include "arrow/python/util/datetime.h"
 #include "arrow/table.h"
@@ -44,23 +46,19 @@ Status CallDeserializeCallback(PyObject* context, PyObject* value,
                                PyObject** deserialized_object);
 
 Status DeserializeTuple(PyObject* context, const Array& array, int64_t start_idx,
-                        int64_t stop_idx, PyObject* base,
-                        const std::vector<std::shared_ptr<Tensor>>& tensors,
+                        int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                         PyObject** out);
 
 Status DeserializeList(PyObject* context, const Array& array, int64_t start_idx,
-                       int64_t stop_idx, PyObject* base,
-                       const std::vector<std::shared_ptr<Tensor>>& tensors,
+                       int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                        PyObject** out);
 
 Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx,
-                      int64_t stop_idx, PyObject* base,
-                      const std::vector<std::shared_ptr<Tensor>>& tensors,
+                      int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                       PyObject** out);
 
 Status DeserializeDict(PyObject* context, const Array& array, int64_t start_idx,
-                       int64_t stop_idx, PyObject* base,
-                       const std::vector<std::shared_ptr<Tensor>>& tensors,
+                       int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                        PyObject** out) {
   const auto& data = static_cast<const StructArray&>(array);
   ScopedRef keys, vals;
@@ -69,10 +67,10 @@ Status DeserializeDict(PyObject* context, const Array& array, int64_t
start_idx,
 
   DCHECK_EQ(2, data.num_fields());
 
-  RETURN_NOT_OK(DeserializeList(context, *data.field(0), start_idx, stop_idx, base,
-                                tensors, keys.ref()));
-  RETURN_NOT_OK(DeserializeList(context, *data.field(1), start_idx, stop_idx, base,
-                                tensors, vals.ref()));
+  RETURN_NOT_OK(DeserializeList(context, *data.field(0), start_idx, stop_idx, base, blobs,
+                                keys.ref()));
+  RETURN_NOT_OK(DeserializeList(context, *data.field(1), start_idx, stop_idx, base, blobs,
+                                vals.ref()));
   for (int64_t i = start_idx; i < stop_idx; ++i) {
     // PyDict_SetItem behaves differently from PyList_SetItem and PyTuple_SetItem.
     // The latter two steal references whereas PyDict_SetItem does not. So we need
@@ -91,10 +89,9 @@ Status DeserializeDict(PyObject* context, const Array& array, int64_t
start_idx,
 }
 
 Status DeserializeArray(const Array& array, int64_t offset, PyObject* base,
-                        const std::vector<std::shared_ptr<arrow::Tensor>>&
tensors,
-                        PyObject** out) {
+                        const SerializedPyObject& blobs, PyObject** out) {
   int32_t index = static_cast<const Int32Array&>(array).Value(offset);
-  RETURN_NOT_OK(py::TensorToNdarray(*tensors[index], base, out));
+  RETURN_NOT_OK(py::TensorToNdarray(*blobs.tensors[index], base, out));
   // Mark the array as immutable
   ScopedRef flags(PyObject_GetAttrString(*out, "flags"));
   DCHECK(flags.get() != NULL) << "Could not mark Numpy array immutable";
@@ -104,9 +101,9 @@ Status DeserializeArray(const Array& array, int64_t offset, PyObject*
base,
   return Status::OK();
 }
 
-Status GetValue(PyObject* context, const Array& arr, int64_t index, int32_t type,
-                PyObject* base, const std::vector<std::shared_ptr<Tensor>>&
tensors,
-                PyObject** result) {
+Status GetValue(PyObject* context, const UnionArray& parent, const Array& arr,
+                int64_t index, int32_t type, PyObject* base,
+                const SerializedPyObject& blobs, PyObject** result) {
   switch (arr.type()->id()) {
     case Type::BOOL:
       *result = PyBool_FromLong(static_cast<const BooleanArray&>(arr).Value(index));
@@ -151,69 +148,72 @@ Status GetValue(PyObject* context, const Array& arr, int64_t index,
int32_t type
       const auto& l = static_cast<const ListArray&>(*s.field(0));
       if (s.type()->child(0)->name() == "list") {
         return DeserializeList(context, *l.values(), l.value_offset(index),
-                               l.value_offset(index + 1), base, tensors, result);
+                               l.value_offset(index + 1), base, blobs, result);
       } else if (s.type()->child(0)->name() == "tuple") {
         return DeserializeTuple(context, *l.values(), l.value_offset(index),
-                                l.value_offset(index + 1), base, tensors, result);
+                                l.value_offset(index + 1), base, blobs, result);
       } else if (s.type()->child(0)->name() == "dict") {
         return DeserializeDict(context, *l.values(), l.value_offset(index),
-                               l.value_offset(index + 1), base, tensors, result);
+                               l.value_offset(index + 1), base, blobs, result);
       } else if (s.type()->child(0)->name() == "set") {
         return DeserializeSet(context, *l.values(), l.value_offset(index),
-                              l.value_offset(index + 1), base, tensors, result);
+                              l.value_offset(index + 1), base, blobs, result);
       } else {
         DCHECK(false) << "unexpected StructArray type " << s.type()->child(0)->name();
       }
     }
-    // We use an Int32Builder here to distinguish the tensor indices from
-    // the Type::INT64 above (see tensor_indices_ in SequenceBuilder).
-    case Type::INT32: {
-      return DeserializeArray(arr, index, base, tensors, result);
+    default: {
+      const std::string& child_name = parent.type()->child(type)->name();
+      if (child_name == "tensor") {
+        return DeserializeArray(arr, index, base, blobs, result);
+      } else if (child_name == "buffer") {
+        int32_t ref = static_cast<const Int32Array&>(arr).Value(index);
+        *result = wrap_buffer(blobs.buffers[ref]);
+        return Status::OK();
+      } else {
+        DCHECK(false) << "union tag " << type << " with child name '" <<
child_name
+                      << "' not recognized";
+      }
     }
-    default:
-      DCHECK(false) << "union tag " << type << " not recognized";
   }
   return Status::OK();
 }
 
-#define DESERIALIZE_SEQUENCE(CREATE_FN, SET_ITEM_FN)                               \
-  const auto& data = static_cast<const UnionArray&>(array);               
        \
-  ScopedRef result(CREATE_FN(stop_idx - start_idx));                               \
-  const uint8_t* type_ids = data.raw_type_ids();                                   \
-  const int32_t* value_offsets = data.raw_value_offsets();                         \
-  for (int64_t i = start_idx; i < stop_idx; ++i) {                                 \
-    if (data.IsNull(i)) {                                                          \
-      Py_INCREF(Py_None);                                                          \
-      SET_ITEM_FN(result.get(), i - start_idx, Py_None);                           \
-    } else {                                                                       \
-      int64_t offset = value_offsets[i];                                           \
-      uint8_t type = type_ids[i];                                                  \
-      PyObject* value;                                                             \
-      RETURN_NOT_OK(GetValue(context, *data.UnsafeChild(type), offset, type, base, \
-                             tensors, &value));                                    \
-      SET_ITEM_FN(result.get(), i - start_idx, value);                             \
-    }                                                                              \
-  }                                                                                \
-  *out = result.release();                                                         \
+#define DESERIALIZE_SEQUENCE(CREATE_FN, SET_ITEM_FN)                                    
\
+  const auto& data = static_cast<const UnionArray&>(array);               
              \
+  ScopedRef result(CREATE_FN(stop_idx - start_idx));                                    
\
+  const uint8_t* type_ids = data.raw_type_ids();                                        
\
+  const int32_t* value_offsets = data.raw_value_offsets();                              
\
+  for (int64_t i = start_idx; i < stop_idx; ++i) {                                   
   \
+    if (data.IsNull(i)) {                                                               
\
+      Py_INCREF(Py_None);                                                               
\
+      SET_ITEM_FN(result.get(), i - start_idx, Py_None);                                
\
+    } else {                                                                            
\
+      int64_t offset = value_offsets[i];                                                
\
+      uint8_t type = type_ids[i];                                                       
\
+      PyObject* value;                                                                  
\
+      RETURN_NOT_OK(GetValue(context, data, *data.UnsafeChild(type), offset, type, base,
\
+                             blobs, &value));                                       
    \
+      SET_ITEM_FN(result.get(), i - start_idx, value);                                  
\
+    }                                                                                   
\
+  }                                                                                     
\
+  *out = result.release();                                                              
\
   return Status::OK()
 
 Status DeserializeList(PyObject* context, const Array& array, int64_t start_idx,
-                       int64_t stop_idx, PyObject* base,
-                       const std::vector<std::shared_ptr<Tensor>>& tensors,
+                       int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                        PyObject** out) {
   DESERIALIZE_SEQUENCE(PyList_New, PyList_SET_ITEM);
 }
 
 Status DeserializeTuple(PyObject* context, const Array& array, int64_t start_idx,
-                        int64_t stop_idx, PyObject* base,
-                        const std::vector<std::shared_ptr<Tensor>>& tensors,
+                        int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                         PyObject** out) {
   DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SET_ITEM);
 }
 
 Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx,
-                      int64_t stop_idx, PyObject* base,
-                      const std::vector<std::shared_ptr<Tensor>>& tensors,
+                      int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                       PyObject** out) {
   const auto& data = static_cast<const UnionArray&>(array);
   ScopedRef result(PySet_New(nullptr));
@@ -229,8 +229,8 @@ Status DeserializeSet(PyObject* context, const Array& array, int64_t
start_idx,
       int32_t offset = value_offsets[i];
       int8_t type = type_ids[i];
       PyObject* value;
-      RETURN_NOT_OK(GetValue(context, *data.UnsafeChild(type), offset, type, base,
-                             tensors, &value));
+      RETURN_NOT_OK(GetValue(context, data, *data.UnsafeChild(type), offset, type, base,
+                             blobs, &value));
       if (PySet_Add(result.get(), value) < 0) {
         RETURN_IF_PYERROR();
       }
@@ -244,9 +244,12 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject*
out)
   int64_t offset;
   int64_t bytes_read;
   int32_t num_tensors;
+  int32_t num_buffers;
   // Read number of tensors
   RETURN_NOT_OK(
       src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_tensors)));
+  RETURN_NOT_OK(
+      src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_buffers)));
 
   std::shared_ptr<RecordBatchReader> reader;
   RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader));
@@ -260,6 +263,18 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject*
out)
     out->tensors.push_back(tensor);
     RETURN_NOT_OK(src->Tell(&offset));
   }
+
+  for (int i = 0; i < num_buffers; ++i) {
+    int64_t size;
+    RETURN_NOT_OK(src->ReadAt(offset, sizeof(int64_t), &bytes_read,
+                              reinterpret_cast<uint8_t*>(&size)));
+    RETURN_NOT_OK(src->Tell(&offset));
+    std::shared_ptr<Buffer> buffer;
+    RETURN_NOT_OK(src->ReadAt(offset, size, &buffer));
+    out->buffers.push_back(buffer);
+    RETURN_NOT_OK(src->Tell(&offset));
+  }
+
   return Status::OK();
 }
 
@@ -268,7 +283,7 @@ Status DeserializeObject(PyObject* context, const SerializedPyObject&
obj, PyObj
   PyAcquireGIL lock;
   PyDateTime_IMPORT;
   return DeserializeList(context, *obj.batch->column(0), 0, obj.batch->num_rows(),
base,
-                         obj.tensors, out);
+                         obj, out);
 }
 
 }  // namespace py
diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc
index a46d10d..b0c6287 100644
--- a/cpp/src/arrow/python/python_to_arrow.cc
+++ b/cpp/src/arrow/python/python_to_arrow.cc
@@ -63,6 +63,7 @@ class SequenceBuilder {
         doubles_(::arrow::float64(), pool),
         date64s_(::arrow::date64(), pool),
         tensor_indices_(::arrow::int32(), pool),
+        buffer_indices_(::arrow::int32(), pool),
         list_offsets_({0}),
         tuple_offsets_({0}),
         dict_offsets_({0}),
@@ -146,6 +147,14 @@ class SequenceBuilder {
     return tensor_indices_.Append(tensor_index);
   }
 
+  /// Appending a buffer to the sequence
+  ///
+  /// \param buffer_index Indes of the buffer in the object.
+  Status AppendBuffer(const int32_t buffer_index) {
+    RETURN_NOT_OK(Update(buffer_indices_.length(), &buffer_tag_));
+    return buffer_indices_.Append(buffer_index);
+  }
+
   /// Add a sublist to the sequence. The data contained in the sublist will be
   /// specified in the "Finish" method.
   ///
@@ -184,9 +193,9 @@ class SequenceBuilder {
   }
 
   template <typename BuilderType>
-  Status AddElement(const int8_t tag, BuilderType* out) {
+  Status AddElement(const int8_t tag, BuilderType* out, const std::string& name = "")
{
     if (tag != -1) {
-      fields_[tag] = ::arrow::field("", out->type());
+      fields_[tag] = ::arrow::field(name, out->type());
       RETURN_NOT_OK(out->Finish(&children_[tag]));
       RETURN_NOT_OK(nones_.AppendToBitmap(true));
       type_ids_.push_back(tag);
@@ -232,7 +241,8 @@ class SequenceBuilder {
     RETURN_NOT_OK(AddElement(float_tag_, &floats_));
     RETURN_NOT_OK(AddElement(double_tag_, &doubles_));
     RETURN_NOT_OK(AddElement(date64_tag_, &date64s_));
-    RETURN_NOT_OK(AddElement(tensor_tag_, &tensor_indices_));
+    RETURN_NOT_OK(AddElement(tensor_tag_, &tensor_indices_, "tensor"));
+    RETURN_NOT_OK(AddElement(buffer_tag_, &buffer_indices_, "buffer"));
 
     RETURN_NOT_OK(AddSubsequence(list_tag_, list_data, list_offsets_, "list"));
     RETURN_NOT_OK(AddSubsequence(tuple_tag_, tuple_data, tuple_offsets_, "tuple"));
@@ -262,11 +272,8 @@ class SequenceBuilder {
   DoubleBuilder doubles_;
   Date64Builder date64s_;
 
-  // We use an Int32Builder here to distinguish the tensor indices from
-  // the ints_ above (see the case Type::INT32 in get_value in python.cc).
-  // TODO(pcm): Replace this by using the union tags to distinguish between
-  // these two cases.
   Int32Builder tensor_indices_;
+  Int32Builder buffer_indices_;
 
   std::vector<int32_t> list_offsets_;
   std::vector<int32_t> tuple_offsets_;
@@ -288,6 +295,7 @@ class SequenceBuilder {
   int8_t date64_tag_ = -1;
 
   int8_t tensor_tag_ = -1;
+  int8_t buffer_tag_ = -1;
   int8_t list_tag_ = -1;
   int8_t tuple_tag_ = -1;
   int8_t dict_tag_ = -1;
@@ -390,15 +398,14 @@ Status CallDeserializeCallback(PyObject* context, PyObject* value,
 
 Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts,
                      int32_t recursion_depth, std::shared_ptr<Array>* out,
-                     std::vector<std::shared_ptr<Tensor>>* tensors_out);
+                     SerializedPyObject* blobs_out);
 
 Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder,
-                      std::vector<PyObject*>* subdicts,
-                      std::vector<std::shared_ptr<Tensor>>* tensors_out);
+                      std::vector<PyObject*>* subdicts, SerializedPyObject* blobs_out);
 
 Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences,
                           int32_t recursion_depth, std::shared_ptr<Array>* out,
-                          std::vector<std::shared_ptr<Tensor>>* tensors_out);
+                          SerializedPyObject* blobs_out);
 
 Status AppendScalar(PyObject* obj, SequenceBuilder* builder) {
   if (PyArray_IsScalar(obj, Bool)) {
@@ -444,12 +451,12 @@ Status AppendScalar(PyObject* obj, SequenceBuilder* builder) {
 Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
               std::vector<PyObject*>* sublists, std::vector<PyObject*>* subtuples,
               std::vector<PyObject*>* subdicts, std::vector<PyObject*>* subsets,
-              std::vector<std::shared_ptr<Tensor>>* tensors_out) {
+              SerializedPyObject* blobs_out) {
   // The bool case must precede the int case (PyInt_Check passes for bools)
   if (PyBool_Check(elem)) {
     RETURN_NOT_OK(builder->AppendBool(elem == Py_True));
   } else if (PyArray_DescrFromScalar(elem)->type_num == NPY_HALF) {
-    npy_half halffloat = reinterpret_cast<PyHalfScalarObject *>(elem)->obval;
+    npy_half halffloat = reinterpret_cast<PyHalfScalarObject*>(elem)->obval;
     RETURN_NOT_OK(builder->AppendHalfFloat(halffloat));
   } else if (PyFloat_Check(elem)) {
     RETURN_NOT_OK(builder->AppendDouble(PyFloat_AS_DOUBLE(elem)));
@@ -506,12 +513,17 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
     RETURN_NOT_OK(AppendScalar(elem, builder));
   } else if (PyArray_Check(elem)) {
     RETURN_NOT_OK(SerializeArray(context, reinterpret_cast<PyArrayObject*>(elem), builder,
-                                 subdicts, tensors_out));
+                                 subdicts, blobs_out));
   } else if (elem == Py_None) {
     RETURN_NOT_OK(builder->AppendNone());
   } else if (PyDateTime_CheckExact(elem)) {
     PyDateTime_DateTime* datetime = reinterpret_cast<PyDateTime_DateTime*>(elem);
     RETURN_NOT_OK(builder->AppendDate64(PyDateTime_to_us(datetime)));
+  } else if (is_buffer(elem)) {
+    RETURN_NOT_OK(builder->AppendBuffer(static_cast<int32_t>(blobs_out->buffers.size())));
+    std::shared_ptr<Buffer> buffer;
+    RETURN_NOT_OK(unwrap_buffer(elem, &buffer));
+    blobs_out->buffers.push_back(buffer);
   } else {
     // Attempt to serialize the object using the custom callback.
     PyObject* serialized_object;
@@ -524,8 +536,7 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
 }
 
 Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder,
-                      std::vector<PyObject*>* subdicts,
-                      std::vector<std::shared_ptr<Tensor>>* tensors_out) {
+                      std::vector<PyObject*>* subdicts, SerializedPyObject* blobs_out)
{
   int dtype = PyArray_TYPE(array);
   switch (dtype) {
     case NPY_UINT8:
@@ -539,11 +550,12 @@ Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder*
     case NPY_HALF:
     case NPY_FLOAT:
     case NPY_DOUBLE: {
-      RETURN_NOT_OK(builder->AppendTensor(static_cast<int32_t>(tensors_out->size())));
+      RETURN_NOT_OK(
+          builder->AppendTensor(static_cast<int32_t>(blobs_out->tensors.size())));
       std::shared_ptr<Tensor> tensor;
       RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(),
                                     reinterpret_cast<PyObject*>(array), &tensor));
-      tensors_out->push_back(tensor);
+      blobs_out->tensors.push_back(tensor);
     } break;
     default: {
       PyObject* serialized_object;
@@ -559,7 +571,7 @@ Status SerializeArray(PyObject* context, PyArrayObject* array, SequenceBuilder*
 
 Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences,
                           int32_t recursion_depth, std::shared_ptr<Array>* out,
-                          std::vector<std::shared_ptr<Tensor>>* tensors_out)
{
+                          SerializedPyObject* blobs_out) {
   DCHECK(out);
   if (recursion_depth >= kMaxRecursionDepth) {
     return Status::NotImplemented(
@@ -578,35 +590,35 @@ Status SerializeSequences(PyObject* context, std::vector<PyObject*>
sequences,
         break;
       }
       RETURN_NOT_OK(Append(context, item.get(), &builder, &sublists, &subtuples,
-                           &subdicts, &subsets, tensors_out));
+                           &subdicts, &subsets, blobs_out));
     }
   }
   std::shared_ptr<Array> list;
   if (sublists.size() > 0) {
     RETURN_NOT_OK(
-        SerializeSequences(context, sublists, recursion_depth + 1, &list, tensors_out));
+        SerializeSequences(context, sublists, recursion_depth + 1, &list, blobs_out));
   }
   std::shared_ptr<Array> tuple;
   if (subtuples.size() > 0) {
     RETURN_NOT_OK(
-        SerializeSequences(context, subtuples, recursion_depth + 1, &tuple, tensors_out));
+        SerializeSequences(context, subtuples, recursion_depth + 1, &tuple, blobs_out));
   }
   std::shared_ptr<Array> dict;
   if (subdicts.size() > 0) {
     RETURN_NOT_OK(
-        SerializeDict(context, subdicts, recursion_depth + 1, &dict, tensors_out));
+        SerializeDict(context, subdicts, recursion_depth + 1, &dict, blobs_out));
   }
   std::shared_ptr<Array> set;
   if (subsets.size() > 0) {
     RETURN_NOT_OK(
-        SerializeSequences(context, subsets, recursion_depth + 1, &set, tensors_out));
+        SerializeSequences(context, subsets, recursion_depth + 1, &set, blobs_out));
   }
   return builder.Finish(list.get(), tuple.get(), dict.get(), set.get(), out);
 }
 
 Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts,
                      int32_t recursion_depth, std::shared_ptr<Array>* out,
-                     std::vector<std::shared_ptr<Tensor>>* tensors_out) {
+                     SerializedPyObject* blobs_out) {
   DictBuilder result;
   if (recursion_depth >= kMaxRecursionDepth) {
     return Status::NotImplemented(
@@ -616,45 +628,46 @@ Status SerializeDict(PyObject* context, std::vector<PyObject*>
dicts,
   std::vector<PyObject*> key_tuples, key_dicts, val_lists, val_tuples, val_dicts,
       val_sets, dummy;
   for (const auto& dict : dicts) {
-    PyObject *key, *value;
+    PyObject* key;
+    PyObject* value;
     Py_ssize_t pos = 0;
     while (PyDict_Next(dict, &pos, &key, &value)) {
       RETURN_NOT_OK(Append(context, key, &result.keys(), &dummy, &key_tuples,
&key_dicts,
-                           &dummy, tensors_out));
+                           &dummy, blobs_out));
       DCHECK_EQ(dummy.size(), 0);
       RETURN_NOT_OK(Append(context, value, &result.vals(), &val_lists, &val_tuples,
-                           &val_dicts, &val_sets, tensors_out));
+                           &val_dicts, &val_sets, blobs_out));
     }
   }
   std::shared_ptr<Array> key_tuples_arr;
   if (key_tuples.size() > 0) {
     RETURN_NOT_OK(SerializeSequences(context, key_tuples, recursion_depth + 1,
-                                     &key_tuples_arr, tensors_out));
+                                     &key_tuples_arr, blobs_out));
   }
   std::shared_ptr<Array> key_dicts_arr;
   if (key_dicts.size() > 0) {
     RETURN_NOT_OK(SerializeDict(context, key_dicts, recursion_depth + 1, &key_dicts_arr,
-                                tensors_out));
+                                blobs_out));
   }
   std::shared_ptr<Array> val_list_arr;
   if (val_lists.size() > 0) {
     RETURN_NOT_OK(SerializeSequences(context, val_lists, recursion_depth + 1,
-                                     &val_list_arr, tensors_out));
+                                     &val_list_arr, blobs_out));
   }
   std::shared_ptr<Array> val_tuples_arr;
   if (val_tuples.size() > 0) {
     RETURN_NOT_OK(SerializeSequences(context, val_tuples, recursion_depth + 1,
-                                     &val_tuples_arr, tensors_out));
+                                     &val_tuples_arr, blobs_out));
   }
   std::shared_ptr<Array> val_dict_arr;
   if (val_dicts.size() > 0) {
-    RETURN_NOT_OK(SerializeDict(context, val_dicts, recursion_depth + 1, &val_dict_arr,
-                                tensors_out));
+    RETURN_NOT_OK(
+        SerializeDict(context, val_dicts, recursion_depth + 1, &val_dict_arr, blobs_out));
   }
   std::shared_ptr<Array> val_set_arr;
   if (val_sets.size() > 0) {
     RETURN_NOT_OK(SerializeSequences(context, val_sets, recursion_depth + 1, &val_set_arr,
-                                     tensors_out));
+                                     blobs_out));
   }
   RETURN_NOT_OK(result.Finish(key_tuples_arr.get(), key_dicts_arr.get(),
                               val_list_arr.get(), val_tuples_arr.get(),
@@ -687,16 +700,19 @@ std::shared_ptr<RecordBatch> MakeBatch(std::shared_ptr<Array>
data) {
 Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject* out) {
   PyAcquireGIL lock;
   PyDateTime_IMPORT;
+  import_pyarrow();
   std::vector<PyObject*> sequences = {sequence};
   std::shared_ptr<Array> array;
-  RETURN_NOT_OK(SerializeSequences(context, sequences, 0, &array, &out->tensors));
+  RETURN_NOT_OK(SerializeSequences(context, sequences, 0, &array, out));
   out->batch = MakeBatch(array);
   return Status::OK();
 }
 
 Status WriteSerializedObject(const SerializedPyObject& obj, io::OutputStream* dst) {
   int32_t num_tensors = static_cast<int32_t>(obj.tensors.size());
+  int32_t num_buffers = static_cast<int32_t>(obj.buffers.size());
   RETURN_NOT_OK(dst->Write(reinterpret_cast<uint8_t*>(&num_tensors), sizeof(int32_t)));
+  RETURN_NOT_OK(dst->Write(reinterpret_cast<uint8_t*>(&num_buffers), sizeof(int32_t)));
   RETURN_NOT_OK(ipc::WriteRecordBatchStream({obj.batch}, dst));
 
   int32_t metadata_length;
@@ -705,6 +721,12 @@ Status WriteSerializedObject(const SerializedPyObject& obj, io::OutputStream*
ds
     RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length));
   }
 
+  for (const auto& buffer : obj.buffers) {
+    int64_t size = buffer->size();
+    RETURN_NOT_OK(dst->Write(reinterpret_cast<uint8_t*>(&size), sizeof(int64_t)));
+    RETURN_NOT_OK(dst->Write(buffer->data(), size));
+  }
+
   return Status::OK();
 }
 
diff --git a/cpp/src/arrow/python/python_to_arrow.h b/cpp/src/arrow/python/python_to_arrow.h
index 2c80e5d..c5b6396 100644
--- a/cpp/src/arrow/python/python_to_arrow.h
+++ b/cpp/src/arrow/python/python_to_arrow.h
@@ -23,6 +23,8 @@
 #include <memory>
 #include <vector>
 
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
 #include "arrow/status.h"
 #include "arrow/util/visibility.h"
 
@@ -42,6 +44,7 @@ namespace py {
 struct ARROW_EXPORT SerializedPyObject {
   std::shared_ptr<RecordBatch> batch;
   std::vector<std::shared_ptr<Tensor>> tensors;
+  std::vector<std::shared_ptr<Buffer>> buffers;
 };
 
 /// \brief Serialize Python sequence as a RecordBatch plus
diff --git a/python/pyarrow/serialization.py b/python/pyarrow/serialization.py
index 248b51c..eed6aae 100644
--- a/python/pyarrow/serialization.py
+++ b/python/pyarrow/serialization.py
@@ -102,15 +102,14 @@ def register_default_serialization_handlers(serialization_context):
         import pandas as pd
 
         def _serialize_pandas_series(obj):
-            # TODO: serializing Series without extra copy
-            return serialize_pandas(pd.DataFrame({obj.name: obj})).to_pybytes()
+            return serialize_pandas(pd.DataFrame({obj.name: obj}))
 
         def _deserialize_pandas_series(data):
             deserialized = deserialize_pandas(data)
             return deserialized[deserialized.columns[0]]
 
         def _serialize_pandas_dataframe(obj):
-            return serialize_pandas(obj).to_pybytes()
+            return serialize_pandas(obj)
 
         def _deserialize_pandas_dataframe(data):
             return deserialize_pandas(data)
diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py
index 3932948..9321ebc 100644
--- a/python/pyarrow/tests/test_serialization.py
+++ b/python/pyarrow/tests/test_serialization.py
@@ -336,6 +336,25 @@ def test_serialization_callback_numpy():
 
     pa.serialize(DummyClass())
 
+def test_buffer_serialization():
+
+    class BufferClass(object):
+        pass
+
+    def serialize_buffer_class(obj):
+        return pa.frombuffer(b"hello")
+
+    def deserialize_buffer_class(serialized_obj):
+        return serialized_obj
+
+    pa._default_serialization_context.register_type(
+        BufferClass, "BufferClass", pickle=False,
+        custom_serializer=serialize_buffer_class,
+        custom_deserializer=deserialize_buffer_class)
+
+    b = pa.serialize(BufferClass()).to_buffer()
+    assert pa.deserialize(b).to_pybytes() == b"hello"
+
 
 @pytest.mark.skip(reason="extensive memory requirements")
 def test_arrow_limits(self):

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <commits@arrow.apache.org>'].

Mime
View raw message