arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-759: [Python] Serializing large class of Python objects in Apache Arrow
Date Sun, 20 Aug 2017 04:15:15 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 10f7158df -> b50f2351e


ARROW-759: [Python] Serializing large class of Python objects in Apache Arrow

This PR adds the capability to serialize a large class of (nested) Python objects in Apache Arrow. The eventual goal is to evolve this into a more modern version of pickle that will make it possible to read the data from other languages supported by Apache Arrow (and might also be faster).

Currently we support lists, tuples, dicts, strings, numpy objects, Python classes and namedtuples. A fallback to (cloud-)pickle can be provided for objects that cannot be natively represented in Arrow (for example lambdas).

Numpy data within objects is efficiently represented using Arrow's Tensor facilities and for the nested Python sequences we use Arrow's UnionArray.

There are many loose ends that will need to be addressed in follow up PRs.

Author: Philipp Moritz <pcmoritz@gmail.com>
Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #965 from pcmoritz/python-serialization and squashes the following commits:

31486edd [Wes McKinney] Fix typo
2164db72 [Wes McKinney] Add SerializedPyObject to public API
b70235cd [Wes McKinney] Add pyarrow.deserialize convenience method
a6a402ee [Wes McKinney] Memory map fixture robustness on Windows
114a5fbf [Wes McKinney] Add a Python container for the SerializedPyObject data, total_bytes method
8e596172 [Wes McKinney] Use pytest tmpdir for large memory map fixture so works on Windows
8a42f30f [Wes McKinney] Add doxygen comment to set_serialization_callbacks
a9522c51 [Wes McKinney] Refactoring, address code review comments. fix flake8 issues
ce5784d5 [Wes McKinney] Do not use ARROW_CHECK in production code. Consolidate python_to_arrow code
c8efef94 [Wes McKinney] Fix various Clang compiler warnings due to integer conversions. clang-format
831e2f21 [Philipp Moritz] remove sequence.h
54af39bc [Philipp Moritz] more fixes
a6fdb76a [Philipp Moritz] make tests work
fe56c735 [Philipp Moritz] fixes
84d62f64 [Philipp Moritz] more fixes
49aba8a1 [Philipp Moritz] make it compile on windows
aa1f3009 [Philipp Moritz] linting
95cb9da6 [Philipp Moritz] fix GIL
adcc8f7a [Philipp Moritz] shuffle stuff around
bcebdfef [Philipp Moritz] fix longlong vs int64 and unsigned variant
4cc45cd7 [Philipp Moritz] cleanup
f25f3f3b [Philipp Moritz] cleanups
a88d4107 [Philipp Moritz] convert DESERIALIZE_SEQUENCE back to a macro
c4259785 [Philipp Moritz] prevent possible memory leaks
aeafd827 [Philipp Moritz] fix callbacks
389bfc6e [Philipp Moritz] documentation
2f0760c2 [Philipp Moritz] fix api
faf9a3e6 [Philipp Moritz] make exported API more consistent
e1fc0c59 [Philipp Moritz] restructure
c1f377b7 [Philipp Moritz] more fixes
3e94e6da [Philipp Moritz] clang-format
99e2d1af [Philipp Moritz] cleanups
32983297 [Philipp Moritz] mutable refs and small fixes
e73c1ea8 [Philipp Moritz] make DictBuilder private
39292735 [Philipp Moritz] increase Py_True refcount and hide helper methods
aaf6f095 [Philipp Moritz] remove code duplication
c38c58db [Philipp Moritz] get rid of leaks and clarify reference counting for dicts
74b9e469 [Philipp Moritz] convert DESERIALIZE_SEQUENCE to a template
080db030 [Philipp Moritz] fix first few comments
a6105d2e [Philipp Moritz] lint fix
802e739c [Philipp Moritz] clang-format
2e08de4c [Philipp Moritz] fix namespaces
91b57d57 [Philipp Moritz] fix linting
c4782ac0 [Philipp Moritz] fix
7069e208 [Philipp Moritz] fix imports
2171761b [Philipp Moritz] fix python unicode string
30bb960f [Philipp Moritz] rebase
f229d8d2 [Philipp Moritz] serialization of custom objects
8b2ffe60 [Philipp Moritz] working version
bd36c83e [Philipp Moritz] handle very long longs with custom serialization callback
49a4acb2 [Philipp Moritz] roundtrip working for the first time
44fb98bf [Philipp Moritz] work in progress
3af1c67c [Philipp Moritz] deserialization path (need to figure out if base object and refcounting is handled correctly)
deb3b461 [Philipp Moritz] rename serialization entry point
5766b8ca [Philipp Moritz] python to arrow serialization


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/b50f2351
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/b50f2351
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/b50f2351

