arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-100: [C++] Computing RowBatch size
Date Sat, 23 Apr 2016 15:11:14 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 0b472d860 -> a54164472


ARROW-100: [C++] Computing RowBatch size

Implement RowBatchWriter::DataHeaderSize and arrow::ipc::GetRowBatchSize. To achieve this,
the Flatbuffer metadata is written to a temporary buffer and its size is determined. This
commit also adds MockMemorySource, a new MemorySource that tracks the amount of memory written.

Author: Philipp Moritz <pcmoritz@gmail.com>

Author: Philipp Moritz <pcmoritz@gmail.com>

Closes #61 from pcmoritz/rowbatchsize and squashes the following commits:

e95fc5c [Philipp Moritz] fix formating
253c9f0 [Philipp Moritz] rename MockMemorySource methods to reflect better what they are doing
3484458 [Philipp Moritz] add tests for more datatypes
6b798f8 [Philipp Moritz] fix maximum recursion depth
67af8e1 [Philipp Moritz] merge GetRowBatchSize
9b69f12 [Philipp Moritz] factor out GetRowBatchSize test, use MockMemorySource to implement
GetRowBatchSize, unify DataHeaderSize and TotalBytes into GetTotalSize
aa48cdf [Philipp Moritz] ARROW-100: [C++] Computing RowBatch size


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a5416447
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a5416447
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a5416447

Branch: refs/heads/master
Commit: a541644721ba4cb4723931b2a5eff1ac58c8aedd
Parents: 0b472d8
Author: Philipp Moritz <pcmoritz@gmail.com>
Authored: Sat Apr 23 11:11:05 2016 -0400
Committer: Wes McKinney <wesm@apache.org>
Committed: Sat Apr 23 11:11:05 2016 -0400

----------------------------------------------------------------------
 cpp/src/arrow/ipc/adapter.cc          | 29 +++++++++++++++--------------
 cpp/src/arrow/ipc/adapter.h           |  2 +-
 cpp/src/arrow/ipc/ipc-adapter-test.cc | 28 ++++++++++++++++++++++++++++
 cpp/src/arrow/ipc/memory.cc           | 25 +++++++++++++++++++++++++
 cpp/src/arrow/ipc/memory.h            | 22 ++++++++++++++++++++++
 5 files changed, 91 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/a5416447/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index bf6fa94..3470008 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -179,20 +179,13 @@ class RowBatchWriter {
   }
 
   // This must be called after invoking AssemblePayload
-  int64_t DataHeaderSize() {
-    // TODO(wesm): In case it is needed, compute the upper bound for the size
-    // of the buffer containing the flatbuffer data header.
-    return 0;
-  }
-
-  // Total footprint of buffers. This must be called after invoking
-  // AssemblePayload
-  int64_t TotalBytes() {
-    int64_t total = 0;
-    for (const std::shared_ptr<Buffer>& buffer : buffers_) {
-      total += buffer->size();
-    }
-    return total;
+  Status GetTotalSize(int64_t* size) {
+    // emulates the behavior of Write without actually writing
+    int64_t data_header_offset;
+    MockMemorySource source(0);
+    RETURN_NOT_OK(Write(&source, 0, &data_header_offset));
+    *size = source.GetExtentBytesWritten();
+    return Status::OK();
   }
 
  private:
@@ -211,6 +204,14 @@ Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t
position,
   RETURN_NOT_OK(serializer.AssemblePayload());
   return serializer.Write(dst, position, header_offset);
 }
+
+Status GetRowBatchSize(const RowBatch* batch, int64_t* size) {
+  RowBatchWriter serializer(batch, kMaxIpcRecursionDepth);
+  RETURN_NOT_OK(serializer.AssemblePayload());
+  RETURN_NOT_OK(serializer.GetTotalSize(size));
+  return Status::OK();
+}
+
 // ----------------------------------------------------------------------
 // Row batch read path
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/a5416447/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index 4c9a8a9..0d2b77f 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -62,7 +62,7 @@ Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position,
 // Compute the precise number of bytes needed in a contiguous memory segment to
 // write the row batch. This involves generating the complete serialized
 // Flatbuffers metadata.
