impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [03/50] [abbrv] incubator-impala git commit: IMPALA-3202: DiskIoMgr improvements for new buffer pool
Date Thu, 17 Nov 2016 16:09:14 GMT
IMPALA-3202: DiskIoMgr improvements for new buffer pool

The main goal of this patch is to add support to the DiskIoMgr
to read into client-provided buffers, instead of the DiskIoMgr
allocating intermediate buffers. This is important for the new
buffer pool because we want it to entirely manage its own memory,
rather than delegating part of its memory management to the DiskIoMgr.

Also do some cleanup:
* Consolidate some correlated ScanRange parameters into a parameter
  struct.
* Remove the "untracked" buffers mem tracker, which is no longer
  necessary.
* Change the buffer type in DiskIoMgr to use uint8_t* to be consistent
  with the rest of Impala.

Testing:
Added a unit test. We also get coverage from the BufferedBlockMgr unit
tests, any spilling tests and the Parquet tests with large footers.

Change-Id: I913fbb8ae6c242f1585992eb2a622509344dccec
Reviewed-on: http://gerrit.cloudera.org:8080/4631
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/120f34b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/120f34b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/120f34b0

Branch: refs/heads/hadoop-next
Commit: 120f34b0b171a0194f734f49acae4ab56f0361f3
Parents: ac51667
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Wed Sep 21 09:37:59 2016 -0700
Committer: Internal Jenkins <cloudera-hudson@gerrit.cloudera.org>
Committed: Sun Nov 6 00:32:06 2016 +0000

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc      |   6 +-
 be/src/exec/hdfs-parquet-scanner.cc       |  71 +++++------
 be/src/exec/hdfs-scan-node-base.cc        |  23 ++--
 be/src/exec/hdfs-scan-node-base.h         |  11 +-
 be/src/exec/hdfs-text-scanner.cc          |   8 +-
 be/src/exec/scanner-context.cc            |   2 +-
 be/src/runtime/buffered-block-mgr.cc      |  27 ++--
 be/src/runtime/disk-io-mgr-internal.h     |   3 +-
 be/src/runtime/disk-io-mgr-scan-range.cc  |  75 ++++++++----
 be/src/runtime/disk-io-mgr-stress-test.cc |   1 +
 be/src/runtime/disk-io-mgr-stress.cc      |  16 ++-
 be/src/runtime/disk-io-mgr-stress.h       |   8 +-
 be/src/runtime/disk-io-mgr-test.cc        |  64 ++++++++--
 be/src/runtime/disk-io-mgr.cc             | 154 ++++++++++++-----------
 be/src/runtime/disk-io-mgr.h              | 163 +++++++++++++++++++------
 15 files changed, 406 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index cc0722d..432dac8 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -55,9 +55,9 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     // it is not cached.
     // TODO: add remote disk id and plumb that through to the io mgr.  It should have
     // 1 queue for each NIC as well?
-    DiskIoMgr::ScanRange* header_range = scan_node->AllocateScanRange(
-        files[i]->fs, files[i]->filename.c_str(), header_size, 0, metadata->partition_id,
-        -1, false, false, files[i]->mtime);
+    DiskIoMgr::ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs,
+        files[i]->filename.c_str(), header_size, 0, metadata->partition_id, -1, false,
+        DiskIoMgr::BufferOpts::Uncached());
     header_ranges.push_back(header_range);
   }
   // Issue the header ranges only.  ProcessSplit() will issue the files' scan ranges

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 79925a4..bd5d65c 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -108,13 +108,14 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
           footer_range = scan_node->AllocateScanRange(files[i]->fs,
               files[i]->filename.c_str(), footer_size, footer_start,
               split_metadata->partition_id, footer_split->disk_id(),
-              footer_split->try_cache(), footer_split->expected_local(), files[i]->mtime,
-              split);
+              footer_split->expected_local(),
+              DiskIoMgr::BufferOpts(footer_split->try_cache(), files[i]->mtime), split);
         } else {
           // If we did not find the last split, we know it is going to be a remote read.
-          footer_range = scan_node->AllocateScanRange(files[i]->fs,
-              files[i]->filename.c_str(), footer_size, footer_start,
-              split_metadata->partition_id, -1, false, false, files[i]->mtime, split);
+          footer_range =
+              scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(),
+                  footer_size, footer_start, split_metadata->partition_id, -1, false,
+                  DiskIoMgr::BufferOpts::Uncached(), split);
         }
 
         footer_ranges.push_back(footer_range);
@@ -841,8 +842,8 @@ Status HdfsParquetScanner::ProcessFooter() {
   uint8_t* metadata_size_ptr = magic_number_ptr - sizeof(int32_t);
   uint32_t metadata_size = *reinterpret_cast<uint32_t*>(metadata_size_ptr);
   uint8_t* metadata_ptr = metadata_size_ptr - metadata_size;
-  // If the metadata was too big, we need to stitch it before deserializing it.
-  // In that case, we stitch the data in this buffer.
+  // If the metadata was too big, we need to read it into a contiguous buffer before
+  // deserializing it.
   ScopedBuffer metadata_buffer(scan_node_->mem_tracker());
 
   DCHECK(metadata_range_ != NULL);
@@ -854,45 +855,34 @@ Status HdfsParquetScanner::ProcessFooter() {
     DCHECK(file_desc != NULL);
     // The start of the metadata is:
     // file_length - 4-byte metadata size - footer-size - metadata size
-    int64_t metadata_start = file_desc->file_length -
-      sizeof(int32_t) - sizeof(PARQUET_VERSION_NUMBER) - metadata_size;
-    int64_t metadata_bytes_to_read = metadata_size;
+    int64_t metadata_start = file_desc->file_length - sizeof(int32_t)
+        - sizeof(PARQUET_VERSION_NUMBER) - metadata_size;
     if (metadata_start < 0) {
       return Status(Substitute("File '$0' is invalid. Invalid metadata size in file "
           "footer: $1 bytes. File size: $2 bytes.", filename(), metadata_size,
           file_desc->file_length));
     }
-    // IoMgr can only do a fixed size Read(). The metadata could be larger
-    // so we stitch it here.
-    // TODO: consider moving this stitching into the scanner context. The scanner
-    // context usually handles the stitching but no other scanner need this logic
-    // now.
 
     if (!metadata_buffer.TryAllocate(metadata_size)) {
       return Status(Substitute("Could not allocate buffer of $0 bytes for Parquet "
           "metadata for file '$1'.", metadata_size, filename()));
     }
     metadata_ptr = metadata_buffer.buffer();
-    int64_t copy_offset = 0;
     DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr();
 
-    while (metadata_bytes_to_read > 0) {
-      int64_t to_read = ::min<int64_t>(io_mgr->max_read_buffer_size(),
-          metadata_bytes_to_read);
-      DiskIoMgr::ScanRange* range = scan_node_->AllocateScanRange(
-          metadata_range_->fs(), filename(), to_read, metadata_start + copy_offset, -1,
-          metadata_range_->disk_id(), metadata_range_->try_cache(),
-          metadata_range_->expected_local(), file_desc->mtime);
-
-      DiskIoMgr::BufferDescriptor* io_buffer = NULL;
-      RETURN_IF_ERROR(io_mgr->Read(scan_node_->reader_context(), range, &io_buffer));
-      memcpy(metadata_ptr + copy_offset, io_buffer->buffer(), io_buffer->len());
-      io_buffer->Return();
-
-      metadata_bytes_to_read -= to_read;
-      copy_offset += to_read;
-    }
-    DCHECK_EQ(metadata_bytes_to_read, 0);
+    // Read the header into the metadata buffer.
+    DiskIoMgr::ScanRange* metadata_range = scan_node_->AllocateScanRange(
+        metadata_range_->fs(), filename(), metadata_size, metadata_start, -1,
+        metadata_range_->disk_id(), metadata_range_->expected_local(),
+        DiskIoMgr::BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
+
+    DiskIoMgr::BufferDescriptor* io_buffer;
+    RETURN_IF_ERROR(
+        io_mgr->Read(scan_node_->reader_context(), metadata_range, &io_buffer));
+    DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer());
+    DCHECK_EQ(io_buffer->len(), metadata_size);
+    DCHECK(io_buffer->eosr());
+    io_buffer->Return();
   }
 
   // Deserialize file header
@@ -1153,14 +1143,13 @@ Status HdfsParquetScanner::InitColumns(
         static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
 
     // Determine if the column is completely contained within a local split.
-    bool column_range_local = split_range->expected_local() &&
-                              col_start >= split_range->offset() &&
-                              col_end <= split_range->offset() + split_range->len();
-
-    DiskIoMgr::ScanRange* col_range = scan_node_->AllocateScanRange(
-        metadata_range_->fs(), filename(), col_len, col_start, scalar_reader->col_idx(),
-        split_range->disk_id(), split_range->try_cache(), column_range_local,
-        file_desc->mtime);
+    bool col_range_local = split_range->expected_local()
+        && col_start >= split_range->offset()
+        && col_end <= split_range->offset() + split_range->len();
+    DiskIoMgr::ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
+        filename(), col_len, col_start, scalar_reader->col_idx(), split_range->disk_id(),
+        col_range_local,
+        DiskIoMgr::BufferOpts(split_range->try_cache(), file_desc->mtime));
     col_ranges.push_back(col_range);
 
     // Get the stream that will be used for this column

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index bf0697c..d31c6d1 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -277,8 +277,8 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
     }
     file_desc->splits.push_back(
         AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length,
-            split.offset, split.partition_id, params.volume_id,
-            try_cache, expected_local, file_desc->mtime));
+            split.offset, split.partition_id, params.volume_id, expected_local,
+            DiskIoMgr::BufferOpts(try_cache, file_desc->mtime)));
   }
 
   // Update server wide metrics for number of scan ranges and ranges that have
