impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject incubator-impala git commit: IMPALA-5073: Part 1: add option to use mmap() for buffer pool
Date Tue, 18 Apr 2017 15:55:10 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 42002b91c -> 955b257cf


IMPALA-5073: Part 1: add option to use mmap() for buffer pool

Support allocating with mmap instead of TCMalloc to give more control
over memory usage. Also tell Linux to back larger buffers with huge
pages when possible to reduce TLB pressure. The main complication is
that memory returned by mmap() is not necessarily aligned to a huge
page boundary, so we need to "fix up" the mapping ourselves.

Adds additional memory metrics, since we previously relied on the
assumption that all memory was allocated through TCMalloc.
memory.total-used tracks the total across the buffer pool and
TCMalloc. When the buffer pool is not present, they just report
the TCMalloc values.

This can be enabled with the --mmap_buffers flag. The transparent
huge pages support can be disabled with the --madvise_huge_pages
startup flag.

At some point this should become the default, but it requires
more work to validate perf and resource used (virtual address
space, etc).

Testing:
Added some unit tests to test edge cases and the different supported
flags. Many pre-existing tests also exercise the modified code.

Change-Id: Ifbc748f74adcbbdcfa45f3ec7df98284925acbd6
Reviewed-on: http://gerrit.cloudera.org:8080/6474
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/955b257c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/955b257c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/955b257c

Branch: refs/heads/master
Commit: 955b257cfb5f94775318c466ee18771ff167026d
Parents: 42002b9
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Wed Apr 12 11:02:00 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Tue Apr 18 09:53:54 2017 +0000

----------------------------------------------------------------------
 be/src/catalog/catalogd-main.cc                 |  4 +-
 .../runtime/bufferpool/buffer-allocator-test.cc | 77 ++++++++++++++++-
 be/src/runtime/bufferpool/buffer-allocator.h    |  2 +
 be/src/runtime/bufferpool/buffer-pool.cc        |  8 ++
 be/src/runtime/bufferpool/buffer-pool.h         |  2 +
 be/src/runtime/bufferpool/system-allocator.cc   | 83 +++++++++++++++++-
 be/src/runtime/bufferpool/system-allocator.h    | 12 +--
 be/src/runtime/exec-env.cc                      |  7 +-
 be/src/statestore/statestored-main.cc           |  4 +-
 be/src/util/memory-metrics.cc                   | 90 +++++++++++++++++---
 be/src/util/memory-metrics.h                    | 52 ++++++++++-
 be/src/util/metrics-test.cc                     | 25 +++++-
 be/src/util/metrics.h                           | 27 +++++-
 common/thrift/generate_error_codes.py           |  2 +-
 common/thrift/metrics.json                      | 42 +++++++++
 15 files changed, 396 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/be/src/catalog/catalogd-main.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc
index b484c9d..75b7caa 100644
--- a/be/src/catalog/catalogd-main.cc
+++ b/be/src/catalog/catalogd-main.cc
@@ -69,8 +69,8 @@ int CatalogdMain(int argc, char** argv) {
     LOG(INFO) << "Not starting webserver";
   }
 
-  metrics->Init(FLAGS_enable_webserver ? webserver.get() : NULL);
-  ABORT_IF_ERROR(RegisterMemoryMetrics(metrics.get(), true));
+  metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr);
+  ABORT_IF_ERROR(RegisterMemoryMetrics(metrics.get(), true, nullptr, nullptr));
   StartThreadInstrumentation(metrics.get(), webserver.get(), true);
 
   InitRpcEventTracing(webserver.get());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/be/src/runtime/bufferpool/buffer-allocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator-test.cc b/be/src/runtime/bufferpool/buffer-allocator-test.cc
index 515c1ae..9887086 100644
--- a/be/src/runtime/bufferpool/buffer-allocator-test.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator-test.cc
@@ -21,14 +21,21 @@
 #include "runtime/bufferpool/buffer-allocator.h"
 #include "runtime/bufferpool/buffer-pool-internal.h"
 #include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/system-allocator.h"
 #include "testutil/cpu-util.h"
 #include "testutil/gtest-util.h"
 #include "util/cpu-info.h"
 
 #include "common/names.h"
 