-int64_t GetRowBatchSize(const RowBatch* batch);
+Status GetRowBatchSize(const RowBatch* batch, int64_t* size);
 
 // ----------------------------------------------------------------------
 // "Read" path; does not copy data if the MemorySource does not

http://git-wip-us.apache.org/repos/asf/arrow/blob/a5416447/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index c243cfb..3b14734 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -195,6 +195,34 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch,
     ::testing::Values(&MakeIntRowBatch, &MakeListRowBatch, &MakeNonNullRowBatch,
                             &MakeZeroLengthRowBatch, &MakeDeeplyNestedList));
 
+void TestGetRowBatchSize(std::shared_ptr<RowBatch> batch) {
+  MockMemorySource mock_source(1 << 16);
+  int64_t mock_header_location;
+  int64_t size;
+  ASSERT_OK(WriteRowBatch(&mock_source, batch.get(), 0, &mock_header_location));
+  ASSERT_OK(GetRowBatchSize(batch.get(), &size));
+  ASSERT_EQ(mock_source.GetExtentBytesWritten(), size);
+}
+
+TEST_F(TestWriteRowBatch, IntegerGetRowBatchSize) {
+  std::shared_ptr<RowBatch> batch;
+
+  ASSERT_OK(MakeIntRowBatch(&batch));
+  TestGetRowBatchSize(batch);
+
+  ASSERT_OK(MakeListRowBatch(&batch));
+  TestGetRowBatchSize(batch);
+
+  ASSERT_OK(MakeZeroLengthRowBatch(&batch));
+  TestGetRowBatchSize(batch);
+
+  ASSERT_OK(MakeNonNullRowBatch(&batch));
+  TestGetRowBatchSize(batch);
+
+  ASSERT_OK(MakeDeeplyNestedList(&batch));
+  TestGetRowBatchSize(batch);
+}
+
 class RecursionLimits : public ::testing::Test, public MemoryMapFixture {
  public:
   void SetUp() { pool_ = default_memory_pool(); }

http://git-wip-us.apache.org/repos/asf/arrow/blob/a5416447/cpp/src/arrow/ipc/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/memory.cc b/cpp/src/arrow/ipc/memory.cc
index 84cbc18..caff2c6 100644
--- a/cpp/src/arrow/ipc/memory.cc
+++ b/cpp/src/arrow/ipc/memory.cc
@@ -145,5 +145,30 @@ Status MemoryMappedSource::Write(int64_t position, const uint8_t* data,
int64_t
   return Status::OK();
 }
 
+MockMemorySource::MockMemorySource(int64_t size)
+    : size_(size), extent_bytes_written_(0) {}
+
+Status MockMemorySource::Close() {
+  return Status::OK();
+}
+
+Status MockMemorySource::ReadAt(
+    int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  return Status::OK();
+}
+
+Status MockMemorySource::Write(int64_t position, const uint8_t* data, int64_t nbytes) {
+  extent_bytes_written_ = std::max(extent_bytes_written_, position + nbytes);
+  return Status::OK();
+}
+
+int64_t MockMemorySource::Size() const {
+  return size_;
+}
+
+int64_t MockMemorySource::GetExtentBytesWritten() const {
+  return extent_bytes_written_;
+}
+
 }  // namespace ipc
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/a5416447/cpp/src/arrow/ipc/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/memory.h b/cpp/src/arrow/ipc/memory.h
index e529603..c6fd7a7 100644
--- a/cpp/src/arrow/ipc/memory.h
+++ b/cpp/src/arrow/ipc/memory.h
@@ -121,6 +121,28 @@ class MemoryMappedSource : public MemorySource {
   std::unique_ptr<Impl> impl_;
 };
 
+// A MemorySource that tracks the size of allocations from a memory source
+class MockMemorySource : public MemorySource {
+ public:
+  explicit MockMemorySource(int64_t size);
+
+  Status Close() override;
+
+  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
+  Status Write(int64_t position, const uint8_t* data, int64_t nbytes) override;
+
+  int64_t Size() const override;
+
+  // @return: the smallest number of bytes containing the modified region of the
+  // MockMemorySource
+  int64_t GetExtentBytesWritten() const;
+
+ private:
+  int64_t size_;
+  int64_t extent_bytes_written_;
+};
+
 }  // namespace ipc
 }  // namespace arrow
 


Mime
View raw message