@@ -554,9 +554,9 @@ bool HdfsScanNodeBase::WaitForRuntimeFilters(int32_t time_ms) {
   return false;
 }
 
-DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(
-    hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id,
-    int disk_id, bool try_cache, bool expected_local, int64_t mtime,
+DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
+    int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
+    const DiskIoMgr::BufferOpts& buffer_opts,
     const DiskIoMgr::ScanRange* original_split) {
   DCHECK_GE(disk_id, -1);
   // Require that the scan range is within [0, file_length). While this cannot be used
@@ -573,15 +573,20 @@ DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(
         new ScanRangeMetadata(partition_id, original_split));
   DiskIoMgr::ScanRange* range =
       runtime_state_->obj_pool()->Add(new DiskIoMgr::ScanRange());
-  range->Reset(fs, file, len, offset, disk_id, try_cache, expected_local,
-      mtime, metadata);
+  range->Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, metadata);
   return range;
 }
 
+DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
+    int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache,
+    bool expected_local, int mtime, const DiskIoMgr::ScanRange* original_split) {
+  return AllocateScanRange(fs, file, len, offset, partition_id, disk_id, expected_local,
+      DiskIoMgr::BufferOpts(try_cache, mtime), original_split);
+}
+
 Status HdfsScanNodeBase::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges,
     int num_files_queued) {
-  RETURN_IF_ERROR(
-      runtime_state_->io_mgr()->AddScanRanges(reader_context_, ranges));
+  RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_, ranges));
   num_unqueued_files_.Add(-num_files_queued);
   DCHECK_GE(num_unqueued_files_.Load(), 0);
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 3531c9e..785674c 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -189,11 +189,16 @@ class HdfsScanNodeBase : public ScanNode {
   /// If not NULL, the 'original_split' pointer is stored for reference in the scan range
   /// metadata of the scan range that is to be allocated.
   /// This is thread safe.
-  DiskIoMgr::ScanRange* AllocateScanRange(
-      hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id,
-      int disk_id, bool try_cache, bool expected_local, int64_t mtime,
+  DiskIoMgr::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+      int64_t offset, int64_t partition_id, int disk_id, bool expected_local,
+      const DiskIoMgr::BufferOpts& buffer_opts,
       const DiskIoMgr::ScanRange* original_split = NULL);
 
+  /// Old API for compatibility with text scanners (e.g. LZO text scanner).
+  DiskIoMgr::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len,
+      int64_t offset, int64_t partition_id, int disk_id, bool try_cache,
+      bool expected_local, int mtime, const DiskIoMgr::ScanRange* original_split = NULL);
+
   /// Adds ranges to the io mgr queue. 'num_files_queued' indicates how many file's scan
   /// ranges have been added completely.  A file's scan ranges are added completely if no
   /// new scanner threads will be needed to process that file besides the additional

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index fa267b4..6757815 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -108,10 +108,10 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
           DCHECK_GT(files[i]->file_length, 0);
           ScanRangeMetadata* metadata =
               static_cast<ScanRangeMetadata*>(split->meta_data());
-          DiskIoMgr::ScanRange* file_range = scan_node->AllocateScanRange(
-              files[i]->fs, files[i]->filename.c_str(), files[i]->file_length, 0,
-              metadata->partition_id, split->disk_id(), split->try_cache(),
-              split->expected_local(), files[i]->mtime);
+          DiskIoMgr::ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs,
+              files[i]->filename.c_str(), files[i]->file_length, 0,
+              metadata->partition_id, split->disk_id(), split->expected_local(),
+              DiskIoMgr::BufferOpts(split->try_cache(), files[i]->mtime));
           compressed_text_scan_ranges.push_back(file_range);
           scan_node->max_compressed_text_file_length()->Set(files[i]->file_length);
         }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 033ab82..8e8e84c 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -180,7 +180,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
     }
     DiskIoMgr::ScanRange* range = parent_->scan_node_->AllocateScanRange(
         scan_range_->fs(), filename(), read_past_buffer_size, offset, -1,
-        scan_range_->disk_id(), false, false, scan_range_->mtime());
+        scan_range_->disk_id(), false, DiskIoMgr::BufferOpts::Uncached());
     RETURN_IF_ERROR(parent_->state_->io_mgr()->Read(
         parent_->scan_node_->reader_context(), range, &io_buffer_));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index f1d947e..b7a48a9 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -671,25 +671,16 @@ Status BufferedBlockMgr::PinBlock(Block* block, bool* pinned, Block* release_blo
     DiskIoMgr::ScanRange* scan_range =
         obj_pool_.Add(new DiskIoMgr::ScanRange());
     scan_range->Reset(NULL, block->write_range_->file(), block->write_range_->len(),
-        block->write_range_->offset(), block->write_range_->disk_id(), false, block,
-        DiskIoMgr::ScanRange::NEVER_CACHE);
-    vector<DiskIoMgr::ScanRange*> ranges(1, scan_range);
-    status = io_mgr_->AddScanRanges(io_request_context_, ranges, true);
+        block->write_range_->offset(), block->write_range_->disk_id(), false,
+        DiskIoMgr::BufferOpts::ReadInto(block->buffer(), block->buffer_len()));
+    DiskIoMgr::BufferDescriptor* io_mgr_buffer;
+    status = io_mgr_->Read(io_request_context_, scan_range, &io_mgr_buffer);
     if (!status.ok()) goto error;
 
