impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [07/11] incubator-impala git commit: IMPALA-4674: Part 2: port backend exec to BufferPool
Date Sat, 05 Aug 2017 03:18:18 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
deleted file mode 100644
index e4737c2..0000000
--- a/be/src/runtime/buffered-block-mgr.cc
+++ /dev/null
@@ -1,1254 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/buffered-block-mgr.h"
-#include "runtime/mem-pool.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/runtime-state.h"
-#include "runtime/tmp-file-mgr.h"
-#include "util/debug-util.h"
-#include "util/disk-info.h"
-#include "util/filesystem-util.h"
-#include "util/impalad-metrics.h"
-#include "util/runtime-profile-counters.h"
-#include "util/uid-util.h"
-
-#include <gutil/strings/substitute.h>
-
-#include "common/names.h"
-
-using namespace strings; // for Substitute
-
-namespace impala {
-
-BufferedBlockMgr::BlockMgrsMap BufferedBlockMgr::query_to_block_mgrs_;
-SpinLock BufferedBlockMgr::static_block_mgrs_lock_;
-
-
-struct BufferedBlockMgr::Client {
-  Client(const string& debug_info, BufferedBlockMgr* mgr, int num_reserved_buffers,
-      bool tolerates_oversubscription, MemTracker* tracker,
-         RuntimeState* state)
-      : debug_info_(debug_info),
-        mgr_(mgr),
-        state_(state),
-        tracker_(tracker),
-        query_tracker_(mgr_->mem_tracker_->parent()),
-        num_reserved_buffers_(num_reserved_buffers),
-        tolerates_oversubscription_(tolerates_oversubscription),
-        num_tmp_reserved_buffers_(0),
-        num_pinned_buffers_(0),
-        logged_large_allocation_warning_(false) {
-    DCHECK(tracker != NULL);
-  }
-
-  /// A string that will be printed to identify the client, e.g. which exec node it
-  /// belongs to.
-  string debug_info_;
-
-  /// Unowned.
-  BufferedBlockMgr* mgr_;
-
-  /// Unowned.
-  RuntimeState* state_;
-
-  /// Tracker for this client. Unowned.
-  /// When the client gets a buffer, we update the consumption on this tracker. However,
-  /// we don't want to transfer the buffer from the block mgr to the client (i.e. release
-  /// from the block mgr), since the block mgr is where the block mem usage limit is
-  /// enforced. Even when we give a buffer to a client, the buffer is still owned and
-  /// counts against the block mgr tracker (i.e. there is a fixed pool of buffers
-  /// regardless of if they are in the block mgr or the clients).
-  MemTracker* tracker_;
-
-  /// This is the common ancestor between the block mgr tracker and the client tracker.
-  /// When memory is transferred to the client, we want it to stop at this tracker.
-  MemTracker* query_tracker_;
-
-  /// Number of buffers reserved by this client.
-  int num_reserved_buffers_;
-
-  /// If false, return MEM_LIMIT_EXCEEDED when a reserved buffer cannot be allocated.
-  /// If true, return Status::OK() as with a non-reserved buffer.
-  bool tolerates_oversubscription_;
-
-  /// Number of buffers temporarily reserved.
-  int num_tmp_reserved_buffers_;
-
-  /// Number of buffers pinned by this client.
-  int num_pinned_buffers_;
-
-  /// Whether a warning about a large allocation has been made for this client. Used
-  /// to avoid producing excessive log messages.
-  bool logged_large_allocation_warning_;
-
-  void PinBuffer(BufferDescriptor* buffer) {
-    DCHECK(buffer != NULL);
-    if (buffer->len == mgr_->max_block_size()) {
-      ++num_pinned_buffers_;
-      tracker_->ConsumeLocal(buffer->len, query_tracker_);
-    }
-  }
-
-  void UnpinBuffer(BufferDescriptor* buffer) {
-    DCHECK(buffer != NULL);
-    if (buffer->len == mgr_->max_block_size()) {
-      DCHECK_GT(num_pinned_buffers_, 0);
-      --num_pinned_buffers_;
-      tracker_->ReleaseLocal(buffer->len, query_tracker_);
-    }
-  }
-
-  string DebugString() const {
-    stringstream ss;
-    ss << "Client " << this << endl
-       << " " << debug_info_ << endl
-       << "  num_reserved_buffers=" << num_reserved_buffers_ << endl
-       << "  num_tmp_reserved_buffers=" << num_tmp_reserved_buffers_ << endl
-       << "  num_pinned_buffers=" << num_pinned_buffers_;
-    return ss.str();
-  }
-};
-
-// BufferedBlockMgr::Block methods.
-BufferedBlockMgr::Block::Block(BufferedBlockMgr* block_mgr)
-  : buffer_desc_(NULL),
-    block_mgr_(block_mgr),
-    client_(NULL),
-    valid_data_len_(0),
-    num_rows_(0) {}
-
-Status BufferedBlockMgr::Block::Pin(bool* pinned, Block* release_block, bool unpin) {
-  return block_mgr_->PinBlock(this, pinned, release_block, unpin);
-}
-
-Status BufferedBlockMgr::Block::Unpin() {
-  return block_mgr_->UnpinBlock(this);
-}
-
-void BufferedBlockMgr::Block::Delete() {
-  block_mgr_->DeleteBlock(this);
-}
-
-void BufferedBlockMgr::Block::Init() {
-  // No locks are taken because the block is new or has previously been deleted.
-  is_pinned_ = false;
-  in_write_ = false;
-  is_deleted_ = false;
-  valid_data_len_ = 0;
-  client_ = NULL;
-  num_rows_ = 0;
-}
-
-bool BufferedBlockMgr::Block::Validate() const {
-  if (is_deleted_ && (is_pinned_ || (!in_write_ && buffer_desc_ != NULL))) {
-    LOG(ERROR) << "Deleted block in use - " << DebugString();
-    return false;
-  }
-
-  if (buffer_desc_ == NULL && (is_pinned_ || in_write_)) {
-    LOG(ERROR) << "Block without buffer in use - " << DebugString();
-    return false;
-  }
-
-  if (buffer_desc_ == NULL && block_mgr_->unpinned_blocks_.Contains(this)) {
-    LOG(ERROR) << "Unpersisted block without buffer - " << DebugString();
-    return false;
-  }
-
-  if (buffer_desc_ != NULL && (buffer_desc_->block != this)) {
-    LOG(ERROR) << "Block buffer inconsistency - " << DebugString();
-    return false;
-  }
-
-  return true;
-}
-
-string BufferedBlockMgr::Block::TmpFilePath() const {
-  if (write_handle_ == NULL) return "";
-  return write_handle_->TmpFilePath();
-}
-
-string BufferedBlockMgr::Block::DebugString() const {
-  stringstream ss;
-  ss << "Block: " << this << endl
-     << "  Buffer Desc: " << buffer_desc_ << endl
-     << "  Data Len: " << valid_data_len_ << endl
-     << "  Num Rows: " << num_rows_ << endl;
-  if (is_pinned_) ss << "  Buffer Len: " << buffer_len() << endl;
-  ss << "  Deleted: " << is_deleted_ << endl
-     << "  Pinned: " << is_pinned_ << endl
-     << "  Write Issued: " << in_write_ << endl
-     << "  Client Local: " << client_local_ << endl;
-  if (write_handle_ != NULL) {
-    ss << "  Write handle: " << write_handle_->DebugString() << endl;
-  }
-  if (client_ != NULL) ss << "  Client: " << client_->DebugString();
-  return ss.str();
-}
-
-BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr,
-    int64_t block_size, int64_t scratch_limit)
-  : max_block_size_(block_size),
-    // Keep two writes in flight per scratch disk so the disks can stay busy.
-    block_write_threshold_(tmp_file_mgr->NumActiveTmpDevices() * 2),
-    disable_spill_(state->query_ctx().disable_spilling || block_write_threshold_ == 0
-        || scratch_limit == 0),
-    query_id_(state->query_id()),
-    initialized_(false),
-    unfullfilled_reserved_buffers_(0),
-    total_pinned_buffers_(0),
-    non_local_outstanding_writes_(0),
-    tmp_file_group_(NULL),
-    is_cancelled_(false),
-    writes_issued_(0),
-    debug_write_delay_ms_(0) {}
-
-Status BufferedBlockMgr::Create(RuntimeState* state, MemTracker* parent,
-    RuntimeProfile* profile, TmpFileMgr* tmp_file_mgr, int64_t mem_limit,
-    int64_t block_size, shared_ptr<BufferedBlockMgr>* block_mgr) {
-  DCHECK(parent != NULL);
-  int64_t scratch_limit = state->query_options().scratch_limit;
-  block_mgr->reset();
-  {
-    lock_guard<SpinLock> lock(static_block_mgrs_lock_);
-    BlockMgrsMap::iterator it = query_to_block_mgrs_.find(state->query_id());
-    if (it != query_to_block_mgrs_.end()) *block_mgr = it->second.lock();
-    if (*block_mgr == NULL) {
-      // weak_ptr::lock returns NULL if the weak_ptr is expired. This means
-      // all shared_ptr references have gone to 0 and it is in the process of
-      // being deleted. This can happen if the last shared reference is released
-      // but before the weak ptr is removed from the map.
-      block_mgr->reset(
-          new BufferedBlockMgr(state, tmp_file_mgr, block_size, scratch_limit));
-      query_to_block_mgrs_[state->query_id()] = *block_mgr;
-    }
-  }
-  (*block_mgr)
-      ->Init(state->io_mgr(), tmp_file_mgr, profile, parent, mem_limit, scratch_limit);
-  return Status::OK();
-}
-
-int64_t BufferedBlockMgr::available_buffers(Client* client) const {
-  int64_t unused_reserved = client->num_reserved_buffers_ +
-      client->num_tmp_reserved_buffers_ - client->num_pinned_buffers_;
-  return max<int64_t>(0, remaining_unreserved_buffers()) +
-      max<int64_t>(0, unused_reserved);
-}
-
-int64_t BufferedBlockMgr::remaining_unreserved_buffers() const {
-  int64_t num_buffers = free_io_buffers_.size() +
-      unpinned_blocks_.size() + non_local_outstanding_writes_;
-  num_buffers += mem_tracker_->SpareCapacity() / max_block_size();
-  num_buffers -= unfullfilled_reserved_buffers_;
-  return num_buffers;
-}
-
-Status BufferedBlockMgr::RegisterClient(const string& debug_info,
-    int num_reserved_buffers, bool tolerates_oversubscription, MemTracker* tracker,
-    RuntimeState* state, Client** client) {
-  DCHECK_GE(num_reserved_buffers, 0);
-  Client* aClient = new Client(debug_info, this, num_reserved_buffers,
-      tolerates_oversubscription, tracker, state);
-  lock_guard<mutex> lock(lock_);
-  *client = obj_pool_.Add(aClient);
-  unfullfilled_reserved_buffers_ += num_reserved_buffers;
-  return Status::OK();
-}
-
-void BufferedBlockMgr::ClearReservations(Client* client) {
-  lock_guard<mutex> lock(lock_);
-  if (client->num_pinned_buffers_ < client->num_reserved_buffers_) {
-    unfullfilled_reserved_buffers_ -=
-        client->num_reserved_buffers_ - client->num_pinned_buffers_;
-  }
-  client->num_reserved_buffers_ = 0;
-
-  unfullfilled_reserved_buffers_ -= client->num_tmp_reserved_buffers_;
-  client->num_tmp_reserved_buffers_ = 0;
-}
-
-bool BufferedBlockMgr::TryAcquireTmpReservation(Client* client, int num_buffers) {
-  lock_guard<mutex> lock(lock_);
-  DCHECK_EQ(client->num_tmp_reserved_buffers_, 0);
-  if (client->num_pinned_buffers_ < client->num_reserved_buffers_) {
-    // If client has unused reserved buffers, we use those first.
-    num_buffers -= client->num_reserved_buffers_ - client->num_pinned_buffers_;
-  }
-  if (num_buffers < 0) return true;
-  if (available_buffers(client) < num_buffers) return false;
-
-  client->num_tmp_reserved_buffers_ = num_buffers;
-  unfullfilled_reserved_buffers_ += num_buffers;
-  return true;
-}
-
-bool BufferedBlockMgr::ConsumeMemory(Client* client, int64_t size) {
-  int64_t buffers_needed = BitUtil::Ceil(size, max_block_size());
-  if (UNLIKELY(!BitUtil::IsNonNegative32Bit(buffers_needed))) {
-    VLOG_QUERY << "Trying to consume " << size << " which is out of range.";
-    return false;
-  }
-  DCHECK_GT(buffers_needed, 0) << "Trying to consume 0 memory";
-
-  unique_lock<mutex> lock(lock_);
-  if (size < max_block_size() && mem_tracker_->TryConsume(size)) {
-    // For small allocations (less than a block size), just let the allocation through.
-    client->tracker_->ConsumeLocal(size, client->query_tracker_);
-    return true;
-  }
-
-  if (max<int64_t>(0, remaining_unreserved_buffers()) +
-      client->num_tmp_reserved_buffers_ < buffers_needed) {
-    return false;
-  }
-
-  if (mem_tracker_->TryConsume(size)) {
-    // There was still unallocated memory, don't need to recycle allocated blocks.
-    client->tracker_->ConsumeLocal(size, client->query_tracker_);
-    return true;
-  }
-
-  // Bump up client->num_tmp_reserved_buffers_ to satisfy this request. We don't want
-  // another client to grab the buffer.
-  int additional_tmp_reservations = 0;
-  if (client->num_tmp_reserved_buffers_ < buffers_needed) {
-    additional_tmp_reservations = buffers_needed - client->num_tmp_reserved_buffers_;
-    client->num_tmp_reserved_buffers_ += additional_tmp_reservations;
-    unfullfilled_reserved_buffers_ += additional_tmp_reservations;
-  }
-
-  // Loop until we have freed enough memory.
-  // We free all the memory at the end. We don't want another component to steal the
-  // memory.
-  int buffers_acquired = 0;
-  do {
-    BufferDescriptor* buffer_desc = NULL;
-    Status s = FindBuffer(lock, &buffer_desc); // This waits on the lock.
-    if (buffer_desc == NULL) break;
-    DCHECK(s.ok());
-    all_io_buffers_.erase(buffer_desc->all_buffers_it);
-    if (buffer_desc->block != NULL) buffer_desc->block->buffer_desc_ = NULL;
-    delete[] buffer_desc->buffer;
-    ++buffers_acquired;
-  } while (buffers_acquired != buffers_needed);
-
-  Status status = Status::OK();
-  if (buffers_acquired == buffers_needed) status = WriteUnpinnedBlocks();
-  // If we either couldn't acquire enough buffers or WriteUnpinnedBlocks() failed, undo
-  // the reservation.
-  if (buffers_acquired != buffers_needed || !status.ok()) {
-    if (!status.ok() && !status.IsCancelled()) {
-      VLOG_QUERY << "Query: " << query_id_ << " write unpinned buffers failed.";
-      client->state_->LogError(status.msg());
-    }
-    client->num_tmp_reserved_buffers_ -= additional_tmp_reservations;
-    unfullfilled_reserved_buffers_ -= additional_tmp_reservations;
-    mem_tracker_->Release(buffers_acquired * max_block_size());
-    return false;
-  }
-
-  client->num_tmp_reserved_buffers_ -= buffers_acquired;
-  unfullfilled_reserved_buffers_ -= buffers_acquired;
-
-  DCHECK_GE(buffers_acquired * max_block_size(), size);
-  mem_tracker_->Release(buffers_acquired * max_block_size());
-  if (!mem_tracker_->TryConsume(size)) return false;
-  client->tracker_->ConsumeLocal(size, client->query_tracker_);
-  DCHECK(Validate()) << endl << DebugInternal();
-  return true;
-}
-
-void BufferedBlockMgr::ReleaseMemory(Client* client, int64_t size) {
-  mem_tracker_->Release(size);
-  client->tracker_->ReleaseLocal(size, client->query_tracker_);
-}
-
-void BufferedBlockMgr::Cancel() {
-  {
-    lock_guard<mutex> lock(lock_);
-    if (is_cancelled_) return;
-    is_cancelled_ = true;
-  }
-}
-
-bool BufferedBlockMgr::IsCancelled() {
-  lock_guard<mutex> lock(lock_);
-  return is_cancelled_;
-}
-
-Status BufferedBlockMgr::MemLimitTooLowError(Client* client, int node_id) {
-  VLOG_QUERY << "Query: " << query_id_ << ". Node=" << node_id
-             << " ran out of memory: " << endl
-             << DebugInternal() << endl << client->DebugString();
-  int64_t min_memory = client->num_reserved_buffers_ * max_block_size();
-  string msg = Substitute(
-      "The memory limit is set too low to initialize spilling operator (id=$0). The "
-      "minimum required memory to spill this operator is $1.",
-      node_id, PrettyPrinter::Print(min_memory, TUnit::BYTES));
-  return client->tracker_->MemLimitExceeded(client->state_, msg);
-}
-
-Status BufferedBlockMgr::GetNewBlock(Client* client, Block* unpin_block, Block** block,
-    int64_t len) {
-  DCHECK_LE(len, max_block_size_) << "Cannot request block bigger than max_len";
-  DCHECK_NE(len, 0) << "Cannot request block of zero size";
-  *block = NULL;
-  Block* new_block = NULL;
-  Status status;
-
-  {
-    lock_guard<mutex> lock(lock_);
-    if (is_cancelled_) return Status::CANCELLED;
-    new_block = GetUnusedBlock(client);
-    DCHECK(new_block->Validate()) << endl << new_block->DebugString();
-    DCHECK_EQ(new_block->client_, client);
-    DCHECK_NE(new_block, unpin_block);
-
-    if (len > 0 && len < max_block_size_) {
-      DCHECK(unpin_block == NULL);
-      if (client->tracker_->TryConsume(len)) {
-        uint8_t* buffer = new uint8_t[len];
-        // Descriptors for non-I/O sized buffers are deleted when the block is deleted.
-        new_block->buffer_desc_ = new BufferDescriptor(buffer, len);
-        new_block->buffer_desc_->block = new_block;
-        new_block->is_pinned_ = true;
-        client->PinBuffer(new_block->buffer_desc_);
-        ++total_pinned_buffers_;
-        *block = new_block;
-        return Status::OK();
-      } else {
-        status = Status::OK();
-        goto no_buffer_avail;
-      }
-    }
-  }
-
-  bool in_mem;
-  status = FindBufferForBlock(new_block, &in_mem);
-  if (!status.ok()) goto no_buffer_avail;
-  DCHECK(!in_mem) << "A new block cannot start in mem.";
-  DCHECK(!new_block->is_pinned() || new_block->buffer_desc_ != NULL)
-      << new_block->DebugString();
-
-  if (!new_block->is_pinned()) {
-    if (unpin_block == NULL) {
-      // We couldn't get a new block and no unpin block was provided. Can't return
-      // a block.
-      status = Status::OK();
-      goto no_buffer_avail;
-    } else {
-      // We need to transfer the buffer from unpin_block to new_block.
-      status = TransferBuffer(new_block, unpin_block, true);
-      if (!status.ok()) goto no_buffer_avail;
-    }
-  } else if (unpin_block != NULL) {
-    // Got a new block without needing to transfer. Just unpin this block.
-    status = unpin_block->Unpin();
-    if (!status.ok()) goto no_buffer_avail;
-  }
-
-  DCHECK(new_block->is_pinned());
-  *block = new_block;
-  return Status::OK();
-
-no_buffer_avail:
-  DCHECK(new_block != NULL);
-  DeleteBlock(new_block);
-  return status;
-}
-
-Status BufferedBlockMgr::TransferBuffer(Block* dst, Block* src, bool unpin) {
-  Status status = Status::OK();
-  DCHECK(dst != NULL);
-  DCHECK(src != NULL);
-  unique_lock<mutex> lock(lock_);
-
-  DCHECK(src->is_pinned_);
-  DCHECK(!dst->is_pinned_);
-  DCHECK(dst->buffer_desc_ == NULL);
-  DCHECK_EQ(src->buffer_desc_->len, max_block_size_);
-
-  // Ensure that there aren't any writes in flight for 'src'.
-  WaitForWrite(lock, src);
-  src->is_pinned_ = false;
-
-  if (unpin) {
-    // First write out the src block so we can grab its buffer.
-    src->client_local_ = true;
-    status = WriteUnpinnedBlock(src);
-    if (!status.ok()) {
-      // The transfer failed, return the buffer to src.
-      src->is_pinned_ = true;
-      return status;
-    }
-    // Wait for the write to complete.
-    WaitForWrite(lock, src);
-    if (is_cancelled_) {
-      // We can't be sure the write succeeded, so return the buffer to src.
-      src->is_pinned_ = true;
-      return Status::CANCELLED;
-    }
-    DCHECK(!src->in_write_);
-  }
-  // Assign the buffer to the new block.
-  dst->buffer_desc_ = src->buffer_desc_;
-  dst->buffer_desc_->block = dst;
-  src->buffer_desc_ = NULL;
-  dst->is_pinned_ = true;
-  if (!unpin) DeleteBlockLocked(lock, src);
-  return Status::OK();
-}
-
-BufferedBlockMgr::~BufferedBlockMgr() {
-  shared_ptr<BufferedBlockMgr> other_mgr_ptr;
-  {
-    lock_guard<SpinLock> lock(static_block_mgrs_lock_);
-    BlockMgrsMap::iterator it = query_to_block_mgrs_.find(query_id_);
-    // IMPALA-2286: Another fragment may have called Create() for this query_id_ and
-    // saw that this BufferedBlockMgr is being destructed.  That fragement will
-    // overwrite the map entry for query_id_, pointing it to a different
-    // BufferedBlockMgr object.  We should let that object's destructor remove the
-    // entry.  On the other hand, if the second BufferedBlockMgr is destructed before
-    // this thread acquires the lock, then we'll remove the entry (because we can't
-    // distinguish between the two expired pointers), and when the other
-    // ~BufferedBlockMgr() call occurs, it won't find an entry for this query_id_.
-    if (it != query_to_block_mgrs_.end()) {
-      other_mgr_ptr = it->second.lock();
-      if (other_mgr_ptr.get() == NULL) {
-        // The BufferBlockMgr object referenced by this entry is being deconstructed.
-        query_to_block_mgrs_.erase(it);
-      } else {
-        // The map references another (still valid) BufferedBlockMgr.
-        DCHECK_NE(other_mgr_ptr.get(), this);
-      }
-    }
-  }
-  // IMPALA-4274: releasing the reference count can recursively call ~BufferedBlockMgr().
-  // Do not do that with 'static_block_mgrs_lock_' held.
-  other_mgr_ptr.reset();
-
-  // Delete tmp files and cancel any in-flight writes.
-  tmp_file_group_->Close();
-
-  // If there are any outstanding writes and we are here it means that when the
-  // WriteComplete() callback gets executed it is going to access invalid memory.
-  // See IMPALA-1890.
-  DCHECK_EQ(non_local_outstanding_writes_, 0) << endl << DebugInternal();
-
-  // Validate that clients deleted all of their blocks. Since all writes have
-  // completed at this point, any deleted blocks should be in unused_blocks_.
-  for (auto it = all_blocks_.begin(); it != all_blocks_.end(); ++it) {
-    Block* block = *it;
-    DCHECK(block->Validate()) << block->DebugString();
-    DCHECK(unused_blocks_.Contains(block)) << block->DebugString();
-  }
-
-  // Free memory resources.
-  for (BufferDescriptor* buffer: all_io_buffers_) {
-    mem_tracker_->Release(buffer->len);
-    delete[] buffer->buffer;
-  }
-  DCHECK_EQ(mem_tracker_->consumption(), 0);
-  mem_tracker_->UnregisterFromParent();
-  mem_tracker_.reset();
-}
-
-int64_t BufferedBlockMgr::bytes_allocated() const {
-  return mem_tracker_->consumption();
-}
-
-int BufferedBlockMgr::num_pinned_buffers(Client* client) const {
-  return client->num_pinned_buffers_;
-}
-
-int BufferedBlockMgr::num_reserved_buffers_remaining(Client* client) const {
-  return max<int>(client->num_reserved_buffers_ - client->num_pinned_buffers_, 0);
-}
-
-MemTracker* BufferedBlockMgr::get_tracker(Client* client) const {
-  return client->tracker_;
-}
-
-int64_t BufferedBlockMgr::GetNumWritesOutstanding() {
-  // Acquire lock to avoid returning mid-way through WriteComplete() when the
-  // state may be inconsistent.
-  lock_guard<mutex> lock(lock_);
-  return profile()->GetCounter("BlockWritesOutstanding")->value();
-}
-
-Status BufferedBlockMgr::DeleteOrUnpinBlock(Block* block, bool unpin) {
-  if (block == NULL) {
-    return IsCancelled() ? Status::CANCELLED : Status::OK();
-  }
-  if (unpin) {
-    return block->Unpin();
-  } else {
-    block->Delete();
-    return IsCancelled() ? Status::CANCELLED : Status::OK();
-  }
-}
-
-Status BufferedBlockMgr::PinBlock(Block* block, bool* pinned, Block* release_block,
-    bool unpin) {
-  DCHECK(block != NULL);
-  DCHECK(!block->is_deleted_);
-  Status status;
-  *pinned = false;
-  if (block->is_pinned_) {
-    *pinned = true;
-    return DeleteOrUnpinBlock(release_block, unpin);
-  }
-
-  bool in_mem = false;
-  status = FindBufferForBlock(block, &in_mem);
-  if (!status.ok()) goto error;
-  *pinned = block->is_pinned_;
-
-  if (in_mem) {
-    // The block's buffer is still in memory with the original data.
-    status = CancelWrite(block);
-    if (!status.ok()) goto error;
-    return DeleteOrUnpinBlock(release_block, unpin);
-  }
-
-  if (!block->is_pinned_) {
-    if (release_block == NULL) return Status::OK();
-
-    if (block->buffer_desc_ != NULL) {
-      // The block's buffer is still in memory but we couldn't get an additional buffer
-      // because it would eat into another client's reservation. However, we can use
-      // release_block's reservation, so reclaim the buffer.
-      {
-        lock_guard<mutex> lock(lock_);
-        if (free_io_buffers_.Contains(block->buffer_desc_)) {
-          DCHECK(!block->is_pinned_ && !block->in_write_ &&
-                 !unpinned_blocks_.Contains(block)) << endl << block->DebugString();
-          free_io_buffers_.Remove(block->buffer_desc_);
-        } else if (unpinned_blocks_.Contains(block)) {
-          unpinned_blocks_.Remove(block);
-        } else {
-          DCHECK(block->in_write_);
-        }
-        block->is_pinned_ = true;
-        *pinned = true;
-        block->client_->PinBuffer(block->buffer_desc_);
-        ++total_pinned_buffers_;
-        status = WriteUnpinnedBlocks();
-        if (!status.ok()) goto error;
-      }
-      status = CancelWrite(block);
-      if (!status.ok()) goto error;
-      return DeleteOrUnpinBlock(release_block, unpin);
-    }
-    // FindBufferForBlock() wasn't able to find a buffer so transfer the one from
-    // 'release_block'.
-    status = TransferBuffer(block, release_block, unpin);
-    if (!status.ok()) goto error;
-    DCHECK(!release_block->is_pinned_);
-    release_block = NULL; // Handled by transfer.
-    DCHECK(block->is_pinned_);
-    *pinned = true;
-  }
-
-  DCHECK(block->write_handle_ != NULL) << block->DebugString() << endl << release_block;
-
-  // The block is on disk - read it back into memory.
-  if (block->valid_data_len() > 0) {
-    status = tmp_file_group_->Read(block->write_handle_.get(), block->valid_data());
-    if (!status.ok()) goto error;
-  }
-  tmp_file_group_->DestroyWriteHandle(move(block->write_handle_));
-  return DeleteOrUnpinBlock(release_block, unpin);
-
-error:
-  DCHECK(!status.ok());
-  // Make sure to delete the block if we hit an error before calling DeleteOrUnpin().
-  if (release_block != NULL && !unpin) DeleteBlock(release_block);
-  return status;
-}
-
-Status BufferedBlockMgr::CancelWrite(Block* block) {
-  {
-    unique_lock<mutex> lock(lock_);
-    DCHECK(block->buffer_desc_ != NULL);
-    // If there is an in-flight write, wait for it to finish. This is sub-optimal
-    // compared to just cancelling the write, but reduces the number of possible
-    // code paths in this legacy code.
-    WaitForWrite(lock, block);
-    if (is_cancelled_) return Status::CANCELLED;
-  }
-  if (block->write_handle_ != NULL) {
-    // Make sure the write is not in-flight.
-    block->write_handle_->Cancel();
-    block->write_handle_->WaitForWrite();
-    // Restore the in-memory data without reading from disk (e.g. decrypt it).
-    RETURN_IF_ERROR(
-        tmp_file_group_->RestoreData(move(block->write_handle_), block->valid_data()));
-  }
-  return Status::OK();
-}
-
-Status BufferedBlockMgr::UnpinBlock(Block* block) {
-  DCHECK(!block->is_deleted_) << "Unpin for deleted block.";
-
-  lock_guard<mutex> unpinned_lock(lock_);
-  if (is_cancelled_) return Status::CANCELLED;
-  DCHECK(block->Validate()) << endl << block->DebugString();
-  if (!block->is_pinned_) return Status::OK();
-  DCHECK_EQ(block->buffer_desc_->len, max_block_size_) << "Can only unpin io blocks.";
-  DCHECK(Validate()) << endl << DebugInternal();
-  // Add 'block' to the list of unpinned blocks and set is_pinned_ to false.
-  // Cache its position in the list for later removal.
-  block->is_pinned_ = false;
-  DCHECK(!unpinned_blocks_.Contains(block)) << " Unpin for block in unpinned list";
-  if (!block->in_write_) unpinned_blocks_.Enqueue(block);
-  block->client_->UnpinBuffer(block->buffer_desc_);
-  if (block->client_->num_pinned_buffers_ < block->client_->num_reserved_buffers_) {
-    ++unfullfilled_reserved_buffers_;
-  }
-  --total_pinned_buffers_;
-  RETURN_IF_ERROR(WriteUnpinnedBlocks());
-  DCHECK(Validate()) << endl << DebugInternal();
-  DCHECK(block->Validate()) << endl << block->DebugString();
-  return Status::OK();
-}
-
-Status BufferedBlockMgr::WriteUnpinnedBlocks() {
-  if (disable_spill_) return Status::OK();
-
-  // Assumes block manager lock is already taken.
-  while (non_local_outstanding_writes_ + free_io_buffers_.size() < block_write_threshold_
-      && !unpinned_blocks_.empty()) {
-    // Pop a block from the back of the list (LIFO).
-    Block* write_block = unpinned_blocks_.PopBack();
-    write_block->client_local_ = false;
-    RETURN_IF_ERROR(WriteUnpinnedBlock(write_block));
-    ++non_local_outstanding_writes_;
-  }
-  DCHECK(Validate()) << endl << DebugInternal();
-  return Status::OK();
-}
-
-Status BufferedBlockMgr::WriteUnpinnedBlock(Block* block) {
-  // Assumes block manager lock is already taken.
-  DCHECK(!block->is_pinned_) << block->DebugString();
-  DCHECK(!block->in_write_) << block->DebugString();
-  DCHECK(block->write_handle_ == NULL) << block->DebugString();
-  DCHECK_EQ(block->buffer_desc_->len, max_block_size_);
-
-  // The block is on disk - read it back into memory.
-  RETURN_IF_ERROR(tmp_file_group_->Write(block->valid_data(),
-      [this, block](const Status& write_status) { WriteComplete(block, write_status); },
-      &block->write_handle_));
-
-  block->in_write_ = true;
-  DCHECK(block->Validate()) << endl << block->DebugString();
-  outstanding_writes_counter_->Add(1);
-  ++writes_issued_;
-  if (writes_issued_ == 1) {
-    if (ImpaladMetrics::NUM_QUERIES_SPILLED != NULL) {
-      ImpaladMetrics::NUM_QUERIES_SPILLED->Increment(1);
-    }
-  }
-  return Status::OK();
-}
-
-void BufferedBlockMgr::WaitForWrite(unique_lock<mutex>& lock, Block* block) {
-  DCHECK(!block->is_deleted_);
-  while (block->in_write_ && !is_cancelled_) {
-    block->write_complete_cv_.wait(lock);
-  }
-}
-
-void BufferedBlockMgr::WriteComplete(Block* block, const Status& write_status) {
-#ifndef NDEBUG
-  if (debug_write_delay_ms_ > 0) {
-    usleep(static_cast<int64_t>(debug_write_delay_ms_) * 1000);
-  }
-#endif
-  Status status = Status::OK();
-  lock_guard<mutex> lock(lock_);
-  DCHECK(Validate()) << endl << DebugInternal();
-  DCHECK(is_cancelled_ || block->in_write_) << "WriteComplete() for block not in write."
-                                            << endl
-                                            << block->DebugString();
-  DCHECK(block->buffer_desc_ != NULL);
-
-  outstanding_writes_counter_->Add(-1);
-  if (!block->client_local_) {
-    DCHECK_GT(non_local_outstanding_writes_, 0) << block->DebugString();
-    --non_local_outstanding_writes_;
-  }
-  block->in_write_ = false;
-
-  // ReturnUnusedBlock() will clear the block, so save required state in local vars.
-  // state is not valid if the block was deleted because the state may be torn down
-  // after the state's fragment has deleted all of its blocks.
-  RuntimeState* state = block->is_deleted_ ? NULL : block->client_->state_;
-
-  // If the block was re-pinned when it was in the IOMgr queue, don't free it.
-  if (block->is_pinned_) {
-    // The number of outstanding writes has decreased but the number of free buffers
-    // hasn't.
-    DCHECK(!block->is_deleted_);
-    DCHECK(!block->client_local_)
-        << "Client should be waiting. No one should have pinned this block.";
-    if (write_status.ok() && !is_cancelled_ && !state->is_cancelled()) {
-      status = WriteUnpinnedBlocks();
-    }
-  } else if (block->client_local_) {
-    DCHECK(!block->is_deleted_)
-        << "Client should be waiting. No one should have deleted this block.";
-  } else {
-    DCHECK_EQ(block->buffer_desc_->len, max_block_size_)
-        << "Only io sized buffers should spill";
-    free_io_buffers_.Enqueue(block->buffer_desc_);
-  }
-
-  if (!write_status.ok() || !status.ok() || is_cancelled_) {
-    VLOG_FILE << "Query: " << query_id_ << ". Write did not complete successfully: "
-                                           "write_status="
-              << write_status.GetDetail() << ", status=" << status.GetDetail()
-              << ". is_cancelled_=" << is_cancelled_;
-    // If the instance is already cancelled, don't confuse things with these errors.
-    if (!write_status.ok() && !write_status.IsCancelled()) {
-      // Report but do not attempt to recover from write error.
-      VLOG_QUERY << "Query: " << query_id_ << " write complete callback with error.";
-
-      if (state != NULL) state->LogError(write_status.msg());
-    }
-    if (!status.ok() && !status.IsCancelled()) {
-      VLOG_QUERY << "Query: " << query_id_ << " error while writing unpinned blocks.";
-      if (state != NULL) state->LogError(status.msg());
-    }
-    // Set cancelled. Threads waiting for a write will be woken up in the normal way when
-    // one of the writes they are waiting for completes.
-    is_cancelled_ = true;
-  }
-
-  // Notify any threads that may have been expecting to get block's buffer based on
-  // the value of 'non_local_outstanding_writes_'. Wake them all up. If we added
-  // a buffer to 'free_io_buffers_', one thread will get a buffer. All the others
-  // will re-evaluate whether they should continue waiting and if another write needs
-  // to be initiated.
-  if (!block->client_local_) buffer_available_cv_.notify_all();
-  if (block->is_deleted_) {
-    // Finish the DeleteBlock() work.
-    tmp_file_group_->DestroyWriteHandle(move(block->write_handle_));
-    block->buffer_desc_->block = NULL;
-    block->buffer_desc_ = NULL;
-    ReturnUnusedBlock(block);
-    block = NULL;
-  } else {
-    // Wake up the thread waiting on this block (if any).
-    block->write_complete_cv_.notify_one();
-  }
-
-  DCHECK(Validate()) << endl << DebugInternal();
-}
-
-void BufferedBlockMgr::DeleteBlock(Block* block) {
-  unique_lock<mutex> lock(lock_);
-  DeleteBlockLocked(lock, block);
-}
-
-void BufferedBlockMgr::DeleteBlockLocked(const unique_lock<mutex>& lock, Block* block) {
-  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
-  DCHECK(block->Validate()) << endl << DebugInternal();
-  DCHECK(!block->is_deleted_);
-  block->is_deleted_ = true;
-
-  if (block->is_pinned_) {
-    if (block->is_max_size()) --total_pinned_buffers_;
-    block->is_pinned_ = false;
-    block->client_->UnpinBuffer(block->buffer_desc_);
-    if (block->client_->num_pinned_buffers_ < block->client_->num_reserved_buffers_) {
-      ++unfullfilled_reserved_buffers_;
-    }
-  } else if (unpinned_blocks_.Contains(block)) {
-    // Remove block from unpinned list.
-    unpinned_blocks_.Remove(block);
-  }
-
-  if (block->in_write_) {
-    DCHECK(block->buffer_desc_ != NULL && block->buffer_desc_->len == max_block_size_)
-        << "Should never be writing a small buffer";
-    // If a write is still pending, cancel it and return. Cleanup will be done in
-    // WriteComplete(). Cancelling the write ensures that it won't try to log to the
-    // RuntimeState (which may be torn down before the block manager).
-    DCHECK(block->Validate()) << endl << block->DebugString();
-    return;
-  }
-
-  if (block->buffer_desc_ != NULL) {
-    if (block->buffer_desc_->len != max_block_size_) {
-      // Just delete the block for now.
-      delete[] block->buffer_desc_->buffer;
-      block->client_->tracker_->Release(block->buffer_desc_->len);
-      delete block->buffer_desc_;
-      block->buffer_desc_ = NULL;
-    } else {
-      if (!free_io_buffers_.Contains(block->buffer_desc_)) {
-        free_io_buffers_.Enqueue(block->buffer_desc_);
-        // Wake up one of the waiting threads, which will grab the buffer.
-        buffer_available_cv_.notify_one();
-      }
-      block->buffer_desc_->block = NULL;
-      block->buffer_desc_ = NULL;
-    }
-  }
-
-  // Discard any on-disk data. The write is finished so this won't call back into
-  // BufferedBlockMgr.
-  if (block->write_handle_ != NULL) {
-    tmp_file_group_->DestroyWriteHandle(move(block->write_handle_));
-  }
-  ReturnUnusedBlock(block);
-  DCHECK(block->Validate()) << endl << block->DebugString();
-  DCHECK(Validate()) << endl << DebugInternal();
-}
-
-void BufferedBlockMgr::ReturnUnusedBlock(Block* block) {
-  DCHECK(block->is_deleted_) << block->DebugString();
-  DCHECK(!block->is_pinned_) << block->DebugString();;
-  DCHECK(block->buffer_desc_ == NULL);
-  block->Init();
-  unused_blocks_.Enqueue(block);
-}
-
-Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) {
-  DCHECK(block != NULL);
-  Client* client = block->client_;
-  DCHECK(client != NULL);
-  DCHECK(!block->is_pinned_ && !block->is_deleted_)
-      << "Pinned or deleted block " << endl << block->DebugString();
-  *in_mem = false;
-
-  unique_lock<mutex> l(lock_);
-  if (is_cancelled_) return Status::CANCELLED;
-
-  // First check if there is enough reserved memory to satisfy this request.
-  bool is_reserved_request = false;
-  if (client->num_pinned_buffers_ < client->num_reserved_buffers_) {
-    is_reserved_request = true;
-  } else if (client->num_tmp_reserved_buffers_ > 0) {
-    is_reserved_request = true;
-    --client->num_tmp_reserved_buffers_;
-  }
-
-  DCHECK(Validate()) << endl << DebugInternal();
-  if (is_reserved_request) --unfullfilled_reserved_buffers_;
-
-  if (!is_reserved_request && remaining_unreserved_buffers() < 1) {
-    // The client already has its quota and there are no unreserved blocks left.
-    // Note that even if this passes, it is still possible for the path below to
-    // see OOM because another query consumed memory from the process tracker. This
-    // only happens if the buffer has not already been allocated by the block mgr.
-    // This check should ensure that the memory cannot be consumed by another client
-    // of the block mgr.
-    return Status::OK();
-  }
-
-  if (block->buffer_desc_ != NULL) {
-    // The block is in memory. It may be in 3 states:
-    //  1. In the unpinned list. The buffer will not be in the free list.
-    //  2. in_write_ == true. The buffer will not be in the free list.
-    //  3. The buffer is free, but hasn't yet been reassigned to a different block.
-    DCHECK_EQ(block->buffer_desc_->len, max_block_size())
-        << "Non-I/O blocks are always pinned";
-    DCHECK(unpinned_blocks_.Contains(block) ||
-           block->in_write_ ||
-           free_io_buffers_.Contains(block->buffer_desc_));
-    if (unpinned_blocks_.Contains(block)) {
-      unpinned_blocks_.Remove(block);
-      DCHECK(!free_io_buffers_.Contains(block->buffer_desc_));
-    } else if (block->in_write_) {
-      DCHECK(block->in_write_ && !free_io_buffers_.Contains(block->buffer_desc_));
-    } else {
-      free_io_buffers_.Remove(block->buffer_desc_);
-    }
-    buffered_pin_counter_->Add(1);
-    *in_mem = true;
-  } else {
-    BufferDescriptor* buffer_desc = NULL;
-    RETURN_IF_ERROR(FindBuffer(l, &buffer_desc));
-
-    if (buffer_desc == NULL) {
-      // There are no free buffers or blocks we can evict. We need to fail this request.
-      // If this is an optional request, return OK. If it is required, return OOM.
-      if (!is_reserved_request || client->tolerates_oversubscription_) return Status::OK();
-
-      if (VLOG_QUERY_IS_ON) {
-        stringstream ss;
-        ss << "Query id=" << query_id_ << " was unable to get minimum required buffers."
-           << endl << DebugInternal() << endl << client->DebugString();
-        VLOG_QUERY << ss.str();
-      }
-      return client->tracker_->MemLimitExceeded(client->state_,
-          "Query did not have enough memory to get the minimum required buffers in the "
-          "block manager.");
-    }
-
-    DCHECK(buffer_desc != NULL);
-    DCHECK_EQ(buffer_desc->len, max_block_size()) << "Non-I/O buffer";
-    if (buffer_desc->block != NULL) {
-      // This buffer was assigned to a block but now we are reusing it. Reset the
-      // previous block->buffer link.
-      DCHECK(buffer_desc->block->Validate()) << endl << buffer_desc->block->DebugString();
-      buffer_desc->block->buffer_desc_ = NULL;
-    }
-    buffer_desc->block = block;
-    block->buffer_desc_ = buffer_desc;
-  }
-  DCHECK(block->buffer_desc_ != NULL);
-  DCHECK(block->buffer_desc_->len < max_block_size() || !block->is_pinned_)
-      << "Trying to pin already pinned block. "
-      << block->buffer_desc_->len << " " << block->is_pinned_;
-  block->is_pinned_ = true;
-  client->PinBuffer(block->buffer_desc_);
-  ++total_pinned_buffers_;
-
-  DCHECK(block->Validate()) << endl << block->DebugString();
-  // The number of free buffers has decreased. Write unpinned blocks if the number
-  // of free buffers is less than the threshold.
-  RETURN_IF_ERROR(WriteUnpinnedBlocks());
-  DCHECK(Validate()) << endl << DebugInternal();
-  return Status::OK();
-}
-
-// We need to find a new buffer. We prefer getting this buffer in this order:
-//  1. Allocate a new block if the number of free blocks is less than the write threshold
-//     or if we are running without spilling, until we run out of memory.
-//  2. Pick a buffer from the free list.
-//  3. Wait and evict an unpinned buffer.
-Status BufferedBlockMgr::FindBuffer(unique_lock<mutex>& lock,
-    BufferDescriptor** buffer_desc) {
-  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
-  *buffer_desc = NULL;
-
-  // First, try to allocate a new buffer.
-  DCHECK(block_write_threshold_ > 0 || disable_spill_);
-  if ((free_io_buffers_.size() < block_write_threshold_ || disable_spill_) &&
-      mem_tracker_->TryConsume(max_block_size_)) {
-    uint8_t* new_buffer = new uint8_t[max_block_size_];
-    *buffer_desc = obj_pool_.Add(new BufferDescriptor(new_buffer, max_block_size_));
-    (*buffer_desc)->all_buffers_it = all_io_buffers_.insert(
-        all_io_buffers_.end(), *buffer_desc);
-    return Status::OK();
-  }
-
-  // Second, try to pick a buffer from the free list.
-  if (free_io_buffers_.empty()) {
-    // There are no free buffers. If spills are disabled or there no unpinned blocks we
-    // can write, return. We can't get a buffer.
-    if (disable_spill_) {
-      if (block_write_threshold_ == 0) {
-        return Status("Spilling has been disabled due to no usable scratch space. "
-            "Please specify a usable scratch space location via the --scratch_dirs "
-            "impalad flag.");
-      } else {
-        return Status("Spilling has been disabled for plans that do not have stats and "
-            "are not hinted to prevent potentially bad plans from using too many cluster "
-            "resources. Please run COMPUTE STATS on these tables, hint the plan or "
-            "disable this behavior via the DISABLE_UNSAFE_SPILLS query option.");
-      }
-    }
-
-    // Third, this block needs to use a buffer that was unpinned from another block.
-    // Get a free buffer from the front of the queue and assign it to the block.
-    do {
-      if (unpinned_blocks_.empty() && non_local_outstanding_writes_ == 0) {
-        return Status::OK();
-      }
-      SCOPED_TIMER(buffer_wait_timer_);
-      // Try to evict unpinned blocks before waiting.
-      RETURN_IF_ERROR(WriteUnpinnedBlocks());
-      DCHECK_GT(non_local_outstanding_writes_, 0) << endl << DebugInternal();
-      buffer_available_cv_.wait(lock);
-      if (is_cancelled_) return Status::CANCELLED;
-    } while (free_io_buffers_.empty());
-  }
-  *buffer_desc = free_io_buffers_.Dequeue();
-  return Status::OK();
-}
-
-BufferedBlockMgr::Block* BufferedBlockMgr::GetUnusedBlock(Client* client) {
-  DCHECK(client != NULL);
-  Block* new_block = NULL;
-  if (unused_blocks_.empty()) {
-    new_block = obj_pool_.Add(new Block(this));
-    all_blocks_.push_back(new_block);
-    new_block->Init();
-    created_block_counter_->Add(1);
-  } else {
-    new_block = unused_blocks_.Dequeue();
-    recycled_blocks_counter_->Add(1);
-  }
-  DCHECK(new_block != NULL);
-  new_block->client_ = client;
-  return new_block;
-}
-
-bool BufferedBlockMgr::Validate() const {
-  int num_free_io_buffers = 0;
-
-  if (total_pinned_buffers_ < 0) {
-    LOG(ERROR) << "total_pinned_buffers_ < 0: " << total_pinned_buffers_;
-    return false;
-  }
-
-  for (BufferDescriptor* buffer: all_io_buffers_) {
-    bool is_free = free_io_buffers_.Contains(buffer);
-    num_free_io_buffers += is_free;
-
-    if (*buffer->all_buffers_it != buffer) {
-      LOG(ERROR) << "All buffers list is corrupt. Buffer iterator is not valid.";
-      return false;
-    }
-
-    if (buffer->block == NULL && !is_free) {
-      LOG(ERROR) << "Buffer with no block not in free list." << endl << DebugInternal();
-      return false;
-    }
-
-    if (buffer->len != max_block_size_) {
-      LOG(ERROR) << "Non-io sized buffers should not end up on free list.";
-      return false;
-    }
-
-    if (buffer->block != NULL) {
-      if (buffer->block->buffer_desc_ != buffer) {
-        LOG(ERROR) << "buffer<->block pointers inconsistent. Buffer: " << buffer
-          << endl << buffer->block->DebugString();
-        return false;
-      }
-
-      if (!buffer->block->Validate()) {
-        LOG(ERROR) << "buffer->block inconsistent."
-          << endl << buffer->block->DebugString();
-        return false;
-      }
-
-      if (is_free && (buffer->block->is_pinned_ || buffer->block->in_write_ ||
-            unpinned_blocks_.Contains(buffer->block))) {
-        LOG(ERROR) << "Block with buffer in free list and"
-          << " is_pinned_ = " << buffer->block->is_pinned_
-          << " in_write_ = " << buffer->block->in_write_
-          << " Unpinned_blocks_.Contains = "
-          << unpinned_blocks_.Contains(buffer->block)
-          << endl << buffer->block->DebugString();
-        return false;
-      }
-    }
-  }
-
-  if (free_io_buffers_.size() != num_free_io_buffers) {
-    LOG(ERROR) << "free_buffer_list_ inconsistency."
-      << " num_free_io_buffers = " << num_free_io_buffers
-      << " free_io_buffers_.size() = " << free_io_buffers_.size()
-      << endl << DebugInternal();
-    return false;
-  }
-
-  Block* block = unpinned_blocks_.head();
-  while (block != NULL) {
-    if (!block->Validate()) {
-      LOG(ERROR) << "Block inconsistent in unpinned list."
-        << endl << block->DebugString();
-      return false;
-    }
-
-    if (block->in_write_ || free_io_buffers_.Contains(block->buffer_desc_)) {
-      LOG(ERROR) << "Block in unpinned list with"
-        << " in_write_ = " << block->in_write_
-        << " free_io_buffers_.Contains = "
-        << free_io_buffers_.Contains(block->buffer_desc_)
-        << endl << block->DebugString();
-      return false;
-    }
-    block = block->Next();
-  }
-
-  // Check if we're writing blocks when the number of free buffers is less than
-  // the write threshold. We don't write blocks after cancellation.
-  if (!is_cancelled_ && !unpinned_blocks_.empty() && !disable_spill_ &&
-      (free_io_buffers_.size() + non_local_outstanding_writes_ <
-       block_write_threshold_)) {
-    // TODO: this isn't correct when WriteUnpinnedBlocks() fails during the call to
-    // WriteUnpinnedBlock() so just log the condition but don't return false. Figure
-    // out a way to re-enable this change?
-    LOG(ERROR) << "Missed writing unpinned blocks";
-  }
-  return true;
-}
-
-string BufferedBlockMgr::DebugString(Client* client) {
-  stringstream ss;
-  unique_lock<mutex> l(lock_);
-  ss <<  DebugInternal();
-  if (client != NULL) ss << endl << client->DebugString();
-  return ss.str();
-}
-
-string BufferedBlockMgr::DebugInternal() const {
-  stringstream ss;
-  ss << "Buffered block mgr " << this << endl
-     << "  Num writes outstanding: " << outstanding_writes_counter_->value() << endl
-     << "  Num free io buffers: " << free_io_buffers_.size() << endl
-     << "  Num unpinned blocks: " << unpinned_blocks_.size() << endl
-     << "  Num available buffers: " << remaining_unreserved_buffers() << endl
-     << "  Total pinned buffers: " << total_pinned_buffers_ << endl
-     << "  Unfullfilled reserved buffers: " << unfullfilled_reserved_buffers_ << endl
-     << "  Remaining memory: " << mem_tracker_->SpareCapacity()
-     << " (#blocks=" << (mem_tracker_->SpareCapacity() / max_block_size_) << ")" << endl
-     << "  Block write threshold: " << block_write_threshold_;
-  if (tmp_file_group_ != NULL) ss << tmp_file_group_->DebugString();
-  return ss.str();
-}
-
-void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, TmpFileMgr* tmp_file_mgr,
-    RuntimeProfile* parent_profile, MemTracker* parent_tracker, int64_t mem_limit,
-    int64_t scratch_limit) {
-  unique_lock<mutex> l(lock_);
-  if (initialized_) return;
-
-  profile_.reset(new RuntimeProfile(&obj_pool_, "BlockMgr"));
-  parent_profile->AddChild(profile_.get());
-
-  tmp_file_group_.reset(new TmpFileMgr::FileGroup(
-      tmp_file_mgr, io_mgr, profile_.get(), query_id_, scratch_limit));
-
-  mem_limit_counter_ = ADD_COUNTER(profile_.get(), "MemoryLimit", TUnit::BYTES);
-  mem_limit_counter_->Set(mem_limit);
-  block_size_counter_ = ADD_COUNTER(profile_.get(), "MaxBlockSize", TUnit::BYTES);
-  block_size_counter_->Set(max_block_size_);
-  created_block_counter_ = ADD_COUNTER(profile_.get(), "BlocksCreated", TUnit::UNIT);
-  recycled_blocks_counter_ = ADD_COUNTER(profile_.get(), "BlocksRecycled", TUnit::UNIT);
-  outstanding_writes_counter_ =
-      ADD_COUNTER(profile_.get(), "BlockWritesOutstanding", TUnit::UNIT);
-  buffered_pin_counter_ = ADD_COUNTER(profile_.get(), "BufferedPins", TUnit::UNIT);
-  buffer_wait_timer_ = ADD_TIMER(profile_.get(), "TotalBufferWaitTime");
-
-  // Create a new mem_tracker and allocate buffers.
-  mem_tracker_.reset(
-      new MemTracker(profile(), mem_limit, "Block Manager", parent_tracker));
-
-  initialized_ = true;
-}
-
-} // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/buffered-block-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.h b/be/src/runtime/buffered-block-mgr.h
deleted file mode 100644
index ab05329..0000000
--- a/be/src/runtime/buffered-block-mgr.h
+++ /dev/null
@@ -1,606 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_BUFFERED_BLOCK_MGR
-#define IMPALA_RUNTIME_BUFFERED_BLOCK_MGR
-
-#include "runtime/disk-io-mgr.h"
-#include "runtime/tmp-file-mgr.h"
-#include "util/mem-range.h"
-
-#include <boost/unordered_map.hpp>
-
-namespace impala {
-
-class RuntimeState;
-
-/// The BufferedBlockMgr is used to allocate and manage blocks of data using a fixed memory
-/// budget. Available memory is split into a pool of fixed-size memory buffers. When a
-/// client allocates or requests a block, the block is assigned a buffer from this pool and
-/// is 'pinned' in memory. Clients can also unpin a block, allowing the manager to reassign
-/// its buffer to a different block.
-//
-/// The BufferedBlockMgr typically allocates blocks in IO buffer size to get maximal IO
-/// efficiency when spilling. Clients can also request smaller buffers that cannot spill
-/// (note that it would be possible to spill small buffers, but we currently do not allow
-/// it). This is useful to present the same block API and mem tracking for clients (one can
-/// use the block mgr API to mem track non-spillable (smaller) buffers). Clients that do
-/// partitioning (e.g. PHJ and PAGG) will start with these smaller buffer sizes to reduce
-/// the minimum buffering requirements and grow to max sized buffers as the input grows.
-/// For simplicity, these small buffers are not recycled (there's also not really a need
-/// since they are allocated all at once on query startup). These buffers are not counted
-/// against the reservation.
-//
-/// The BufferedBlockMgr reserves one buffer per disk ('block_write_threshold_') for
-/// itself. When the number of free buffers falls below 'block_write_threshold', unpinned
-/// blocks are flushed in Last-In-First-Out order. (It is assumed that unpinned blocks are
-/// re-read in FIFO order). The TmpFileMgr is used to obtain file handles to write to
-/// within the tmp directories configured for Impala.
-//
-/// It is expected to have one BufferedBlockMgr per query. All allocations that can grow
-/// proportional to the input size and that might need to spill to disk should allocate
-/// from the same BufferedBlockMgr.
-//
-/// A client must pin a block in memory to read/write its contents and unpin it when it is
-/// no longer in active use. The BufferedBlockMgr guarantees that:
-///  a) The memory buffer assigned to a block is not removed or released while it is pinned.
-///  b) The contents of an unpinned block will be available on a subsequent call to pin.
-//
-/// The Client supports the following operations:
-///  GetNewBlock(): Returns a new pinned block.
-///  Close(): Frees all memory and disk space. Called when a query is closed or cancelled.
-///   Close() is idempotent.
-//
-/// A Block supports the following operations:
-///  Pin(): Pins a block to a buffer in memory, and reads its contents from disk if
-///   necessary. If there are no free buffers, waits for a buffer to become available.
-///   Invoked before the contents of a block are read or written. The block
-///   will be maintained in memory until Unpin() is called.
-///  Unpin(): Invoked to indicate the block is not in active use. The block is added to a
-///   list of unpinned blocks. Unpinned blocks are only written when the number of free
-///   blocks falls below the 'block_write_threshold'.
-///  Delete(): Invoked to deallocate a block. The buffer associated with the block is
-///   immediately released and its on-disk location (if any) reused. All blocks must be
-///   deleted before the block manager is torn down.
-///
-/// The block manager is thread-safe with the following caveat: A single block cannot be
-/// used simultaneously by multiple clients in any capacity.
-/// However, the block manager client is not thread-safe. That is, the block manager
-/// allows multiple single-threaded block manager clients.
-///
-/// TODO: replace with BufferPool.
-class BufferedBlockMgr {
- private:
-  struct BufferDescriptor;
-
- public:
-  /// A client of the BufferedBlockMgr. There is a single BufferedBlockMgr per plan
-  /// fragment and all operators that need blocks from it should use a separate client.
-  /// Each client has the option to reserve a number of blocks that it can claim later.
-  /// The remaining memory that is not reserved by any clients is free for all and
-  /// available to all clients.
-  /// This is an opaque handle.
-  struct Client;
-
-  /// A fixed-size block of data that may be be persisted to disk. The state of the block
-  /// is maintained by the block manager and is described by 3 bools:
-  /// is_pinned_ = True if the block is pinned. The block has a non-null buffer_desc_,
-  ///   buffer_desc_ cannot be in the free buffer list and the block cannot be in
-  ///   unused_blocks_ or unpinned_blocks_. Newly allocated blocks are pinned.
-  /// in_write_ = True if a write has been issued but not completed for this block.
-  ///   The block cannot be in the unpinned_blocks_ and must have a non-null buffer_desc_
-  ///   that's not in the free buffer list. It may be pinned or unpinned.
-  /// is_deleted_ = True if Delete() has been called on a block. After this, no API call
-  ///   is valid on the block.
-  //
-  /// Pin() and Unpin() can be invoked on a block any number of times before Delete().
-  /// When a pinned block is unpinned for the first time, it is added to the
-  /// unpinned_blocks_ list and its buffer is removed from the free list.
-  /// If it is pinned or deleted at any time while it is on the unpinned list, it is
-  /// simply removed from that list. When it is dequeued from that list and enqueued
-  /// for writing, in_write_ is set to true. The block may be pinned, unpinned or deleted
-  /// while in_write_ is true. After the write has completed, the block's buffer will be
-  /// returned to the free buffer list if it is no longer pinned, and the block itself
-  /// will be put on the unused blocks list if Delete() was called.
-  //
-  /// A block MUST have a non-null buffer_desc_ if
-  ///  a) is_pinned_ is true (i.e. the client is using it), or
-  ///  b) in_write_ is true, (i.e. IO mgr is using it), or
-  ///  c) It is on the unpinned list (buffer has not been persisted.)
-  //
-  /// In addition to the block manager API, Block exposes Allocate(), ReturnAllocation()
-  /// and BytesRemaining() to allocate and free memory within a block, and buffer() and
-  /// valid_data_len() to read/write the contents of a block. These are not thread-safe.
-  class Block : public InternalQueue<Block>::Node {
-   public:
-    /// Pins a block in memory--assigns a free buffer to a block and reads it from disk if
-    /// necessary. If there are no free blocks and no unpinned blocks, '*pinned' is set to
-    /// false and the block is not pinned. If 'release_block' is non-NULL, if there is
-    /// memory pressure, this block will be pinned using the buffer from 'release_block'.
-    /// If 'unpin' is true, 'release_block' will be unpinned (regardless of whether or not
-    /// the buffer was used for this block). If 'unpin' is false, 'release_block' is
-    /// deleted. 'release_block' must be pinned. If an error occurs and 'unpin' was false,
-    /// 'release_block' is always deleted. If 'unpin' was true and an error occurs,
-    /// 'release_block' may be left pinned or unpinned.
-    Status Pin(bool* pinned, Block* release_block = NULL, bool unpin = true);
-
-    /// Unpins a block by adding it to the list of unpinned blocks maintained by the block
-    /// manager. An unpinned block must be flushed before its buffer is released or
-    /// assigned to a different block. Is non-blocking.
-    Status Unpin();
-
-    /// Delete a block. Its buffer is released and on-disk location can be over-written.
-    /// Non-blocking.
-    void Delete();
-
-    void AddRow() { ++num_rows_; }
-    int num_rows() const { return num_rows_; }
-
-    /// Allocates the specified number of bytes from this block.
-    template <typename T> T* Allocate(int size) {
-      DCHECK_GE(BytesRemaining(), size);
-      uint8_t* current_location = buffer_desc_->buffer + valid_data_len_;
-      valid_data_len_ += size;
-      return reinterpret_cast<T*>(current_location);
-    }
-
-    /// Return the number of remaining bytes that can be allocated in this block.
-    int BytesRemaining() const {
-      DCHECK(buffer_desc_ != NULL);
-      return buffer_desc_->len - valid_data_len_;
-    }
-
-    /// Return size bytes from the most recent allocation.
-    void ReturnAllocation(int size) {
-      DCHECK_GE(valid_data_len_, size);
-      valid_data_len_ -= size;
-    }
-
-    /// Pointer to start of the block data in memory. Only guaranteed to be valid if the
-    /// block is pinned.
-    uint8_t* buffer() const {
-      DCHECK(buffer_desc_ != NULL);
-      return buffer_desc_->buffer;
-    }
-
-    /// Returns a reference to the valid data in the block's buffer. Only guaranteed to
-    /// be valid if the block is pinned.
-    MemRange valid_data() const {
-      DCHECK(buffer_desc_ != NULL);
-      return MemRange(buffer_desc_->buffer, valid_data_len_);
-    }
-
-    /// Return the number of bytes allocated in this block.
-    int64_t valid_data_len() const { return valid_data_len_; }
-
-    /// Returns the length of the underlying buffer. Only callable if the block is
-    /// pinned.
-    int64_t buffer_len() const {
-      DCHECK(is_pinned());
-      return buffer_desc_->len;
-    }
-
-    /// Returns true if this block is the max block size. Only callable if the block
-    /// is pinned.
-    bool is_max_size() const {
-      DCHECK(is_pinned());
-      return buffer_desc_->len == block_mgr_->max_block_size();
-    }
-
-    bool is_pinned() const { return is_pinned_; }
-
-    /// Path of temporary file backing the block. Intended for use in testing.
-    /// Returns empty string if no backing file allocated.
-    std::string TmpFilePath() const;
-
-    /// Debug helper method to print the state of a block.
-    std::string DebugString() const;
-
-   private:
-    friend class BufferedBlockMgr;
-
-    Block(BufferedBlockMgr* block_mgr);
-
-    /// Initialize the state of a block and set the number of bytes allocated to 0.
-    void Init();
-
-    /// Debug helper method to validate the state of a block. block_mgr_ lock must already
-    /// be taken.
-    bool Validate() const;
-
-    /// Pointer to the buffer associated with the block. NULL if the block is not in
-    /// memory and cannot be changed while the block is pinned or being written.
-    BufferDescriptor* buffer_desc_;
-
-    /// Parent block manager object. Responsible for maintaining the state of the block.
-    BufferedBlockMgr* block_mgr_;
-
-    /// The client that owns this block.
-    Client* client_;
-
-    /// Non-NULL when the block data is written to scratch or is in the process of being
-    /// written.
-    std::unique_ptr<TmpFileMgr::WriteHandle> write_handle_;
-
-    /// Length of valid (i.e. allocated) data within the block.
-    int64_t valid_data_len_;
-
-    /// Number of rows in this block.
-    int num_rows_;
-
-    /// Block state variables. The block's buffer can be freed only if is_pinned_ and
-    /// in_write_ are both false.
-
-    /// is_pinned_ is true while the block is pinned by a client.
-    bool is_pinned_;
-
-    /// in_write_ is set to true when the block is enqueued for writing via DiskIoMgr,
-    /// and set to false when the write is complete.
-    bool in_write_;
-
-    /// True if the block is deleted by the client.
-    bool is_deleted_;
-
-    /// Condition variable to wait for the write to this block to finish. If 'in_write_'
-    /// is true, notify_one() will eventually be called on this condition variable. Only
-    /// on thread should wait on this cv at a time.
-    boost::condition_variable write_complete_cv_;
-
-    /// If true, this block is being written out so the underlying buffer can be
-    /// transferred to another block from the same client. We don't want this buffer
-    /// getting picked up by another client.
-    bool client_local_;
-  }; // class Block
-
-  /// Create a block manager with the specified mem_limit. If a block mgr with the
-  /// same query id has already been created, that block mgr is returned.
-  /// - mem_limit: maximum memory that will be used by the block mgr.
-  /// - buffer_size: maximum size of each buffer.
-  static Status Create(RuntimeState* state, MemTracker* parent,
-      RuntimeProfile* profile, TmpFileMgr* tmp_file_mgr, int64_t mem_limit,
-      int64_t buffer_size, std::shared_ptr<BufferedBlockMgr>* block_mgr);
-
-  ~BufferedBlockMgr();
-
-  /// Registers a client with 'num_reserved_buffers'. The returned client is owned
-  /// by the BufferedBlockMgr and has the same lifetime as it.
-  /// We allow oversubscribing the reserved buffers. It is likely that the
-  /// 'num_reserved_buffers' will be very pessimistic for small queries and we don't want
-  /// to
-  /// fail all of them with mem limit exceeded.
-  /// The min reserved buffers is often independent of data size and we still want
-  /// to run small queries with very small limits.
-  /// Buffers used by this client are reflected in tracker.
-  /// 'tolerates_oversubscription' determines how oversubscription is handled. If true,
-  /// failure to allocate a reserved buffer is not an error. If false, failure to
-  /// allocate a reserved buffer is a MEM_LIMIT_EXCEEDED error.
-  /// 'debug_info' is a string that will be printed in debug messages and errors to
-  /// identify the client.
-  Status RegisterClient(const std::string& debug_info, int num_reserved_buffers,
-      bool tolerates_oversubscription, MemTracker* tracker, RuntimeState* state,
-      Client** client);
-
-  /// Clears all reservations for this client.
-  void ClearReservations(Client* client);
-
-  /// Tries to acquire a one-time reservation of num_buffers. The semantics are:
-  ///  - If this call fails, the next 'num_buffers' calls to Pin()/GetNewBlock() might
-  ///    not have enough memory.
-  ///  - If this call succeeds, the next 'num_buffers' call to Pin()/GetNewBlock() will
-  ///    be guaranteed to get the block. Once these blocks have been pinned, the
-  ///    reservation from this call has no more effect.
-  /// Blocks coming from the tmp reservation also count towards the regular reservation.
-  /// This is useful to Pin() a number of blocks and guarantee all or nothing behavior.
-  bool TryAcquireTmpReservation(Client* client, int num_buffers);
-
-  /// Return a new pinned block. If there is no memory for this block, *block will be set
-  /// to NULL.
-  /// If len > 0, GetNewBlock() will return a block with a buffer of size len. len
-  /// must be less than max_block_size and this block cannot be unpinned.
-  /// This function will try to allocate new memory for the block up to the limit.
-  /// Otherwise it will (conceptually) write out an unpinned block and use that memory.
-  /// The caller can pass a non-NULL 'unpin_block' to transfer memory from 'unpin_block'
-  /// to the new block. If 'unpin_block' is non-NULL, the new block can never fail to
-  /// get a buffer. The semantics of this are:
-  ///   - If 'unpin_block' is non-NULL, it must be pinned.
-  ///   - If the call succeeds, 'unpin_block' is unpinned.
-  ///   - If there is no memory pressure, block will get a newly allocated buffer.
-  ///   - If there is memory pressure, block will get the buffer from 'unpin_block'.
-  Status GetNewBlock(Client* client, Block* unpin_block, Block** block, int64_t len = -1);
-
-  /// Test helper to cancel the block mgr. All subsequent calls that return a Status fail
-  /// with Status::CANCELLED. Idempotent.
-  void Cancel();
-
-  /// Returns true if the block manager was cancelled.
-  bool IsCancelled();
-
-  /// Dumps block mgr state. Grabs lock. If client is not NULL, also dumps its state.
-  std::string DebugString(Client* client = NULL);
-
-  /// Consumes 'size' bytes from the buffered block mgr. This is used by callers that want
-  /// the memory to come from the block mgr pool (and therefore trigger spilling) but need
-  /// the allocation to be more flexible than blocks. Buffer space reserved with
-  /// TryAcquireTmpReservation may be used to fulfill the request if available. If the
-  /// request is unsuccessful, that temporary buffer space is not consumed.
-  /// Returns false if there was not enough memory.
-  ///
-  /// This is used only for the Buckets structure in the hash table, which cannot be
-  /// segmented into blocks.
-  bool ConsumeMemory(Client* client, int64_t size);
-
-  /// All successful allocates bytes from ConsumeMemory() must have a corresponding
-  /// ReleaseMemory() call.
-  void ReleaseMemory(Client* client, int64_t size);
-
-  /// Returns a MEM_LIMIT_EXCEEDED error which includes the minimum memory required by
-  /// this 'client' that acts on behalf of the node with id 'node_id'. 'node_id' is used
-  /// only for error reporting.
-  Status MemLimitTooLowError(Client* client, int node_id);
-
-  int num_pinned_buffers(Client* client) const;
-  int num_reserved_buffers_remaining(Client* client) const;
-  MemTracker* mem_tracker() const { return mem_tracker_.get(); }
-  MemTracker* get_tracker(Client* client) const;
-  int64_t max_block_size() const { return max_block_size_; }
-  int64_t bytes_allocated() const;
-  RuntimeProfile* profile() { return profile_.get(); }
-  int writes_issued() const { return writes_issued_; }
-
-  void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; }
-
- private:
-  friend class BufferedBlockMgrTest;
-  friend struct Client;
-
-  /// Descriptor for a single memory buffer in the pool.
-  struct BufferDescriptor : public InternalQueue<BufferDescriptor>::Node {
-    /// Start of the buffer.
-    uint8_t* buffer;
-
-    /// Length of the buffer.
-    int64_t len;
-
-    /// Block that this buffer is assigned to. May be NULL.
-    Block* block;
-
-    /// Iterator into all_io_buffers_ for this buffer.
-    std::list<BufferDescriptor*>::iterator all_buffers_it;
-
-    BufferDescriptor(uint8_t* buf, int64_t len) : buffer(buf), len(len), block(NULL) {}
-  };
-
-  BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr, int64_t block_size,
-      int64_t scratch_limit);
-
-  /// Initializes the block mgr. Idempotent and thread-safe.
-  void Init(DiskIoMgr* io_mgr, TmpFileMgr* tmp_file_mgr, RuntimeProfile* profile,
-      MemTracker* parent_tracker, int64_t mem_limit, int64_t scratch_limit);
-
-  /// PinBlock(), UnpinBlock(), DeleteBlock() perform the actual work of Block::Pin(),
-  /// Unpin() and Delete(). DeleteBlock() must be called without the lock_ taken and
-  /// DeleteBlockLocked() must be called with the lock_ taken.
-  Status PinBlock(Block* block, bool* pinned, Block* src, bool unpin);
-  Status UnpinBlock(Block* block);
-  void DeleteBlock(Block* block);
-  void DeleteBlockLocked(const boost::unique_lock<boost::mutex>& lock, Block* block);
-
-  /// If there is an in-flight write, cancel the write and restore the contents of the
-  /// block's buffer. If no write has been started for 'block', does nothing. 'block'
-  /// must have an associated buffer. Returns an error status if an error is encountered
-  /// while cancelling the write or CANCELLED if the block mgr is cancelled.
-  Status CancelWrite(Block* block);
-
-  /// If the 'block' is NULL, checks if cancelled and returns. Otherwise, depending on
-  /// 'unpin' calls either  DeleteBlock() or UnpinBlock(), which both first check for
-  /// cancellation. It should be called without the lock_ acquired.
-  Status DeleteOrUnpinBlock(Block* block, bool unpin);
-
-  /// Transfers the buffer from 'src' to 'dst'. 'src' must be pinned. If a write is
-  /// already in flight for 'src', this may block until that write completes.
-  /// If unpin == false, 'src' is simply deleted.
-  /// If unpin == true, 'src' is unpinned and it may block until the write of 'src' is
-  /// completed.
-  /// The caller should not hold 'lock_'.
-  Status TransferBuffer(Block* dst, Block* src, bool unpin);
-
-  /// The number of buffers available for client. That is, if all other clients were
-  /// stopped, the number of buffers this client could get.
-  int64_t available_buffers(Client* client) const;
-
-  /// Returns the total number of unreserved buffers. This is the sum of unpinned,
-  /// free and buffers we can still allocate minus the total number of reserved buffers
-  /// that are not pinned.
-  /// Note this can be negative if the buffers are oversubscribed.
-  /// Must be called with lock_ taken.
-  int64_t remaining_unreserved_buffers() const;
-
-  /// Finds a buffer for a block and pins it. If the block's buffer has not been evicted,
-  /// it removes the block from the unpinned list and sets *in_mem = true.
-  /// If the block is not in memory, it will call FindBuffer() that may block.
-  /// If we can't get a buffer (e.g. no more memory, nothing in the unpinned and free
-  /// lists) this function returns with the block unpinned.
-  /// Uses the lock_, the caller should not have already acquired the lock_.
-  Status FindBufferForBlock(Block* block, bool* in_mem);
-
-  /// Returns a new buffer that can be used. *buffer is set to NULL if there was no
-  /// memory.
-  /// Otherwise, this function gets a new buffer by:
-  ///   1. Allocating a new buffer if possible
-  ///   2. Using a buffer from the free list (which is populated by moving blocks from
-  ///      the unpinned list by writing them out).
-  /// Must be called with the lock_ already taken. This function can block.
-  Status FindBuffer(boost::unique_lock<boost::mutex>& lock, BufferDescriptor** buffer);
-
-  /// Writes unpinned blocks via DiskIoMgr until one of the following is true:
-  ///   1. The number of outstanding writes >= (block_write_threshold_ - num free buffers)
-  ///   2. There are no more unpinned blocks
-  /// Must be called with the lock_ already taken. Is not blocking.
-  Status WriteUnpinnedBlocks();
-
-  /// Issues the write for this block to the DiskIoMgr.
-  Status WriteUnpinnedBlock(Block* block);
-
-  /// Wait until either there is no in-flight write for 'block' or the block mgr is
-  /// cancelled. 'lock_' must be held with 'lock'.
-  void WaitForWrite(boost::unique_lock<boost::mutex>& lock, Block* block);
-
-  /// Callback used by DiskIoMgr to indicate a block write has completed.  write_status
-  /// is the status of the write. is_cancelled_ is set to true if write_status is not
-  /// Status::OK or a re-issue of the write fails. Returns the block's buffer to the
-  /// free buffers list if it is no longer pinned. Returns the block itself to the free
-  /// blocks list if it has been deleted.
-  void WriteComplete(Block* block, const Status& write_status);
-
-  /// Returns a deleted block to the list of free blocks. Assumes the block's buffer has
-  /// already been returned to the free buffers list. Non-blocking.
-  /// Thread-safe and does not need the lock_ acquired.
-  void ReturnUnusedBlock(Block* block);
-
-  /// Checks unused_blocks_ for an unused block object, else allocates a new one.
-  /// Non-blocking and needs no lock_.
-  Block* GetUnusedBlock(Client* client);
-
-  // Test helper to get the number of block writes currently outstanding.
-  int64_t GetNumWritesOutstanding();
-
-  /// Used to debug the state of the block manager. Lock must already be taken.
-  bool Validate() const;
-  std::string DebugInternal() const;
-
-  /// Size of the largest/default block in bytes.
-  const int64_t max_block_size_;
-
-  /// Unpinned blocks are written when the number of free buffers is below this threshold.
-  /// Equal to two times the number of disks.
-  const int block_write_threshold_;
-
-  /// If true, spilling is disabled. The client calls will fail if there is not enough
-  /// memory.
-  const bool disable_spill_;
-
-  const TUniqueId query_id_;
-
-  ObjectPool obj_pool_;
-
-  /// Track buffers allocated by the block manager.
-  boost::scoped_ptr<MemTracker> mem_tracker_;
-
-  /// This lock protects the block and buffer lists below, except for unused_blocks_.
-  /// It also protects the various counters and changes to block state. Additionally, it
-  /// is used for the blocking condvars: buffer_available_cv_ and
-  /// block->write_complete_cv_.
-  boost::mutex lock_;
-
-  /// If true, Init() has been called.
-  bool initialized_;
-
-  /// The total number of reserved buffers across all clients that are not pinned.
-  int unfullfilled_reserved_buffers_;
-
-  /// The total number of pinned buffers across all clients.
-  int total_pinned_buffers_;
-
-  /// Number of outstanding writes (Writes issued but not completed).
-  /// This does not include client-local writes.
-  int non_local_outstanding_writes_;
-
-  /// Signal availability of free buffers. Also signalled when a write completes for a
-  /// pinned block, in case another thread was expecting to obtain its buffer. If
-  /// 'non_local_outstanding_writes_' > 0, notify_all() will eventually be called on
-  /// this condition variable. To avoid free buffers accumulating while threads wait
-  /// on the cv, a woken thread must grab an available buffer (unless is_cancelled_ is
-  /// true at that time).
-  boost::condition_variable buffer_available_cv_;
-
-  /// All used or unused blocks allocated by the BufferedBlockMgr.
-  vector<Block*> all_blocks_;
-
-  /// List of blocks is_pinned_ = false AND are not on DiskIoMgr's write queue.
-  /// Blocks are added to and removed from the back of the list. (i.e. in LIFO order).
-  /// Blocks in this list must have is_pinned_ = false, in_write_ = false,
-  /// is_deleted_ = false.
-  InternalQueue<Block> unpinned_blocks_;
-
-  /// List of blocks that have been deleted and are no longer in use.
-  /// Can be reused in GetNewBlock(). Blocks in this list must be in the Init'ed state,
-  /// i.e. buffer_desc_ = NULL, is_pinned_ = false, in_write_ = false,
-  /// is_deleted_ = false, valid_data_len = 0.
-  InternalQueue<Block> unused_blocks_;
-
-  /// List of buffers that can be assigned to a block in Pin() or GetNewBlock().
-  /// These buffers either have no block associated with them or are associated with an
-  /// an unpinned block that has been persisted. That is, either block = NULL or
-  /// (!block->is_pinned_  && !block->in_write_  && !unpinned_blocks_.Contains(block)).
-  /// All of these buffers are io sized.
-  InternalQueue<BufferDescriptor> free_io_buffers_;
-
-  /// All allocated io-sized buffers.
-  std::list<BufferDescriptor*> all_io_buffers_;
-
-  /// Group of temporary physical files, (one per tmp device) to which
-  /// blocks may be written. Blocks are round-robined across these files.
-  boost::scoped_ptr<TmpFileMgr::FileGroup> tmp_file_group_;
-
-  /// If true, a disk write failed and all API calls return.
-  /// Status::CANCELLED. Set to true if there was an error writing a block, or if
-  /// WriteComplete() needed to reissue the write and that failed.
-  bool is_cancelled_;
-
-  /// Counters and timers to track behavior.
-  boost::scoped_ptr<RuntimeProfile> profile_;
-
-  /// These have a fixed value for the lifetime of the manager and show memory usage.
-  RuntimeProfile::Counter* mem_limit_counter_;
-  RuntimeProfile::Counter* block_size_counter_;
-
-  /// Total number of blocks created.
-  RuntimeProfile::Counter* created_block_counter_;
-
-  /// Number of deleted blocks reused.
-  RuntimeProfile::Counter* recycled_blocks_counter_;
-
-  /// Number of Pin() calls that did not require a disk read.
-  RuntimeProfile::Counter* buffered_pin_counter_;
-
-  /// Time spent waiting for a free buffer.
-  RuntimeProfile::Counter* buffer_wait_timer_;
-
-  /// Number of writes outstanding (issued but not completed).
-  RuntimeProfile::Counter* outstanding_writes_counter_;
-
-  /// Number of writes issued.
-  int writes_issued_;
-
-  /// Protects query_to_block_mgrs_.
-  static SpinLock static_block_mgrs_lock_;
-
-  /// All per-query BufferedBlockMgr objects that are in use.  For memory management, this
-  /// map contains only weak ptrs. BufferedBlockMgrs that are handed out are shared ptrs.
-  /// When all the shared ptrs are no longer referenced, the BufferedBlockMgr
-  /// d'tor will be called at which point the weak ptr will be removed from the map.
-  typedef boost::unordered_map<TUniqueId, std::weak_ptr<BufferedBlockMgr>> BlockMgrsMap;
-  static BlockMgrsMap query_to_block_mgrs_;
-
-  /// Debug option to delay write completion.
-  int debug_write_delay_ms_;
-
-}; // class BufferedBlockMgr
-
-} // namespace impala.
-
-#endif


Mime
View raw message