arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [1/3] arrow git commit: ARROW-1408: [C++] IPC public API cleanup, refactoring. Add SerializeSchema, ReadSchema public APIs
Date Fri, 25 Aug 2017 03:05:10 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 750b77dc6 -> f50f2eacb


http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/python/arrow_to_python.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/arrow_to_python.h b/cpp/src/arrow/python/arrow_to_python.h
index 559ce18..e187d59 100644
--- a/cpp/src/arrow/python/arrow_to_python.h
+++ b/cpp/src/arrow/python/arrow_to_python.h
@@ -46,8 +46,7 @@ namespace py {
 /// \param[out] out the reconstructed data
 /// \return Status
 ARROW_EXPORT
-Status ReadSerializedObject(std::shared_ptr<io::RandomAccessFile> src,
-                            SerializedPyObject* out);
+Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out);
 
 /// \brief Reconstruct Python object from Arrow-serialized representation
 /// \param[in] object

http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/python/python_to_arrow.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc
index a1ccd99..e00f194 100644
--- a/cpp/src/arrow/python/python_to_arrow.cc
+++ b/cpp/src/arrow/python/python_to_arrow.cc
@@ -53,18 +53,18 @@ namespace py {
 /// scalar Python types, lists, tuples, dictionaries and tensors.
 class SequenceBuilder {
  public:
-  explicit SequenceBuilder(MemoryPool* pool = nullptr)
+  explicit SequenceBuilder(MemoryPool* pool ARROW_MEMORY_POOL_DEFAULT)
       : pool_(pool),
-        types_(pool, ::arrow::int8()),
-        offsets_(pool, ::arrow::int32()),
+        types_(::arrow::int8(), pool),
+        offsets_(::arrow::int32(), pool),
         nones_(pool),
-        bools_(pool, ::arrow::boolean()),
-        ints_(pool, ::arrow::int64()),
-        bytes_(pool, ::arrow::binary()),
+        bools_(::arrow::boolean(), pool),
+        ints_(::arrow::int64(), pool),
+        bytes_(::arrow::binary(), pool),
         strings_(pool),
-        floats_(pool, ::arrow::float32()),
-        doubles_(pool, ::arrow::float64()),
-        tensor_indices_(pool, ::arrow::int32()),
+        floats_(::arrow::float32(), pool),
+        doubles_(::arrow::float64(), pool),
+        tensor_indices_(::arrow::int32(), pool),
         list_offsets_({0}),
         tuple_offsets_({0}),
         dict_offsets_({0}) {}
@@ -184,7 +184,7 @@ class SequenceBuilder {
     if (data != nullptr) {
       DCHECK(data->length() == offsets.back());
       std::shared_ptr<Array> offset_array;
-      Int32Builder builder(pool_, std::make_shared<Int32Type>());
+      Int32Builder builder(::arrow::int32(), pool_);
       RETURN_NOT_OK(builder.Append(offsets.data(), offsets.size()));
       RETURN_NOT_OK(builder.Finish(&offset_array));
       std::shared_ptr<Array> list_array;

http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 82cd137..b7963b8 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -313,6 +313,8 @@ Status Schema::AddMetadata(const std::shared_ptr<const KeyValueMetadata>&
metada
   return Status::OK();
 }
 
+std::shared_ptr<const KeyValueMetadata> Schema::metadata() const { return metadata_;
}
+
 std::shared_ptr<Schema> Schema::RemoveMetadata() const {
   return std::make_shared<Schema>(fields_);
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 30cd71e..4cd17bc 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -730,7 +730,7 @@ class ARROW_EXPORT Schema {
   /// \brief The custom key-value metadata, if any
   ///
   /// \return metadata may be nullptr
-  std::shared_ptr<const KeyValueMetadata> metadata() const { return metadata_; }
+  std::shared_ptr<const KeyValueMetadata> metadata() const;
 
   /// \brief Render a string representation of the schema suitable for debugging
   std::string ToString() const;

http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/cpp/src/arrow/util/key_value_metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/key_value_metadata.cc b/cpp/src/arrow/util/key_value_metadata.cc
index 6877a6a..0497f65 100644
--- a/cpp/src/arrow/util/key_value_metadata.cc
+++ b/cpp/src/arrow/util/key_value_metadata.cc
@@ -46,7 +46,9 @@ KeyValueMetadata::KeyValueMetadata() : keys_(), values_() {}
 
 KeyValueMetadata::KeyValueMetadata(
     const std::unordered_map<std::string, std::string>& map)
-    : keys_(UnorderedMapKeys(map)), values_(UnorderedMapValues(map)) {}
+    : keys_(UnorderedMapKeys(map)), values_(UnorderedMapValues(map)) {
+  DCHECK_EQ(keys_.size(), values_.size());
+}
 
 KeyValueMetadata::KeyValueMetadata(const std::vector<std::string>& keys,
                                    const std::vector<std::string>& values)

http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index d00286d..68ae017 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -100,8 +100,8 @@ import pyarrow.hdfs as hdfs
 from pyarrow.ipc import (Message, MessageReader,
                          RecordBatchFileReader, RecordBatchFileWriter,
                          RecordBatchStreamReader, RecordBatchStreamWriter,
-                         read_message, read_record_batch, read_tensor,
-                         write_tensor,
+                         read_message, read_record_batch, read_schema,
+                         read_tensor, write_tensor,
                          get_record_batch_size, get_tensor_size,
                          open_stream,
                          open_file,

http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 082fb61..fcf27da 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -632,8 +632,8 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
     cdef cppclass CRecordBatchStreamReader \
             " arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader):
         @staticmethod
-        CStatus Open(const shared_ptr[InputStream]& stream,
-                     shared_ptr[CRecordBatchStreamReader]* out)
+        CStatus Open(const InputStream* stream,
+                     shared_ptr[CRecordBatchReader]* out)
 
         @staticmethod
         CStatus Open2" Open"(unique_ptr[CMessageReader] message_reader,
@@ -643,22 +643,22 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
             " arrow::ipc::RecordBatchStreamWriter"(CRecordBatchWriter):
         @staticmethod
         CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
-                     shared_ptr[CRecordBatchStreamWriter]* out)
+                     shared_ptr[CRecordBatchWriter]* out)
 
     cdef cppclass CRecordBatchFileWriter \
             " arrow::ipc::RecordBatchFileWriter"(CRecordBatchWriter):
         @staticmethod
         CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
-                     shared_ptr[CRecordBatchFileWriter]* out)
+                     shared_ptr[CRecordBatchWriter]* out)
 
     cdef cppclass CRecordBatchFileReader \
             " arrow::ipc::RecordBatchFileReader":
         @staticmethod
-        CStatus Open(const shared_ptr[RandomAccessFile]& file,
+        CStatus Open(RandomAccessFile* file,
                      shared_ptr[CRecordBatchFileReader]* out)
 
         @staticmethod
-        CStatus Open2" Open"(const shared_ptr[RandomAccessFile]& file,
+        CStatus Open2" Open"(RandomAccessFile* file,
                              int64_t footer_offset,
                              shared_ptr[CRecordBatchFileReader]* out)
 
@@ -684,6 +684,19 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
                             const shared_ptr[CSchema]& schema,
                             shared_ptr[CRecordBatch]* out)
 
+    CStatus SerializeSchema(const CSchema& schema, CMemoryPool* pool,
+                            shared_ptr[CBuffer]* out)
+
+    CStatus SerializeRecordBatch(const CRecordBatch& schema,
+                                 CMemoryPool* pool,
+                                 shared_ptr[CBuffer]* out)
+
+    CStatus ReadSchema(InputStream* stream, shared_ptr[CSchema]* out)
+
+    CStatus ReadRecordBatch(const shared_ptr[CSchema]& schema,
+                            InputStream* stream,
+                            shared_ptr[CRecordBatch]* out)
+
 
 cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil:
 
@@ -792,7 +805,7 @@ cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil:
     CStatus DeserializeObject(const CSerializedPyObject& obj,
                               PyObject* base, PyObject** out)
 
-    CStatus ReadSerializedObject(shared_ptr[RandomAccessFile] src,
+    CStatus ReadSerializedObject(RandomAccessFile* src,
                                  CSerializedPyObject* out)
 
     void set_serialization_callbacks(object serialize_callback,

http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/ipc.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index ceed4b0..027a00d 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -70,7 +70,7 @@ cdef class Message:
 
     def serialize(self, memory_pool=None):
         """
-        Write message to Buffer with length-prefixed metadata, then body
+        Write message as encapsulated IPC message
 
         Parameters
         ----------
@@ -166,18 +166,13 @@ cdef class _RecordBatchWriter:
         pass
 
     def _open(self, sink, Schema schema):
-        cdef:
-            shared_ptr[CRecordBatchStreamWriter] writer
-
         get_writer(sink, &self.sink)
 
         with nogil:
             check_status(
                 CRecordBatchStreamWriter.Open(self.sink.get(),
                                               schema.sp_schema,
-                                              &writer))
-
-        self.writer = <shared_ptr[CRecordBatchWriter]> writer
+                                              &self.writer))
         self.closed = False
 
     def write_batch(self, RecordBatch batch):
@@ -215,6 +210,7 @@ cdef get_input_stream(object source, shared_ptr[InputStream]* out):
 cdef class _RecordBatchReader:
     cdef:
         shared_ptr[CRecordBatchReader] reader
+        shared_ptr[InputStream] in_stream
 
     cdef readonly:
         Schema schema
@@ -223,16 +219,11 @@ cdef class _RecordBatchReader:
         pass
 
     def _open(self, source):
-        cdef:
-            shared_ptr[InputStream] in_stream
-            shared_ptr[CRecordBatchStreamReader] reader
-
-        get_input_stream(source, &in_stream)
-
+        get_input_stream(source, &self.in_stream)
         with nogil:
-            check_status(CRecordBatchStreamReader.Open(in_stream, &reader))
+            check_status(CRecordBatchStreamReader.Open(
+                self.in_stream.get(), &self.reader))
 
-        self.reader = <shared_ptr[CRecordBatchReader]> reader
         self.schema = Schema()
         self.schema.init_schema(self.reader.get().schema())
 
@@ -285,22 +276,20 @@ cdef class _RecordBatchReader:
 cdef class _RecordBatchFileWriter(_RecordBatchWriter):
 
     def _open(self, sink, Schema schema):
-        cdef shared_ptr[CRecordBatchFileWriter] writer
         get_writer(sink, &self.sink)
 
         with nogil:
             check_status(
                 CRecordBatchFileWriter.Open(self.sink.get(), schema.sp_schema,
-                                            &writer))
+                                            &self.writer))
 
-        # Cast to base class, because has same interface
-        self.writer = <shared_ptr[CRecordBatchWriter]> writer
         self.closed = False
 
 
 cdef class _RecordBatchFileReader:
     cdef:
         shared_ptr[CRecordBatchFileReader] reader
+        shared_ptr[RandomAccessFile] file
 
     cdef readonly:
         Schema schema
@@ -309,8 +298,7 @@ cdef class _RecordBatchFileReader:
         pass
 
     def _open(self, source, footer_offset=None):
-        cdef shared_ptr[RandomAccessFile] reader
-        get_reader(source, &reader)
+        get_reader(source, &self.file)
 
         cdef int64_t offset = 0
         if footer_offset is not None:
@@ -318,10 +306,12 @@ cdef class _RecordBatchFileReader:
 
         with nogil:
             if offset != 0:
-                check_status(CRecordBatchFileReader.Open2(
-                    reader, offset, &self.reader))
+                check_status(
+                    CRecordBatchFileReader.Open2(self.file.get(), offset,
+                                                 &self.reader))
             else:
-                check_status(CRecordBatchFileReader.Open(reader, &self.reader))
+                check_status(
+                    CRecordBatchFileReader.Open(self.file.get(), &self.reader))
 
         self.schema = pyarrow_wrap_schema(self.reader.get().schema())
 
@@ -476,24 +466,57 @@ def read_message(source):
     return result
 
 
-def read_record_batch(Message batch_message, Schema schema):
+def read_schema(obj):
+    """
+    Read Schema from message or buffer
+
+    Parameters
+    ----------
+    obj : buffer or Message
+
+    Returns
+    -------
+    schema : Schema
+    """
+    cdef:
+        shared_ptr[CSchema] result
+        shared_ptr[RandomAccessFile] cpp_file
+
+    if isinstance(obj, Message):
+        raise NotImplementedError(type(obj))
+
+    get_reader(obj, &cpp_file)
+
+    with nogil:
+        check_status(ReadSchema(cpp_file.get(), &result))
+
+    return pyarrow_wrap_schema(result)
+
+
+def read_record_batch(obj, Schema schema):
     """
     Read RecordBatch from message, given a known schema
 
     Parameters
     ----------
-    batch_message : Message
-        Such as that obtained from read_message
+    obj : Message or Buffer-like
     schema : Schema
 
     Returns
     -------
     batch : RecordBatch
     """
-    cdef shared_ptr[CRecordBatch] result
+    cdef:
+        shared_ptr[CRecordBatch] result
+        Message message
+
+    if isinstance(obj, Message):
+        message = obj
+    else:
+        message = read_message(obj)
 
     with nogil:
-        check_status(ReadRecordBatch(deref(batch_message.message.get()),
+        check_status(ReadRecordBatch(deref(message.message.get()),
                                      schema.sp_schema, &result))
 
     return pyarrow_wrap_batch(result)

http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py
index d527722..6eb4979 100644
--- a/python/pyarrow/ipc.py
+++ b/python/pyarrow/ipc.py
@@ -20,7 +20,7 @@
 import pyarrow as pa
 
 from pyarrow.lib import (Message, MessageReader,  # noqa
-                         read_message, read_record_batch,
+                         read_message, read_record_batch, read_schema,
                          read_tensor, write_tensor,
                          get_record_batch_size, get_tensor_size)
 import pyarrow.lib as lib

http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/serialization.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi
index a6c955b..3ee34ee 100644
--- a/python/pyarrow/serialization.pxi
+++ b/python/pyarrow/serialization.pxi
@@ -237,7 +237,7 @@ def read_serialized(source, base=None):
     cdef SerializedPyObject serialized = SerializedPyObject()
     serialized.base = base
     with nogil:
-        check_status(ReadSerializedObject(stream, &serialized.data))
+        check_status(ReadSerializedObject(stream.get(), &serialized.data))
 
     return serialized
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/table.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 976f429..dd3359e 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -453,6 +453,28 @@ cdef class RecordBatch:
         else:
             return self.column(key)
 
+    def serialize(self, memory_pool=None):
+        """
+        Write RecordBatch to Buffer as encapsulated IPC message
+
+        Parameters
+        ----------
+        memory_pool : MemoryPool, default None
+            Uses default memory pool if not specified
+
+        Returns
+        -------
+        serialized : Buffer
+        """
+        cdef:
+            shared_ptr[CBuffer] buffer
+            CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
+
+        with nogil:
+            check_status(SerializeRecordBatch(deref(self.batch),
+                                              pool, &buffer))
+        return pyarrow_wrap_buffer(buffer)
+
     def slice(self, offset=0, length=None):
         """
         Compute zero-copy slice of this RecordBatch

http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
index 120a982..ecdbe62 100644
--- a/python/pyarrow/tests/test_ipc.py
+++ b/python/pyarrow/tests/test_ipc.py
@@ -384,6 +384,21 @@ def test_pandas_serialize_round_trip_not_string_columns():
     assert_frame_equal(result, df)
 
 
+def test_schema_batch_serialize_methods():
+    nrows = 5
+    df = pd.DataFrame({
+        'one': np.random.randn(nrows),
+        'two': ['foo', np.nan, 'bar', 'bazbaz', 'qux']})
+    batch = pa.RecordBatch.from_pandas(df)
+
+    s_schema = batch.schema.serialize()
+    s_batch = batch.serialize()
+
+    recons_schema = pa.read_schema(s_schema)
+    recons_batch = pa.read_record_batch(s_batch, recons_schema)
+    assert recons_batch.equals(batch)
+
+
 def write_file(batch, sink):
     writer = pa.RecordBatchFileWriter(sink, batch.schema)
     writer.write_batch(batch)

http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index 28b98f0..424e518 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -67,6 +67,8 @@ def test_recordbatch_basics():
 
     batch = pa.RecordBatch.from_arrays(data, ['c0', 'c1'])
 
+    batch.schema.metadata
+
     assert len(batch) == 5
     assert batch.num_rows == 5
     assert batch.num_columns == len(data)
@@ -80,6 +82,16 @@ def test_recordbatch_basics():
         batch[2]
 
 
+def test_recordbatch_empty_metadata():
+    data = [
+        pa.array(range(5)),
+        pa.array([-10, -5, 0, 5, 10])
+    ]
+
+    batch = pa.RecordBatch.from_arrays(data, ['c0', 'c1'])
+    assert batch.schema.metadata is None
+
+
 def test_recordbatch_slice_getitem():
     data = [
         pa.array(range(5)),

http://git-wip-us.apache.org/repos/asf/arrow/blob/f50f2eac/python/pyarrow/types.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index 592db4f..30c3aa6 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -235,7 +235,9 @@ cdef class Field:
 
         def __get__(self):
             self._check_null()
-            return box_metadata(self.field.metadata().get())
+            cdef shared_ptr[const CKeyValueMetadata] metadata = (
+                self.field.metadata())
+            return box_metadata(metadata.get())
 
     def _check_null(self):
         if self.field == NULL:
@@ -306,6 +308,11 @@ cdef class Schema:
 
         return result
 
+    def _check_null(self):
+        if self.schema == NULL:
+            raise ReferenceError(
+                'Schema not initialized (references NULL pointer)')
+
     cdef void init(self, const vector[shared_ptr[CField]]& fields):
         self.schema = new CSchema(fields)
         self.sp_schema.reset(self.schema)
@@ -327,7 +334,10 @@ cdef class Schema:
     property metadata:
 
         def __get__(self):
-            return box_metadata(self.schema.metadata().get())
+            self._check_null()
+            cdef shared_ptr[const CKeyValueMetadata] metadata = (
+                self.schema.metadata())
+            return box_metadata(metadata.get())
 
     def equals(self, other):
         """
@@ -377,6 +387,28 @@ cdef class Schema:
 
         return pyarrow_wrap_schema(new_schema)
 
+    def serialize(self, memory_pool=None):
+        """
+        Write Schema to Buffer as encapsulated IPC message
+
+        Parameters
+        ----------
+        memory_pool : MemoryPool, default None
+            Uses default memory pool if not specified
+
+        Returns
+        -------
+        serialized : Buffer
+        """
+        cdef:
+            shared_ptr[CBuffer] buffer
+            CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
+
+        with nogil:
+            check_status(SerializeSchema(deref(self.schema),
+                                         pool, &buffer))
+        return pyarrow_wrap_buffer(buffer)
+
     def remove_metadata(self):
         """
         Create new schema without metadata, if any


Mime
View raw message