arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject arrow git commit: ARROW-837: [Python] Add public pyarrow.allocate_buffer API. Rename FixedSizeBufferOutputStream
Date Wed, 30 Aug 2017 07:38:34 GMT
Repository: arrow
Updated Branches:
  refs/heads/master af7829309 -> 7510ae6ef


ARROW-837: [Python] Add public pyarrow.allocate_buffer API. Rename FixedSizeBufferOutputStream

We may want to refactor to support random writes in `NativeFile`, so I've renamed accordingly

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #1014 from wesm/ARROW-837 and squashes the following commits:

0fe22b1 [Wes McKinney] Restore readable assertion check
0d5ebe4 [Wes McKinney] Add public pyarrow.allocate_buffer API. Rename FixedSizeBufferOutputStream
to FixedSizeBufferWriter for when we can support seeking at some point


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/7510ae6e
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/7510ae6e
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/7510ae6e

Branch: refs/heads/master
Commit: 7510ae6ef51b0b27688028b9719e612971375e46
Parents: af78293
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Wed Aug 30 09:38:29 2017 +0200
Committer: Uwe L. Korn <uwelk@xhochy.com>
Committed: Wed Aug 30 09:38:29 2017 +0200

----------------------------------------------------------------------
 python/doc/source/api.rst                 |  1 +
 python/doc/source/plasma.rst              | 10 ++++----
 python/examples/plasma/sorting/sort_df.py |  4 ++--
 python/pyarrow/__init__.py                |  9 +++++--
 python/pyarrow/includes/libarrow.pxd      |  6 +++++
 python/pyarrow/io.pxi                     | 33 +++++++++++++++++++++++---
 python/pyarrow/plasma.pyx                 |  2 +-
 python/pyarrow/tests/test_io.py           | 12 ++++++++++
 python/pyarrow/tests/test_plasma.py       |  4 ++--
 9 files changed, 66 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/7510ae6e/python/doc/source/api.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index 4761c7f..473b16d 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -158,6 +158,7 @@ Input / Output and Shared Memory
 .. autosummary::
    :toctree: generated/
 
+   allocate_buffer
    Buffer
    BufferReader
    BufferOutputStream

http://git-wip-us.apache.org/repos/asf/arrow/blob/7510ae6e/python/doc/source/plasma.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/plasma.rst b/python/doc/source/plasma.rst
index 9a5a74b..74837b9 100644
--- a/python/doc/source/plasma.rst
+++ b/python/doc/source/plasma.rst
@@ -235,14 +235,14 @@ API such as ``pyarrow.get_tensor_size``.
   buf = client.create(object_id, data_size)
 
 To write the Arrow ``Tensor`` object into the buffer, you can use Plasma to
-convert the ``memoryview`` buffer into a ``pyarrow.FixedSizeBufferOutputStream``
-object. A ``pyarrow.FixedSizeBufferOutputStream`` is a format suitable for Arrow's
+convert the ``memoryview`` buffer into a ``pyarrow.FixedSizeBufferWriter``
+object. A ``pyarrow.FixedSizeBufferWriter`` is a format suitable for Arrow's
 ``pyarrow.write_tensor``:
 
 .. code-block:: python
 
   # Write the tensor into the Plasma-allocated buffer
-  stream = pa.FixedSizeBufferOutputStream(buf)
+  stream = pa.FixedSizeBufferWriter(buf)
   pa.write_tensor(tensor, stream)  # Writes tensor's 552 bytes to Plasma stream
 
 To finish storing the Arrow object in Plasma, call ``seal``:
@@ -328,7 +328,7 @@ The DataFrame can now be written to the buffer as follows.
 .. code-block:: python
 
   # Write the PyArrow RecordBatch to Plasma
-  stream = pa.FixedSizeBufferOutputStream(buf)
+  stream = pa.FixedSizeBufferWriter(buf)
   stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
   stream_writer.write_batch(record_batch)
   stream_writer.close()
@@ -414,7 +414,7 @@ You can test this with the following script:
   object_id = plasma.ObjectID(np.random.bytes(20))
   buf = client.create(object_id, pa.get_tensor_size(tensor))
 
