arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-1194: [Python] Expose MockOutputStream in pyarrow.
Date Thu, 13 Jul 2017 12:26:21 GMT
Repository: arrow
Updated Branches:
  refs/heads/master f0ecc0673 -> 28e06d870


ARROW-1194: [Python] Expose MockOutputStream in pyarrow.

This allows you to get the size of a record batch and schema through pyarrow by writing to
a mock output stream. You can then use the resulting size to allocate an appropriately sized
buffer to actually write to.

Example usage.

```python
import pyarrow as pa
import pandas as pd

val = pd.DataFrame({'a': [1, 2, 3]})
record_batch = pa.RecordBatch.from_pandas(val)

# Get the size of the record batch and schema
sink = pa.MockOutputStream()
stream_writer = pa.RecordBatchStreamWriter(sink, record_batch.schema)
stream_writer.write_batch(record_batch)
size = sink.size()
```

Author: Robert Nishihara <robertnishihara@gmail.com>

Closes #830 from robertnishihara/mockoutputstream and squashes the following commits:

4e15cd9 [Robert Nishihara] Expose MockOutputStream to Python.


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/28e06d87
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/28e06d87
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/28e06d87

Branch: refs/heads/master
Commit: 28e06d870b6159a9dc3b40d88244a5a7ce08b1b8
Parents: f0ecc06
Author: Robert Nishihara <robertnishihara@gmail.com>
Authored: Thu Jul 13 08:26:13 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Thu Jul 13 08:26:13 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/io/memory.cc               | 17 ++++++++++++
 cpp/src/arrow/io/memory.h                | 16 +++++++++++
 cpp/src/arrow/ipc/ipc-read-write-test.cc |  2 +-
 cpp/src/arrow/ipc/util.h                 | 23 ----------------
 cpp/src/arrow/ipc/writer.cc              |  4 +--
 python/pyarrow/__init__.py               |  2 +-
 python/pyarrow/includes/libarrow.pxd     |  5 ++++
 python/pyarrow/io.pxi                    | 12 +++++++++
 python/pyarrow/tests/test_io.py          | 39 +++++++++++++++++++++++++++
 9 files changed, 93 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/28e06d87/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 22721f1..4d8bf63 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -100,6 +100,23 @@ Status BufferOutputStream::Reserve(int64_t nbytes) {
 }
 
 // ----------------------------------------------------------------------
+// OutputStream that doesn't write anything
+
+Status MockOutputStream::Close() {
+  return Status::OK();
+}
+
+Status MockOutputStream::Tell(int64_t* position) {
+  *position = extent_bytes_written_;
+  return Status::OK();
+}
+
+Status MockOutputStream::Write(const uint8_t* data, int64_t nbytes) {
+  extent_bytes_written_ += nbytes;
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
 // In-memory buffer writer
 
 static constexpr int kMemcopyDefaultNumThreads = 1;

http://git-wip-us.apache.org/repos/asf/arrow/blob/28e06d87/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index f1b5990..06384f0 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -67,6 +67,22 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream {
   uint8_t* mutable_data_;
 };
 
+// A helper class to tracks the size of allocations
+class ARROW_EXPORT MockOutputStream : public OutputStream {
+ public:
+  MockOutputStream() : extent_bytes_written_(0) {}
+
+  // Implement the OutputStream interface
+  Status Close() override;
+  Status Tell(int64_t* position) override;
+  Status Write(const uint8_t* data, int64_t nbytes) override;
+
+  int64_t GetExtentBytesWritten() const { return extent_bytes_written_; }
+
+ private:
+  int64_t extent_bytes_written_;
+};
+
 /// \brief Enables random writes into a fixed-size mutable buffer
 ///
 class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile {

http://git-wip-us.apache.org/repos/asf/arrow/blob/28e06d87/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index c99816c..c71d046 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -340,7 +340,7 @@ TEST_F(TestWriteRecordBatch, SliceTruncatesBuffers) {
 }
 
 void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
-  ipc::MockOutputStream mock;
+  io::MockOutputStream mock;
   int32_t mock_metadata_length = -1;
   int64_t mock_body_length = -1;
   int64_t size = -1;

http://git-wip-us.apache.org/repos/asf/arrow/blob/28e06d87/cpp/src/arrow/ipc/util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h
index 2000c61..49a7d01 100644
--- a/cpp/src/arrow/ipc/util.h
+++ b/cpp/src/arrow/ipc/util.h
@@ -37,29 +37,6 @@ static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment =
kArrowAli
   return ((nbytes + alignment - 1) / alignment) * alignment;
 }
 