-    // Read from the io mgr buffer into the block's assigned buffer.
-    int64_t offset = 0;
-    bool buffer_eosr;
-    do {
-      DiskIoMgr::BufferDescriptor* io_mgr_buffer;
-      status = scan_range->GetNext(&io_mgr_buffer);
-      if (!status.ok()) goto error;
-      memcpy(block->buffer() + offset, io_mgr_buffer->buffer(), io_mgr_buffer->len());
-      offset += io_mgr_buffer->len();
-      buffer_eosr = io_mgr_buffer->eosr();
-      io_mgr_buffer->Return();
-    } while (!buffer_eosr);
-    DCHECK_EQ(offset, block->write_range_->len());
+    DCHECK_EQ(io_mgr_buffer->buffer(), block->buffer());
+    DCHECK_EQ(io_mgr_buffer->len(), block->valid_data_len());
+    DCHECK(io_mgr_buffer->eosr());
+    io_mgr_buffer->Return();
   }
 
   if (FLAGS_disk_spill_encryption) {
@@ -1285,7 +1276,7 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, RuntimeProfile* parent_profile,
   unique_lock<mutex> l(lock_);
   if (initialized_) return;
 
-  io_mgr->RegisterContext(&io_request_context_);
+  io_mgr->RegisterContext(&io_request_context_, NULL);
 
   profile_.reset(new RuntimeProfile(&obj_pool_, "BlockMgr"));
   parent_profile->AddChild(profile_.get());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/runtime/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h
index 0b279b5..17b02af 100644
--- a/be/src/runtime/disk-io-mgr-internal.h
+++ b/be/src/runtime/disk-io-mgr-internal.h
@@ -250,7 +250,8 @@ class DiskIoRequestContext {
 
   /// The number of buffers that are being used for this reader. This is the sum
   /// of all buffers in ScanRange queues and buffers currently being read into (i.e. about
-  /// to be queued).
+  /// to be queued). This includes both IOMgr-allocated buffers and client-provided
+  /// buffers.
   AtomicInt32 num_used_buffers_;
 
   /// The total number of ready buffers across all ranges.  Ready buffers are buffers

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index 8ebe7a5..bc676bf 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -162,7 +162,7 @@ void DiskIoMgr::ScanRange::Cancel(const Status& status) {
 
   // For cached buffers, we can't close the range until the cached buffer is returned.
   // Close() is called from DiskIoMgr::ReturnBuffer().
-  if (cached_buffer_ == NULL) Close();
+  if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) Close();
 }
 
 void DiskIoMgr::ScanRange::CleanupQueuedBuffers() {
@@ -205,36 +205,57 @@ bool DiskIoMgr::ScanRange::Validate() {
 }
 
 DiskIoMgr::ScanRange::ScanRange(int capacity)
-  : ready_buffers_capacity_(capacity) {
-  request_type_ = RequestType::READ;
-  Reset(NULL, "", -1, -1, -1, false, false, NEVER_CACHE);
-}
+  : RequestRange(RequestType::READ),
+    meta_data_(NULL),
+    try_cache_(false),
+    expected_local_(false),
+    io_mgr_(NULL),
+    reader_(NULL),
+    hdfs_file_(NULL),
+    external_buffer_tag_(ExternalBufferTag::NO_BUFFER),
+    ready_buffers_capacity_(capacity),
+    mtime_(-1) {}
 
 DiskIoMgr::ScanRange::~ScanRange() {
   DCHECK(hdfs_file_ == NULL) << "File was not closed.";
-  DCHECK(cached_buffer_ == NULL) << "Cached buffer was not released.";
+  DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
+      << "Cached buffer was not released.";
 }
 
 void DiskIoMgr::ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
-    int disk_id, bool try_cache, bool expected_local, int64_t mtime, void* meta_data) {
+    int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data) {
   DCHECK(ready_buffers_.empty());
+  DCHECK(file != NULL);
+  DCHECK_GE(len, 0);
+  DCHECK_GE(offset, 0);
+  DCHECK(buffer_opts.client_buffer_ == NULL || buffer_opts.client_buffer_len_ >= len_);
   fs_ = fs;
   file_ = file;
   len_ = len;
   offset_ = offset;
   disk_id_ = disk_id;
-  try_cache_ = try_cache;
+  try_cache_ = buffer_opts.try_cache_;
+  mtime_ = buffer_opts.mtime_;
   expected_local_ = expected_local;
   meta_data_ = meta_data;
-  cached_buffer_ = NULL;
+  if (buffer_opts.client_buffer_ != NULL) {
+    external_buffer_tag_ = ExternalBufferTag::CLIENT_BUFFER;
+    client_buffer_.data = buffer_opts.client_buffer_;
+    client_buffer_.len = buffer_opts.client_buffer_len_;
+  } else {
+    external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
+  }
   io_mgr_ = NULL;
   reader_ = NULL;
   hdfs_file_ = NULL;
-  mtime_ = mtime;
 }
 
 void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader) {
   DCHECK(hdfs_file_ == NULL);
+  DCHECK(local_file_ == NULL);
+  // Reader must provide MemTracker or a buffer.
+  DCHECK(external_buffer_tag_ == ExternalBufferTag::CLIENT_BUFFER
+      || reader->mem_tracker_ != NULL);
   io_mgr_ = io_mgr;
   reader_ = reader;
   local_file_ = NULL;
@@ -321,9 +342,10 @@ void DiskIoMgr::ScanRange::Close() {
         hdfsFileFreeReadStatistics(stats);
       }
     }
-    if (cached_buffer_ != NULL) {
+    if (external_buffer_tag_ == ExternalBufferTag::CACHED_BUFFER) {
       hadoopRzBufferFree(hdfs_file_->file(), cached_buffer_);
       cached_buffer_ = NULL;
+      external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
     }
     io_mgr_->CacheOrCloseFileHandle(file(), hdfs_file_, false);
     VLOG_FILE << "Cache HDFS file handle file=" << file();
@@ -349,23 +371,22 @@ int64_t DiskIoMgr::ScanRange::MaxReadChunkSize() const {
     DCHECK(IsS3APath(file()));
     return 128 * 1024;
   }
-  return numeric_limits<int64_t>::max();
+  // The length argument of hdfsRead() is an int. Ensure we don't overflow it.
+  return numeric_limits<int>::max();
 }
 
 // TODO: how do we best use the disk here.  e.g. is it good to break up a
 // 1MB read into 8 128K reads?
 // TODO: look at linux disk scheduling