-  stream = pa.FixedSizeBufferOutputStream(buf)
+  stream = pa.FixedSizeBufferWriter(buf)
   stream.set_memcopy_threads(4)
   a = time.time()
   pa.write_tensor(tensor, stream)

http://git-wip-us.apache.org/repos/asf/arrow/blob/7510ae6e/python/examples/plasma/sorting/sort_df.py
----------------------------------------------------------------------
diff --git a/python/examples/plasma/sorting/sort_df.py b/python/examples/plasma/sorting/sort_df.py
index 0181ed7..0805592 100644
--- a/python/examples/plasma/sorting/sort_df.py
+++ b/python/examples/plasma/sorting/sort_df.py
@@ -39,7 +39,7 @@ import multimerge
 #     num_cols = 1
 
 client = None
-object_store_size = 2 * 10 ** 9 # 2 GB
+object_store_size = 2 * 10 ** 9  # 2 GB
 num_cores = 8
 num_rows = 200000
 num_cols = 2
@@ -69,7 +69,7 @@ def put_df(df):
     buf = client.create(object_id, data_size)
 
     # Write the serialized DataFrame to the object store
-    sink = pa.FixedSizeBufferOutputStream(buf)
+    sink = pa.FixedSizeBufferWriter(buf)
     stream_writer = pa.RecordBatchStreamWriter(sink, record_batch.schema)
     stream_writer.write_batch(record_batch)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/7510ae6e/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index ecdcfcd..f97d356 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -68,10 +68,10 @@ from pyarrow.lib import (null, bool_,
                          Date32Value, Date64Value, TimestampValue)
 
 from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
-                         FixedSizeBufferOutputStream,
+                         FixedSizeBufferWriter,
                          Buffer, BufferReader, BufferOutputStream,
                          OSFile, MemoryMappedFile, memory_map,
-                         frombuffer,
+                         allocate_buffer, frombuffer,
                          memory_map, create_memory_map,
                          have_libhdfs, have_libhdfs3, MockOutputStream)
 
@@ -134,6 +134,11 @@ def _plasma_store_entry_point():
 
 from pyarrow.util import _deprecate_class
 
+FixedSizeBufferOutputStream = (
+    _deprecate_class('FixedSizeBufferOutputStream',
+                     'FixedSizeBufferWriter',
+                     FixedSizeBufferWriter, '0.7.0'))
+
 # Backwards compatibility with pyarrow < 0.6.0
 HdfsClient = _deprecate_class('HdfsClient', 'pyarrow.hdfs.connect',
                               hdfs.connect, '0.6.0')

http://git-wip-us.apache.org/repos/asf/arrow/blob/7510ae6e/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 98eda8b..e032448 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -160,6 +160,12 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         CMutableBuffer(const uint8_t* data, int64_t size)
         uint8_t* mutable_data()
 
+    CStatus AllocateBuffer(CMemoryPool* pool, const int64_t size,
+                           shared_ptr[CBuffer]* out)
+
+    CStatus AllocateResizableBuffer(CMemoryPool* pool, const int64_t size,
+                                    shared_ptr[ResizableBuffer]* out)
+
     cdef cppclass ResizableBuffer(CBuffer):
         CStatus Resize(int64_t nbytes)
         CStatus Reserve(int64_t nbytes)

