parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-827: Account for arrow::MemoryPool API change and fix bug in reading Int96 timestamps
Date Sat, 07 Jan 2017 20:05:19 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 52d36960e -> fb9c1166c


PARQUET-827: Account for arrow::MemoryPool API change and fix bug in reading Int96 timestamps

Prior to ARROW-427, the "length()" field was not being properly checked in the implementation
of `PrimitiveArray::Equals`.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #215 from wesm/PARQUET-825 and squashes the following commits:

afa41f9 [Wes McKinney] Do not build arrow_jemalloc in arrow external project
ff9f22e [Wes McKinney] cpplint, clang-format
802b325 [Wes McKinney] Update Arrow version
5f155b6 [Wes McKinney] Fix bug exposed by accidental bug fix in ARROW-427
a3c75bb [Wes McKinney] Implement TrackingAllocator based on Arrow default allocator


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/fb9c1166
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/fb9c1166
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/fb9c1166

Branch: refs/heads/master
Commit: fb9c1166c5d7e77d54d7871ff55dfcf2990332bf
Parents: 52d3696
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Sat Jan 7 15:05:12 2017 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Sat Jan 7 15:05:12 2017 -0500

----------------------------------------------------------------------
 cmake_modules/ThirdpartyToolchain.cmake      |  3 +-
 src/parquet/arrow/reader.cc                  |  1 +
 src/parquet/schema/descriptor.cc             | 12 +++----
 src/parquet/schema/schema-descriptor-test.cc | 16 ++++------
 src/parquet/util/memory.cc                   | 39 +++++++++++++----------
 src/parquet/util/memory.h                    | 16 +++++-----
 6 files changed, 43 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fb9c1166/cmake_modules/ThirdpartyToolchain.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake
index 48fee19..54033ec 100644
--- a/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cmake_modules/ThirdpartyToolchain.cmake
@@ -22,7 +22,7 @@ set(THRIFT_VERSION "0.9.1")
 
 # Brotli 0.5.2 does not install headers/libraries yet, but 0.6.0.dev does
 set(BROTLI_VERSION "5db62dcc9d386579609540cdf8869e95ad334bbd")
-set(ARROW_VERSION "e15c6a0b3c05b5b42c204f34369d127182450ca0")
+set(ARROW_VERSION "74685f386307171a90a9f97316e25b7f39cdd0a1")
 
 # find boost headers and libs
 set(Boost_DEBUG TRUE)
@@ -325,6 +325,7 @@ if (NOT ARROW_FOUND)
   set(ARROW_IO_STATIC_LIB "${ARROW_PREFIX}/lib/libarrow_io.a")
   set(ARROW_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
     -DCMAKE_INSTALL_PREFIX=${ARROW_PREFIX}
+    -DARROW_JEMALLOC=OFF
     -DARROW_BUILD_TESTS=OFF)
 
   if (CMAKE_VERSION VERSION_GREATER "3.2")

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fb9c1166/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index d1eec05..db281d9 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -252,6 +252,7 @@ void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType,
Int96T
   for (int64_t i = 0; i < values_read; i++) {
     out_ptr[i] = impala_timestamp_to_nanoseconds(values[i]);
   }
+  valid_bits_idx_ += values_read;
 }
 
 template <>

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fb9c1166/src/parquet/schema/descriptor.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema/descriptor.cc b/src/parquet/schema/descriptor.cc
index c5250d1..0b0d006 100644
--- a/src/parquet/schema/descriptor.cc
+++ b/src/parquet/schema/descriptor.cc
@@ -48,14 +48,10 @@ void SchemaDescriptor::Init(const NodePtr& schema) {
 }
 
 bool SchemaDescriptor::Equals(const SchemaDescriptor& other) const {
-  if (this->num_columns() != other.num_columns()) {
-    return false;
-  }
+  if (this->num_columns() != other.num_columns()) { return false; }
 
   for (int i = 0; i < this->num_columns(); ++i) {
-    if (!this->Column(i)->Equals(*other.Column(i))) {
-      return false;
-    }
+    if (!this->Column(i)->Equals(*other.Column(i))) { return false; }
   }
 
   return true;
@@ -98,8 +94,8 @@ ColumnDescriptor::ColumnDescriptor(const schema::NodePtr& node,
 
 bool ColumnDescriptor::Equals(const ColumnDescriptor& other) const {
   return primitive_node_->Equals(other.primitive_node_) &&
-    max_repetition_level() == other.max_repetition_level() &&
-    max_definition_level() == other.max_definition_level();
+         max_repetition_level() == other.max_repetition_level() &&
+         max_definition_level() == other.max_definition_level();
 }
 
 const ColumnDescriptor* SchemaDescriptor::Column(int i) const {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fb9c1166/src/parquet/schema/schema-descriptor-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc
index 467d63c..4b7f67c 100644
--- a/src/parquet/schema/schema-descriptor-test.cc
+++ b/src/parquet/schema/schema-descriptor-test.cc
@@ -89,30 +89,26 @@ TEST_F(TestSchemaDescriptor, Equals) {
   NodePtr bag2(GroupNode::Make("bag", Repetition::REQUIRED, {list}));
 
   SchemaDescriptor descr1;
-  descr1.Init(GroupNode::Make("schema", Repetition::REPEATED,
-          {inta, intb, intc, bag}));
+  descr1.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag}));
 
   ASSERT_TRUE(descr1.Equals(descr1));
 
   SchemaDescriptor descr2;
-  descr2.Init(GroupNode::Make("schema", Repetition::REPEATED,
-          {inta, intb, intc, bag2}));
+  descr2.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag2}));
   ASSERT_FALSE(descr1.Equals(descr2));
 
   SchemaDescriptor descr3;