Branch: refs/heads/master
Commit: b50f2351e4972adad4d1bac4765cc5fedfa5836c
Parents: 10f7158
Author: Philipp Moritz <pcmoritz@gmail.com>
Authored: Sun Aug 20 00:15:09 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Sun Aug 20 00:15:09 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/builder.h                    |  12 +-
 cpp/src/arrow/python/CMakeLists.txt        |   4 +
 cpp/src/arrow/python/api.h                 |   2 +
 cpp/src/arrow/python/arrow_to_python.cc    | 221 ++++++++
 cpp/src/arrow/python/arrow_to_python.h     |  66 +++
 cpp/src/arrow/python/common.h              |  30 ++
 cpp/src/arrow/python/python_to_arrow.cc    | 654 ++++++++++++++++++++++++
 cpp/src/arrow/python/python_to_arrow.h     |  78 +++
 python/doc/source/api.rst                  |  10 +-
 python/pyarrow/__init__.py                 |   4 +
 python/pyarrow/compat.py                   |   7 +
 python/pyarrow/includes/libarrow.pxd       |  20 +
 python/pyarrow/io.pxi                      |   2 +-
 python/pyarrow/lib.pyx                     |   3 +
 python/pyarrow/serialization.pxi           | 279 ++++++++++
 python/pyarrow/tests/test_serialization.py | 236 +++++++++
 16 files changed, 1619 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 46900fc..3b851f9 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -244,7 +244,7 @@ class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder<T> {
   using PrimitiveBuilder<T>::Reserve;
 
   /// Append a single scalar and increase the size if necessary.
-  Status Append(value_type val) {
+  Status Append(const value_type val) {
     RETURN_NOT_OK(ArrayBuilder::Reserve(1));
     UnsafeAppend(val);
     return Status::OK();
@@ -255,7 +255,7 @@ class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder<T> {
   ///
   /// This method does not capacity-check; make sure to call Reserve
   /// beforehand.
-  void UnsafeAppend(value_type val) {
+  void UnsafeAppend(const value_type val) {
     BitUtil::SetBit(null_bitmap_data_, length_);
     raw_data_[length_++] = val;
   }
@@ -371,7 +371,7 @@ class ARROW_EXPORT AdaptiveUIntBuilder : public internal::AdaptiveIntBuilderBase
   using ArrayBuilder::Advance;
 
   /// Scalar append
-  Status Append(uint64_t val) {
+  Status Append(const uint64_t val) {
     RETURN_NOT_OK(Reserve(1));
     BitUtil::SetBit(null_bitmap_data_, length_);
 
@@ -430,7 +430,7 @@ class ARROW_EXPORT AdaptiveIntBuilder : public internal::AdaptiveIntBuilderBase
   using ArrayBuilder::Advance;
 
   /// Scalar append
-  Status Append(int64_t val) {
+  Status Append(const int64_t val) {
     RETURN_NOT_OK(Reserve(1));
     BitUtil::SetBit(null_bitmap_data_, length_);
 
@@ -511,7 +511,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
   std::shared_ptr<Buffer> data() const { return data_; }
 
   /// Scalar append
-  Status Append(bool val) {
+  Status Append(const bool val) {
     RETURN_NOT_OK(Reserve(1));
     BitUtil::SetBit(null_bitmap_data_, length_);
     if (val) {
@@ -523,7 +523,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
     return Status::OK();
   }
 
-  Status Append(uint8_t val) { return Append(val != 0); }
+  Status Append(const uint8_t val) { return Append(val != 0); }
 
   /// Vector append
   ///

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/cpp/src/arrow/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/CMakeLists.txt b/cpp/src/arrow/python/CMakeLists.txt
index 0fdf81e..f2807b9 100644
--- a/cpp/src/arrow/python/CMakeLists.txt
+++ b/cpp/src/arrow/python/CMakeLists.txt
@@ -43,6 +43,7 @@ set(ARROW_PYTHON_TEST_LINK_LIBS ${ARROW_PYTHON_MIN_TEST_LIBS})
 
 set(ARROW_PYTHON_SRCS
   arrow_to_pandas.cc
+  arrow_to_python.cc
   builtin_convert.cc
   common.cc
   config.cc
@@ -51,6 +52,7 @@ set(ARROW_PYTHON_SRCS
   io.cc
   numpy_convert.cc
   pandas_to_arrow.cc
+  python_to_arrow.cc
   pyarrow.cc
 )
 
@@ -83,6 +85,7 @@ endif()
 install(FILES
   api.h
   arrow_to_pandas.h
+  arrow_to_python.h
   builtin_convert.h
   common.h
   config.h
@@ -92,6 +95,7 @@ install(FILES
   numpy_convert.h
   numpy_interop.h
   pandas_to_arrow.h
+  python_to_arrow.h
   platform.h
   pyarrow.h
   type_traits.h

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/cpp/src/arrow/python/api.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/api.h b/cpp/src/arrow/python/api.h
index 7cb36ad..4ceb3f1 100644
--- a/cpp/src/arrow/python/api.h
+++ b/cpp/src/arrow/python/api.h
@@ -19,11 +19,13 @@
 #define ARROW_PYTHON_API_H
 
 #include "arrow/python/arrow_to_pandas.h"
+#include "arrow/python/arrow_to_python.h"
 #include "arrow/python/builtin_convert.h"
 #include "arrow/python/common.h"
 #include "arrow/python/helpers.h"
 #include "arrow/python/io.h"
 #include "arrow/python/numpy_convert.h"
 #include "arrow/python/pandas_to_arrow.h"
+#include "arrow/python/python_to_arrow.h"
 
 #endif  // ARROW_PYTHON_API_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/cpp/src/arrow/python/arrow_to_python.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc
new file mode 100644
index 0000000..622ef82
--- /dev/null
+++ b/cpp/src/arrow/python/arrow_to_python.cc
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/arrow_to_python.h"
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/python/common.h"
+#include "arrow/python/helpers.h"
+#include "arrow/python/numpy_convert.h"
+#include "arrow/table.h"
+#include "arrow/util/logging.h"
+
+extern "C" {
+extern PyObject* pyarrow_serialize_callback;
+extern PyObject* pyarrow_deserialize_callback;
+}
+
+namespace arrow {
+namespace py {
+
+Status CallCustomCallback(PyObject* callback, PyObject* elem, PyObject** result);
+
+Status DeserializeTuple(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
+                        PyObject* base,
+                        const std::vector<std::shared_ptr<Tensor>>& tensors,
+                        PyObject** out);
+
+Status DeserializeList(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
+                       PyObject* base,
+                       const std::vector<std::shared_ptr<Tensor>>& tensors,
+                       PyObject** out);
+
+Status DeserializeDict(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
+                       PyObject* base,
+                       const std::vector<std::shared_ptr<Tensor>>& tensors,
+                       PyObject** out) {
+  auto data = std::dynamic_pointer_cast<StructArray>(array);
+  ScopedRef keys, vals;
+  ScopedRef result(PyDict_New());
+  RETURN_NOT_OK(
+      DeserializeList(data->field(0), start_idx, stop_idx, base, tensors, keys.ref()));
+  RETURN_NOT_OK(
+      DeserializeList(data->field(1), start_idx, stop_idx, base, tensors, vals.ref()));
+  for (int64_t i = start_idx; i < stop_idx; ++i) {
+    // PyDict_SetItem behaves differently from PyList_SetItem and PyTuple_SetItem.
+    // The latter two steal references whereas PyDict_SetItem does not. So we need
+    // to make sure the reference count is decremented by letting the ScopedRef
+    // go out of scope at the end.
+    PyDict_SetItem(result.get(), PyList_GET_ITEM(keys.get(), i - start_idx),
+                   PyList_GET_ITEM(vals.get(), i - start_idx));
+  }
+  static PyObject* py_type = PyUnicode_FromString("_pytype_");
+  if (PyDict_Contains(result.get(), py_type)) {
+    RETURN_NOT_OK(CallCustomCallback(pyarrow_deserialize_callback, result.get(), out));
+  } else {
+    *out = result.release();
+  }
+  return Status::OK();
+}
+
+Status DeserializeArray(std::shared_ptr<Array> array, int64_t offset, PyObject* base,
+                        const std::vector<std::shared_ptr<arrow::Tensor>>& tensors,
+                        PyObject** out) {
+  DCHECK(array);
+  int32_t index = std::static_pointer_cast<Int32Array>(array)->Value(offset);
+  RETURN_NOT_OK(py::TensorToNdarray(*tensors[index], base, out));
+  // Mark the array as immutable
+  ScopedRef flags(PyObject_GetAttrString(*out, "flags"));
+  DCHECK(flags.get() != NULL) << "Could not mark Numpy array immutable";
+  Py_INCREF(Py_False);
+  int flag_set = PyObject_SetAttrString(flags.get(), "writeable", Py_False);
+  DCHECK(flag_set == 0) << "Could not mark Numpy array immutable";
+  return Status::OK();
+}
+
+Status GetValue(std::shared_ptr<Array> arr, int64_t index, int32_t type, PyObject* base,
+                const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** result) {
+  switch (arr->type()->id()) {
+    case Type::BOOL:
+      *result =
+          PyBool_FromLong(std::static_pointer_cast<BooleanArray>(arr)->Value(index));
+      return Status::OK();
+    case Type::INT64:
+      *result =
+          PyLong_FromSsize_t(std::static_pointer_cast<Int64Array>(arr)->Value(index));
+      return Status::OK();
+    case Type::BINARY: {
+      int32_t nchars;
+      const uint8_t* str =
+          std::static_pointer_cast<BinaryArray>(arr)->GetValue(index, &nchars);
+      *result = PyBytes_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
+      return CheckPyError();
+    }
+    case Type::STRING: {
+      int32_t nchars;
+      const uint8_t* str =
+          std::static_pointer_cast<StringArray>(arr)->GetValue(index, &nchars);
+      *result = PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
+      return CheckPyError();
+    }
+    case Type::FLOAT:
+      *result =
+          PyFloat_FromDouble(std::static_pointer_cast<FloatArray>(arr)->Value(index));
+      return Status::OK();
+    case Type::DOUBLE:
+      *result =
+          PyFloat_FromDouble(std::static_pointer_cast<DoubleArray>(arr)->Value(index));
+      return Status::OK();
+    case Type::STRUCT: {
+      auto s = std::static_pointer_cast<StructArray>(arr);
+      auto l = std::static_pointer_cast<ListArray>(s->field(0));
+      if (s->type()->child(0)->name() == "list") {
+        return DeserializeList(l->values(), l->value_offset(index),
+                               l->value_offset(index + 1), base, tensors, result);
+      } else if (s->type()->child(0)->name() == "tuple") {
+        return DeserializeTuple(l->values(), l->value_offset(index),
+                                l->value_offset(index + 1), base, tensors, result);
+      } else if (s->type()->child(0)->name() == "dict") {
+        return DeserializeDict(l->values(), l->value_offset(index),
+                               l->value_offset(index + 1), base, tensors, result);
+      } else {
+        DCHECK(false) << "unexpected StructArray type " << s->type()->child(0)->name();
+      }
+    }
+    // We use an Int32Builder here to distinguish the tensor indices from
+    // the Type::INT64 above (see tensor_indices_ in SequenceBuilder).
+    case Type::INT32: {
+      return DeserializeArray(arr, index, base, tensors, result);
+    }
+    default:
+      DCHECK(false) << "union tag " << type << " not recognized";
+  }
+  return Status::OK();
+}
+
+#define DESERIALIZE_SEQUENCE(CREATE_FN, SET_ITEM_FN)                        \
+  auto data = std::dynamic_pointer_cast<UnionArray>(array);                 \
+  int64_t size = array->length();                                           \
+  ScopedRef result(CREATE_FN(stop_idx - start_idx));                        \
+  auto types = std::make_shared<Int8Array>(size, data->type_ids());         \
+  auto offsets = std::make_shared<Int32Array>(size, data->value_offsets()); \
+  for (int64_t i = start_idx; i < stop_idx; ++i) {                          \
+    if (data->IsNull(i)) {                                                  \
+      Py_INCREF(Py_None);                                                   \
+      SET_ITEM_FN(result.get(), i - start_idx, Py_None);                    \
+    } else {                                                                \
+      int64_t offset = offsets->Value(i);                                   \
+      int8_t type = types->Value(i);                                        \
+      std::shared_ptr<Array> arr = data->child(type);                       \
+      PyObject* value;                                                      \
+      RETURN_NOT_OK(GetValue(arr, offset, type, base, tensors, &value));    \
+      SET_ITEM_FN(result.get(), i - start_idx, value);                      \
+    }                                                                       \
+  }                                                                         \
+  *out = result.release();                                                  \
+  return Status::OK();
+
+Status DeserializeList(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
+                       PyObject* base,
+                       const std::vector<std::shared_ptr<Tensor>>& tensors,
+                       PyObject** out) {
+  DESERIALIZE_SEQUENCE(PyList_New, PyList_SET_ITEM)
+}
+
+Status DeserializeTuple(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
+                        PyObject* base,
+                        const std::vector<std::shared_ptr<Tensor>>& tensors,
+                        PyObject** out) {
+  DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SET_ITEM)
+}
+
+Status ReadSerializedObject(std::shared_ptr<io::RandomAccessFile> src,
+                            SerializedPyObject* out) {
+  std::shared_ptr<ipc::RecordBatchStreamReader> reader;
+  int64_t offset;
+  int64_t bytes_read;
+  int32_t num_tensors;
+  // Read number of tensors
+  RETURN_NOT_OK(
+      src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_tensors)));
+  RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader));
+  RETURN_NOT_OK(reader->ReadNextRecordBatch(&out->batch));
+  RETURN_NOT_OK(src->Tell(&offset));
+  offset += 4;  // Skip the end-of-stream message
+  for (int i = 0; i < num_tensors; ++i) {
+    std::shared_ptr<Tensor> tensor;
+    RETURN_NOT_OK(ipc::ReadTensor(offset, src.get(), &tensor));
+    out->tensors.push_back(tensor);
+    RETURN_NOT_OK(src->Tell(&offset));
+  }
+  return Status::OK();
+}
+
+Status DeserializeObject(const SerializedPyObject& obj, PyObject* base, PyObject** out) {
+  PyAcquireGIL lock;
+  return DeserializeList(obj.batch->column(0), 0, obj.batch->num_rows(), base,
+                         obj.tensors, out);
+}
+
+}  // namespace py
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/cpp/src/arrow/python/arrow_to_python.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/arrow_to_python.h b/cpp/src/arrow/python/arrow_to_python.h
new file mode 100644
index 0000000..559ce18
--- /dev/null
+++ b/cpp/src/arrow/python/arrow_to_python.h
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_PYTHON_ARROW_TO_PYTHON_H
+#define ARROW_PYTHON_ARROW_TO_PYTHON_H
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/python/python_to_arrow.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class RecordBatch;
+class Tensor;
+
+namespace io {
+
+class RandomAccessFile;
+
+}  // namespace io
+
+namespace py {
+
+/// \brief Read serialized Python sequence from file interface using Arrow IPC
+/// \param[in] src a RandomAccessFile
+/// \param[out] out the reconstructed data
+/// \return Status
+ARROW_EXPORT
+Status ReadSerializedObject(std::shared_ptr<io::RandomAccessFile> src,
+                            SerializedPyObject* out);
+
+/// \brief Reconstruct Python object from Arrow-serialized representation
+/// \param[in] object
+/// \param[in] base a Python object holding the underlying data that any NumPy
+/// arrays will reference, to avoid premature deallocation
+/// \param[out] out the returned object
+/// \return Status
+/// This acquires the GIL
+ARROW_EXPORT
+Status DeserializeObject(const SerializedPyObject& object, PyObject* base,
+                         PyObject** out);
+
+}  // namespace py
+}  // namespace arrow
+
+#endif  // ARROW_PYTHON_ARROW_TO_PYTHON_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/cpp/src/arrow/python/common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/common.h b/cpp/src/arrow/python/common.h
index ec40d0e..7f94f95 100644
--- a/cpp/src/arrow/python/common.h
+++ b/cpp/src/arrow/python/common.h
@@ -91,6 +91,36 @@ class ARROW_EXPORT OwnedRef {
   PyObject* obj_;
 };
 
+// This is different from OwnedRef in that it assumes that
+// the GIL is held by the caller and doesn't decrement the
+// reference count when release is called.
+class ARROW_EXPORT ScopedRef {
+ public:
+  ScopedRef() : obj_(nullptr) {}
+
+  explicit ScopedRef(PyObject* obj) : obj_(obj) {}
+
+  ~ScopedRef() { Py_XDECREF(obj_); }
+
+  void reset(PyObject* obj) {
+    Py_XDECREF(obj_);
+    obj_ = obj;
+  }
+
+  PyObject* release() {
+    PyObject* result = obj_;
+    obj_ = nullptr;
+    return result;
+  }
+
+  PyObject* get() const { return obj_; }
+
+  PyObject** ref() { return &obj_; }
+
+ private:
+  PyObject* obj_;
+};
+
 struct ARROW_EXPORT PyObjectStringify {
   OwnedRef tmp_obj;
   const char* bytes;

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/cpp/src/arrow/python/python_to_arrow.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc
new file mode 100644
index 0000000..47d8ef6
--- /dev/null
+++ b/cpp/src/arrow/python/python_to_arrow.cc
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/python_to_arrow.h"
+#include "arrow/python/numpy_interop.h"
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include <numpy/arrayobject.h>
+#include <numpy/arrayscalars.h>
+
+#include "arrow/array.h"
+#include "arrow/builder.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/ipc/writer.h"
+#include "arrow/python/common.h"
+#include "arrow/python/helpers.h"
+#include "arrow/python/numpy_convert.h"
+#include "arrow/python/platform.h"
+#include "arrow/tensor.h"
+#include "arrow/util/logging.h"
+
+constexpr int32_t kMaxRecursionDepth = 100;
+
+extern "C" {
+PyObject* pyarrow_serialize_callback = NULL;
+PyObject* pyarrow_deserialize_callback = NULL;
+}
+
+namespace arrow {
+namespace py {
+
+/// A Sequence is a heterogeneous collections of elements. It can contain
+/// scalar Python types, lists, tuples, dictionaries and tensors.
+class SequenceBuilder {
+ public:
+  explicit SequenceBuilder(MemoryPool* pool = nullptr)
+      : pool_(pool),
+        types_(pool, ::arrow::int8()),
+        offsets_(pool, ::arrow::int32()),
+        nones_(pool),
+        bools_(pool, ::arrow::boolean()),
+        ints_(pool, ::arrow::int64()),
+        bytes_(pool, ::arrow::binary()),
+        strings_(pool),
+        floats_(pool, ::arrow::float32()),
+        doubles_(pool, ::arrow::float64()),
+        tensor_indices_(pool, ::arrow::int32()),
+        list_offsets_({0}),
+        tuple_offsets_({0}),
+        dict_offsets_({0}) {}
+
+  /// Appending a none to the sequence
+  Status AppendNone() {
+    RETURN_NOT_OK(offsets_.Append(0));
+    RETURN_NOT_OK(types_.Append(0));
+    return nones_.AppendToBitmap(false);
+  }
+
+  Status Update(int64_t offset, int8_t* tag) {
+    if (*tag == -1) {
+      *tag = num_tags_++;
+    }
+    RETURN_NOT_OK(offsets_.Append(static_cast<int32_t>(offset)));
+    RETURN_NOT_OK(types_.Append(*tag));
+    return nones_.AppendToBitmap(true);
+  }
+
+  template <typename BuilderType, typename T>
+  Status AppendPrimitive(const T val, int8_t* tag, BuilderType* out) {
+    RETURN_NOT_OK(Update(out->length(), tag));
+    return out->Append(val);
+  }
+
+  /// Appending a boolean to the sequence
+  Status AppendBool(const bool data) {
+    return AppendPrimitive(data, &bool_tag_, &bools_);
+  }
+
+  /// Appending an int64_t to the sequence
+  Status AppendInt64(const int64_t data) {
+    return AppendPrimitive(data, &int_tag_, &ints_);
+  }
+
+  /// Appending an uint64_t to the sequence
+  Status AppendUInt64(const uint64_t data) {
+    // TODO(wesm): Bounds check
+    return AppendPrimitive(static_cast<int64_t>(data), &int_tag_, &ints_);
+  }
+
+  /// Append a list of bytes to the sequence
+  Status AppendBytes(const uint8_t* data, int32_t length) {
+    RETURN_NOT_OK(Update(bytes_.length(), &bytes_tag_));
+    return bytes_.Append(data, length);
+  }
+
+  /// Appending a string to the sequence
+  Status AppendString(const char* data, int32_t length) {
+    RETURN_NOT_OK(Update(strings_.length(), &string_tag_));
+    return strings_.Append(data, length);
+  }
+
+  /// Appending a float to the sequence
+  Status AppendFloat(const float data) {
+    return AppendPrimitive(data, &float_tag_, &floats_);
+  }
+
+  /// Appending a double to the sequence
+  Status AppendDouble(const double data) {
+    return AppendPrimitive(data, &double_tag_, &doubles_);
+  }
+
+  /// Appending a tensor to the sequence
+  ///
+  /// \param tensor_index Index of the tensor in the object.
+  Status AppendTensor(const int32_t tensor_index) {
+    RETURN_NOT_OK(Update(tensor_indices_.length(), &tensor_tag_));
+    return tensor_indices_.Append(tensor_index);
+  }
+
+  /// Add a sublist to the sequence. The data contained in the sublist will be
+  /// specified in the "Finish" method.
+  ///
+  /// To construct l = [[11, 22], 33, [44, 55]] you would for example run
+  /// list = ListBuilder();
+  /// list.AppendList(2);
+  /// list.Append(33);
+  /// list.AppendList(2);
+  /// list.Finish([11, 22, 44, 55]);
+  /// list.Finish();
+
+  /// \param size
+  /// The size of the sublist
+  Status AppendList(Py_ssize_t size) {
+    RETURN_NOT_OK(Update(list_offsets_.size() - 1, &list_tag_));
+    list_offsets_.push_back(list_offsets_.back() + static_cast<int32_t>(size));
+    return Status::OK();
+  }
+
+  Status AppendTuple(Py_ssize_t size) {
+    RETURN_NOT_OK(Update(tuple_offsets_.size() - 1, &tuple_tag_));
+    tuple_offsets_.push_back(tuple_offsets_.back() + static_cast<int32_t>(size));
+    return Status::OK();
+  }
+
+  Status AppendDict(Py_ssize_t size) {
+    RETURN_NOT_OK(Update(dict_offsets_.size() - 1, &dict_tag_));
+    dict_offsets_.push_back(dict_offsets_.back() + static_cast<int32_t>(size));
+    return Status::OK();
+  }
+
+  template <typename BuilderType>
+  Status AddElement(const int8_t tag, BuilderType* out) {
+    if (tag != -1) {
+      fields_[tag] = ::arrow::field("", out->type());
+      RETURN_NOT_OK(out->Finish(&children_[tag]));
+      RETURN_NOT_OK(nones_.AppendToBitmap(true));
+      type_ids_.push_back(tag);
+    }
+    return Status::OK();
+  }
+
+  Status AddSubsequence(int8_t tag, const Array* data,
+                        const std::vector<int32_t>& offsets, const std::string& name) {
+    if (data != nullptr) {
+      DCHECK(data->length() == offsets.back());
+      std::shared_ptr<Array> offset_array;
+      Int32Builder builder(pool_, std::make_shared<Int32Type>());
+      RETURN_NOT_OK(builder.Append(offsets.data(), offsets.size()));
+      RETURN_NOT_OK(builder.Finish(&offset_array));
+      std::shared_ptr<Array> list_array;
+      RETURN_NOT_OK(ListArray::FromArrays(*offset_array, *data, pool_, &list_array));
+      auto field = ::arrow::field(name, list_array->type());
+      auto type = ::arrow::struct_({field});
+      fields_[tag] = ::arrow::field("", type);
+      children_[tag] = std::shared_ptr<StructArray>(
+          new StructArray(type, list_array->length(), {list_array}));
+      RETURN_NOT_OK(nones_.AppendToBitmap(true));
+      type_ids_.push_back(tag);
+    } else {
+      DCHECK_EQ(offsets.size(), 1);
+    }
+    return Status::OK();
+  }
+
+  /// Finish building the sequence and return the result.
+  /// Input arrays may be nullptr
+  Status Finish(const Array* list_data, const Array* tuple_data, const Array* dict_data,
+                std::shared_ptr<Array>* out) {
+    fields_.resize(num_tags_);
+    children_.resize(num_tags_);
+
+    RETURN_NOT_OK(AddElement(bool_tag_, &bools_));
+    RETURN_NOT_OK(AddElement(int_tag_, &ints_));
+    RETURN_NOT_OK(AddElement(string_tag_, &strings_));
+    RETURN_NOT_OK(AddElement(bytes_tag_, &bytes_));
+    RETURN_NOT_OK(AddElement(float_tag_, &floats_));
+    RETURN_NOT_OK(AddElement(double_tag_, &doubles_));
+    RETURN_NOT_OK(AddElement(tensor_tag_, &tensor_indices_));
+
+    RETURN_NOT_OK(AddSubsequence(list_tag_, list_data, list_offsets_, "list"));
+    RETURN_NOT_OK(AddSubsequence(tuple_tag_, tuple_data, tuple_offsets_, "tuple"));
+    RETURN_NOT_OK(AddSubsequence(dict_tag_, dict_data, dict_offsets_, "dict"));
+
+    auto type = ::arrow::union_(fields_, type_ids_, UnionMode::DENSE);
+    out->reset(new UnionArray(type, types_.length(), children_, types_.data(),
+                              offsets_.data(), nones_.null_bitmap(),
+                              nones_.null_count()));
+    return Status::OK();
+  }
+
+ private:
+  MemoryPool* pool_;
+
+  Int8Builder types_;
+  Int32Builder offsets_;
+
+  NullBuilder nones_;
+  BooleanBuilder bools_;
+  Int64Builder ints_;
+  BinaryBuilder bytes_;
+  StringBuilder strings_;
+  FloatBuilder floats_;
+  DoubleBuilder doubles_;
+
+  // We use an Int32Builder here to distinguish the tensor indices from
+  // the ints_ above (see the case Type::INT32 in get_value in python.cc).
+  // TODO(pcm): Replace this by using the union tags to distinguish between
+  // these two cases.
+  Int32Builder tensor_indices_;
+
+  std::vector<int32_t> list_offsets_;
+  std::vector<int32_t> tuple_offsets_;
+  std::vector<int32_t> dict_offsets_;
+
+  // Tags for members of the sequence. If they are set to -1 it means
+  // they are not used and will not part be of the metadata when we call
+  // SequenceBuilder::Finish. If a member with one of the tags is added,
+  // the associated variable gets a unique index starting from 0. This
+  // happens in the UPDATE macro in sequence.cc.
+  int8_t bool_tag_ = -1;
+  int8_t int_tag_ = -1;
+  int8_t string_tag_ = -1;
+  int8_t bytes_tag_ = -1;
+  int8_t float_tag_ = -1;
+  int8_t double_tag_ = -1;
+
+  int8_t tensor_tag_ = -1;
+  int8_t list_tag_ = -1;
+  int8_t tuple_tag_ = -1;
+  int8_t dict_tag_ = -1;
+
+  int8_t num_tags_ = 0;
+
+  // Members for the output union constructed in Finish
+  std::vector<std::shared_ptr<Field>> fields_;
+  std::vector<std::shared_ptr<Array>> children_;
+  std::vector<uint8_t> type_ids_;
+};
+
+/// Constructing dictionaries of key/value pairs. Sequences of
+/// keys and values are built separately using a pair of
+/// SequenceBuilders. The resulting Arrow representation
+/// can be obtained via the Finish method.
+class DictBuilder {
+ public:
+  explicit DictBuilder(MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) {}
+
+  /// Builder for the keys of the dictionary
+  SequenceBuilder& keys() { return keys_; }
+  /// Builder for the values of the dictionary
+  SequenceBuilder& vals() { return vals_; }
+
+  /// Construct an Arrow StructArray representing the dictionary.
+  /// Contains a field "keys" for the keys and "vals" for the values.
+
+  /// \param list_data
+  ///    List containing the data from nested lists in the value
+  ///   list of the dictionary
+  ///
+  /// \param dict_data
+  ///   List containing the data from nested dictionaries in the
+  ///   value list of the dictionary
+  Status Finish(const Array* key_tuple_data, const Array* key_dict_data,
+                const Array* val_list_data, const Array* val_tuple_data,
+                const Array* val_dict_data, std::shared_ptr<Array>* out) {
+    // lists and dicts can't be keys of dicts in Python, that is why for
+    // the keys we do not need to collect sublists
+    std::shared_ptr<Array> keys, vals;
+    RETURN_NOT_OK(keys_.Finish(nullptr, key_tuple_data, key_dict_data, &keys));
+    RETURN_NOT_OK(vals_.Finish(val_list_data, val_tuple_data, val_dict_data, &vals));
+    auto keys_field = std::make_shared<Field>("keys", keys->type());
+    auto vals_field = std::make_shared<Field>("vals", vals->type());
+    auto type = std::make_shared<StructType>(
+        std::vector<std::shared_ptr<Field>>({keys_field, vals_field}));
+    std::vector<std::shared_ptr<Array>> field_arrays({keys, vals});
+    DCHECK(keys->length() == vals->length());
+    out->reset(new StructArray(type, keys->length(), field_arrays));
+    return Status::OK();
+  }
+
+ private:
+  SequenceBuilder keys_;
+  SequenceBuilder vals_;
+};
+
+Status CallCustomCallback(PyObject* callback, PyObject* elem, PyObject** result) {
+  *result = NULL;
+  if (!callback) {
+    std::stringstream ss;
+    ScopedRef repr(PyObject_Repr(elem));
+    RETURN_IF_PYERROR();
+    ScopedRef ascii(PyUnicode_AsASCIIString(repr.get()));
+    ss << "error while calling callback on " << PyBytes_AsString(ascii.get())
+       << ": handler not registered";
+    return Status::NotImplemented(ss.str());
+  } else {
+    ScopedRef arglist(Py_BuildValue("(O)", elem));
+    *result = PyObject_CallObject(callback, arglist.get());
+    RETURN_IF_PYERROR();
+  }
+  return Status::OK();
+}
+
+void set_serialization_callbacks(PyObject* serialize_callback,
+                                 PyObject* deserialize_callback) {
+  pyarrow_serialize_callback = serialize_callback;
+  pyarrow_deserialize_callback = deserialize_callback;
+}
+
+Status CallCustomSerializationCallback(PyObject* elem, PyObject** serialized_object) {
+  RETURN_NOT_OK(CallCustomCallback(pyarrow_serialize_callback, elem, serialized_object));
+  if (!PyDict_Check(*serialized_object)) {
+    return Status::TypeError("serialization callback must return a valid dictionary");
+  }
+  return Status::OK();
+}
+
+Status SerializeDict(std::vector<PyObject*> dicts, int32_t recursion_depth,
+                     std::shared_ptr<Array>* out, std::vector<PyObject*>* tensors_out);
+
+Status SerializeArray(PyArrayObject* array, SequenceBuilder* builder,
+                      std::vector<PyObject*>* subdicts,
+                      std::vector<PyObject*>* tensors_out);
+
+Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_depth,
+                          std::shared_ptr<Array>* out,
+                          std::vector<PyObject*>* tensors_out);
+
+Status AppendScalar(PyObject* obj, SequenceBuilder* builder) {
+  if (PyArray_IsScalar(obj, Bool)) {
+    return builder->AppendBool(reinterpret_cast<PyBoolScalarObject*>(obj)->obval != 0);
+  } else if (PyArray_IsScalar(obj, Float)) {
+    return builder->AppendFloat(reinterpret_cast<PyFloatScalarObject*>(obj)->obval);
+  } else if (PyArray_IsScalar(obj, Double)) {
+    return builder->AppendDouble(reinterpret_cast<PyDoubleScalarObject*>(obj)->obval);
+  }
+  int64_t value = 0;
+  if (PyArray_IsScalar(obj, Byte)) {
+    value = reinterpret_cast<PyByteScalarObject*>(obj)->obval;
+  } else if (PyArray_IsScalar(obj, UByte)) {
+    value = reinterpret_cast<PyUByteScalarObject*>(obj)->obval;
+  } else if (PyArray_IsScalar(obj, Short)) {
+    value = reinterpret_cast<PyShortScalarObject*>(obj)->obval;
+  } else if (PyArray_IsScalar(obj, UShort)) {
+    value = reinterpret_cast<PyUShortScalarObject*>(obj)->obval;
+  } else if (PyArray_IsScalar(obj, Int)) {
+    value = reinterpret_cast<PyIntScalarObject*>(obj)->obval;
+  } else if (PyArray_IsScalar(obj, UInt)) {
+    value = reinterpret_cast<PyUIntScalarObject*>(obj)->obval;
+  } else if (PyArray_IsScalar(obj, Long)) {
+    value = reinterpret_cast<PyLongScalarObject*>(obj)->obval;
+  } else if (PyArray_IsScalar(obj, ULong)) {
+    value = reinterpret_cast<PyULongScalarObject*>(obj)->obval;
+  } else if (PyArray_IsScalar(obj, LongLong)) {
+    value = reinterpret_cast<PyLongLongScalarObject*>(obj)->obval;
+  } else if (PyArray_IsScalar(obj, Int64)) {
+    value = reinterpret_cast<PyInt64ScalarObject*>(obj)->obval;
+  } else if (PyArray_IsScalar(obj, ULongLong)) {
+    value = reinterpret_cast<PyULongLongScalarObject*>(obj)->obval;
+  } else if (PyArray_IsScalar(obj, UInt64)) {
+    value = reinterpret_cast<PyUInt64ScalarObject*>(obj)->obval;
+  } else {
+    DCHECK(false) << "scalar type not recognized";
+  }
+  return builder->AppendInt64(value);
+}
+
+Status Append(PyObject* elem, SequenceBuilder* builder, std::vector<PyObject*>* sublists,
+              std::vector<PyObject*>* subtuples, std::vector<PyObject*>* subdicts,
+              std::vector<PyObject*>* tensors_out) {
+  // The bool case must precede the int case (PyInt_Check passes for bools)
+  if (PyBool_Check(elem)) {
+    RETURN_NOT_OK(builder->AppendBool(elem == Py_True));
+  } else if (PyFloat_Check(elem)) {
+    RETURN_NOT_OK(builder->AppendDouble(PyFloat_AS_DOUBLE(elem)));
+  } else if (PyLong_Check(elem)) {
+    int overflow = 0;
+    int64_t data = PyLong_AsLongLongAndOverflow(elem, &overflow);
+    if (!overflow) {
+      RETURN_NOT_OK(builder->AppendInt64(data));
+    } else {
+      // Attempt to serialize the object using the custom callback.
+      PyObject* serialized_object;
+      // The reference count of serialized_object will be decremented in SerializeDict
+      RETURN_NOT_OK(CallCustomSerializationCallback(elem, &serialized_object));
+      RETURN_NOT_OK(builder->AppendDict(PyDict_Size(serialized_object)));
+      subdicts->push_back(serialized_object);
+    }
+#if PY_MAJOR_VERSION < 3
+  } else if (PyInt_Check(elem)) {
+    RETURN_NOT_OK(builder->AppendInt64(static_cast<int64_t>(PyInt_AS_LONG(elem))));
+#endif
+  } else if (PyBytes_Check(elem)) {
+    auto data = reinterpret_cast<uint8_t*>(PyBytes_AS_STRING(elem));
+    const int64_t size = static_cast<int64_t>(PyBytes_GET_SIZE(elem));
+    if (size > std::numeric_limits<int32_t>::max()) {
+      return Status::Invalid("Cannot writes bytes over 2GB");
+    }
+    RETURN_NOT_OK(builder->AppendBytes(data, static_cast<int32_t>(size)));
+  } else if (PyUnicode_Check(elem)) {
+    Py_ssize_t size;
+#if PY_MAJOR_VERSION >= 3
+    char* data = PyUnicode_AsUTF8AndSize(elem, &size);
+#else
+    ScopedRef str(PyUnicode_AsUTF8String(elem));
+    char* data = PyString_AS_STRING(str.get());
+    size = PyString_GET_SIZE(str.get());
+#endif
+    if (size > std::numeric_limits<int32_t>::max()) {
+      return Status::Invalid("Cannot writes bytes over 2GB");
+    }
+    RETURN_NOT_OK(builder->AppendString(data, static_cast<int32_t>(size)));
+  } else if (PyList_Check(elem)) {
+    RETURN_NOT_OK(builder->AppendList(PyList_Size(elem)));
+    sublists->push_back(elem);
+  } else if (PyDict_Check(elem)) {
+    RETURN_NOT_OK(builder->AppendDict(PyDict_Size(elem)));
+    subdicts->push_back(elem);
+  } else if (PyTuple_CheckExact(elem)) {
+    RETURN_NOT_OK(builder->AppendTuple(PyTuple_Size(elem)));
+    subtuples->push_back(elem);
+  } else if (PyArray_IsScalar(elem, Generic)) {
+    RETURN_NOT_OK(AppendScalar(elem, builder));
+  } else if (PyArray_Check(elem)) {
+    RETURN_NOT_OK(SerializeArray(reinterpret_cast<PyArrayObject*>(elem), builder,
+                                 subdicts, tensors_out));
+  } else if (elem == Py_None) {
+    RETURN_NOT_OK(builder->AppendNone());
+  } else {
+    // Attempt to serialize the object using the custom callback.
+    PyObject* serialized_object;
+    // The reference count of serialized_object will be decremented in SerializeDict
+    RETURN_NOT_OK(CallCustomSerializationCallback(elem, &serialized_object));
+    RETURN_NOT_OK(builder->AppendDict(PyDict_Size(serialized_object)));
+    subdicts->push_back(serialized_object);
+  }
+  return Status::OK();
+}
+
+Status SerializeArray(PyArrayObject* array, SequenceBuilder* builder,
+                      std::vector<PyObject*>* subdicts,
+                      std::vector<PyObject*>* tensors_out) {
+  int dtype = PyArray_TYPE(array);
+  switch (dtype) {
+    case NPY_BOOL:
+    case NPY_UINT8:
+    case NPY_INT8:
+    case NPY_UINT16:
+    case NPY_INT16:
+    case NPY_UINT32:
+    case NPY_INT32:
+    case NPY_UINT64:
+    case NPY_INT64:
+    case NPY_FLOAT:
+    case NPY_DOUBLE: {
+      RETURN_NOT_OK(builder->AppendTensor(static_cast<int32_t>(tensors_out->size())));
+      tensors_out->push_back(reinterpret_cast<PyObject*>(array));
+    } break;
+    default: {
+      PyObject* serialized_object;
+      // The reference count of serialized_object will be decremented in SerializeDict
+      RETURN_NOT_OK(CallCustomSerializationCallback(reinterpret_cast<PyObject*>(array),
+                                                    &serialized_object));
+      RETURN_NOT_OK(builder->AppendDict(PyDict_Size(serialized_object)));
+      subdicts->push_back(serialized_object);
+    }
+  }
+  return Status::OK();
+}
+
+Status SerializeSequences(std::vector<PyObject*> sequences, int32_t recursion_depth,
+                          std::shared_ptr<Array>* out,
+                          std::vector<PyObject*>* tensors_out) {
+  DCHECK(out);
+  if (recursion_depth >= kMaxRecursionDepth) {
+    return Status::NotImplemented(
+        "This object exceeds the maximum recursion depth. It may contain itself "
+        "recursively.");
+  }
+  SequenceBuilder builder(nullptr);
+  std::vector<PyObject *> sublists, subtuples, subdicts;
+  for (const auto& sequence : sequences) {
+    ScopedRef iterator(PyObject_GetIter(sequence));
+    RETURN_IF_PYERROR();
+    ScopedRef item;
+    while (item.reset(PyIter_Next(iterator.get())), item.get()) {
+      RETURN_NOT_OK(
+          Append(item.get(), &builder, &sublists, &subtuples, &subdicts, tensors_out));
+    }
+  }
+  std::shared_ptr<Array> list;
+  if (sublists.size() > 0) {
+    RETURN_NOT_OK(SerializeSequences(sublists, recursion_depth + 1, &list, tensors_out));
+  }
+  std::shared_ptr<Array> tuple;
+  if (subtuples.size() > 0) {
+    RETURN_NOT_OK(
+        SerializeSequences(subtuples, recursion_depth + 1, &tuple, tensors_out));
+  }
+  std::shared_ptr<Array> dict;
+  if (subdicts.size() > 0) {
+    RETURN_NOT_OK(SerializeDict(subdicts, recursion_depth + 1, &dict, tensors_out));
+  }
+  return builder.Finish(list.get(), tuple.get(), dict.get(), out);
+}
+
+Status SerializeDict(std::vector<PyObject*> dicts, int32_t recursion_depth,
+                     std::shared_ptr<Array>* out, std::vector<PyObject*>* tensors_out) {
+  DictBuilder result;
+  if (recursion_depth >= kMaxRecursionDepth) {
+    return Status::NotImplemented(
+        "This object exceeds the maximum recursion depth. It may contain itself "
+        "recursively.");
+  }
+  std::vector<PyObject *> key_tuples, key_dicts, val_lists, val_tuples, val_dicts, dummy;
+  for (const auto& dict : dicts) {
+    PyObject *key, *value;
+    Py_ssize_t pos = 0;
+    while (PyDict_Next(dict, &pos, &key, &value)) {
+      RETURN_NOT_OK(
+          Append(key, &result.keys(), &dummy, &key_tuples, &key_dicts, tensors_out));
+      DCHECK_EQ(dummy.size(), 0);
+      RETURN_NOT_OK(Append(value, &result.vals(), &val_lists, &val_tuples, &val_dicts,
+                           tensors_out));
+    }
+  }
+  std::shared_ptr<Array> key_tuples_arr;
+  if (key_tuples.size() > 0) {
+    RETURN_NOT_OK(SerializeSequences(key_tuples, recursion_depth + 1, &key_tuples_arr,
+                                     tensors_out));
+  }
+  std::shared_ptr<Array> key_dicts_arr;
+  if (key_dicts.size() > 0) {
+    RETURN_NOT_OK(
+        SerializeDict(key_dicts, recursion_depth + 1, &key_dicts_arr, tensors_out));
+  }
+  std::shared_ptr<Array> val_list_arr;
+  if (val_lists.size() > 0) {
+    RETURN_NOT_OK(
+        SerializeSequences(val_lists, recursion_depth + 1, &val_list_arr, tensors_out));
+  }
+  std::shared_ptr<Array> val_tuples_arr;
+  if (val_tuples.size() > 0) {
+    RETURN_NOT_OK(SerializeSequences(val_tuples, recursion_depth + 1, &val_tuples_arr,
+                                     tensors_out));
+  }
+  std::shared_ptr<Array> val_dict_arr;
+  if (val_dicts.size() > 0) {
+    RETURN_NOT_OK(
+        SerializeDict(val_dicts, recursion_depth + 1, &val_dict_arr, tensors_out));
+  }
+  RETURN_NOT_OK(result.Finish(key_tuples_arr.get(), key_dicts_arr.get(),
+                              val_list_arr.get(), val_tuples_arr.get(),
+                              val_dict_arr.get(), out));
+
+  // This block is used to decrement the reference counts of the results
+  // returned by the serialization callback, which is called in SerializeArray,
+  // in DeserializeDict and in Append
+  static PyObject* py_type = PyUnicode_FromString("_pytype_");
+  for (const auto& dict : dicts) {
+    if (PyDict_Contains(dict, py_type)) {
+      // If the dictionary contains the key "_pytype_", then the user has to
+      // have registered a callback.
+      if (pyarrow_serialize_callback == nullptr) {
+        return Status::Invalid("No serialization callback set");
+      }
+      Py_XDECREF(dict);
+    }
+  }
+
+  return Status::OK();
+}
+
+std::shared_ptr<RecordBatch> MakeBatch(std::shared_ptr<Array> data) {
+  auto field = std::make_shared<Field>("list", data->type());
+  auto schema = ::arrow::schema({field});
+  return std::shared_ptr<RecordBatch>(new RecordBatch(schema, data->length(), {data}));
+}
+
+Status SerializeObject(PyObject* sequence, SerializedPyObject* out) {
+  PyAcquireGIL lock;
+  std::vector<PyObject*> sequences = {sequence};
+  std::shared_ptr<Array> array;
+  std::vector<PyObject*> py_tensors;
+  RETURN_NOT_OK(SerializeSequences(sequences, 0, &array, &py_tensors));
+  out->batch = MakeBatch(array);
+  for (const auto& py_tensor : py_tensors) {
+    std::shared_ptr<Tensor> arrow_tensor;
+    RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(), py_tensor, &arrow_tensor));
+    out->tensors.push_back(arrow_tensor);
+  }
+  return Status::OK();
+}
+
+Status WriteSerializedObject(const SerializedPyObject& obj, io::OutputStream* dst) {
+  int32_t num_tensors = static_cast<int32_t>(obj.tensors.size());
+  std::shared_ptr<ipc::RecordBatchStreamWriter> writer;
+  int32_t metadata_length;
+  int64_t body_length;
+
+  RETURN_NOT_OK(dst->Write(reinterpret_cast<uint8_t*>(&num_tensors), sizeof(int32_t)));
+  RETURN_NOT_OK(ipc::RecordBatchStreamWriter::Open(dst, obj.batch->schema(), &writer));
+  RETURN_NOT_OK(writer->WriteRecordBatch(*obj.batch));
+  RETURN_NOT_OK(writer->Close());
+
+  for (const auto& tensor : obj.tensors) {
+    RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length));
+  }
+
+  return Status::OK();
+}
+
+}  // namespace py
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/cpp/src/arrow/python/python_to_arrow.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/python_to_arrow.h b/cpp/src/arrow/python/python_to_arrow.h
new file mode 100644
index 0000000..8ac0396
--- /dev/null
+++ b/cpp/src/arrow/python/python_to_arrow.h
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_PYTHON_PYTHON_TO_ARROW_H
+#define ARROW_PYTHON_PYTHON_TO_ARROW_H
+
+#include "arrow/python/platform.h"
+
+#include <memory>
+#include <vector>
+
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class RecordBatch;
+class Tensor;
+
+namespace io {
+
+class OutputStream;
+
+}  // namespace io
+
+namespace py {
+
+struct ARROW_EXPORT SerializedPyObject {
+  std::shared_ptr<RecordBatch> batch;
+  std::vector<std::shared_ptr<Tensor>> tensors;
+};
+
+/// \brief Register callback functions to perform conversions to or from other
+/// Python representations en route to/from deserialization
+///
+/// \param[in] serialize_callback a Python callable
+/// \param[in] deserialize_callback a Python callable
+///
+/// Analogous to Python custom picklers / unpicklers
+ARROW_EXPORT
+void set_serialization_callbacks(PyObject* serialize_callback,
+                                 PyObject* deserialize_callback);
+
+/// \brief Serialize Python sequence as a RecordBatch plus
+/// \param[in] sequence a Python sequence object to serialize to Arrow data
+/// structures
+/// \param[out] out the serialized representation
+/// \return Status
+///
+/// Release GIL before calling
+ARROW_EXPORT
+Status SerializeObject(PyObject* sequence, SerializedPyObject* out);
+
+/// \brief Write serialized Python object to OutputStream
+/// \param[in] object a serialized Python object to write out
+/// \param[out] dst an OutputStream
+/// \return Status
+ARROW_EXPORT
+Status WriteSerializedObject(const SerializedPyObject& object, io::OutputStream* dst);
+
+}  // namespace py
+}  // namespace arrow
+
+#endif  // ARROW_PYTHON_PYTHON_TO_ARROW_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/python/doc/source/api.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index 1aaf89c..846af4c 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -181,8 +181,8 @@ File Systems
 
 .. _api.ipc:
 
-Interprocess Communication and Messaging
-----------------------------------------
+Interprocess Communication and Serialization
+--------------------------------------------
 
 .. autosummary::
    :toctree: generated/
@@ -201,6 +201,12 @@ Interprocess Communication and Messaging
    read_tensor
    write_tensor
    get_tensor_size
+   serialize
+   serialize_to
+   deserialize
+   deserialize_from
+   read_serialized
+   SerializedPyObject
 
 .. _api.memory_pool:
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index d0348b4..4abf29e 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -87,6 +87,10 @@ from pyarrow.lib import (ArrowException,
                          ArrowNotImplementedError,
                          ArrowTypeError)
 
+# Serialization
+from pyarrow.lib import (deserialize_from, deserialize,
+                         serialize, serialize_to, read_serialized,
+                         SerializedPyObject)
 
 from pyarrow.filesystem import FileSystem, LocalFileSystem
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/python/pyarrow/compat.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/compat.py b/python/pyarrow/compat.py
index 2252e85..df5e4fa 100644
--- a/python/pyarrow/compat.py
+++ b/python/pyarrow/compat.py
@@ -129,6 +129,13 @@ else:
     def frombytes(o):
         return o.decode('utf8')
 
+try:
+    import cloudpickle as pickle
+except ImportError:
+    try:
+        import cPickle as pickle
+    except ImportError:
+        import pickle
 
 def encode_file_path(path):
     import os

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index c6a9d9d..082fb61 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -778,6 +778,26 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
     cdef struct PandasOptions:
         c_bool strings_to_categorical
 
+cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil:
+
+    cdef cppclass CSerializedPyObject" arrow::py::SerializedPyObject":
+        shared_ptr[CRecordBatch] batch
+        vector[shared_ptr[CTensor]] tensors
+
+    CStatus SerializeObject(object sequence, CSerializedPyObject* out)
+
+    CStatus WriteSerializedObject(const CSerializedPyObject& obj,
+                                  OutputStream* dst)
+
+    CStatus DeserializeObject(const CSerializedPyObject& obj,
+                              PyObject* base, PyObject** out)
+
+    CStatus ReadSerializedObject(shared_ptr[RandomAccessFile] src,
+                                 CSerializedPyObject* out)
+
+    void set_serialization_callbacks(object serialize_callback,
+                                     object deserialize_callback)
+
 
 cdef extern from 'arrow/python/init.h':
     int arrow_init_numpy() except -1

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 061a7a9..b5858ab 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -749,7 +749,7 @@ cdef get_writer(object source, shared_ptr[OutputStream]* writer):
     if isinstance(source, NativeFile):
         nf = source
 
-        if nf.is_readable:
+        if not nf.is_writeable:
             raise IOError('Native file is not writeable')
 
         nf.write_handle(writer)

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/python/pyarrow/lib.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx
index 789801b..4ea327e 100644
--- a/python/pyarrow/lib.pyx
+++ b/python/pyarrow/lib.pyx
@@ -119,5 +119,8 @@ include "ipc.pxi"
 # Feather format
 include "feather.pxi"
 
+# Python serialization
+include "serialization.pxi"
+
 # Public API
 include "public-api.pxi"

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/python/pyarrow/serialization.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi
new file mode 100644
index 0000000..a6c955b
--- /dev/null
+++ b/python/pyarrow/serialization.pxi
@@ -0,0 +1,279 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from cpython.ref cimport PyObject
+
+from pyarrow.compat import pickle
+
+
+def is_named_tuple(cls):
+    """Return True if cls is a namedtuple and False otherwise."""
+    b = cls.__bases__
+    if len(b) != 1 or b[0] != tuple:
+        return False
+    f = getattr(cls, "_fields", None)
+    if not isinstance(f, tuple):
+        return False
+    return all(type(n) == str for n in f)
+
+
+class SerializationException(Exception):
+    def __init__(self, message, example_object):
+        Exception.__init__(self, message)
+        self.example_object = example_object
+
+
+class DeserializationException(Exception):
+    def __init__(self, message, type_id):
+        Exception.__init__(self, message)
+        self.type_id = type_id
+
+
+# Types with special serialization handlers
+type_to_type_id = dict()
+whitelisted_types = dict()
+types_to_pickle = set()
+custom_serializers = dict()
+custom_deserializers = dict()
+
+
+def register_type(type, type_id, pickle=False,
+                  custom_serializer=None, custom_deserializer=None):
+    """Add type to the list of types we can serialize.
+
+    Parameters
+    ----------
+    type :type
+        The type that we can serialize.
+    type_id : bytes
+        A string of bytes used to identify the type.
+    pickle : bool
+        True if the serialization should be done with pickle.
+        False if it should be done efficiently with Arrow.
+    custom_serializer : callable
+        This argument is optional, but can be provided to
+        serialize objects of the class in a particular way.
+    custom_deserializer : callable
+        This argument is optional, but can be provided to
+        deserialize objects of the class in a particular way.
+    """
+    type_to_type_id[type] = type_id
+    whitelisted_types[type_id] = type
+    if pickle:
+        types_to_pickle.add(type_id)
+    if custom_serializer is not None:
+        custom_serializers[type_id] = custom_serializer
+        custom_deserializers[type_id] = custom_deserializer
+
+
+def _serialization_callback(obj):
+    if type(obj) not in type_to_type_id:
+        raise SerializationException("pyarrow does not know how to "
+                                     "serialize objects of type {}."
+                                     .format(type(obj)),
+                                     obj)
+    type_id = type_to_type_id[type(obj)]
+    if type_id in types_to_pickle:
+        serialized_obj = {"data": pickle.dumps(obj), "pickle": True}
+    elif type_id in custom_serializers:
+        serialized_obj = {"data": custom_serializers[type_id](obj)}
+    else:
+        if is_named_tuple(type(obj)):
+            serialized_obj = {}
+            serialized_obj["_pa_getnewargs_"] = obj.__getnewargs__()
+        elif hasattr(obj, "__dict__"):
+            serialized_obj = obj.__dict__
+        else:
+            raise SerializationException("We do not know how to serialize "
+                                         "the object '{}'".format(obj), obj)
+    return dict(serialized_obj, **{"_pytype_": type_id})
+
+
+def _deserialization_callback(serialized_obj):
+    type_id = serialized_obj["_pytype_"]
+
+    if "pickle" in serialized_obj:
+        # The object was pickled, so unpickle it.
+        obj = pickle.loads(serialized_obj["data"])
+    else:
+        assert type_id not in types_to_pickle
+        if type_id not in whitelisted_types:
+            raise "error"
+        type = whitelisted_types[type_id]
+        if type_id in custom_deserializers:
+            obj = custom_deserializers[type_id](serialized_obj["data"])
+        else:
+            # In this case, serialized_obj should just be the __dict__ field.
+            if "_pa_getnewargs_" in serialized_obj:
+                obj = type.__new__(type, *serialized_obj["_pa_getnewargs_"])
+            else:
+                obj = type.__new__(type)
+                serialized_obj.pop("_pytype_")
+                obj.__dict__.update(serialized_obj)
+    return obj
+
+
+set_serialization_callbacks(_serialization_callback,
+                            _deserialization_callback)
+
+
+cdef class SerializedPyObject:
+    """
+    Arrow-serialized representation of Python object
+    """
+    cdef:
+        CSerializedPyObject data
+
+    cdef readonly:
+        object base
+
+    property total_bytes:
+
+        def __get__(self):
+            cdef CMockOutputStream mock_stream
+            with nogil:
+                check_status(WriteSerializedObject(self.data, &mock_stream))
+
+            return mock_stream.GetExtentBytesWritten()
+
+    def write_to(self, sink):
+        """
+        Write serialized object to a sink
+        """
+        cdef shared_ptr[OutputStream] stream
+        get_writer(sink, &stream)
+        self._write_to(stream.get())
+
+    cdef _write_to(self, OutputStream* stream):
+        with nogil:
+            check_status(WriteSerializedObject(self.data, stream))
+
+    def deserialize(self):
+        """
+        Convert back to Python object
+        """
+        cdef PyObject* result
+
+        with nogil:
+            check_status(DeserializeObject(self.data, <PyObject*> self.base,
+                                           &result))
+
+        # This is necessary to avoid a memory leak
+        return PyObject_to_object(result)
+
+    def to_buffer(self):
+        """
+        Write serialized data as Buffer
+        """
+        sink = BufferOutputStream()
+        self.write_to(sink)
+        return sink.get_result()
+
+
+def serialize(object value):
+    """EXPERIMENTAL: Serialize a Python sequence
+
+    Parameters
+    ----------
+    value: object
+        Python object for the sequence that is to be serialized.
+
+    Returns
+    -------
+    serialized : SerializedPyObject
+    """
+    cdef SerializedPyObject serialized = SerializedPyObject()
+    with nogil:
+        check_status(SerializeObject(value, &serialized.data))
+    return serialized
+
+
+def serialize_to(object value, sink):
+    """EXPERIMENTAL: Serialize a Python sequence to a file.
+
+    Parameters
+    ----------
+    value: object
+        Python object for the sequence that is to be serialized.
+    sink: NativeFile or file-like
+        File the sequence will be written to.
+    """
+    serialized = serialize(value)
+    serialized.write_to(sink)
+
+
+def read_serialized(source, base=None):
+    """EXPERIMENTAL: Read serialized Python sequence from file-like object
+
+    Parameters
+    ----------
+    source: NativeFile
+        File to read the sequence from.
+    base: object
+        This object will be the base object of all the numpy arrays
+        contained in the sequence.
+
+    Returns
+    -------
+    serialized : the serialized data
+    """
+    cdef shared_ptr[RandomAccessFile] stream
+    get_reader(source, &stream)
+
+    cdef SerializedPyObject serialized = SerializedPyObject()
+    serialized.base = base
+    with nogil:
+        check_status(ReadSerializedObject(stream, &serialized.data))
+
+    return serialized
+
+
+def deserialize_from(source, object base):
+    """EXPERIMENTAL: Deserialize a Python sequence from a file.
+
+    Parameters
+    ----------
+    source: NativeFile
+        File to read the sequence from.
+    base: object
+        This object will be the base object of all the numpy arrays
+        contained in the sequence.
+
+    Returns
+    -------
+    object
+        Python object for the deserialized sequence.
+    """
+    serialized = read_serialized(source, base=base)
+    return serialized.deserialize()
+
+
+def deserialize(obj):
+    """
+    EXPERIMENTAL: Deserialize Python object from Buffer or other Python object
+    supporting the buffer protocol
+
+    Parameters
+    ----------
+    obj : pyarrow.Buffer or Python object supporting buffer protocol
+
+    Returns
+    -------
+    deserialized : object
+    """
+    source = BufferReader(obj)
+    return deserialize_from(source, obj)

http://git-wip-us.apache.org/repos/asf/arrow/blob/b50f2351/python/pyarrow/tests/test_serialization.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py
new file mode 100644
index 0000000..f6f9840
--- /dev/null
+++ b/python/pyarrow/tests/test_serialization.py
@@ -0,0 +1,236 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import division
+
+import pytest
+
+from collections import namedtuple
+import os
+import string
+import sys
+
+import pyarrow as pa
+import numpy as np
+
+
+def assert_equal(obj1, obj2):
+    module_numpy = (type(obj1).__module__ == np.__name__ or
+                    type(obj2).__module__ == np.__name__)
+    if module_numpy:
+        empty_shape = ((hasattr(obj1, "shape") and obj1.shape == ()) or
+                       (hasattr(obj2, "shape") and obj2.shape == ()))
+        if empty_shape:
+            # This is a special case because currently np.testing.assert_equal
+            # fails because we do not properly handle different numerical
+            # types.
+            assert obj1 == obj2, ("Objects {} and {} are "
+                                  "different.".format(obj1, obj2))
+        else:
+            np.testing.assert_equal(obj1, obj2)
+    elif hasattr(obj1, "__dict__") and hasattr(obj2, "__dict__"):
+        special_keys = ["_pytype_"]
+        assert (set(list(obj1.__dict__.keys()) + special_keys) ==
+                set(list(obj2.__dict__.keys()) + special_keys)), ("Objects {} "
+                                                                  "and {} are "
+                                                                  "different."
+                                                                  .format(
+                                                                      obj1,
+                                                                      obj2))
+        for key in obj1.__dict__.keys():
+            if key not in special_keys:
+                assert_equal(obj1.__dict__[key], obj2.__dict__[key])
+    elif type(obj1) is dict or type(obj2) is dict:
+        assert_equal(obj1.keys(), obj2.keys())
+        for key in obj1.keys():
+            assert_equal(obj1[key], obj2[key])
+    elif type(obj1) is list or type(obj2) is list:
+        assert len(obj1) == len(obj2), ("Objects {} and {} are lists with "
+                                        "different lengths."
+                                        .format(obj1, obj2))
+        for i in range(len(obj1)):
+            assert_equal(obj1[i], obj2[i])
+    elif type(obj1) is tuple or type(obj2) is tuple:
+        assert len(obj1) == len(obj2), ("Objects {} and {} are tuples with "
+                                        "different lengths."
+                                        .format(obj1, obj2))
+        for i in range(len(obj1)):
+            assert_equal(obj1[i], obj2[i])
+    elif (pa.lib.is_named_tuple(type(obj1)) or
+          pa.lib.is_named_tuple(type(obj2))):
+        assert len(obj1) == len(obj2), ("Objects {} and {} are named tuples "
+                                        "with different lengths."
+                                        .format(obj1, obj2))
+        for i in range(len(obj1)):
+            assert_equal(obj1[i], obj2[i])
+    else:
+        assert obj1 == obj2, "Objects {} and {} are different.".format(obj1,
+                                                                       obj2)
+
+
+def array_custom_serializer(obj):
+    return obj.tolist(), obj.dtype.str
+
+
+def array_custom_deserializer(serialized_obj):
+    return np.array(serialized_obj[0], dtype=np.dtype(serialized_obj[1]))
+
+pa.lib.register_type(np.ndarray, 20 * b"\x00", pickle=False,
+                     custom_serializer=array_custom_serializer,
+                     custom_deserializer=array_custom_deserializer)
+
+if sys.version_info >= (3, 0):
+    long_extras = [0, np.array([["hi", u"hi"], [1.3, 1]])]
+else:
+    _LONG_ZERO, _LONG_ONE = long(0), long(1)  # noqa: E501,F821
+    long_extras = [_LONG_ZERO, np.array([["hi", u"hi"],
+                                         [1.3, _LONG_ONE]])]
+
+PRIMITIVE_OBJECTS = [
+    0, 0.0, 0.9, 1 << 62, 1 << 100, 1 << 999,
+    [1 << 100, [1 << 100]], "a", string.printable, "\u262F",
+    u"hello world", u"\xff\xfe\x9c\x001\x000\x00", None, True,
+    False, [], (), {}, np.int8(3), np.int32(4), np.int64(5),
+    np.uint8(3), np.uint32(4), np.uint64(5), np.float32(1.9),
+    np.float64(1.9), np.zeros([100, 100]),
+    np.random.normal(size=[100, 100]), np.array(["hi", 3]),
+    np.array(["hi", 3], dtype=object)] + long_extras
+
+COMPLEX_OBJECTS = [
+    [[[[[[[[[[[[]]]]]]]]]]]],
+    {"obj{}".format(i): np.random.normal(size=[100, 100]) for i in range(10)},
+    # {(): {(): {(): {(): {(): {(): {(): {(): {(): {(): {
+    #       (): {(): {}}}}}}}}}}}}},
+    ((((((((((),),),),),),),),),),
+    {"a": {"b": {"c": {"d": {}}}}}]
+
+
+class Foo(object):
+    def __init__(self, value=0):
+        self.value = value
+
+    def __hash__(self):
+        return hash(self.value)
+
+    def __eq__(self, other):
+        return other.value == self.value
+
+
+class Bar(object):
+    def __init__(self):
+        for i, val in enumerate(PRIMITIVE_OBJECTS + COMPLEX_OBJECTS):
+            setattr(self, "field{}".format(i), val)
+
+
+class Baz(object):
+    def __init__(self):
+        self.foo = Foo()
+        self.bar = Bar()
+
+    def method(self, arg):
+        pass
+
+
+class Qux(object):
+    def __init__(self):
+        self.objs = [Foo(), Bar(), Baz()]
+
+
+class SubQux(Qux):
+    def __init__(self):
+        Qux.__init__(self)
+
+
+class CustomError(Exception):
+    pass
+
+
+Point = namedtuple("Point", ["x", "y"])
+NamedTupleExample = namedtuple("Example",
+                               "field1, field2, field3, field4, field5")
+
+
+CUSTOM_OBJECTS = [Exception("Test object."), CustomError(), Point(11, y=22),
+                  Foo(), Bar(), Baz(), Qux(), SubQux(),
+                  NamedTupleExample(1, 1.0, "hi", np.zeros([3, 5]), [1, 2, 3])]
+
+pa.lib.register_type(Foo, 20 * b"\x01")
+pa.lib.register_type(Bar, 20 * b"\x02")
+pa.lib.register_type(Baz, 20 * b"\x03")
+pa.lib.register_type(Qux, 20 * b"\x04")
+pa.lib.register_type(SubQux, 20 * b"\x05")
+pa.lib.register_type(Exception, 20 * b"\x06")
+pa.lib.register_type(CustomError, 20 * b"\x07")
+pa.lib.register_type(Point, 20 * b"\x08")
+pa.lib.register_type(NamedTupleExample, 20 * b"\x09")
+
+# TODO(pcm): This is currently a workaround until arrow supports
+# arbitrary precision integers. This is only called on long integers,
+# see the associated case in the append method in python_to_arrow.cc
+pa.lib.register_type(int, 20 * b"\x10", pickle=False,
+                     custom_serializer=lambda obj: str(obj),
+                     custom_deserializer=(
+                         lambda serialized_obj: int(serialized_obj)))
+
+
+if (sys.version_info < (3, 0)):
+    deserializer = (
+        lambda serialized_obj: long(serialized_obj))  # noqa: E501,F821
+    pa.lib.register_type(long, 20 * b"\x11", pickle=False,  # noqa: E501,F821
+                         custom_serializer=lambda obj: str(obj),
+                         custom_deserializer=deserializer)
+
+
+def serialization_roundtrip(value, f):
+    f.seek(0)
+    pa.serialize_to(value, f)
+    f.seek(0)
+    result = pa.deserialize_from(f, None)
+    assert_equal(value, result)
+
+
+@pytest.yield_fixture(scope='session')
+def large_memory_map(tmpdir_factory):
+    path = (tmpdir_factory.mktemp('data')
+            .join('pyarrow-serialization-tmp-file').strpath)
+
+    # Create a large memory mapped file
+    SIZE = 100 * 1024 * 1024  # 100 MB
+    with open(path, 'wb') as f:
+        f.write(np.random.randint(0, 256, size=SIZE)
+                .astype('u1')
+                .tobytes()
+                [:SIZE])
+    return path
+
+
+def test_primitive_serialization(large_memory_map):
+    with pa.memory_map(large_memory_map, mode="r+") as mmap:
+        for obj in PRIMITIVE_OBJECTS:
+            serialization_roundtrip([obj], mmap)
+
+
+def test_complex_serialization(large_memory_map):
+    with pa.memory_map(large_memory_map, mode="r+") as mmap:
+        for obj in COMPLEX_OBJECTS:
+            serialization_roundtrip([obj], mmap)
+
+
+def test_custom_serialization(large_memory_map):
+    with pa.memory_map(large_memory_map, mode="r+") as mmap:
+        for obj in CUSTOM_OBJECTS:
+            serialization_roundtrip([obj], mmap)


Mime
View raw message