-// A helper class to tracks the size of allocations
-class MockOutputStream : public io::OutputStream {
- public:
-  MockOutputStream() : extent_bytes_written_(0) {}
-
-  Status Close() override { return Status::OK(); }
-
-  Status Write(const uint8_t* data, int64_t nbytes) override {
-    extent_bytes_written_ += nbytes;
-    return Status::OK();
-  }
-
-  Status Tell(int64_t* position) override {
-    *position = extent_bytes_written_;
-    return Status::OK();
-  }
-
-  int64_t GetExtentBytesWritten() const { return extent_bytes_written_; }
-
- private:
-  int64_t extent_bytes_written_;
-};
-
 }  // namespace ipc
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/28e06d87/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 592bca2..7563343 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -566,7 +566,7 @@ Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size)
{
   // emulates the behavior of Write without actually writing
   int32_t metadata_length = 0;
   int64_t body_length = 0;
-  MockOutputStream dst;
+  io::MockOutputStream dst;
   RETURN_NOT_OK(WriteRecordBatch(batch, 0, &dst, &metadata_length, &body_length,
       default_memory_pool(), kMaxNestingDepth, true));
   *size = dst.GetExtentBytesWritten();
@@ -577,7 +577,7 @@ Status GetTensorSize(const Tensor& tensor, int64_t* size) {
   // emulates the behavior of Write without actually writing
   int32_t metadata_length = 0;
   int64_t body_length = 0;
-  MockOutputStream dst;
+  io::MockOutputStream dst;
   RETURN_NOT_OK(WriteTensor(tensor, &dst, &metadata_length, &body_length));
   *size = dst.GetExtentBytesWritten();
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/arrow/blob/28e06d87/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 4310954..37aec6c 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -68,7 +68,7 @@ from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
                          frombuffer, read_tensor, write_tensor,
                          memory_map, create_memory_map,
                          get_record_batch_size, get_tensor_size,
-                         have_libhdfs, have_libhdfs3)
+                         have_libhdfs, have_libhdfs3, MockOutputStream)
 
 from pyarrow.lib import (MemoryPool, total_allocated_bytes,
                          set_memory_pool, default_memory_pool)

http://git-wip-us.apache.org/repos/asf/arrow/blob/28e06d87/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 2db1dd1..9fad824 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -540,6 +540,11 @@ cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil:
         (OutputStream):
         CBufferOutputStream(const shared_ptr[ResizableBuffer]& buffer)
 
+    cdef cppclass CMockOutputStream" arrow::io::MockOutputStream"\
+        (OutputStream):
+        CMockOutputStream()
+        int64_t GetExtentBytesWritten()
+
 
 cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil:
     cdef cppclass SchemaMessage:

http://git-wip-us.apache.org/repos/asf/arrow/blob/28e06d87/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 9f0ad7e..cfa751d 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -534,6 +534,18 @@ cdef class BufferOutputStream(NativeFile):
         return pyarrow_wrap_buffer(<shared_ptr[CBuffer]> self.buffer)
 
 
+cdef class MockOutputStream(NativeFile):
+
+    def __cinit__(self):
+        self.wr_file.reset(new CMockOutputStream())
+        self.is_readable = 0
+        self.is_writeable = 1
+        self.is_open = True
+
+    def size(self):
+        return (<CMockOutputStream*>self.wr_file.get()).GetExtentBytesWritten()
+
+
 cdef class BufferReader(NativeFile):
     """
     Zero-copy reader from objects convertible to Arrow buffer

http://git-wip-us.apache.org/repos/asf/arrow/blob/28e06d87/python/pyarrow/tests/test_io.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 397b7a8..cadf786 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -23,6 +23,8 @@ import sys
 
 import numpy as np
 
+import pandas as pd
+
 from pyarrow.compat import u, guid
 import pyarrow as pa
 
@@ -233,6 +235,43 @@ def test_nativefile_write_memoryview():
 
 
 # ----------------------------------------------------------------------
+# Mock output stream
+
+
+def test_mock_output_stream():
+    # Make sure that the MockOutputStream and the BufferOutputStream record the
+    # same size
+
+    # 10 bytes
+    val = b'dataabcdef'
+
+    f1 = pa.MockOutputStream()
+    f2 = pa.BufferOutputStream()
+
+    K = 1000
+    for i in range(K):
+        f1.write(val)
+        f2.write(val)
+
+    assert f1.size() == len(f2.get_result())
+
+    # Do the same test with a pandas DataFrame
+    val = pd.DataFrame({'a': [1, 2, 3]})
+    record_batch = pa.RecordBatch.from_pandas(val)
+
+    f1 = pa.MockOutputStream()
+    f2 = pa.BufferOutputStream()
+
+    stream_writer1 = pa.RecordBatchStreamWriter(f1, record_batch.schema)
+    stream_writer2 = pa.RecordBatchStreamWriter(f2, record_batch.schema)
+
+    stream_writer1.write_batch(record_batch)
+    stream_writer2.write_batch(record_batch)
+
+    assert f1.size() == len(f2.get_result())
+
+
+# ----------------------------------------------------------------------
 # OS files and memory maps
 
 


Mime
View raw message