+DECLARE_bool(mmap_buffers);
+DECLARE_bool(madvise_huge_pages);
+
 namespace impala {
 
+using BufferAllocator = BufferPool::BufferAllocator;
+using BufferHandle = BufferPool::BufferHandle;
+
 class BufferAllocatorTest : public ::testing::Test {
  public:
   virtual void SetUp() {
@@ -45,7 +52,7 @@ class BufferAllocatorTest : public ::testing::Test {
     CpuTestUtil::ResetAffinity(); // Some tests modify affinity.
   }
 
-  int GetFreeListSize(BufferPool::BufferAllocator* allocator, int core, int64_t len) {
+  int GetFreeListSize(BufferAllocator* allocator, int core, int64_t len) {
     return allocator->GetFreeListSize(core, len);
   }
 
@@ -70,7 +77,7 @@ TEST_F(BufferAllocatorTest, FreeListSizes) {
   const int NUM_BUFFERS = 512;
   const int64_t TOTAL_BYTES = NUM_BUFFERS * TEST_BUFFER_LEN;
 
-  BufferPool::BufferAllocator allocator(dummy_pool_, TEST_BUFFER_LEN, TOTAL_BYTES);
+  BufferAllocator allocator(dummy_pool_, TEST_BUFFER_LEN, TOTAL_BYTES);
 
   // Allocate a bunch of buffers - all free list checks should miss.
   vector<BufferHandle> buffers(NUM_BUFFERS);
@@ -121,5 +128,69 @@ TEST_F(BufferAllocatorTest, FreeListSizes) {
   allocator.ReleaseMemory(TOTAL_BYTES);
   ASSERT_EQ(0, GetFreeListSize(&allocator, CORE, TEST_BUFFER_LEN));
 }
+
+class SystemAllocatorTest : public ::testing::Test {
+ public:
+  virtual void SetUp() {}
+
+  virtual void TearDown() {}
+
+  static const int64_t MIN_BUFFER_LEN = 4 * 1024;
+  static const int64_t MAX_BUFFER_LEN = 1024 * 1024 * 1024;
+};
+
+/// Basic test that checks that we can allocate buffers of the expected power-of-two
+/// sizes.
+TEST_F(SystemAllocatorTest, BasicPowersOfTwo) {
+  SystemAllocator allocator(MIN_BUFFER_LEN);
+
+  // Iterate a few times to make sure we can reallocate.
+  for (int iter = 0; iter < 5; ++iter) {
+    // Allocate buffers of a mix of sizes.
+    vector<BufferHandle> buffers;
+    for (int alloc_iter = 0; alloc_iter < 2; ++alloc_iter) {
+      for (int64_t len = MIN_BUFFER_LEN; len <= MAX_BUFFER_LEN; len *= 2) {
+        BufferHandle buffer;
+        ASSERT_OK(allocator.Allocate(len, &buffer));
+        ASSERT_TRUE(buffer.is_open());
+        // Write a few bytes to the buffer to check it's valid memory.
+        buffer.data()[0] = 0;
+        buffer.data()[buffer.len() / 2] = 0;
+        buffer.data()[buffer.len() - 1] = 0;
+        buffers.push_back(move(buffer));
+      }
+    }
+
+    // Free all the buffers.
+    for (BufferHandle& buffer : buffers) allocator.Free(move(buffer));
+  }
+}
+
+/// Make an absurdly large allocation to test the failure path.
+TEST_F(SystemAllocatorTest, LargeAllocFailure) {
+  SystemAllocator allocator(MIN_BUFFER_LEN);
+  BufferHandle buffer;
+  Status status = allocator.Allocate(1LL << 48, &buffer);
+  EXPECT_FALSE(status.ok());
+  EXPECT_EQ(status.msg().error(), TErrorCode::BUFFER_ALLOCATION_FAILED);
+}
+}
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  int result = 0;
+  for (bool mmap : {false, true}) {
+    for (bool madvise : {false, true}) {
+      if (madvise && !mmap) continue; // Not an interesting combination.
+      std::cerr << "+==================================================" << std::endl
+                << "| Running tests with mmap=" << mmap << " madvise="
<< madvise
+                << std::endl
+                << "+==================================================" << std::endl;
+      FLAGS_mmap_buffers = mmap;
+      FLAGS_madvise_huge_pages = madvise;
+      if (RUN_ALL_TESTS() != 0) result = 1;
+    }
+  }
+  return result;
 }
-IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/be/src/runtime/bufferpool/buffer-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.h b/be/src/runtime/bufferpool/buffer-allocator.h
index 68efbc1..97686f0 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.h
+++ b/be/src/runtime/bufferpool/buffer-allocator.h
@@ -122,6 +122,8 @@ class BufferPool::BufferAllocator {
   /// Try to release at least 'bytes_to_free' bytes of memory to the system allocator.
   void ReleaseMemory(int64_t bytes_to_free);
 
+  int64_t system_bytes_limit() const { return system_bytes_limit_; }
+
   /// Return the amount of memory currently allocated from the system.
   int64_t GetSystemBytesAllocated() const {
     return system_bytes_limit_ - system_bytes_remaining_.Load();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 05176a5..a71a7c7 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -240,6 +240,14 @@ void BufferPool::ReleaseMemory(int64_t bytes_to_free) {
   allocator_->ReleaseMemory(bytes_to_free);
 }
 
+int64_t BufferPool::GetSystemBytesLimit() const {
+  return allocator_->system_bytes_limit();
+}
+
+int64_t BufferPool::GetSystemBytesAllocated() const {
+  return allocator_->GetSystemBytesAllocated();
+}
+
 bool BufferPool::ClientHandle::IncreaseReservation(int64_t bytes) {
   return impl_->reservation()->IncreaseReservation(bytes);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index ad9ab00..9a37923 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -252,6 +252,8 @@ class BufferPool : public CacheLineAligned {
   std::string DebugString();
 
   int64_t min_buffer_len() const { return min_buffer_len_; }
+  int64_t GetSystemBytesLimit() const;
+  int64_t GetSystemBytesAllocated() const;
 
   /// Generous upper bounds on page and buffer size and the number of different
   /// power-of-two buffer sizes.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/be/src/runtime/bufferpool/system-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/system-allocator.cc b/be/src/runtime/bufferpool/system-allocator.cc
index 0ea429a..7598d3c 100644
--- a/be/src/runtime/bufferpool/system-allocator.cc
+++ b/be/src/runtime/bufferpool/system-allocator.cc
@@ -17,10 +17,26 @@
 
 #include "runtime/bufferpool/system-allocator.h"
 
+#include <sys/mman.h>
+
 #include "util/bit-util.h"
 
+// TODO: IMPALA-5073: this should eventually become the default once we are confident
+// that it is superior to allocating via TCMalloc.
+DEFINE_bool(mmap_buffers, false,
+    "(Advanced) If true, allocate buffers directly from the operating system instead of "
+    "with TCMalloc.");
+
+DEFINE_bool(madvise_huge_pages, true,
+    "(Advanced) If true and --mmap_buffers is also "
+    "true, advise operating system to back large memory buffers with huge pages");
+
 namespace impala {
 
+/// This is the huge page size on x86-64. We could parse /proc/meminfo to programmatically
+/// get this, but it is unlikely to change unless we port to a different architecture.
+static int64_t HUGE_PAGE_SIZE = 2LL * 1024 * 1024;
+
 SystemAllocator::SystemAllocator(int64_t min_buffer_len)
   : min_buffer_len_(min_buffer_len) {
   DCHECK(BitUtil::IsPowerOf2(min_buffer_len));
@@ -31,14 +47,73 @@ Status SystemAllocator::Allocate(int64_t len, BufferPool::BufferHandle*
buffer)
   DCHECK_LE(len, BufferPool::MAX_BUFFER_BYTES);
   DCHECK(BitUtil::IsPowerOf2(len)) << len;
 
-  uint8_t* alloc = reinterpret_cast<uint8_t*>(malloc(len));
-  if (alloc == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, len);
-  buffer->Open(alloc, len, CpuInfo::GetCurrentCore());
+  uint8_t* buffer_mem;
+  if (FLAGS_mmap_buffers) {
+    RETURN_IF_ERROR(AllocateViaMMap(len, &buffer_mem));
+  } else {
+    // AddressSanitizer does not instrument mmap(). Use malloc() to preserve
+    // instrumentation.
+    buffer_mem = reinterpret_cast<uint8_t*>(malloc(len));
+    if (buffer_mem == nullptr) {
+      return Status(
+          TErrorCode::BUFFER_ALLOCATION_FAILED, len, "malloc() failed under asan");
+    }
+  }
+  buffer->Open(buffer_mem, len, CpuInfo::GetCurrentCore());
+  return Status::OK();
+}
+
+Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) {
+  int64_t map_len = len;
+  bool use_huge_pages = len % HUGE_PAGE_SIZE == 0 && FLAGS_madvise_huge_pages;
+  if (use_huge_pages) {
+    // Map an extra huge page so we can fix up the alignment if needed.
+    map_len += HUGE_PAGE_SIZE;
+  }
+  uint8_t* mem = reinterpret_cast<uint8_t*>(
+      mmap(nullptr, map_len, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0));
+  if (mem == MAP_FAILED) {
+    const char* error = strerror(errno);
+    return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, len, error);
+  }
+
+  if (use_huge_pages) {
+    // mmap() may return memory that is not aligned to the huge page size. For the
+    // subsequent madvise() call to work well, we need to align it ourselves and
+    // unmap the memory on either side of the buffer that we don't need.
+    uintptr_t misalignment = reinterpret_cast<uintptr_t>(mem) % HUGE_PAGE_SIZE;
+    if (misalignment != 0) {
+      uintptr_t fixup = HUGE_PAGE_SIZE - misalignment;
+      munmap(mem, fixup);
+      mem += fixup;
+      map_len -= fixup;
+    }
+    munmap(mem + len, map_len - len);
+    DCHECK_EQ(reinterpret_cast<uintptr_t>(mem) % HUGE_PAGE_SIZE, 0) << mem;
+    // Mark the buffer as a candidate for promotion to huge pages. The Linux Transparent
+    // Huge Pages implementation will try to back the memory with a huge page if it is
+    // enabled. MADV_HUGEPAGE was introduced in 2.6.38, so we similarly need to skip this
+    // code if we are compiling against an older kernel.
+#ifdef MADV_HUGEPAGE
+    int rc;
+    // According to madvise() docs it may return EAGAIN to signal that we should retry.
+    do {
+      rc = madvise(mem, len, MADV_HUGEPAGE);
+    } while (rc == -1 && errno == EAGAIN);
+    DCHECK(rc == 0) << "madvise(MADV_HUGEPAGE) shouldn't fail" << errno;
+#endif
+  }
+  *buffer_mem = mem;
   return Status::OK();
 }
 
 void SystemAllocator::Free(BufferPool::BufferHandle&& buffer) {
-  free(buffer.data());
+  if (FLAGS_mmap_buffers) {
+    int rc = munmap(buffer.data(), buffer.len());
+    DCHECK_EQ(rc, 0) << "Unexpected munmap() error: " << errno;
+  } else {
+    free(buffer.data());
+  }
   buffer.Reset(); // Avoid DCHECK in ~BufferHandle().
 }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/be/src/runtime/bufferpool/system-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/system-allocator.h b/be/src/runtime/bufferpool/system-allocator.h
index 8b4a544..33ad525 100644
--- a/be/src/runtime/bufferpool/system-allocator.h
+++ b/be/src/runtime/bufferpool/system-allocator.h
@@ -25,12 +25,9 @@
 namespace impala {
 
 /// The underlying memory allocator for the buffer pool that allocates buffer memory from
-/// the system. All buffers are allocated through the BufferPool's SystemAllocator. The
-/// allocator only handles allocating buffers that are power-of-two multiples of the
-/// minimum buffer length.
-///
-/// TODO:
-/// * Allocate memory with mmap() instead of malloc().
+/// the operating system using mmap(). All buffers are allocated through the BufferPool's
+/// SystemAllocator. The allocator only handles allocating buffers that are power-of-two
+/// multiples of the minimum buffer length.
 class SystemAllocator {
  public:
   SystemAllocator(int64_t min_buffer_len);
@@ -43,6 +40,9 @@ class SystemAllocator {
   void Free(BufferPool::BufferHandle&& buffer);
 
  private:
+  /// Allocate 'len' bytes of memory for a buffer via mmap().
+  Status AllocateViaMMap(int64_t len, uint8_t** buffer_mem);
+
   const int64_t min_buffer_len_;
 };
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 1fa863e..e5d0c0b 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -307,12 +307,13 @@ Status ExecEnv::StartServices() {
   metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr);
   impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
   catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
-  RETURN_IF_ERROR(RegisterMemoryMetrics(metrics_.get(), true));
+  RETURN_IF_ERROR(RegisterMemoryMetrics(
+      metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
 
 #ifndef ADDRESS_SANITIZER
   // Limit of -1 means no memory limit.
-  mem_tracker_.reset(new MemTracker(TcmallocMetric::PHYSICAL_BYTES_RESERVED,
-      bytes_limit > 0 ? bytes_limit : -1, "Process"));
+  mem_tracker_.reset(new MemTracker(
+      AggregateMemoryMetric::TOTAL_USED, bytes_limit > 0 ? bytes_limit : -1, "Process"));
 #else
   // tcmalloc metrics aren't defined in ASAN builds, just use the default behavior to
   // track process memory usage (sum of all children trackers).

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/be/src/statestore/statestored-main.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestored-main.cc b/be/src/statestore/statestored-main.cc
index 4e5680f..0886c70 100644
--- a/be/src/statestore/statestored-main.cc
+++ b/be/src/statestore/statestored-main.cc
@@ -63,8 +63,8 @@ int StatestoredMain(int argc, char** argv) {
     LOG(INFO) << "Not starting webserver";
   }
 
-  metrics->Init(FLAGS_enable_webserver ? webserver.get() : NULL);
-  ABORT_IF_ERROR(RegisterMemoryMetrics(metrics.get(), false));
+  metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr);
+  ABORT_IF_ERROR(RegisterMemoryMetrics(metrics.get(), false, nullptr, nullptr));
   StartThreadInstrumentation(metrics.get(), webserver.get(), false);
   InitRpcEventTracing(webserver.get());
   // TODO: Add a 'common metrics' method to add standard metrics to

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/be/src/util/memory-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index ae00e5a..3f2d198 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -20,6 +20,8 @@
 #include <boost/algorithm/string.hpp>
 #include <gutil/strings/substitute.h>
 
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "util/jni-util.h"
 #include "util/time.h"
 
@@ -27,25 +29,40 @@ using boost::algorithm::to_lower;
 using namespace impala;
 using namespace strings;
 
+DECLARE_bool(mmap_buffers);
+
+SumGauge<uint64_t>* AggregateMemoryMetric::TOTAL_USED = nullptr;
+
 TcmallocMetric* TcmallocMetric::BYTES_IN_USE = NULL;
 TcmallocMetric* TcmallocMetric::PAGEHEAP_FREE_BYTES = NULL;
 TcmallocMetric* TcmallocMetric::TOTAL_BYTES_RESERVED = NULL;
 TcmallocMetric* TcmallocMetric::PAGEHEAP_UNMAPPED_BYTES = NULL;
 TcmallocMetric::PhysicalBytesMetric* TcmallocMetric::PHYSICAL_BYTES_RESERVED = NULL;
 
-TcmallocMetric* TcmallocMetric::CreateAndRegister(MetricGroup* metrics, const string&
key,
-  const string& tcmalloc_var) {
-    return metrics->RegisterMetric(
-        new TcmallocMetric(MetricDefs::Get(key), tcmalloc_var));
+BufferPoolMetric* BufferPoolMetric::LIMIT = nullptr;
+BufferPoolMetric* BufferPoolMetric::SYSTEM_ALLOCATED = nullptr;
+BufferPoolMetric* BufferPoolMetric::RESERVED = nullptr;
+
+TcmallocMetric* TcmallocMetric::CreateAndRegister(
+    MetricGroup* metrics, const string& key, const string& tcmalloc_var) {
+  return metrics->RegisterMetric(new TcmallocMetric(MetricDefs::Get(key), tcmalloc_var));
 }
 
-Status impala::RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_metrics) {
+Status impala::RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_metrics,
+    ReservationTracker* global_reservations, BufferPool* buffer_pool) {
+  if (global_reservations != nullptr) {
+    DCHECK(buffer_pool != nullptr);
+    RETURN_IF_ERROR(
+        BufferPoolMetric::InitMetrics(metrics, global_reservations, buffer_pool));
+  }
 #ifndef ADDRESS_SANITIZER
-  TcmallocMetric::BYTES_IN_USE = TcmallocMetric::CreateAndRegister(metrics,
-      "tcmalloc.bytes-in-use", "generic.current_allocated_bytes");
+  // We rely on TCMalloc for our global memory metrics, so skip setting them up
+  // if we're not using TCMalloc.
+  TcmallocMetric::BYTES_IN_USE = TcmallocMetric::CreateAndRegister(
+      metrics, "tcmalloc.bytes-in-use", "generic.current_allocated_bytes");
 
-  TcmallocMetric::TOTAL_BYTES_RESERVED = TcmallocMetric::CreateAndRegister(metrics,
-      "tcmalloc.total-bytes-reserved", "generic.heap_size");
+  TcmallocMetric::TOTAL_BYTES_RESERVED = TcmallocMetric::CreateAndRegister(
+      metrics, "tcmalloc.total-bytes-reserved", "generic.heap_size");
 
   TcmallocMetric::PAGEHEAP_FREE_BYTES = TcmallocMetric::CreateAndRegister(metrics,
       "tcmalloc.pageheap-free-bytes", "tcmalloc.pageheap_free_bytes");
@@ -53,11 +70,22 @@ Status impala::RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_met
   TcmallocMetric::PAGEHEAP_UNMAPPED_BYTES = TcmallocMetric::CreateAndRegister(metrics,
       "tcmalloc.pageheap-unmapped-bytes", "tcmalloc.pageheap_unmapped_bytes");
 
-  TcmallocMetric::PHYSICAL_BYTES_RESERVED = metrics->RegisterMetric(
-      new TcmallocMetric::PhysicalBytesMetric(
+  TcmallocMetric::PHYSICAL_BYTES_RESERVED =
+      metrics->RegisterMetric(new TcmallocMetric::PhysicalBytesMetric(
           MetricDefs::Get("tcmalloc.physical-bytes-reserved")));
 #endif
 
+  // Add compound metrics that track totals across TCMalloc and the buffer pool.
+  // total-used should track the total physical memory in use.
+  vector<UIntGauge*> used_metrics{TcmallocMetric::PHYSICAL_BYTES_RESERVED};
+  if (FLAGS_mmap_buffers && global_reservations != nullptr) {
+    // If we mmap() buffers, the buffers are not allocated via TCMalloc. Ensure they are
+    // properly tracked.
+    used_metrics.push_back(BufferPoolMetric::SYSTEM_ALLOCATED);
+  }
+
+  AggregateMemoryMetric::TOTAL_USED = metrics->RegisterMetric(
+      new SumGauge<uint64_t>(MetricDefs::Get("memory.total-used"), used_metrics));
   if (register_jvm_metrics) {
     RETURN_IF_ERROR(JvmMetric::InitMetrics(metrics->GetOrCreateChildGroup("jvm")));
   }
@@ -131,6 +159,44 @@ void JvmMetric::CalculateValue() {
       return;
     case PEAK_COMMITTED: value_ = pool.peak_committed;
       return;
-    default: DCHECK(false) << "Unknown JvmMetricType: " << metric_type_;
+    default:
+      DCHECK(false) << "Unknown JvmMetricType: " << metric_type_;
+  }
+}
+
+Status BufferPoolMetric::InitMetrics(MetricGroup* metrics,
+    ReservationTracker* global_reservations, BufferPool* buffer_pool) {
+  LIMIT = metrics->RegisterMetric(
+      new BufferPoolMetric(MetricDefs::Get("buffer-pool.limit"),
+          BufferPoolMetricType::LIMIT, global_reservations, buffer_pool));
+  SYSTEM_ALLOCATED = metrics->RegisterMetric(
+      new BufferPoolMetric(MetricDefs::Get("buffer-pool.system-allocated"),
+          BufferPoolMetricType::SYSTEM_ALLOCATED, global_reservations, buffer_pool));
+  RESERVED = metrics->RegisterMetric(
+      new BufferPoolMetric(MetricDefs::Get("buffer-pool.reserved"),
+          BufferPoolMetricType::RESERVED, global_reservations, buffer_pool));
+  return Status::OK();
+}
+
+BufferPoolMetric::BufferPoolMetric(const TMetricDef& def, BufferPoolMetricType type,
+    ReservationTracker* global_reservations, BufferPool* buffer_pool)
+  : UIntGauge(def, 0),
+    type_(type),
+    global_reservations_(global_reservations),
+    buffer_pool_(buffer_pool) {}
+
+void BufferPoolMetric::CalculateValue() {
+  switch (type_) {
+    case BufferPoolMetricType::LIMIT:
+      value_ = buffer_pool_->GetSystemBytesLimit();
+      break;
+    case BufferPoolMetricType::SYSTEM_ALLOCATED:
+      value_ = buffer_pool_->GetSystemBytesAllocated();
+      break;
+    case BufferPoolMetricType::RESERVED:
+      value_ = global_reservations_->GetReservation();
+      break;
+    default:
+      DCHECK(false) << "Unknown BufferPoolMetricType: " << static_cast<int>(type_);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index 34728e3..e058bcb 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -29,8 +29,20 @@
 
 namespace impala {
 
+class BufferPool;
+class ReservationTracker;
 class Thread;
 
+/// Memory metrics including TCMalloc and BufferPool memory.
+class AggregateMemoryMetric {
+ public:
+  /// The sum of Tcmalloc TOTAL_BYTES_RESERVED and BufferPool SYSTEM_ALLOCATED.
+  /// Approximates the total amount of physical memory consumed by the backend (i.e. not
+  /// including JVM memory), which is either in use by queries or cached by the BufferPool
+  /// or TcMalloc.
+  static SumGauge<uint64_t>* TOTAL_USED;
+};
+
 /// Specialised metric which exposes numeric properties from tcmalloc.
 class TcmallocMetric : public UIntGauge {
  public:
@@ -128,10 +140,44 @@ class JvmMetric : public IntGauge {
   JvmMetricType metric_type_;
 };
 
-/// Registers common tcmalloc memory metrics. If register_jvm_metrics is true, the JVM
-/// memory metrics are also registered.
-Status RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_metrics);
+/// Metric that reports information about the buffer pool.
+class BufferPoolMetric : public UIntGauge {
+ public:
+  static Status InitMetrics(MetricGroup* metrics, ReservationTracker* global_reservations,
+      BufferPool* buffer_pool);
+
+  /// Global metrics, initialized by CreateAndRegisterMetrics().
+  static BufferPoolMetric* LIMIT;
+  static BufferPoolMetric* SYSTEM_ALLOCATED;
+  static BufferPoolMetric* RESERVED;
+
+ protected:
+  virtual void CalculateValue();
+
+ private:
+  enum class BufferPoolMetricType {
+    LIMIT, // Limit on memory allocated to buffers.
+    // Total amount of buffer memory allocated from the system. Always <= LIMIT.
+    SYSTEM_ALLOCATED,
+    // Total of all buffer reservations. May be < SYSTEM_ALLOCATED if not all reservations
+    // are fulfilled, or > SYSTEM_ALLOCATED because of additional memory cached by
+    // BufferPool. Always <= LIMIT.
+    RESERVED,
+  };
+
+  BufferPoolMetric(const TMetricDef& def, BufferPoolMetricType type,
+      ReservationTracker* global_reservations, BufferPool* buffer_pool);
+
+  BufferPoolMetricType type_;
+  ReservationTracker* global_reservations_;
+  BufferPool* buffer_pool_;
+};
 
+/// Registers common tcmalloc memory metrics. If 'register_jvm_metrics' is true, the JVM
+/// memory metrics are also registered. If 'global_reservations' and 'buffer_pool' are
+/// not NULL, also register buffer pool metrics.
+Status RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_metrics,
+    ReservationTracker* global_reservations, BufferPool* buffer_pool);
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/be/src/util/metrics-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/metrics-test.cc b/be/src/util/metrics-test.cc
index 10ecb64..543e0e3 100644
--- a/be/src/util/metrics-test.cc
+++ b/be/src/util/metrics-test.cc
@@ -106,6 +106,27 @@ TEST_F(MetricsTest, GaugeMetrics) {
   AssertValue(int_gauge_with_units, 10, "10s000ms");
 }
 
+TEST_F(MetricsTest, SumGauge) {
+  MetricGroup metrics("SumGauge");
+  AddMetricDef("gauge1", TMetricKind::GAUGE, TUnit::NONE);
+  AddMetricDef("gauge2", TMetricKind::GAUGE, TUnit::NONE);
+  AddMetricDef("sum", TMetricKind::GAUGE, TUnit::NONE);
+  IntGauge* gauge1 = metrics.AddGauge<int64_t>("gauge1", 0);
+  IntGauge* gauge2 = metrics.AddGauge<int64_t>("gauge2", 0);
+
+  vector<IntGauge*> gauges({gauge1, gauge2});
+  IntGauge* sum_gauge =
+      metrics.RegisterMetric(new SumGauge<int64_t>(MetricDefs::Get("sum"), gauges));
+
+  AssertValue(sum_gauge, 0, "0");
+  gauge1->Increment(1);
+  AssertValue(sum_gauge, 1, "1");
+  gauge2->Increment(-1);
+  AssertValue(sum_gauge, 0, "0");
+  gauge2->Increment(100);
+  AssertValue(sum_gauge, 100, "100");
+}
+
 TEST_F(MetricsTest, PropertyMetrics) {
   MetricGroup metrics("PropertyMetrics");
   AddMetricDef("bool_property", TMetricKind::PROPERTY, TUnit::NONE);
@@ -196,7 +217,7 @@ TEST_F(MetricsTest, StatsMetricsSingle) {
 TEST_F(MetricsTest, MemMetric) {
 #ifndef ADDRESS_SANITIZER
   MetricGroup metrics("MemMetrics");
-  RegisterMemoryMetrics(&metrics, false);
+  RegisterMemoryMetrics(&metrics, false, nullptr, nullptr);
   // Smoke test to confirm that tcmalloc metrics are returning reasonable values.
   UIntGauge* bytes_in_use =
       metrics.FindMetricForTesting<UIntGauge>("tcmalloc.bytes-in-use");
@@ -228,7 +249,7 @@ TEST_F(MetricsTest, MemMetric) {
 
 TEST_F(MetricsTest, JvmMetrics) {
   MetricGroup metrics("JvmMetrics");
-  RegisterMemoryMetrics(&metrics, true);
+  RegisterMemoryMetrics(&metrics, true, nullptr, nullptr);
   UIntGauge* jvm_total_used =
       metrics.GetOrCreateChildGroup("jvm")->FindMetricForTesting<UIntGauge>(
           "jvm.total.current-usage-bytes");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/be/src/util/metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index 93497ea..1296e06 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -19,9 +19,10 @@
 #define IMPALA_UTIL_METRICS_H
 
 #include <map>
-#include <string>
 #include <sstream>
 #include <stack>
+#include <string>
+#include <vector>
 #include <boost/function.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/locks.hpp>
@@ -211,6 +212,26 @@ class SimpleMetric : public Metric {
   T value_;
 };
 
+// Gauge metric that computes the sum of several gauges.
+template <typename T>
+class SumGauge : public SimpleMetric<T, TMetricKind::GAUGE> {
+ public:
+  SumGauge(const TMetricDef& metric_def,
+      const std::vector<SimpleMetric<T, TMetricKind::GAUGE>*>& metrics)
+    : SimpleMetric<T, TMetricKind::GAUGE>(metric_def, 0), metrics_(metrics) {}
+  virtual ~SumGauge() {}
+
+ private:
+  virtual void CalculateValue() override {
+    T sum = 0;
+    for (SimpleMetric<T, TMetricKind::GAUGE>* metric : metrics_) sum += metric->value();
+    this->value_ = sum;
+  }
+
+  /// The metrics to be summed.
+  std::vector<SimpleMetric<T, TMetricKind::GAUGE>*> metrics_;
+};
+
 /// Container for a set of metrics. A MetricGroup owns the memory for every metric
 /// contained within it (see Add*() to create commonly used metric
 /// types). Metrics are 'registered' with a MetricGroup, once registered they cannot be
@@ -218,8 +239,8 @@ class SimpleMetric : public Metric {
 //
 /// MetricGroups may be organised hierarchically as a tree.
 //
-/// Typically a metric object is cached by its creator after registration. If a metric must
-/// be retrieved without an available pointer, FindMetricForTesting() will search the
+/// Typically a metric object is cached by its creator after registration. If a metric
+/// must be retrieved without an available pointer, FindMetricForTesting() will search the
 /// MetricGroup and all its descendent MetricGroups in turn.
 //
 /// TODO: Hierarchical naming: that is, resolve "group1.group2.metric-name" to a path

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 39036c5..33bae25 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -281,7 +281,7 @@ error_codes = (
   ("SCRATCH_LIMIT_EXCEEDED", 90, "Scratch space limit of $0 bytes exceeded for query "
    "while spilling data to disk."),
 
-  ("BUFFER_ALLOCATION_FAILED", 91, "Unexpected error allocating $0 byte buffer."),
+  ("BUFFER_ALLOCATION_FAILED", 91, "Unexpected error allocating $0 byte buffer: $1"),
 
   ("PARQUET_ZERO_ROWS_IN_NON_EMPTY_FILE", 92, "File '$0' is corrupt: metadata indicates "
    "a zero row count but there is at least one non-empty row group."),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/955b257c/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 0829139..49d13ae 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1092,6 +1092,48 @@
     "key": "tcmalloc.total-bytes-reserved"
   },
   {
+    "description": "Maximum allowed bytes allocated by the buffer pool.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Buffer Pool Allocated Memory Limit.",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "buffer-pool.limit"
+  },
+  {
+    "description": "Total buffer memory currently allocated by the buffer pool.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Buffer Pool Total Allocated Memory.",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "buffer-pool.system-allocated"
+  },
+  {
+    "description": "Total memory currently reserved for buffers.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Buffer Pool Total Reserved Memory.",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "buffer-pool.reserved"
+  },
+  {
+    "description": "Total memory currently used by TCMalloc and buffer pool.",
+    "contexts": [
+      "STATESTORE",
+      "CATALOGSERVER",
+      "IMPALAD"
+    ],
+    "label": "Total Used Memory.",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "memory.total-used"
+  },
+  {
     "description": "The number of running threads in this process.",
     "contexts": [
       "STATESTORE",


Mime
View raw message