http://git-wip-us.apache.org/repos/asf/arrow/blob/7510ae6e/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index b5858ab..495e31b 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -151,6 +151,7 @@ cdef class NativeFile:
         """
         cdef int64_t offset
         self._assert_readable()
+
         with nogil:
             if whence == 0:
                 offset = position
@@ -534,7 +535,7 @@ cdef class OSFile(NativeFile):
         self.wr_file = <shared_ptr[OutputStream]> handle
 
 
-cdef class FixedSizeBufferOutputStream(NativeFile):
+cdef class FixedSizeBufferWriter(NativeFile):
 
     def __cinit__(self, Buffer buffer):
         self.wr_file.reset(new CFixedSizeBufferWriter(buffer.buffer))
@@ -580,6 +581,11 @@ cdef class Buffer:
         def __get__(self):
             return self.buffer.get().size()
 
+    property is_mutable:
+
+        def __get__(self):
+            return self.buffer.get().is_mutable()
+
     property parent:
 
         def __get__(self):
@@ -638,19 +644,40 @@ cdef class Buffer:
         return self.size
 
 
-cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool):
+cdef shared_ptr[PoolBuffer] _allocate_buffer(CMemoryPool* pool):
     cdef shared_ptr[PoolBuffer] result
     result.reset(new PoolBuffer(pool))
     return result
 
 
+def allocate_buffer(int64_t size, MemoryPool pool=None):
+    """
+    Allocate mutable fixed-size buffer
+
+    Parameters
+    ----------
+    size : int
+        Number of bytes to allocate (plus internal padding)
+    pool : MemoryPool, optional
+        Uses default memory pool if not provided
+    """
+    cdef:
+        shared_ptr[CBuffer] buffer
+        CMemoryPool* cpool = maybe_unbox_memory_pool(pool)
+
+    with nogil:
+        check_status(AllocateBuffer(cpool, size, &buffer))
+
+    return pyarrow_wrap_buffer(buffer)
+
+
 cdef class BufferOutputStream(NativeFile):
 
     cdef:
         shared_ptr[PoolBuffer] buffer
 
     def __cinit__(self, MemoryPool memory_pool=None):
-        self.buffer = allocate_buffer(maybe_unbox_memory_pool(memory_pool))
+        self.buffer = _allocate_buffer(maybe_unbox_memory_pool(memory_pool))
         self.wr_file.reset(new CBufferOutputStream(
             <shared_ptr[ResizableBuffer]> self.buffer))
         self.is_readable = 0

http://git-wip-us.apache.org/repos/asf/arrow/blob/7510ae6e/python/pyarrow/plasma.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index b72e49b..cb19bea 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -390,7 +390,7 @@ cdef class PlasmaClient:
                                    else ObjectID.from_random())
         serialized = pyarrow.serialize(value)
         buffer = self.create(target_id, serialized.total_bytes)
-        stream = pyarrow.FixedSizeBufferOutputStream(buffer)
+        stream = pyarrow.FixedSizeBufferWriter(buffer)
         stream.set_memcopy_threads(4)
         serialized.write_to(stream)
         self.seal(target_id)

http://git-wip-us.apache.org/repos/asf/arrow/blob/7510ae6e/python/pyarrow/tests/test_io.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index d503ea2..98c465a 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -170,6 +170,18 @@ def test_buffer_numpy():
     assert array.base == buf
 
 
+def test_allocate_buffer():
+    buf = pa.allocate_buffer(100)
+    assert buf.size == 100
+    assert buf.is_mutable
+
+    bit = b'abcde'
+    writer = pa.FixedSizeBufferWriter(buf)
+    writer.write(bit)
+
+    assert buf.to_pybytes()[:5] == bit
+
+
 def test_buffer_memoryview_is_immutable():
     val = b'some data'
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/7510ae6e/python/pyarrow/tests/test_plasma.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index a831ef2..9af97df 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -294,7 +294,7 @@ class TestPlasmaClient(object):
         tensor = pa.Tensor.from_numpy(data)
         data_size = pa.get_tensor_size(tensor)
         buf = self.plasma_client.create(object_id, data_size)
-        stream = pa.FixedSizeBufferOutputStream(buf)
+        stream = pa.FixedSizeBufferWriter(buf)
         pa.write_tensor(tensor, stream)
         self.plasma_client.seal(object_id)
         # Read the arrow object.
@@ -320,7 +320,7 @@ class TestPlasmaClient(object):
         object_id = plasma.ObjectID(np.random.bytes(20))
 
         buf = self.plasma_client.create(object_id, data_size)
-        stream = pa.FixedSizeBufferOutputStream(buf)
+        stream = pa.FixedSizeBufferWriter(buf)
         stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
         stream_writer.write_batch(record_batch)
 


Mime
View raw message