-  descr3.Init(GroupNode::Make("schema", Repetition::REPEATED,
-          {inta, intb2, intc, bag}));
+  descr3.Init(GroupNode::Make("schema", Repetition::REPEATED, {inta, intb2, intc, bag}));
   ASSERT_FALSE(descr1.Equals(descr3));
 
   // Robust to name of parent node
   SchemaDescriptor descr4;
-  descr4.Init(GroupNode::Make("SCHEMA", Repetition::REPEATED,
-          {inta, intb, intc, bag}));
+  descr4.Init(GroupNode::Make("SCHEMA", Repetition::REPEATED, {inta, intb, intc, bag}));
   ASSERT_TRUE(descr1.Equals(descr4));
 
   SchemaDescriptor descr5;
-  descr5.Init(GroupNode::Make("schema", Repetition::REPEATED,
-          {inta, intb, intc, bag, intb2}));
+  descr5.Init(
+      GroupNode::Make("schema", Repetition::REPEATED, {inta, intb, intc, bag, intb2}));
   ASSERT_FALSE(descr1.Equals(descr5));
 
   // Different max repetition / definition levels

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fb9c1166/src/parquet/util/memory.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc
index b490c2e..72ed3ac 100644
--- a/src/parquet/util/memory.cc
+++ b/src/parquet/util/memory.cc
@@ -22,6 +22,8 @@
 #include <cstdio>
 #include <string>
 
+#include "arrow/status.h"
+
 #include "parquet/exception.h"
 #include "parquet/types.h"
 #include "parquet/util/bit-util.h"
@@ -30,31 +32,34 @@
 namespace parquet {
 
 ::arrow::Status TrackingAllocator::Allocate(int64_t size, uint8_t** out) {
-  if (0 == size) {
+  if (size == 0) {
     *out = nullptr;
     return ::arrow::Status::OK();
   }
+  ARROW_RETURN_NOT_OK(allocator_->Allocate(size, out));
+  const int64_t total_memory = allocator_->bytes_allocated();
+  if (total_memory > max_memory_) { max_memory_ = total_memory; }
+  return ::arrow::Status::OK();
+}
 
-  uint8_t* p = static_cast<uint8_t*>(std::malloc(size));
-  if (!p) { return ::arrow::Status::OutOfMemory("memory allocation failed"); }
-  {
-    std::lock_guard<std::mutex> lock(stats_mutex_);
-    total_memory_ += size;
-    if (total_memory_ > max_memory_) { max_memory_ = total_memory_; }
-  }
-  *out = p;
+::arrow::Status TrackingAllocator::Reallocate(
+    int64_t old_size, int64_t new_size, uint8_t** out) {
+  ARROW_RETURN_NOT_OK(allocator_->Reallocate(old_size, new_size, out));
+  const int64_t total_memory = allocator_->bytes_allocated();
+  if (total_memory > max_memory_) { max_memory_ = total_memory; }
   return ::arrow::Status::OK();
 }
 
 void TrackingAllocator::Free(uint8_t* p, int64_t size) {
-  if (nullptr != p && size > 0) {
-    {
-      std::lock_guard<std::mutex> lock(stats_mutex_);
-      DCHECK_GE(total_memory_, size) << "Attempting to free too much memory";
-      total_memory_ -= size;
-    }
-    std::free(p);
-  }
+  allocator_->Free(p, size);
+}
+
+int64_t TrackingAllocator::max_memory() const {
+  return max_memory_.load();
+}
+
+int64_t TrackingAllocator::bytes_allocated() const {
+  return allocator_->bytes_allocated();
 }
 
 MemoryAllocator* default_allocator() {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fb9c1166/src/parquet/util/memory.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
index b915450..2a71c37 100644
--- a/src/parquet/util/memory.h
+++ b/src/parquet/util/memory.h
@@ -18,11 +18,11 @@
 #ifndef PARQUET_UTIL_MEMORY_H
 #define PARQUET_UTIL_MEMORY_H
 
+#include <atomic>
 #include <cstdint>
 #include <cstdlib>
 #include <cstring>
 #include <memory>
-#include <mutex>
 #include <string>
 #include <vector>
 
@@ -72,19 +72,19 @@ PARQUET_EXPORT MemoryAllocator* default_allocator();
 
 class PARQUET_EXPORT TrackingAllocator : public MemoryAllocator {
  public:
-  TrackingAllocator() : total_memory_(0), max_memory_(0) {}
+  explicit TrackingAllocator(MemoryAllocator* allocator = ::arrow::default_memory_pool())
+      : allocator_(allocator), max_memory_(0) {}
 
   ::arrow::Status Allocate(int64_t size, uint8_t** out) override;
+  ::arrow::Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;
   void Free(uint8_t* p, int64_t size) override;
 
-  int64_t bytes_allocated() const override { return total_memory_; }
-
-  int64_t max_memory() { return max_memory_; }
+  int64_t max_memory() const;
+  int64_t bytes_allocated() const override;
 
  private:
-  std::mutex stats_mutex_;
-  int64_t total_memory_;
-  int64_t max_memory_;
+  MemoryAllocator* allocator_;
+  std::atomic<int64_t> max_memory_;
 };
 
 template <class T>


Mime
View raw message