-Status DiskIoMgr::ScanRange::Read(char* buffer, int64_t* bytes_read, bool* eosr) {
+Status DiskIoMgr::ScanRange::Read(
+    uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) {
   unique_lock<mutex> hdfs_lock(hdfs_lock_);
   if (is_cancelled_) return Status::CANCELLED;
 
   *eosr = false;
   *bytes_read = 0;
-  // hdfsRead() length argument is an int.  Since max_buffer_size_ type is no bigger
-  // than an int, this min() will ensure that we don't overflow the length argument.
-  DCHECK_LE(sizeof(io_mgr_->max_buffer_size_), sizeof(int));
-  int bytes_to_read =
-      min(static_cast<int64_t>(io_mgr_->max_buffer_size_), len_ - bytes_read_);
+  // Read until the end of the scan range or the end of the buffer.
+  int bytes_to_read = min(len_ - bytes_read_, buffer_len);
   DCHECK_GE(bytes_to_read, 0);
 
   if (fs_ != NULL) {
@@ -373,6 +394,9 @@ Status DiskIoMgr::ScanRange::Read(char* buffer, int64_t* bytes_read, bool* eosr)
     int64_t max_chunk_size = MaxReadChunkSize();
     while (*bytes_read < bytes_to_read) {
       int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size);
+      DCHECK_GE(chunk_size, 0);
+      // The hdfsRead() length argument is an int.
+      DCHECK_LE(chunk_size, numeric_limits<int>::max());
       int last_read = hdfsRead(fs_, hdfs_file_->file(), buffer + *bytes_read, chunk_size);
       if (last_read == -1) {
         return Status(GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
@@ -423,14 +447,15 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
     if (is_cancelled_) return Status::CANCELLED;
 
     DCHECK(hdfs_file_ != NULL);
-    DCHECK(cached_buffer_ == NULL);
-    cached_buffer_ = hadoopReadZero(hdfs_file_->file(),
-        io_mgr_->cached_read_options_, len());
+    DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER);
+    cached_buffer_ =
+        hadoopReadZero(hdfs_file_->file(), io_mgr_->cached_read_options_, len());
+    if (cached_buffer_ != NULL) external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
   }
   // Data was not cached, caller will fall back to normal read path.
-  if (cached_buffer_ == NULL) {
+  if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) {
     VLOG_QUERY << "Cache read failed for scan range: " << DebugString()
-        << ". Switching to disk read path.";
+               << ". Switching to disk read path.";
     // Clean up the scan range state before re-issuing it.
     Close();
     return Status::OK();
@@ -455,8 +480,8 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
   // Create a single buffer desc for the entire scan range and enqueue that.
   // 'mem_tracker' is NULL because the memory is owned by the HDFS java client,
   // not the Impala backend.
-  BufferDescriptor* desc = io_mgr_->GetBufferDesc(
-      reader_, NULL, this, reinterpret_cast<char*>(buffer), 0);
+  BufferDescriptor* desc =
+      io_mgr_->GetBufferDesc(reader_, NULL, this, reinterpret_cast<uint8_t*>(buffer), 0);
   desc->len_ = bytes_read;
   desc->scan_range_offset_ = 0;
   desc->eosr_ = true;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/runtime/disk-io-mgr-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress-test.cc b/be/src/runtime/disk-io-mgr-stress-test.cc
index 4b67abb..7ae9515 100644
--- a/be/src/runtime/disk-io-mgr-stress-test.cc
+++ b/be/src/runtime/disk-io-mgr-stress-test.cc
@@ -36,6 +36,7 @@ const bool TEST_CANCELLATION = true;
 int main(int argc, char** argv) {
   google::InitGoogleLogging(argv[0]);
   CpuInfo::Init();
+  OsInfo::Init();
   impala::InitThreading();
   int duration_sec = DEFAULT_DURATION_SEC;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/runtime/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.cc b/be/src/runtime/disk-io-mgr-stress.cc
index 16fbd39..73e055d 100644
--- a/be/src/runtime/disk-io-mgr-stress.cc
+++ b/be/src/runtime/disk-io-mgr-stress.cc
@@ -73,7 +73,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
 
   io_mgr_.reset(new DiskIoMgr(
       num_disks, num_threads_per_disk, MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
-  Status status = io_mgr_->Init(&dummy_tracker_);
+  Status status = io_mgr_->Init(&mem_tracker_);
   CHECK(status.ok());
 
   // Initialize some data files.  It doesn't really matter how many there are.
@@ -87,6 +87,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
   }
 
   clients_ = new Client[num_clients_];
+  client_mem_trackers_.resize(num_clients_);
   for (int i = 0; i < num_clients_; ++i) {
     NewClient(i);
   }
@@ -127,7 +128,8 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 
         // Validate the bytes read
         CHECK_LE(file_offset + len, expected.size());
-        CHECK_EQ(strncmp(buffer->buffer(), &expected.c_str()[file_offset], len), 0);
+        CHECK_EQ(strncmp(reinterpret_cast<char*>(buffer->buffer()),
+                     &expected.c_str()[file_offset], len), 0);
 
         // Copy the bytes from this read into the result buffer.
         memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
@@ -228,13 +230,15 @@ void DiskIoMgrStress::NewClient(int i) {
     int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN;
     range_len = min(range_len, file_len - assigned_len);
 
-    DiskIoMgr::ScanRange* range = new DiskIoMgr::ScanRange();;
-    range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len,
-        assigned_len, 0, false, false, DiskIoMgr::ScanRange::NEVER_CACHE);
+    DiskIoMgr::ScanRange* range = new DiskIoMgr::ScanRange();
+    range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
+        0, false, DiskIoMgr::BufferOpts::Uncached());
     client.scan_ranges.push_back(range);
     assigned_len += range_len;
   }
-  Status status = io_mgr_->RegisterContext(&client.reader, NULL);
+
+  client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_));
+  Status status = io_mgr_->RegisterContext(&client.reader, client_mem_trackers_[i].get());
   CHECK(status.ok());
   status = io_mgr_->AddScanRanges(client.reader, client.scan_ranges);
   CHECK(status.ok());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/runtime/disk-io-mgr-stress.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.h b/be/src/runtime/disk-io-mgr-stress.h
index cf81969..6d7549e 100644
--- a/be/src/runtime/disk-io-mgr-stress.h
+++ b/be/src/runtime/disk-io-mgr-stress.h
@@ -19,6 +19,7 @@
 #ifndef IMPALA_RUNTIME_DISK_IO_MGR_STRESS_H
 #define IMPALA_RUNTIME_DISK_IO_MGR_STRESS_H
 
+#include <memory>
 #include <vector>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread/mutex.hpp>
@@ -57,8 +58,8 @@ class DiskIoMgrStress {
   /// during the test
   std::vector<File> files_;
 
-  /// Dummy mem tracker
-  MemTracker dummy_tracker_;
+  /// Root mem tracker.
+  MemTracker mem_tracker_;
 
   /// io manager
   boost::scoped_ptr<DiskIoMgr> io_mgr_;
@@ -70,6 +71,9 @@ class DiskIoMgrStress {
   int num_clients_;
   Client* clients_;
 
+  /// Client MemTrackers, one per client.
+  std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_;
+
   /// If true, tests cancelling readers
   bool includes_cancellation_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index 0f93875..ef7e12f 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -62,7 +62,7 @@ class DiskIoMgrTest : public testing::Test {
     if (status.ok()) {
       DiskIoMgr::ScanRange* scan_range = pool_->Add(new DiskIoMgr::ScanRange());
       scan_range->Reset(NULL, (*written_range)->file(), (*written_range)->len(),
-          (*written_range)->offset(), 0, false, false, DiskIoMgr::ScanRange::NEVER_CACHE);
+          (*written_range)->offset(), 0, false, DiskIoMgr::BufferOpts::Uncached());
       ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data),
           sizeof(int32_t));
     }
@@ -161,10 +161,16 @@ class DiskIoMgrTest : public testing::Test {
     }
   }
 
+  DiskIoMgr::ScanRange* AllocateRange(int num_buffers) {
+    return pool_->Add(new DiskIoMgr::ScanRange(num_buffers));
+  }
+
   DiskIoMgr::ScanRange* InitRange(int num_buffers, const char* file_path, int offset,
-      int len, int disk_id, int64_t mtime, void* meta_data = NULL, bool is_cached = false) {
-    DiskIoMgr::ScanRange* range = pool_->Add(new DiskIoMgr::ScanRange(num_buffers));
-    range->Reset(NULL, file_path, len, offset, disk_id, is_cached, true, mtime, meta_data);
+      int len, int disk_id, int64_t mtime, void* meta_data = NULL,
+      bool is_cached = false) {
+    DiskIoMgr::ScanRange* range = AllocateRange(num_buffers);
+    range->Reset(NULL, file_path, len, offset, disk_id, true,
+        DiskIoMgr::BufferOpts(is_cached, mtime), meta_data);
     EXPECT_EQ(mtime, range->mtime());
     return range;
   }
@@ -242,7 +248,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   DiskIoMgr io_mgr(1, 1, 1, 10);
   ASSERT_OK(io_mgr.Init(&mem_tracker));
   DiskIoRequestContext* writer;
-  ASSERT_OK(io_mgr.RegisterContext(&writer));
+  ASSERT_OK(io_mgr.RegisterContext(&writer, NULL));
   int32_t* data = pool_->Add(new int32_t);
   *data = rand();
 
@@ -784,7 +790,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
         DiskIoMgr io_mgr(num_disks, threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
         io_mgr.Init(&mem_tracker);
         for (int file_index = 0; file_index < num_contexts; ++file_index) {
-          ASSERT_OK(io_mgr.RegisterContext(&contexts[file_index]));
+          ASSERT_OK(io_mgr.RegisterContext(&contexts[file_index], &mem_tracker));
         }
         pool_.reset(new ObjectPool);
         int read_offset = 0;
@@ -898,7 +904,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
           EXPECT_OK(io_mgr.Init(&mem_tracker));
 
           for (int i = 0; i < NUM_READERS; ++i) {
-            ASSERT_OK(io_mgr.RegisterContext(&readers[i], NULL));
+            ASSERT_OK(io_mgr.RegisterContext(&readers[i], &mem_tracker));
 
             vector<DiskIoMgr::ScanRange*> ranges;
             for (int j = 0; j < DATA_LEN; ++j) {
@@ -1042,9 +1048,51 @@ TEST_F(DiskIoMgrTest, PartialRead) {
   EXPECT_EQ(mem_tracker.consumption(), 0);
 }
 
+// Test reading into a client-allocated buffer.
+TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
+  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+  const char* data = "the quick brown fox jumped over the lazy dog";
+  int len = strlen(data);
+  int read_len = 4; // Make buffer size smaller than client-provided buffer.
+  CreateTempFile(tmp_file, data);
+
+  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, read_len, read_len));
+
+  ASSERT_OK(io_mgr->Init(&mem_tracker));
+  // Reader doesn't need to provide mem tracker if it's providing buffers.
+  MemTracker* reader_mem_tracker = NULL;
+  DiskIoRequestContext* reader;
+  ASSERT_OK(io_mgr->RegisterContext(&reader, reader_mem_tracker));
+
+  for (int buffer_len : vector<int>({len - 1, len, len + 1})) {
+    vector<uint8_t> client_buffer(buffer_len);
+    int scan_len = min(len, buffer_len);
+    DiskIoMgr::ScanRange* range = AllocateRange(1);
+    range->Reset(NULL, tmp_file, scan_len, 0, 0, true,
+        DiskIoMgr::BufferOpts::ReadInto(&client_buffer[0], buffer_len));
+    ASSERT_OK(io_mgr->AddScanRange(reader, range, true));
+
+    DiskIoMgr::BufferDescriptor* io_buffer;
+    ASSERT_OK(range->GetNext(&io_buffer));
+    ASSERT_TRUE(io_buffer->eosr());
+    ASSERT_EQ(scan_len, io_buffer->len());
+    ASSERT_EQ(client_buffer.data(), io_buffer->buffer());
+    ASSERT_EQ(memcmp(io_buffer->buffer(), data, scan_len), 0);
+
+    // DiskIoMgr should not have allocated memory.
+    EXPECT_EQ(mem_tracker.consumption(), 0);
+    io_buffer->Return();
+  }
+
+  io_mgr->UnregisterContext(reader);
+  pool_.reset();
+  io_mgr.reset();
+  EXPECT_EQ(mem_tracker.consumption(), 0);
+}
 }
 
