impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [15/16] incubator-impala git commit: IMPALA-4835 (prep only): create io subfolder and namespace
Date Sat, 18 Nov 2017 00:31:58 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-reader-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.h b/be/src/runtime/disk-io-mgr-reader-context.h
deleted file mode 100644
index 90426d9..0000000
--- a/be/src/runtime/disk-io-mgr-reader-context.h
+++ /dev/null
@@ -1,406 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_DISK_IO_MGR_READER_CONTEXT_H
-#define IMPALA_RUNTIME_DISK_IO_MGR_READER_CONTEXT_H
-
-#include "runtime/disk-io-mgr.h"
-#include "util/condition-variable.h"
-
-namespace impala {
-
-/// A request context is used to group together I/O requests belonging to a client of the
-/// I/O manager for management and scheduling. For most I/O manager clients it is an
-/// opaque pointer, but some clients may need to include this header, e.g. to make the
-/// unique_ptr<DiskIoRequestContext> destructor work correctly.
-///
-/// Implementation Details
-/// ======================
-/// This object maintains a lot of state that is carefully synchronized. The context
-/// maintains state across all disks as well as per disk state.
-/// The unit for an IO request is a RequestRange, which may be a ScanRange or a
-/// WriteRange.
-/// A scan range for the reader is on one of five states:
-/// 1) PerDiskState's unstarted_ranges: This range has only been queued
-///    and nothing has been read from it.
-/// 2) DiskIoRequestContext's ready_to_start_ranges_: This range is about to be started.
-///    As soon as the reader picks it up, it will move to the in_flight_ranges
-///    queue.
-/// 3) PerDiskState's in_flight_ranges: This range is being processed and will
-///    be read from the next time a disk thread picks it up in GetNextRequestRange()
-/// 4) ScanRange's outgoing ready buffers is full. We can't read for this range
-///    anymore. We need the caller to pull a buffer off which will put this in
-///    the in_flight_ranges queue. These ranges are in the DiskIoRequestContext's
-///    blocked_ranges_ queue.
-/// 5) ScanRange is cached and in the cached_ranges_ queue.
-//
-/// If the scan range is read and does not get blocked on the outgoing queue, the
-/// transitions are: 1 -> 2 -> 3.
-/// If the scan range does get blocked, the transitions are
-/// 1 -> 2 -> 3 -> (4 -> 3)*
-//
-/// In the case of a cached scan range, the range is immediately put in cached_ranges_.
-/// When the caller asks for the next range to process, we first pull ranges from
-/// the cache_ranges_ queue. If the range was cached, the range is removed and
-/// done (ranges are either entirely cached or not at all). If the cached read attempt
-/// fails, we put the range in state 1.
-//
-/// A write range for a context may be in one of two lists:
-/// 1) unstarted_write_ranges_ : Ranges that have been queued but not processed.
-/// 2) in_flight_ranges_: The write range is ready to be processed by the next disk thread
-///    that picks it up in GetNextRequestRange().
-//
-/// AddWriteRange() adds WriteRanges for a disk.
-/// It is the responsibility of the client to pin the data to be written via a WriteRange
-/// in memory. After a WriteRange has been written, a callback is invoked to inform the
-/// client that the write has completed.
-//
-/// An important assumption is that write does not exceed the maximum read size and that
-/// the entire range is written when the write request is handled. (In other words, writes
-/// are not broken up.)
-//
-/// When a DiskIoRequestContext is processed by a disk thread in GetNextRequestRange(),
-/// a write range is always removed from the list of unstarted write ranges and appended
-/// to the in_flight_ranges_ queue. This is done to alternate reads and writes - a read
-/// that is scheduled (by calling GetNextRange()) is always followed by a write (if one
-/// exists).  And since at most one WriteRange can be present in in_flight_ranges_ at any
-/// time (once a write range is returned from GetNetxRequestRange() it is completed an
-/// not re-enqueued), a scan range scheduled via a call to GetNextRange() can be queued up
-/// behind at most one write range.
-class DiskIoRequestContext {
-  using RequestRange = DiskIoMgr::RequestRange;
-  using ScanRange = DiskIoMgr::ScanRange;
-  using WriteRange = DiskIoMgr::WriteRange;
-  using RequestType = DiskIoMgr::RequestType;
-
- public:
-  ~DiskIoRequestContext() { DCHECK_EQ(state_, Inactive) << "Must be unregistered."; }
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(DiskIoRequestContext);
-  friend class DiskIoMgr;
-
-  class PerDiskState;
-
-  enum State {
-    /// Reader is initialized and maps to a client
-    Active,
-
-    /// Reader is in the process of being cancelled.  Cancellation is coordinated between
-    /// different threads and when they are all complete, the reader context is moved to
-    /// the inactive state.
-    Cancelled,
-
-    /// Reader context does not map to a client.  Accessing memory in this context
-    /// is invalid (i.e. it is equivalent to a dangling pointer).
-    Inactive,
-  };
-
-  DiskIoRequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker);
-
-  /// Decrements the number of active disks for this reader.  If the disk count
-  /// goes to 0, the disk complete condition variable is signaled.
-  /// Reader lock must be taken before this call.
-  void DecrementDiskRefCount() {
-    // boost doesn't let us dcheck that the reader lock is taken
-    DCHECK_GT(num_disks_with_ranges_, 0);
-    if (--num_disks_with_ranges_ == 0) {
-      disks_complete_cond_var_.NotifyAll();
-    }
-    DCHECK(Validate()) << std::endl << DebugString();
-  }
-
-  /// Reader & Disk Scheduling: Readers that currently can't do work are not on
-  /// the disk's queue. These readers are ones that don't have any ranges in the
-  /// in_flight_queue AND have not prepared a range by setting next_range_to_start.
-  /// The rule to make sure readers are scheduled correctly is to ensure anytime a
-  /// range is put on the in_flight_queue or anytime next_range_to_start is set to
-  /// NULL, the reader is scheduled.
-
-  /// Adds range to in_flight_ranges, scheduling this reader on the disk threads
-  /// if necessary.
-  /// Reader lock must be taken before this.
-  void ScheduleScanRange(ScanRange* range) {
-    DCHECK_EQ(state_, Active);
-    DCHECK(range != NULL);
-    DiskIoRequestContext::PerDiskState& state = disk_states_[range->disk_id()];
-    state.in_flight_ranges()->Enqueue(range);
-    state.ScheduleContext(this, range->disk_id());
-  }
-
-  /// Cancels the context with status code 'status'
-  void Cancel(const Status& status);
-
-  /// Cancel the context if not already cancelled, wait for all scan ranges to finish
-  /// and mark the context as inactive, after which it cannot be used.
-  void CancelAndMarkInactive();
-
-  /// Adds request range to disk queue for this request context. Currently,
-  /// schedule_immediately must be false is RequestRange is a write range.
-  void AddRequestRange(RequestRange* range, bool schedule_immediately);
-
-  /// Validates invariants of reader.  Reader lock must be taken beforehand.
-  bool Validate() const;
-
-  /// Dumps out reader information.  Lock should be taken by caller
-  std::string DebugString() const;
-
-  /// Parent object
-  DiskIoMgr* const parent_;
-
-  /// Memory used for this reader.  This is unowned by this object.
-  MemTracker* const mem_tracker_;
-
-  /// Total bytes read for this reader
-  RuntimeProfile::Counter* bytes_read_counter_ = nullptr;
-
-  /// Total time spent in hdfs reading
-  RuntimeProfile::Counter* read_timer_ = nullptr;
-
-  /// Number of active read threads
-  RuntimeProfile::Counter* active_read_thread_counter_ = nullptr;
-
-  /// Disk access bitmap. The counter's bit[i] is set if disk id i has been accessed.
-  /// TODO: we can only support up to 64 disks with this bitmap but it lets us use a
-  /// builtin atomic instruction. Probably good enough for now.
-  RuntimeProfile::Counter* disks_accessed_bitmap_ = nullptr;
-
-  /// Total number of bytes read locally, updated at end of each range scan
-  AtomicInt64 bytes_read_local_{0};
-
-  /// Total number of bytes read via short circuit read, updated at end of each range scan
-  AtomicInt64 bytes_read_short_circuit_{0};
-
-  /// Total number of bytes read from date node cache, updated at end of each range scan
-  AtomicInt64 bytes_read_dn_cache_{0};
-
-  /// Total number of bytes from remote reads that were expected to be local.
-  AtomicInt64 unexpected_remote_bytes_{0};
-
-  /// The number of buffers that have been returned to the reader (via GetNext) that the
-  /// reader has not returned. Only included for debugging and diagnostics.
-  AtomicInt32 num_buffers_in_reader_{0};
-
-  /// The number of scan ranges that have been completed for this reader.
-  AtomicInt32 num_finished_ranges_{0};
-
-  /// The number of scan ranges that required a remote read, updated at the end of each
-  /// range scan. Only used for diagnostics.
-  AtomicInt32 num_remote_ranges_{0};
-
-  /// The total number of scan ranges that have not been started. Only used for
-  /// diagnostics. This is the sum of all unstarted_scan_ranges across all disks.
-  AtomicInt32 num_unstarted_scan_ranges_{0};
-
-  /// Total number of file handle opens where the file handle was present in the cache
-  AtomicInt32 cached_file_handles_hit_count_{0};
-
-  /// Total number of file handle opens where the file handle was not in the cache
-  AtomicInt32 cached_file_handles_miss_count_{0};
-
-  /// The number of buffers that are being used for this reader. This is the sum
-  /// of all buffers in ScanRange queues and buffers currently being read into (i.e. about
-  /// to be queued). This includes both IOMgr-allocated buffers and client-provided
-  /// buffers.
-  AtomicInt32 num_used_buffers_{0};
-
-  /// The total number of ready buffers across all ranges.  Ready buffers are buffers
-  /// that have been read from disk but not retrieved by the caller.
-  /// This is the sum of all queued buffers in all ranges for this reader context.
-  AtomicInt32 num_ready_buffers_{0};
-
-  /// All fields below are accessed by multiple threads and the lock needs to be
-  /// taken before accessing them. Must be acquired before ScanRange::lock_ if both
-  /// are held simultaneously.
-  boost::mutex lock_;
-
-  /// Current state of the reader
-  State state_ = Active;
-
-  /// Status of this reader.  Set to non-ok if cancelled.
-  Status status_;
-
-  /// The number of disks with scan ranges remaining (always equal to the sum of
-  /// disks with ranges).
-  int num_disks_with_ranges_ = 0;
-
-  /// This is the list of ranges that are expected to be cached on the DN.
-  /// When the reader asks for a new range (GetNextScanRange()), we first
-  /// return ranges from this list.
-  InternalQueue<ScanRange> cached_ranges_;
-
-  /// A list of ranges that should be returned in subsequent calls to
-  /// GetNextRange.
-  /// There is a trade-off with when to populate this list.  Populating it on
-  /// demand means consumers need to wait (happens in DiskIoMgr::GetNextRange()).
-  /// Populating it preemptively means we make worse scheduling decisions.
-  /// We currently populate one range per disk.
-  /// TODO: think about this some more.
-  InternalQueue<ScanRange> ready_to_start_ranges_;
-  ConditionVariable ready_to_start_ranges_cv_; // used with lock_
-
-  /// Ranges that are blocked due to back pressure on outgoing buffers.
-  InternalQueue<ScanRange> blocked_ranges_;
-
-  /// Condition variable for UnregisterContext() to wait for all disks to complete
-  ConditionVariable disks_complete_cond_var_;
-
-  /// Struct containing state per disk. See comments in the disk read loop on how
-  /// they are used.
-  class PerDiskState {
-   public:
-    bool done() const { return done_; }
-    void set_done(bool b) { done_ = b; }
-
-    int num_remaining_ranges() const { return num_remaining_ranges_; }
-    int& num_remaining_ranges() { return num_remaining_ranges_; }
-
-    ScanRange* next_scan_range_to_start() { return next_scan_range_to_start_; }
-    void set_next_scan_range_to_start(ScanRange* range) {
-      next_scan_range_to_start_ = range;
-    }
-
-    /// We need to have a memory barrier to prevent this load from being reordered
-    /// with num_threads_in_op(), since these variables are set without the reader
-    /// lock taken
-    bool is_on_queue() const {
-      bool b = is_on_queue_;
-      __sync_synchronize();
-      return b;
-    }
-
-    int num_threads_in_op() const {
-      int v = num_threads_in_op_.Load();
-      // TODO: determine whether this barrier is necessary for any callsites.
-      AtomicUtil::MemoryBarrier();
-      return v;
-    }
-
-    const InternalQueue<ScanRange>* unstarted_scan_ranges() const {
-      return &unstarted_scan_ranges_;
-    }
-    const InternalQueue<WriteRange>* unstarted_write_ranges() const {
-      return &unstarted_write_ranges_;
-    }
-    const InternalQueue<RequestRange>* in_flight_ranges() const {
-      return &in_flight_ranges_;
-    }
-
-    InternalQueue<ScanRange>* unstarted_scan_ranges() { return &unstarted_scan_ranges_; }
-    InternalQueue<WriteRange>* unstarted_write_ranges() {
-      return &unstarted_write_ranges_;
-    }
-    InternalQueue<RequestRange>* in_flight_ranges() { return &in_flight_ranges_; }
-
-    /// Schedules the request context on this disk if it's not already on the queue.
-    /// Context lock must be taken before this.
-    void ScheduleContext(DiskIoRequestContext* context, int disk_id);
-
-    /// Increment the ref count on reader.  We need to track the number of threads per
-    /// reader per disk that are in the unlocked hdfs read code section. This is updated
-    /// by multiple threads without a lock so we need to use an atomic int.
-    void IncrementRequestThreadAndDequeue() {
-      num_threads_in_op_.Add(1);
-      is_on_queue_ = false;
-    }
-
-    void DecrementRequestThread() { num_threads_in_op_.Add(-1); }
-
-    /// Decrement request thread count and do final cleanup if this is the last
-    /// thread. RequestContext lock must be taken before this.
-    void DecrementRequestThreadAndCheckDone(DiskIoRequestContext* context) {
-      num_threads_in_op_.Add(-1); // Also acts as a barrier.
-      if (!is_on_queue_ && num_threads_in_op_.Load() == 0 && !done_) {
-        // This thread is the last one for this reader on this disk, do final cleanup
-        context->DecrementDiskRefCount();
-        done_ = true;
-      }
-    }
-
-   private:
-    /// If true, this disk is all done for this request context, including any cleanup.
-    /// If done is true, it means that this request must not be on this disk's queue
-    /// *AND* there are no threads currently working on this context. To satisfy
-    /// this, only the last thread (per disk) can set this to true.
-    bool done_ = true;
-
-    /// For each disk, keeps track if the context is on this disk's queue, indicating
-    /// the disk must do some work for this context. The disk needs to do work in 4 cases:
-    ///  1) in_flight_ranges is not empty, the disk needs to read for this reader.
-    ///  2) next_range_to_start is NULL, the disk needs to prepare a scan range to be
-    ///     read next.
-    ///  3) the reader has been cancelled and this disk needs to participate in the
-    ///     cleanup.
-    ///  4) A write range is added to queue.
-    /// In general, we only want to put a context on the disk queue if there is something
-    /// useful that can be done. If there's nothing useful, the disk queue will wake up
-    /// and then remove the reader from the queue. Doing this causes thrashing of the
-    /// threads.
-    bool is_on_queue_ = false;
-
-    /// For each disks, the number of request ranges that have not been fully read.
-    /// In the non-cancellation path, this will hit 0, and done will be set to true
-    /// by the disk thread. This is undefined in the cancellation path (the various
-    /// threads notice by looking at the DiskIoRequestContext's state_).
-    int num_remaining_ranges_ = 0;
-
-    /// Queue of ranges that have not started being read.  This list is exclusive
-    /// with in_flight_ranges.
-    InternalQueue<ScanRange> unstarted_scan_ranges_;
-
-    /// Queue of pending IO requests for this disk in the order that they will be
-    /// processed. A ScanRange is added to this queue when it is returned in
-    /// GetNextRange(), or when it is added with schedule_immediately = true.
-    /// A WriteRange is added to this queue from unstarted_write_ranges_ for each
-    /// invocation of GetNextRequestRange() in WorkLoop().
-    /// The size of this queue is always less than or equal to num_remaining_ranges.
-    InternalQueue<RequestRange> in_flight_ranges_;
-
-    /// The next range to start for this reader on this disk. Each disk (for each reader)
-    /// picks the next range to start. The range is set here and also added to the
-    /// ready_to_start_ranges_ queue. The reader pulls from the queue in FIFO order,
-    /// so the ranges from different disks are round-robined. When the range is pulled
-    /// off the ready_to_start_ranges_ queue, it sets this variable to NULL, so the disk
-    /// knows to populate it again and add it to ready_to_start_ranges_ i.e. it is used
-    /// as a flag by DiskIoMgr::GetNextScanRange to determine if it needs to add another
-    /// range to ready_to_start_ranges_.
-    ScanRange* next_scan_range_to_start_ = nullptr;
-
-    /// For each disk, the number of threads issuing the underlying read/write on behalf
-    /// of this context. There are a few places where we release the context lock, do some
-    /// work, and then grab the lock again.  Because we don't hold the lock for the
-    /// entire operation, we need this ref count to keep track of which thread should do
-    /// final resource cleanup during cancellation.
-    /// Only the thread that sees the count at 0 should do the final cleanup.
-    AtomicInt32 num_threads_in_op_{0};
-
-    /// Queue of write ranges to process for this disk. A write range is always added
-    /// to in_flight_ranges_ in GetNextRequestRange(). There is a separate
-    /// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate between reads
-    /// and writes. (Otherwise, since next_scan_range_to_start is set
-    /// in GetNextRequestRange() whenever it is null, repeated calls to
-    /// GetNextRequestRange() and GetNextRange() may result in only reads being processed)
-    InternalQueue<WriteRange> unstarted_write_ranges_;
-  };
-
-  /// Per disk states to synchronize multiple disk threads accessing the same request
-  /// context.
-  std::vector<PerDiskState> disk_states_;
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
deleted file mode 100644
index 7f0692e..0000000
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ /dev/null
@@ -1,591 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/disk-io-mgr.h"
-#include "runtime/disk-io-mgr-internal.h"
-#include "util/error-util.h"
-#include "util/hdfs-util.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRead() "
-    "when performing HDFS read operations. This is necessary to use HDFS hedged reads "
-    "(assuming the HDFS client is configured to do so).");
-
-// TODO: Run perf tests and empirically settle on the most optimal default value for the
-// read buffer size. Currently setting it as 128k for the same reason as for S3, i.e.
-// due to JNI array allocation and memcpy overhead, 128k was emperically found to have the
-// least overhead.
-DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to use when "
-    "reading from ADLS.");
-
-// Implementation of the ScanRange functionality. Each ScanRange contains a queue
-// of ready buffers. For each ScanRange, there is only a single producer and
-// consumer thread, i.e. only one disk thread will push to a scan range at
-// any time and only one thread will remove from the queue. This is to guarantee
-// that buffers are queued and read in file order.
-
-bool DiskIoMgr::ScanRange::EnqueueBuffer(
-    const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) {
-  DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
-  {
-    unique_lock<mutex> scan_range_lock(lock_);
-    DCHECK(Validate()) << DebugString();
-    DCHECK(!eosr_returned_);
-    DCHECK(!eosr_queued_);
-    if (is_cancelled_) {
-      // Return the buffer, this range has been cancelled
-      if (buffer->buffer_ != nullptr) {
-        io_mgr_->num_buffers_in_readers_.Add(1);
-        reader_->num_buffers_in_reader_.Add(1);
-      }
-      reader_->num_used_buffers_.Add(-1);
-      io_mgr_->ReturnBuffer(move(buffer));
-      return false;
-    }
-    reader_->num_ready_buffers_.Add(1);
-    eosr_queued_ = buffer->eosr();
-    ready_buffers_.emplace_back(move(buffer));
-
-    DCHECK_LE(ready_buffers_.size(), SCAN_RANGE_READY_BUFFER_LIMIT);
-    blocked_on_queue_ = ready_buffers_.size() == SCAN_RANGE_READY_BUFFER_LIMIT;
-  }
-
-  buffer_ready_cv_.NotifyOne();
-
-  return blocked_on_queue_;
-}
-
-Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
-  DCHECK(*buffer == nullptr);
-  bool eosr;
-  {
-    unique_lock<mutex> scan_range_lock(lock_);
-    if (eosr_returned_) return Status::OK();
-    DCHECK(Validate()) << DebugString();
-
-    while (ready_buffers_.empty() && !is_cancelled_) {
-      buffer_ready_cv_.Wait(scan_range_lock);
-    }
-
-    if (is_cancelled_) {
-      DCHECK(!status_.ok());
-      return status_;
-    }
-
-    // Remove the first ready buffer from the queue and return it
-    DCHECK(!ready_buffers_.empty());
-    DCHECK_LE(ready_buffers_.size(), SCAN_RANGE_READY_BUFFER_LIMIT);
-    *buffer = move(ready_buffers_.front());
-    ready_buffers_.pop_front();
-    eosr_returned_ = (*buffer)->eosr();
-    eosr = (*buffer)->eosr();
-  }
-
-  // Update tracking counters. The buffer has now moved from the IoMgr to the
-  // caller.
-  io_mgr_->num_buffers_in_readers_.Add(1);
-  reader_->num_buffers_in_reader_.Add(1);
-  reader_->num_ready_buffers_.Add(-1);
-  reader_->num_used_buffers_.Add(-1);
-  if (eosr) reader_->num_finished_ranges_.Add(1);
-
-  Status status = (*buffer)->status_;
-  if (!status.ok()) {
-    io_mgr_->ReturnBuffer(move(*buffer));
-    return status;
-  }
-
-  unique_lock<mutex> reader_lock(reader_->lock_);
-
-  DCHECK(reader_->Validate()) << endl << reader_->DebugString();
-  if (reader_->state_ == DiskIoRequestContext::Cancelled) {
-    reader_->blocked_ranges_.Remove(this);
-    Cancel(reader_->status_);
-    io_mgr_->ReturnBuffer(move(*buffer));
-    return status_;
-  }
-
-  {
-    // Check to see if we can re-schedule a blocked range. Note that EnqueueBuffer()
-    // may have been called after we released 'lock_' above so we need to re-check
-    // whether the queue is full.
-    unique_lock<mutex> scan_range_lock(lock_);
-    if (blocked_on_queue_ && ready_buffers_.size() < SCAN_RANGE_READY_BUFFER_LIMIT
-        && !eosr_queued_) {
-      blocked_on_queue_ = false;
-      // This scan range was blocked and is no longer, add it to the reader
-      // queue again.
-      reader_->blocked_ranges_.Remove(this);
-      reader_->ScheduleScanRange(this);
-    }
-  }
-  return Status::OK();
-}
-
-void DiskIoMgr::ScanRange::Cancel(const Status& status) {
-  // Cancelling a range that was never started, ignore.
-  if (io_mgr_ == nullptr) return;
-
-  DCHECK(!status.ok());
-  {
-    // Grab both locks to make sure that all working threads see is_cancelled_.
-    unique_lock<mutex> scan_range_lock(lock_);
-    unique_lock<mutex> hdfs_lock(hdfs_lock_);
-    DCHECK(Validate()) << DebugString();
-    if (is_cancelled_) return;
-    is_cancelled_ = true;
-    status_ = status;
-  }
-  buffer_ready_cv_.NotifyAll();
-  CleanupQueuedBuffers();
-
-  // For cached buffers, we can't close the range until the cached buffer is returned.
-  // Close() is called from DiskIoMgr::ReturnBuffer().
-  if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) Close();
-}
-
-void DiskIoMgr::ScanRange::CleanupQueuedBuffers() {
-  DCHECK(is_cancelled_);
-  io_mgr_->num_buffers_in_readers_.Add(ready_buffers_.size());
-  reader_->num_buffers_in_reader_.Add(ready_buffers_.size());
-  reader_->num_used_buffers_.Add(-ready_buffers_.size());
-  reader_->num_ready_buffers_.Add(-ready_buffers_.size());
-
-  while (!ready_buffers_.empty()) {
-    io_mgr_->ReturnBuffer(move(ready_buffers_.front()));
-    ready_buffers_.pop_front();
-  }
-}
-
-string DiskIoMgr::ScanRange::DebugString() const {
-  stringstream ss;
-  ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_
-     << " len=" << len_ << " bytes_read=" << bytes_read_
-     << " buffer_queue=" << ready_buffers_.size()
-     << " hdfs_file=" << exclusive_hdfs_fh_;
-  return ss.str();
-}
-
-bool DiskIoMgr::ScanRange::Validate() {
-  if (bytes_read_ > len_) {
-    LOG(WARNING) << "Bytes read tracking is wrong. Shouldn't read past the scan range."
-                 << " bytes_read_=" << bytes_read_ << " len_=" << len_;
-    return false;
-  }
-  if (eosr_returned_ && !eosr_queued_) {
-    LOG(WARNING) << "Returned eosr to reader before finishing reading the scan range"
-                 << " eosr_returned_=" << eosr_returned_
-                 << " eosr_queued_=" << eosr_queued_;
-    return false;
-  }
-  return true;
-}
-
-DiskIoMgr::ScanRange::ScanRange()
-  : RequestRange(RequestType::READ),
-    num_remote_bytes_(0),
-    external_buffer_tag_(ExternalBufferTag::NO_BUFFER),
-    mtime_(-1) {}
-
-DiskIoMgr::ScanRange::~ScanRange() {
-  DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed.";
-  DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
-      << "Cached buffer was not released.";
-}
-
-void DiskIoMgr::ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
-    int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data) {
-  DCHECK(ready_buffers_.empty());
-  DCHECK(file != nullptr);
-  DCHECK_GE(len, 0);
-  DCHECK_GE(offset, 0);
-  DCHECK(buffer_opts.client_buffer_ == nullptr ||
-         buffer_opts.client_buffer_len_ >= len_);
-  fs_ = fs;
-  file_ = file;
-  len_ = len;
-  offset_ = offset;
-  disk_id_ = disk_id;
-  try_cache_ = buffer_opts.try_cache_;
-  mtime_ = buffer_opts.mtime_;
-  expected_local_ = expected_local;
-  num_remote_bytes_ = 0;
-  meta_data_ = meta_data;
-  if (buffer_opts.client_buffer_ != nullptr) {
-    external_buffer_tag_ = ExternalBufferTag::CLIENT_BUFFER;
-    client_buffer_.data = buffer_opts.client_buffer_;
-    client_buffer_.len = buffer_opts.client_buffer_len_;
-  } else {
-    external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
-  }
-  io_mgr_ = nullptr;
-  reader_ = nullptr;
-  exclusive_hdfs_fh_ = nullptr;
-}
-
-void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader) {
-  DCHECK(exclusive_hdfs_fh_ == nullptr);
-  DCHECK(local_file_ == nullptr);
-  // Reader must provide MemTracker or a buffer.
-  DCHECK(external_buffer_tag_ == ExternalBufferTag::CLIENT_BUFFER
-      || reader->mem_tracker_ != nullptr);
-  io_mgr_ = io_mgr;
-  reader_ = reader;
-  local_file_ = nullptr;
-  exclusive_hdfs_fh_ = nullptr;
-  bytes_read_ = 0;
-  is_cancelled_ = false;
-  eosr_queued_= false;
-  eosr_returned_= false;
-  blocked_on_queue_ = false;
-  DCHECK(Validate()) << DebugString();
-}
-
-Status DiskIoMgr::ScanRange::Open(bool use_file_handle_cache) {
-  unique_lock<mutex> hdfs_lock(hdfs_lock_);
-  if (is_cancelled_) return Status::CANCELLED;
-
-  if (fs_ != nullptr) {
-    if (exclusive_hdfs_fh_ != nullptr) return Status::OK();
-    // With file handle caching, the scan range does not maintain its own
-    // hdfs file handle. File handle caching is only used for local files,
-    // so s3 and remote filesystems should obtain an exclusive file handle
-    // for each scan range.
-    if (use_file_handle_cache && expected_local_) return Status::OK();
-    // Get a new exclusive file handle.
-    exclusive_hdfs_fh_ = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
-        mtime(), reader_, true);
-    if (exclusive_hdfs_fh_ == nullptr) {
-      return Status(TErrorCode::DISK_IO_ERROR,
-          GetHdfsErrorMsg("Failed to open HDFS file ", file_));
-    }
-
-    if (hdfsSeek(fs_, exclusive_hdfs_fh_->file(), offset_) != 0) {
-      // Destroy the file handle and remove it from the cache.
-      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, true);
-      exclusive_hdfs_fh_ = nullptr;
-      return Status(TErrorCode::DISK_IO_ERROR,
-          Substitute("Error seeking to $0 in file: $1 $2", offset_, file_,
-          GetHdfsErrorMsg("")));
-    }
-  } else {
-    if (local_file_ != nullptr) return Status::OK();
-
-    local_file_ = fopen(file(), "r");
-    if (local_file_ == nullptr) {
-      return Status(TErrorCode::DISK_IO_ERROR, Substitute("Could not open file: $0: $1",
-            file_, GetStrErrMsg()));
-    }
-    if (fseek(local_file_, offset_, SEEK_SET) == -1) {
-      fclose(local_file_);
-      local_file_ = nullptr;
-      return Status(TErrorCode::DISK_IO_ERROR, Substitute("Could not seek to $0 "
-          "for file: $1: $2", offset_, file_, GetStrErrMsg()));
-    }
-  }
-  if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
-    ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
-  }
-  return Status::OK();
-}
-
-void DiskIoMgr::ScanRange::Close() {
-  unique_lock<mutex> hdfs_lock(hdfs_lock_);
-  bool closed_file = false;
-  if (fs_ != nullptr) {
-    if (exclusive_hdfs_fh_ != nullptr) {
-      GetHdfsStatistics(exclusive_hdfs_fh_->file());
-
-      if (external_buffer_tag_ == ExternalBufferTag::CACHED_BUFFER) {
-        hadoopRzBufferFree(exclusive_hdfs_fh_->file(), cached_buffer_);
-        cached_buffer_ = nullptr;
-        external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
-      }
-
-      // Destroy the file handle and remove it from the cache.
-      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, true);
-      exclusive_hdfs_fh_ = nullptr;
-      closed_file = true;
-    }
-
-    if (FLAGS_use_hdfs_pread) {
-      // Update Hedged Read Metrics.
-      // We call it only if the --use_hdfs_pread flag is set, to avoid having the
-      // libhdfs client malloc and free a hdfsHedgedReadMetrics object unnecessarily
-      // otherwise. 'hedged_metrics' is only set upon success.
-      struct hdfsHedgedReadMetrics* hedged_metrics;
-      int success = hdfsGetHedgedReadMetrics(fs_, &hedged_metrics);
-      if (success == 0) {
-        ImpaladMetrics::HEDGED_READ_OPS->set_value(hedged_metrics->hedgedReadOps);
-        ImpaladMetrics::HEDGED_READ_OPS_WIN->set_value(hedged_metrics->hedgedReadOpsWin);
-        hdfsFreeHedgedReadMetrics(hedged_metrics);
-      }
-    }
-
-    if (num_remote_bytes_ > 0) {
-      reader_->num_remote_ranges_.Add(1L);
-      if (expected_local_) {
-        reader_->unexpected_remote_bytes_.Add(num_remote_bytes_);
-        VLOG_FILE << "Unexpected remote HDFS read of "
-                  << PrettyPrinter::Print(num_remote_bytes_, TUnit::BYTES)
-                  << " for file '" << file_ << "'";
-      }
-    }
-  } else {
-    if (local_file_ == nullptr) return;
-    fclose(local_file_);
-    local_file_ = nullptr;
-    closed_file = true;
-  }
-  if (closed_file && ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
-    ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
-  }
-}
-
-int64_t DiskIoMgr::ScanRange::MaxReadChunkSize() const {
-  // S3 InputStreams don't support DIRECT_READ (i.e. java.nio.ByteBuffer read()
-  // interface).  So, hdfsRead() needs to allocate a Java byte[] and copy the data out.
-  // Profiles show that both the JNI array allocation and the memcpy adds much more
-  // overhead for larger buffers, so limit the size of each read request.  128K was
-  // chosen empirically by trying values between 4K and 8M and optimizing for lower CPU
-  // utilization and higher S3 througput.
-  if (disk_id_ == io_mgr_->RemoteS3DiskId()) {
-    DCHECK(IsS3APath(file()));
-    return 128 * 1024;
-  }
-  if (disk_id_ == io_mgr_->RemoteAdlsDiskId()) {
-    DCHECK(IsADLSPath(file()));
-    return FLAGS_adls_read_chunk_size;
-  }
-  // The length argument of hdfsRead() is an int. Ensure we don't overflow it.
-  return numeric_limits<int>::max();
-}
-
-// TODO: how do we best use the disk here.  e.g. is it good to break up a
-// 1MB read into 8 128K reads?
-// TODO: look at linux disk scheduling
-Status DiskIoMgr::ScanRange::Read(
-    uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) {
-  unique_lock<mutex> hdfs_lock(hdfs_lock_);
-  if (is_cancelled_) return Status::CANCELLED;
-
-  *eosr = false;
-  *bytes_read = 0;
-  // Read until the end of the scan range or the end of the buffer.
-  int bytes_to_read = min(len_ - bytes_read_, buffer_len);
-  DCHECK_GE(bytes_to_read, 0);
-
-  if (fs_ != nullptr) {
-    HdfsFileHandle* borrowed_hdfs_fh = nullptr;
-    hdfsFile hdfs_file;
-
-    // If the scan range has an exclusive file handle, use it. Otherwise, borrow
-    // a file handle from the cache.
-    if (exclusive_hdfs_fh_ != nullptr) {
-      hdfs_file = exclusive_hdfs_fh_->file();
-    } else {
-      borrowed_hdfs_fh = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
-          mtime(), reader_, false);
-      if (borrowed_hdfs_fh == nullptr) {
-        return Status(TErrorCode::DISK_IO_ERROR,
-            GetHdfsErrorMsg("Failed to open HDFS file ", file_));
-      }
-      hdfs_file = borrowed_hdfs_fh->file();
-    }
-
-    int64_t max_chunk_size = MaxReadChunkSize();
-    Status status = Status::OK();
-    while (*bytes_read < bytes_to_read) {
-      int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size);
-      DCHECK_GE(chunk_size, 0);
-      // The hdfsRead() length argument is an int.
-      DCHECK_LE(chunk_size, numeric_limits<int>::max());
-      int current_bytes_read = -1;
-      // bytes_read_ is only updated after the while loop
-      int64_t position_in_file = offset_ + bytes_read_ + *bytes_read;
-      int num_retries = 0;
-      while (true) {
-        status = Status::OK();
-        // For file handles from the cache, any of the below file operations may fail
-        // due to a bad file handle. In each case, record the error, but allow for a
-        // retry to fix it.
-        if (FLAGS_use_hdfs_pread) {
-          current_bytes_read = hdfsPread(fs_, hdfs_file, position_in_file,
-              buffer + *bytes_read, chunk_size);
-          if (current_bytes_read == -1) {
-            status = Status(TErrorCode::DISK_IO_ERROR,
-                GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
-          }
-        } else {
-          // If the file handle is borrowed, it may not be at the appropriate
-          // location. Seek to the appropriate location.
-          bool seek_failed = false;
-          if (borrowed_hdfs_fh != nullptr) {
-            if (hdfsSeek(fs_, hdfs_file, position_in_file) != 0) {
-              status = Status(TErrorCode::DISK_IO_ERROR, Substitute("Error seeking to $0 "
-                  " in file: $1: $2", position_in_file, file_, GetHdfsErrorMsg("")));
-              seek_failed = true;
-            }
-          }
-          if (!seek_failed) {
-            current_bytes_read = hdfsRead(fs_, hdfs_file, buffer + *bytes_read,
-                chunk_size);
-            if (current_bytes_read == -1) {
-              status = Status(TErrorCode::DISK_IO_ERROR,
-                  GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
-            }
-          }
-        }
-
-        // Do not retry:
-        // - if read was successful (current_bytes_read != -1)
-        // - or if already retried once
-        // - or if this not using a borrowed file handle
-        DCHECK_LE(num_retries, 1);
-        if (current_bytes_read != -1 || borrowed_hdfs_fh == nullptr ||
-            num_retries == 1) {
-          break;
-        }
-        // The error may be due to a bad file handle. Reopen the file handle and retry.
-        ++num_retries;
-        RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(),
-            mtime(), &borrowed_hdfs_fh));
-        hdfs_file = borrowed_hdfs_fh->file();
-      }
-      if (!status.ok()) break;
-      if (current_bytes_read == 0) {
-        // No more bytes in the file. The scan range went past the end.
-        *eosr = true;
-        break;
-      }
-      *bytes_read += current_bytes_read;
-
-      // Collect and accumulate statistics
-      GetHdfsStatistics(hdfs_file);
-    }
-
-    if (borrowed_hdfs_fh != nullptr) {
-      io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), borrowed_hdfs_fh, false);
-    }
-    if (!status.ok()) return status;
-  } else {
-    DCHECK(local_file_ != nullptr);
-    *bytes_read = fread(buffer, 1, bytes_to_read, local_file_);
-    DCHECK_GE(*bytes_read, 0);
-    DCHECK_LE(*bytes_read, bytes_to_read);
-    if (*bytes_read < bytes_to_read) {
-      if (ferror(local_file_) != 0) {
-        return Status(TErrorCode::DISK_IO_ERROR, Substitute("Error reading from $0"
-            "at byte offset: $1: $2", file_, offset_ + bytes_read_, GetStrErrMsg()));
-      } else {
-        // On Linux, we should only get partial reads from block devices on error or eof.
-        DCHECK(feof(local_file_) != 0);
-        *eosr = true;
-      }
-    }
-  }
-  bytes_read_ += *bytes_read;
-  DCHECK_LE(bytes_read_, len_);
-  if (bytes_read_ == len_) *eosr = true;
-  return Status::OK();
-}
-
-Status DiskIoMgr::ScanRange::ReadFromCache(
-    const unique_lock<mutex>& reader_lock, bool* read_succeeded) {
-  DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
-  DCHECK(try_cache_);
-  DCHECK_EQ(bytes_read_, 0);
-  *read_succeeded = false;
-  Status status = Open(false);
-  if (!status.ok()) return status;
-
-  // Cached reads not supported on local filesystem.
-  if (fs_ == nullptr) return Status::OK();
-
-  {
-    unique_lock<mutex> hdfs_lock(hdfs_lock_);
-    if (is_cancelled_) return Status::CANCELLED;
-
-    DCHECK(exclusive_hdfs_fh_ != nullptr);
-    DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER);
-    cached_buffer_ =
-      hadoopReadZero(exclusive_hdfs_fh_->file(), io_mgr_->cached_read_options_, len());
-    if (cached_buffer_ != nullptr) {
-      external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
-    }
-  }
-  // Data was not cached, caller will fall back to normal read path.
-  if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) {
-    VLOG_QUERY << "Cache read failed for scan range: " << DebugString()
-               << ". Switching to disk read path.";
-    // Clean up the scan range state before re-issuing it.
-    Close();
-    return Status::OK();
-  }
-
-  // Cached read returned a buffer, verify we read the correct amount of data.
-  void* buffer = const_cast<void*>(hadoopRzBufferGet(cached_buffer_));
-  int32_t bytes_read = hadoopRzBufferLength(cached_buffer_);
-  // A partial read can happen when files are truncated.
-  // TODO: If HDFS ever supports partially cached blocks, we'll have to distinguish
-  // between errors and partially cached blocks here.
-  if (bytes_read < len()) {
-    VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ". Expected "
-      << len() << " bytes, but read " << bytes_read << ". Switching to disk read path.";
-    // Close the scan range. 'read_succeeded' is still false, so the caller will fall back
-    // to non-cached read of this scan range.
-    Close();
-    return Status::OK();
-  }
-
-  // Create a single buffer desc for the entire scan range and enqueue that.
-  // 'mem_tracker' is nullptr because the memory is owned by the HDFS java client,
-  // not the Impala backend.
-  unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(
-      io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0, nullptr));
-  desc->len_ = bytes_read;
-  desc->scan_range_offset_ = 0;
-  desc->eosr_ = true;
-  bytes_read_ = bytes_read;
-  EnqueueBuffer(reader_lock, move(desc));
-  if (reader_->bytes_read_counter_ != nullptr) {
-    COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
-  }
-  *read_succeeded = true;
-  reader_->num_used_buffers_.Add(1);
-  return Status::OK();
-}
-
-void DiskIoMgr::ScanRange::GetHdfsStatistics(hdfsFile hdfs_file) {
-  struct hdfsReadStatistics* stats;
-  if (IsHdfsPath(file())) {
-    int success = hdfsFileGetReadStatistics(hdfs_file, &stats);
-    if (success == 0) {
-      reader_->bytes_read_local_.Add(stats->totalLocalBytesRead);
-      reader_->bytes_read_short_circuit_.Add(stats->totalShortCircuitBytesRead);
-      reader_->bytes_read_dn_cache_.Add(stats->totalZeroCopyBytesRead);
-      if (stats->totalLocalBytesRead != stats->totalBytesRead) {
-        num_remote_bytes_ += stats->totalBytesRead - stats->totalLocalBytesRead;
-      }
-      hdfsFileFreeReadStatistics(stats);
-    }
-    hdfsFileClearReadStatistics(hdfs_file);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress-test.cc b/be/src/runtime/disk-io-mgr-stress-test.cc
deleted file mode 100644
index 7ae9515..0000000
--- a/be/src/runtime/disk-io-mgr-stress-test.cc
+++ /dev/null
@@ -1,60 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/disk-io-mgr-stress.h"
-#include "util/cpu-info.h"
-#include "util/string-parser.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-// Simple utility to run the disk io stress test.  A optional second parameter
-// can be passed to control how long to run this test (0 for forever).
-
-// TODO: make these configurable once we decide how to run BE tests with args
-const int DEFAULT_DURATION_SEC = 1;
-const int NUM_DISKS = 5;
-const int NUM_THREADS_PER_DISK = 5;
-const int NUM_CLIENTS = 10;
-const bool TEST_CANCELLATION = true;
-
-int main(int argc, char** argv) {
-  google::InitGoogleLogging(argv[0]);
-  CpuInfo::Init();
-  OsInfo::Init();
-  impala::InitThreading();
-  int duration_sec = DEFAULT_DURATION_SEC;
-
-  if (argc == 2) {
-    StringParser::ParseResult status;
-    duration_sec = StringParser::StringToInt<int>(argv[1], strlen(argv[1]), &status);
-    if (status != StringParser::PARSE_SUCCESS) {
-      printf("Invalid arg: %s\n", argv[1]);
-      return 1;
-    }
-  }
-  if (duration_sec != 0) {
-    printf("Running stress test for %d seconds.\n", duration_sec);
-  } else {
-    printf("Running stress test indefinitely.\n");
-  }
-  DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION);
-  test.Run(duration_sec);
-
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.cc b/be/src/runtime/disk-io-mgr-stress.cc
deleted file mode 100644
index a98c3a4..0000000
--- a/be/src/runtime/disk-io-mgr-stress.cc
+++ /dev/null
@@ -1,246 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <boost/thread/mutex.hpp>
-
-#include "runtime/disk-io-mgr-stress.h"
-
-#include "runtime/disk-io-mgr-reader-context.h"
-#include "util/time.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-static const float ABORT_CHANCE = .10f;
-static const int MIN_READ_LEN = 1;
-static const int MAX_READ_LEN = 20;
-
-static const int MIN_FILE_LEN = 10;
-static const int MAX_FILE_LEN = 1024;
-
-// Make sure this is between MIN/MAX FILE_LEN to test more cases
-static const int MIN_READ_BUFFER_SIZE = 64;
-static const int MAX_READ_BUFFER_SIZE = 128;
-
-static const int CANCEL_READER_PERIOD_MS = 20;  // in ms
-
-static void CreateTempFile(const char* filename, const char* data) {
-  FILE* file = fopen(filename, "w");
-  CHECK(file != NULL);
-  fwrite(data, 1, strlen(data), file);
-  fclose(file);
-}
-
-string GenerateRandomData() {
-  int rand_len = rand() % (MAX_FILE_LEN - MIN_FILE_LEN) + MIN_FILE_LEN;
-  stringstream ss;
-  for (int i = 0; i < rand_len; ++i) {
-    char c = rand() % 26 + 'a';
-    ss << c;
-  }
-  return ss.str();
-}
-
-struct DiskIoMgrStress::Client {
-  boost::mutex lock;
-  unique_ptr<DiskIoRequestContext> reader;
-  int file_idx;
-  vector<DiskIoMgr::ScanRange*> scan_ranges;
-  int abort_at_byte;
-  int files_processed;
-};
-
-DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
-     int num_clients, bool includes_cancellation) :
-    num_clients_(num_clients),
-    includes_cancellation_(includes_cancellation) {
-
-  time_t rand_seed = time(NULL);
-  LOG(INFO) << "Running with rand seed: " << rand_seed;
-  srand(rand_seed);
-
-  io_mgr_.reset(new DiskIoMgr(num_disks, num_threads_per_disk, num_threads_per_disk,
-      MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
-  Status status = io_mgr_->Init(&mem_tracker_);
-  CHECK(status.ok());
-
-  // Initialize some data files.  It doesn't really matter how many there are.
-  files_.resize(num_clients * 2);
-  for (int i = 0; i < files_.size(); ++i) {
-    stringstream ss;
-    ss << "/tmp/disk_io_mgr_stress_file" << i;
-    files_[i].filename = ss.str();
-    files_[i].data = GenerateRandomData();
-    CreateTempFile(files_[i].filename.c_str(), files_[i].data.c_str());
-  }
-
-  clients_ = new Client[num_clients_];
-  client_mem_trackers_.resize(num_clients_);
-  for (int i = 0; i < num_clients_; ++i) {
-    NewClient(i);
-  }
-}
-
-void DiskIoMgrStress::ClientThread(int client_id) {
-  Client* client = &clients_[client_id];
-  Status status;
-  char read_buffer[MAX_FILE_LEN];
-
-  while (!shutdown_) {
-    bool eos = false;
-    int bytes_read = 0;
-
-    const string& expected = files_[client->file_idx].data;
-
-    while (!eos) {
-      DiskIoMgr::ScanRange* range;
-      Status status = io_mgr_->GetNextRange(client->reader.get(), &range);
-      CHECK(status.ok() || status.IsCancelled());
-      if (range == NULL) break;
-
-      while (true) {
-        unique_ptr<DiskIoMgr::BufferDescriptor> buffer;
-        status = range->GetNext(&buffer);
-        CHECK(status.ok() || status.IsCancelled());
-        if (buffer == NULL) break;
-
-        int64_t scan_range_offset = buffer->scan_range_offset();
-        int len = buffer->len();
-        CHECK_GE(scan_range_offset, 0);
-        CHECK_LT(scan_range_offset, expected.size());
-        CHECK_GT(len, 0);
-
-        // We get scan ranges back in arbitrary order so the scan range to the file
-        // offset.
-        int64_t file_offset = scan_range_offset + range->offset();
-
-        // Validate the bytes read
-        CHECK_LE(file_offset + len, expected.size());
-        CHECK_EQ(strncmp(reinterpret_cast<char*>(buffer->buffer()),
-                     &expected.c_str()[file_offset], len), 0);
-
-        // Copy the bytes from this read into the result buffer.
-        memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
-        io_mgr_->ReturnBuffer(move(buffer));
-        bytes_read += len;
-
-        CHECK_GE(bytes_read, 0);
-        CHECK_LE(bytes_read, expected.size());
-
-        if (bytes_read > client->abort_at_byte) {
-          eos = true;
-          break;
-        }
-      } // End of buffer
-    } // End of scan range
-
-    if (bytes_read == expected.size()) {
-      // This entire file was read without being cancelled, validate the entire result
-      CHECK(status.ok());
-      CHECK_EQ(strncmp(read_buffer, expected.c_str(), bytes_read), 0);
-    }
-
-    // Unregister the old client and get a new one
-    unique_lock<mutex> lock(client->lock);
-    io_mgr_->UnregisterContext(client->reader.get());
-    NewClient(client_id);
-  }
-
-  unique_lock<mutex> lock(client->lock);
-  io_mgr_->UnregisterContext(client->reader.get());
-  client->reader = NULL;
-}
-
-// Cancel a random reader
-void DiskIoMgrStress::CancelRandomReader() {
-  if (!includes_cancellation_) return;
-
-  int rand_client = rand() % num_clients_;
-
-  unique_lock<mutex> lock(clients_[rand_client].lock);
-  io_mgr_->CancelContext(clients_[rand_client].reader.get());
-}
-
-void DiskIoMgrStress::Run(int sec) {
-  shutdown_ = false;
-  for (int i = 0; i < num_clients_; ++i) {
-    readers_.add_thread(
-        new thread(&DiskIoMgrStress::ClientThread, this, i));
-  }
-
-  // Sleep and let the clients do their thing for 'sec'
-  for (int loop_count = 1; sec == 0 || loop_count <= sec; ++loop_count) {
-    int iter = (1000) / CANCEL_READER_PERIOD_MS;
-    for (int i = 0; i < iter; ++i) {
-      SleepForMs(CANCEL_READER_PERIOD_MS);
-      CancelRandomReader();
-    }
-    LOG(ERROR) << "Finished iteration: " << loop_count;
-  }
-
-  // Signal shutdown for the client threads
-  shutdown_ = true;
-
-  for (int i = 0; i < num_clients_; ++i) {
-    unique_lock<mutex> lock(clients_[i].lock);
-    if (clients_[i].reader != NULL) io_mgr_->CancelContext(clients_[i].reader.get());
-  }
-
-  readers_.join_all();
-}
-
-// Initialize a client to read one of the files at random.  The scan ranges are
-// assigned randomly.
-void DiskIoMgrStress::NewClient(int i) {
-  Client& client = clients_[i];
-  ++client.files_processed;
-  client.file_idx = rand() % files_.size();
-  int file_len = files_[client.file_idx].data.size();
-
-  client.abort_at_byte = file_len;
-
-  if (includes_cancellation_) {
-    float rand_value = rand() / (float)RAND_MAX;
-    if (rand_value < ABORT_CHANCE) {
-      // Abort at a random byte inside the file
-      client.abort_at_byte = rand() % file_len;
-    }
-  }
-
-  for (int i = 0; i < client.scan_ranges.size(); ++i) {
-    delete client.scan_ranges[i];
-  }
-  client.scan_ranges.clear();
-
-  int assigned_len = 0;
-  while (assigned_len < file_len) {
-    int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN;
-    range_len = min(range_len, file_len - assigned_len);
-
-    DiskIoMgr::ScanRange* range = new DiskIoMgr::ScanRange();
-    range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
-        0, false, DiskIoMgr::BufferOpts::Uncached());
-    client.scan_ranges.push_back(range);
-    assigned_len += range_len;
-  }
-
-  client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_));
-  client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get());
-  Status status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
-  CHECK(status.ok());
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-stress.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.h b/be/src/runtime/disk-io-mgr-stress.h
deleted file mode 100644
index 0a66f2c..0000000
--- a/be/src/runtime/disk-io-mgr-stress.h
+++ /dev/null
@@ -1,94 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_RUNTIME_DISK_IO_MGR_STRESS_H
-#define IMPALA_RUNTIME_DISK_IO_MGR_STRESS_H
-
-#include <memory>
-#include <vector>
-#include <boost/scoped_ptr.hpp>
-#include <boost/thread/thread.hpp>
-
-#include "runtime/disk-io-mgr.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/thread-resource-mgr.h"
-
-namespace impala {
-
-/// Test utility to stress the disk io mgr.  It allows for a configurable
-/// number of clients.  The clients continuously issue work to the io mgr and
-/// asynchronously get cancelled.  The stress test can be run forever or for
-/// a fixed duration.  The unit test runs this for a fixed duration.
-class DiskIoMgrStress {
- public:
-  DiskIoMgrStress(int num_disks, int num_threads_per_disk, int num_clients,
-      bool includes_cancellation);
-
-  /// Run the test for 'sec'.  If 0, run forever
-  void Run(int sec);
-
- private:
-  struct Client;
-
-  struct File {
-    std::string filename;
-    std::string data;  // the data in the file, used to validate
-  };
-
-
-  /// Files used for testing.  These are created at startup and recycled
-  /// during the test
-  std::vector<File> files_;
-
-  /// Root mem tracker.
-  MemTracker mem_tracker_;
-
-  /// io manager
-  boost::scoped_ptr<DiskIoMgr> io_mgr_;
-
-  /// Thread group for reader threads
-  boost::thread_group readers_;
-
-  /// Array of clients
-  int num_clients_;
-  Client* clients_;
-
-  /// Client MemTrackers, one per client.
-  std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_;
-
-  /// If true, tests cancelling readers
-  bool includes_cancellation_;
-
-  /// Flag to signal that client reader threads should exit
-  volatile bool shutdown_;
-
-  /// Helper to initialize a new reader client, registering a new reader with the
-  /// io mgr and initializing the scan ranges
-  void NewClient(int i);
-
-  /// Thread running the reader.  When the current reader is done (either normally
-  /// or cancelled), it picks up a new reader
-  void ClientThread(int client_id);
-
-  /// Possibly cancels a random reader.
-  void CancelRandomReader();
-};
-
-}
-
-#endif


Mime
View raw message