-int main(int argc, char **argv) {
+int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
   return RUN_ALL_TESTS();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index dd45932..7c5620a 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -223,14 +223,15 @@ void DiskIoMgr::BufferDescriptor::Reset() {
   scan_range_offset_ = 0;
 }
 
-void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader,
-    ScanRange* range, char* buffer, int64_t buffer_len, MemTracker* mem_tracker) {
+void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader, ScanRange* range,
+    uint8_t* buffer, int64_t buffer_len, MemTracker* mem_tracker) {
   DCHECK(io_mgr_ != NULL);
   DCHECK(buffer_ == NULL);
   DCHECK(range != NULL);
   DCHECK(buffer != NULL);
   DCHECK_GE(buffer_len, 0);
-  DCHECK_NE(range->cached_buffer_ == NULL, mem_tracker == NULL);
+  DCHECK_NE(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER,
+      mem_tracker == NULL);
   reader_ = reader;
   scan_range_ = range;
   mem_tracker_ = mem_tracker;
@@ -244,6 +245,7 @@ void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader,
 
 void DiskIoMgr::BufferDescriptor::TransferOwnership(MemTracker* dst) {
   DCHECK(dst != NULL);
+  DCHECK(!is_client_buffer());
   // Memory of cached buffers is not tracked against a tracker.
   if (is_cached()) return;
   DCHECK(mem_tracker_ != NULL);
@@ -257,13 +259,12 @@ void DiskIoMgr::BufferDescriptor::Return() {
   io_mgr_->ReturnBuffer(this);
 }
 
-DiskIoMgr::WriteRange::WriteRange(const string& file, int64_t file_offset, int disk_id,
-    WriteDoneCallback callback) {
+DiskIoMgr::WriteRange::WriteRange(
+    const string& file, int64_t file_offset, int disk_id, WriteDoneCallback callback)
+  : RequestRange(RequestType::WRITE), callback_(callback) {
   file_ = file;
   offset_ = file_offset;
   disk_id_ = disk_id;
-  callback_ = callback;
-  request_type_ = RequestType::WRITE;
 }
 
 void DiskIoMgr::WriteRange::SetData(const uint8_t* buffer, int64_t len) {
@@ -360,9 +361,6 @@ DiskIoMgr::~DiskIoMgr() {
   }
 
   if (free_buffer_mem_tracker_ != NULL) free_buffer_mem_tracker_->UnregisterFromParent();
-  if (unowned_buffer_mem_tracker_ != NULL) {
-    unowned_buffer_mem_tracker_->UnregisterFromParent();
-  }
 
   if (cached_read_options_ != NULL) hadoopRzOptionsFree(cached_read_options_);
 }
@@ -371,8 +369,6 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
   DCHECK(process_mem_tracker != NULL);
   free_buffer_mem_tracker_.reset(
       new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false));
-  unowned_buffer_mem_tracker_.reset(
-      new MemTracker(-1, "Untracked Disk IO Buffers", process_mem_tracker, false));
   // If we hit the process limit, see if we can reclaim some memory by removing
   // previously allocated (but unused) io buffers.
   process_mem_tracker->AddGcFunction(bind(&DiskIoMgr::GcIoBuffers, this));
@@ -573,6 +569,11 @@ Status DiskIoMgr::AddScanRanges(DiskIoRequestContext* reader,
   return Status::OK();
 }
 
+Status DiskIoMgr::AddScanRange(
+    DiskIoRequestContext* reader, ScanRange* range, bool schedule_immediately) {
+  return AddScanRanges(reader, vector<ScanRange*>({range}), schedule_immediately);
+}
+
 // This function returns the next scan range the reader should work on, checking
 // for eos and error cases. If there isn't already a cached scan range or a scan
 // range prepared by the disk threads, the caller waits on the disk threads.
@@ -636,9 +637,11 @@ Status DiskIoMgr::Read(DiskIoRequestContext* reader,
   DCHECK(buffer != NULL);
   *buffer = NULL;
 
-  if (range->len() > max_buffer_size_) {
-    return Status(Substitute("Cannot perform sync read larger than $0. Request was $1",
-                             max_buffer_size_, range->len()));
+  if (range->len() > max_buffer_size_
+      && range->external_buffer_tag_ != ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
+    return Status(Substitute("Internal error: cannot perform sync read of '$0' bytes "
+                   "that is larger than the max read buffer size '$1'.",
+            range->len(), max_buffer_size_));
   }
 
   vector<DiskIoMgr::ScanRange*> ranges;
@@ -656,8 +659,8 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) {
 
   DiskIoRequestContext* reader = buffer_desc->reader_;
   if (buffer_desc->buffer_ != NULL) {
-    if (!buffer_desc->is_cached()) {
-      // Cached buffers are not allocated by DiskIoMgr so don't need to be freed.
+    if (!buffer_desc->is_cached() && !buffer_desc->is_client_buffer()) {
+      // Buffers the were not allocated by DiskIoMgr don't need to be freed.
       FreeBufferMemory(buffer_desc);
     }
     num_buffers_in_readers_.Add(-1);
@@ -686,7 +689,7 @@ void DiskIoMgr::ReturnBufferDesc(BufferDescriptor* desc) {
 }
 
 DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc(DiskIoRequestContext* reader,
-    MemTracker* mem_tracker, ScanRange* range, char* buffer, int64_t buffer_size) {
+    MemTracker* mem_tracker, ScanRange* range, uint8_t* buffer, int64_t buffer_size) {
   BufferDescriptor* buffer_desc;
   {
     unique_lock<mutex> lock(free_buffers_lock_);
@@ -713,14 +716,10 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* read
 
   // Track memory against the reader. This is checked the next time we start
   // a read for the next reader in DiskIoMgr::GetNextScanRange().
-  // TODO: IMPALA-3200: BufferedBlockMgr does not expect read buffers to be tracked
-  // against its MemTracker. Once BufferedBlockMgr is removed, we can expect that
-  // all readers provide a MemTracker and remove this NULL check.
-  MemTracker* buffer_mem_tracker = reader->mem_tracker_ != NULL ? reader->mem_tracker_ :
-      unowned_buffer_mem_tracker_.get();
-  buffer_mem_tracker->Consume(buffer_size);
-
-  char* buffer = NULL;
+  DCHECK(reader->mem_tracker_ != NULL);
+  reader->mem_tracker_->Consume(buffer_size);
+
+  uint8_t* buffer = NULL;
   {
     unique_lock<mutex> lock(free_buffers_lock_);
     if (free_buffers_[idx].empty()) {
@@ -732,7 +731,7 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* read
         ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(buffer_size);
       }
       // We already tracked this memory against the reader's MemTracker.
-      buffer = new char[buffer_size];
+      buffer = new uint8_t[buffer_size];
     } else {
       if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
         ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-1L);
@@ -747,7 +746,7 @@ DiskIoMgr::BufferDescriptor* DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* read
   DCHECK(range != NULL);
   DCHECK(reader != NULL);
   DCHECK(buffer != NULL);
-  return GetBufferDesc(reader, buffer_mem_tracker, range, buffer, buffer_size);
+  return GetBufferDesc(reader, reader->mem_tracker_, range, buffer, buffer_size);
 }
 
 void DiskIoMgr::GcIoBuffers() {
@@ -755,7 +754,7 @@ void DiskIoMgr::GcIoBuffers() {
   int buffers_freed = 0;
   int bytes_freed = 0;
   for (int idx = 0; idx < free_buffers_.size(); ++idx) {
-    for (char* buffer: free_buffers_[idx]) {
+    for (uint8_t* buffer : free_buffers_[idx]) {
       int64_t buffer_size = (1LL << idx) * min_buffer_size_;
       delete[] buffer;
       free_buffer_mem_tracker_->Release(buffer_size);
@@ -780,7 +779,8 @@ void DiskIoMgr::GcIoBuffers() {
 
 void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
   DCHECK(!desc->is_cached());
-  char* buffer = desc->buffer_;
+  DCHECK(!desc->is_client_buffer());
+  uint8_t* buffer = desc->buffer_;
   int64_t buffer_size = desc->buffer_len_;
   int idx = free_buffers_idx(buffer_size);
   DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1LL << idx), 0)
@@ -1062,46 +1062,16 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
     ScanRange* range) {
   int64_t bytes_remaining = range->len_ - range->bytes_read_;
   DCHECK_GT(bytes_remaining, 0);
-  int64_t buffer_size = ::min(bytes_remaining, static_cast<int64_t>(max_buffer_size_));
-  bool enough_memory = true;
-  if (reader->mem_tracker_ != NULL) {
-    enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
-    if (!enough_memory) {
-      // Low memory, GC and try again.
-      GcIoBuffers();
-      enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
-    }
-  }
-
-  if (!enough_memory) {
-    DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
-    unique_lock<mutex> reader_lock(reader->lock_);
-
-    // Just grabbed the reader lock, check for cancellation.
-    if (reader->state_ == DiskIoRequestContext::Cancelled) {
-      DCHECK(reader->Validate()) << endl << reader->DebugString();
-      state.DecrementRequestThreadAndCheckDone(reader);
-      range->Cancel(reader->status_);
-      DCHECK(reader->Validate()) << endl << reader->DebugString();
-      return;
-    }
-
-    if (!range->ready_buffers_.empty()) {
-      // We have memory pressure and this range doesn't need another buffer
-      // (it already has one queued). Skip this range and pick it up later.
-      range->blocked_on_queue_ = true;
-      reader->blocked_ranges_.Enqueue(range);
-      state.DecrementRequestThread();
-      return;
-    } else {
-      // We need to get a buffer anyway since there are none queued. The query
-      // is likely to fail due to mem limits but there's nothing we can do about that
-      // now.
-    }
+  BufferDescriptor* buffer_desc = NULL;
+  if (range->external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
+    buffer_desc = GetBufferDesc(
+        reader, NULL, range, range->client_buffer_.data, range->client_buffer_.len);
+  } else {
+    // Need to allocate a buffer to read into.
+    int64_t buffer_size = ::min(bytes_remaining, static_cast<int64_t>(max_buffer_size_));
+    buffer_desc = TryAllocateNextBufferForRange(disk_queue, reader, range, buffer_size);
+    if (buffer_desc == NULL) return;
   }
-
-  BufferDescriptor* buffer_desc = GetFreeBuffer(reader, range, buffer_size);
-  DCHECK(buffer_desc != NULL);
   reader->num_used_buffers_.Add(1);
 
   // No locks in this section.  Only working on local vars.  We don't want to hold a
@@ -1119,8 +1089,8 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
     SCOPED_TIMER(&read_timer_);
     SCOPED_TIMER(reader->read_timer_);
 
-    buffer_desc->status_ = range->Read(buffer_desc->buffer_, &buffer_desc->len_,
-        &buffer_desc->eosr_);
+    buffer_desc->status_ = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_,
+        &buffer_desc->len_, &buffer_desc->eosr_);
     buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
 
     if (reader->bytes_read_counter_ != NULL) {
@@ -1137,6 +1107,48 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
   HandleReadFinished(disk_queue, reader, buffer_desc);
 }
 
+DiskIoMgr::BufferDescriptor* DiskIoMgr::TryAllocateNextBufferForRange(
+    DiskQueue* disk_queue, DiskIoRequestContext* reader, ScanRange* range,
+    int64_t buffer_size) {
+  DCHECK(reader->mem_tracker_ != NULL);
+  bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
+  if (!enough_memory) {
+    // Low memory, GC and try again.
+    GcIoBuffers();
+    enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
+  }
+
+  if (!enough_memory) {
+    DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
+    unique_lock<mutex> reader_lock(reader->lock_);
+
+    // Just grabbed the reader lock, check for cancellation.
+    if (reader->state_ == DiskIoRequestContext::Cancelled) {
+      DCHECK(reader->Validate()) << endl << reader->DebugString();
+      state.DecrementRequestThreadAndCheckDone(reader);
+      range->Cancel(reader->status_);
+      DCHECK(reader->Validate()) << endl << reader->DebugString();
+      return NULL;
+    }
+
+    if (!range->ready_buffers_.empty()) {
+      // We have memory pressure and this range doesn't need another buffer
+      // (it already has one queued). Skip this range and pick it up later.
+      range->blocked_on_queue_ = true;
+      reader->blocked_ranges_.Enqueue(range);
+      state.DecrementRequestThread();
+      return NULL;
+    } else {
+      // We need to get a buffer anyway since there are none queued. The query
+      // is likely to fail due to mem limits but there's nothing we can do about that
+      // now.
+    }
+  }
+  BufferDescriptor* buffer_desc = GetFreeBuffer(reader, range, buffer_size);
+  DCHECK(buffer_desc != NULL);
+  return buffer_desc;
+}
+
 void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_range) {
   FILE* file_handle = fopen(write_range->file(), "rb+");
   Status ret_status;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/120f34b0/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index 0baac59..4b650a8 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -140,12 +140,15 @@ class MemTracker;
 /// regardless of how many concurrent readers are running.
 //
 /// Buffer Management:
-/// Buffers are allocated by the IoMgr as necessary to service reads. These buffers
-/// are directly returned to the caller. The caller must call Return() on the buffer
-/// when it is done, at which point the buffer will be recycled for another read. In error
-/// cases, the IoMgr will recycle the buffers more promptly but regardless, the caller
-/// must always call Return()
-//
+/// Buffers for reads are either a) allocated by the IoMgr and transferred to the caller,
+/// b) cached HDFS buffers if the scan range uses HDFS caching, or c) provided by the
+/// caller when constructing the scan range.
+///
+/// As a caller reads from a scan range, these buffers are wrapped in BufferDescriptors
+/// and returned to the caller. The caller must always call Return() on the buffer
+/// descriptor when it when it is done to allow recycling of the buffer descriptor and
+/// the associated buffer (if there is an IoMgr-allocated or HDFS cached buffer).
+///
 /// Caching support:
 /// Scan ranges contain metadata on whether or not it is cached on the DN. In that
 /// case, we use the HDFS APIs to read the cached data without doing any copies. For these
@@ -232,7 +235,7 @@ class DiskIoMgr {
   class BufferDescriptor {
    public:
     ScanRange* scan_range() { return scan_range_; }
-    char* buffer() { return buffer_; }
+    uint8_t* buffer() { return buffer_; }
     int64_t buffer_len() { return buffer_len_; }
     int64_t len() { return len_; }
     bool eosr() { return eosr_; }
@@ -240,10 +243,11 @@ class DiskIoMgr {
     /// Returns the offset within the scan range that this buffer starts at
     int64_t scan_range_offset() const { return scan_range_offset_; }
 
-    /// Transfer ownership of buffer memory from 'mem_tracker_' to 'dst' and
-    /// set 'mem_tracker_' to 'dst'.  'mem_tracker_' and 'dst' must be non-NULL.
-    /// Does not check memory limits on 'dst': the caller should check the memory limit
-    /// if a different memory limit may apply to 'dst'.
+    /// Transfer ownership of buffer memory from 'mem_tracker_' to 'dst' and set
+    /// 'mem_tracker_' to 'dst'. 'mem_tracker_' and 'dst' must be non-NULL. Does not
+    /// check memory limits on 'dst': the caller should check the memory limit if a
+    /// different memory limit may apply to 'dst'. If the buffer was a client-provided
+    /// buffer, transferring is not allowed.
     /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp.
     void TransferOwnership(MemTracker* dst);
 
@@ -257,14 +261,25 @@ class DiskIoMgr {
     friend class DiskIoRequestContext;
     BufferDescriptor(DiskIoMgr* io_mgr);
 
-    bool is_cached() { return scan_range_->cached_buffer_ != NULL; }
+    /// Return true if this is a cached buffer owned by HDFS.
+    bool is_cached() const {
+      return scan_range_->external_buffer_tag_
+          == ScanRange::ExternalBufferTag::CACHED_BUFFER;
+    }
+
+    /// Return true if this is a buffer owner by the client that was provided when
+    /// constructing the scan range.
+    bool is_client_buffer() const {
+      return scan_range_->external_buffer_tag_
+          == ScanRange::ExternalBufferTag::CLIENT_BUFFER;
+    }
 
     /// Reset the buffer descriptor to an uninitialized state.
     void Reset();
 
     /// Resets the buffer descriptor state for a new reader, range and data buffer.
     /// The buffer memory should already be accounted against MemTracker
-    void Reset(DiskIoRequestContext* reader, ScanRange* range, char* buffer,
+    void Reset(DiskIoRequestContext* reader, ScanRange* range, uint8_t* buffer,
         int64_t buffer_len, MemTracker* mem_tracker);
 
     DiskIoMgr* const io_mgr_;
@@ -280,7 +295,7 @@ class DiskIoMgr {
     ScanRange* scan_range_;
 
     /// buffer with the read contents
-    char* buffer_;
+    uint8_t* buffer_;
 
     /// length of buffer_. For buffers from cached reads, the length is 0.
     int64_t buffer_len_;
@@ -318,6 +333,9 @@ class DiskIoMgr {
     RequestType::type request_type() const { return request_type_; }
 
    protected:
+    RequestRange(RequestType::type request_type)
+      : fs_(NULL), offset_(-1), len_(-1), disk_id_(-1), request_type_(request_type) {}
+
     /// Hadoop filesystem that contains file_, or set to NULL for local filesystem.
     hdfsFS fs_;
 
@@ -337,25 +355,76 @@ class DiskIoMgr {
     RequestType::type request_type_;
   };
 
+  /// Param struct for different combinations of buffering.
+  struct BufferOpts {
+   public:
+    // Set options for a read into an IoMgr-allocated or HDFS-cached buffer. Caching is
+    // enabled if 'try_cache' is true, the file is in the HDFS cache and 'mtime' matches
+    // the modified time of the cached file in the HDFS cache.
+    BufferOpts(bool try_cache, int64_t mtime)
+      : try_cache_(try_cache),
+        mtime_(mtime),
+        client_buffer_(NULL),
+        client_buffer_len_(-1) {}
+
+    /// Set options for an uncached read into an IoMgr-allocated buffer.
+    static BufferOpts Uncached() {
+      return BufferOpts(false, NEVER_CACHE, NULL, -1);
+    }
+
+    /// Set options to read the entire scan range into 'client_buffer'. The length of the
+    /// buffer, 'client_buffer_len', must fit the entire scan range. HDFS caching is not
+    /// enabled in this case.
+    static BufferOpts ReadInto(uint8_t* client_buffer, int64_t client_buffer_len) {
+      return BufferOpts(false, NEVER_CACHE, client_buffer, client_buffer_len);
+    }
+
+   private:
+    friend class ScanRange;
+
+    BufferOpts(
+        bool try_cache, int64_t mtime, uint8_t* client_buffer, int64_t client_buffer_len)
+      : try_cache_(try_cache),
+        mtime_(mtime),
+        client_buffer_(client_buffer),
+        client_buffer_len_(client_buffer_len) {}
+
+    /// If 'mtime_' is set to NEVER_CACHE, the file handle will never be cached, because
+    /// the modification time won't match.
+    const static int64_t NEVER_CACHE = -1;
+
+    /// If true, read from HDFS cache if possible.
+    const bool try_cache_;
+
+    /// Last modified time of the file associated with the scan range. If set to
+    /// NEVER_CACHE, caching is disabled.
+    const int64_t mtime_;
+
+    /// A destination buffer provided by the client, NULL and -1 if no buffer.
+    uint8_t* const client_buffer_;
+    const int64_t client_buffer_len_;
+  };
+
   /// ScanRange description. The caller must call Reset() to initialize the fields
   /// before calling AddScanRanges(). The private fields are used internally by
   /// the IoMgr.
   class ScanRange : public RequestRange {
    public:
-
-    /// If the mtime is set to NEVER_CACHE, the file handle should never be cached.
-    const static int64_t NEVER_CACHE = -1;
-
     /// The initial queue capacity for this.  Specify -1 to use IoMgr default.
     ScanRange(int initial_capacity = -1);
 
     virtual ~ScanRange();
 
-    /// Resets this scan range object with the scan range description.  The scan range
-    /// must fall within the file bounds (offset >= 0 and offset + len <= file_length).
-    /// Resets this scan range object with the scan range description.
+    /// Resets this scan range object with the scan range description. The scan range
+    /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is NULL for the
+    /// local filesystem). The scan range must fall within the file bounds (offset >= 0
+    /// and offset + len <= file_length). 'disk_id' is the disk queue to add the range
+    /// to. If 'expected_local' is true, a warning is generated if the read did not
+    /// come from a local disk. 'buffer_opts' specifies buffer management options -
+    /// see the DiskIoMgr class comment and the BufferOpts comments for details.
+    /// 'meta_data' is an arbitrary client-provided pointer for any auxiliary data.
     void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
-        bool try_cache, bool expected_local, int64_t mtime, void* metadata = NULL);
+        bool expected_local, const BufferOpts& buffer_opts, void* meta_data = NULL);
 
     void* meta_data() const { return meta_data_; }
     bool try_cache() const { return try_cache_; }
@@ -410,9 +479,9 @@ class DiskIoMgr {
     /// Closes the file for this range. This function only modifies state in this range.
     void Close();
 
-    /// Reads from this range into 'buffer'. Buffer is preallocated. Returns the number
-    /// of bytes read. Updates range to keep track of where in the file we are.
-    Status Read(char* buffer, int64_t* bytes_read, bool* eosr);
+    /// Reads from this range into 'buffer', which has length 'buffer_len' bytes. Returns
+    /// the number of bytes read. The read position in this scan range is updated.
+    Status Read(uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr);
 
     /// Reads from the DN cache. On success, sets cached_buffer_ to the DN buffer
     /// and *read_succeeded to true.
@@ -450,9 +519,24 @@ class DiskIoMgr {
       HdfsCachedFileHandle* hdfs_file_;
     };
 
-    /// If non-null, this is DN cached buffer. This means the cached read succeeded
-    /// and all the bytes for the range are in this buffer.
-    struct hadoopRzBuffer* cached_buffer_;
+    /// Tagged union that holds a buffer for the cases when there is a buffer allocated
+    /// externally from DiskIoMgr that is associated with the scan range.
+    enum class ExternalBufferTag { CLIENT_BUFFER, CACHED_BUFFER, NO_BUFFER };
+    ExternalBufferTag external_buffer_tag_;
+    union {
+      /// Valid if the 'external_buffer_tag_' is CLIENT_BUFFER.
+      struct {
+        /// Client-provided buffer to read the whole scan range into.
+        uint8_t* data;
+
+        /// Length of the client-provided buffer.
+        int64_t len;
+      } client_buffer_;
+
+      /// Valid and non-NULL if the external_buffer_tag_ is CACHED_BUFFER, which means
+      /// that a cached read succeeded and all the bytes for the range are in this buffer.
+      struct hadoopRzBuffer* cached_buffer_;
+    };
 
     /// Lock protecting fields below.
     /// This lock should not be taken during Open/Read/Close.
@@ -564,7 +648,7 @@ class DiskIoMgr {
   ///    the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via
   ///    GetNext().
   Status RegisterContext(DiskIoRequestContext** request_context,
-      MemTracker* reader_mem_tracker = NULL);
+      MemTracker* reader_mem_tracker);
 
   /// Unregisters context from the disk IoMgr. This must be called for every
   /// RegisterContext() regardless of cancellation and must be called in the
@@ -590,8 +674,10 @@ class DiskIoMgr {
   /// (i.e. the caller should not/cannot call GetNextRange for these ranges).
   /// This can be used to do synchronous reads as well as schedule dependent ranges,
   /// as in the case for columnar formats.
-  Status AddScanRanges(DiskIoRequestContext* reader, const std::vector<ScanRange*>& ranges,
-      bool schedule_immediately = false);
+  Status AddScanRanges(DiskIoRequestContext* reader,
+      const std::vector<ScanRange*>& ranges, bool schedule_immediately = false);
+  Status AddScanRange(
+      DiskIoRequestContext* reader, ScanRange* range, bool schedule_immediately = false);
 
   /// Add a WriteRange for the writer. This is non-blocking and schedules the context
   /// on the IoMgr disk queue. Does not create any files.
@@ -609,6 +695,8 @@ class DiskIoMgr {
   /// is read. This can be called while there are outstanding ScanRanges and is
   /// thread safe. Multiple threads can be calling Read() per reader at a time.
   /// range *cannot* have already been added via AddScanRanges.
+  /// This can only be used if the scan range fits in a single IO buffer (i.e. is smaller
+  /// than max_read_buffer_size()) or if reading into a client-provided buffer.
   Status Read(DiskIoRequestContext* reader, ScanRange* range, BufferDescriptor** buffer);
 
   /// Determine which disk queue this file should be assigned to.  Returns an index into
@@ -751,7 +839,7 @@ class DiskIoMgr {
   ///  free_buffers_[10] => list of free buffers with size 1 MB
   ///  free_buffers_[13] => list of free buffers with size 8 MB
   ///  free_buffers_[n]  => list of free buffers with size 2^n * 1024 B
-  std::vector<std::list<char*>> free_buffers_;
+  std::vector<std::list<uint8_t*>> free_buffers_;
 
   /// List of free buffer desc objects that can be handed out to clients
   std::list<BufferDescriptor*> free_buffer_descs_;
@@ -787,8 +875,8 @@ class DiskIoMgr {
   /// Gets a BufferDescriptor initialized with the provided parameters. The object may be
   /// recycled or newly allocated. Does not do anything aside from initialize the
   /// descriptor's fields.
-  BufferDescriptor* GetBufferDesc(DiskIoRequestContext* reader,
-      MemTracker* mem_tracker, ScanRange* range, char* buffer, int64_t buffer_size);
+  BufferDescriptor* GetBufferDesc(DiskIoRequestContext* reader, MemTracker* mem_tracker,
+      ScanRange* range, uint8_t* buffer, int64_t buffer_size);
 
   /// Returns the buffer desc and underlying buffer to the disk IoMgr. This also updates
   /// the reader and disk queue state.
@@ -849,6 +937,13 @@ class DiskIoMgr {
   /// Reads the specified scan range and calls HandleReadFinished when done.
   void ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
       ScanRange* range);
+
+  /// Try to allocate the next buffer for the scan range, returning the new buffer
+  /// if successful. If 'reader' is cancelled, cancels the range and returns NULL.
+  /// If there is memory pressure and buffers are already queued, adds the range
+  /// to the blocked ranges and returns NULL.
+  BufferDescriptor* TryAllocateNextBufferForRange(DiskQueue* disk_queue,
+      DiskIoRequestContext* reader, ScanRange* range, int64_t buffer_size);
 };
 
 }


Mime
View raw message