impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject incubator-impala git commit: IMPALA-6137: fix text scanner split delim mem mgmt
Date Fri, 03 Nov 2017 22:23:34 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 1803b403e -> 6b37a793a


IMPALA-6137: fix text scanner split delim mem mgmt

The bug was that the buffer pointed to by byte_buffer_ptr_ could be freed
by ReleaseCompletedResources() before CheckForSplitDelimiter() was called.

The simple fix is to copy out the single byte that is needed each time
the buffer is filled.

Testing:
Ran exhaustive query tests under ASAN with --disable_mem_pools=true.

Before the change test_text_split_delimiters reliably caused an ASAN
failure when run with --disable_mem_pools=true. We should get this
coverage automatically once the I/O mgr switches to using the buffer
pool, which uses ASAN poisoning on freed buffers.

Change-Id: Iddbb5cf6acc8f0b0e0b4c205c334f21e03d06f1c
Reviewed-on: http://gerrit.cloudera.org:8080/8438
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/6b37a793
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/6b37a793
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/6b37a793

Branch: refs/heads/master
Commit: 6b37a793a0af74ba01c67bb5ac7285af7c5c4d52
Parents: 1803b40
Author: Tim Armstrong <tarmstrong@cloudera.com>
Authored: Thu Nov 2 13:04:49 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Fri Nov 3 21:38:20 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-text-scanner.cc | 27 ++++++++++++++++++++-------
 be/src/exec/hdfs-text-scanner.h  | 23 +++++++++++++++++++++++
 2 files changed, 43 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6b37a793/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index a548ca1..f1c1ff5 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -57,6 +57,7 @@ HdfsTextScanner::HdfsTextScanner(HdfsScanNodeBase* scan_node, RuntimeState*
stat
       byte_buffer_ptr_(nullptr),
       byte_buffer_end_(nullptr),
       byte_buffer_read_size_(0),
+      byte_buffer_filled_(false),
       only_parsing_header_(false),
       scan_state_(CONSTRUCTED),
       boundary_pool_(new MemPool(scan_node->mem_tracker())),
@@ -234,6 +235,7 @@ Status HdfsTextScanner::ResetScanner() {
   boundary_row_.Clear();
   delimited_text_parser_->ParserReset();
   byte_buffer_ptr_ = byte_buffer_end_ = nullptr;
+  byte_buffer_filled_ = false;
   partial_tuple_ = nullptr;
 
   // Initialize codegen fn
@@ -271,7 +273,8 @@ Status HdfsTextScanner::FinishScanRange(RowBatch* row_batch) {
     // TODO: calling FillByteBuffer() at eof() can cause
     // ScannerContext::Stream::GetNextBuffer to DCHECK. Fix this.
     if (decompressor_.get() == nullptr && !stream_->eof()) {
-      status = FillByteBuffer(row_batch->tuple_data_pool(), &eosr, NEXT_BLOCK_READ_SIZE);
+      status =
+        FillByteBufferWrapper(row_batch->tuple_data_pool(), &eosr, NEXT_BLOCK_READ_SIZE);
     }
 
     if (!status.ok() || byte_buffer_read_size_ == 0) {
@@ -342,7 +345,7 @@ Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, int* num_tuples)
{
   bool eosr = stream_->eosr() || scan_state_ == PAST_SCAN_RANGE;
   while (true) {
     if (!eosr && byte_buffer_ptr_ == byte_buffer_end_) {
-      RETURN_IF_ERROR(FillByteBuffer(pool, &eosr));
+      RETURN_IF_ERROR(FillByteBufferWrapper(pool, &eosr));
     }
 
     TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow());
@@ -462,6 +465,16 @@ Status HdfsTextScanner::GetNextInternal(RowBatch* row_batch) {
   return Status::OK();
 }
 
+Status HdfsTextScanner::FillByteBufferWrapper(
+    MemPool* pool, bool* eosr, int num_bytes) {
+  RETURN_IF_ERROR(FillByteBuffer(pool, eosr, num_bytes));
+  if (byte_buffer_read_size_ > 0) {
+    byte_buffer_filled_ = true;
+    byte_buffer_last_byte_ = byte_buffer_end_[-1];
+  }
+  return Status::OK();
+}
+
 Status HdfsTextScanner::FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes) {
   *eosr = false;
 
@@ -648,7 +661,7 @@ Status HdfsTextScanner::FindFirstTuple(MemPool* pool) {
     // Offset maybe not point to a tuple boundary, skip ahead to the first tuple start in
     // this scan range (if one exists).
     do {
-      RETURN_IF_ERROR(FillByteBuffer(nullptr, &eosr));
+      RETURN_IF_ERROR(FillByteBufferWrapper(nullptr, &eosr));
 
       delimited_text_parser_->ParserReset();
       SCOPED_TIMER(parse_delimiter_timer_);
@@ -678,7 +691,7 @@ Status HdfsTextScanner::FindFirstTuple(MemPool* pool) {
         } else {
           // Split delimiter at the end of the current buffer, but not eosr. Advance to
           // the correct position in the next buffer.
-          RETURN_IF_ERROR(FillByteBuffer(pool, &eosr));
+          RETURN_IF_ERROR(FillByteBufferWrapper(pool, &eosr));
           DCHECK_GT(byte_buffer_read_size_, 0);
           DCHECK_EQ(*byte_buffer_ptr_, '\n');
           byte_buffer_ptr_ += 1;
@@ -703,13 +716,13 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter)
{
   DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_);
   *split_delimiter = false;
 
-  // Nothing in buffer
-  if (byte_buffer_read_size_ == 0) return Status::OK();
+  // Nothing was ever read for this scan range.
+  if (!byte_buffer_filled_) return Status::OK();
 
   // If the line delimiter is "\n" (meaning we also accept "\r" and "\r\n" as delimiters)
   // and the current buffer ends with '\r', this could be a "\r\n" delimiter.
   bool split_delimiter_possible = context_->partition_descriptor()->line_delim() ==
'\n'
-      && *(byte_buffer_end_ - 1) == '\r';
+      && byte_buffer_last_byte_ == '\r';
   if (!split_delimiter_possible) return Status::OK();
 
   // The '\r' may be escaped. If it's not the text parser will report a complete tuple.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6b37a793/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 069ca7d..610c612 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -85,6 +85,17 @@ class HdfsTextScanner : public HdfsScanner {
   /// Actual bytes received from last file read.
   int64_t byte_buffer_read_size_;
 
+  /// Last character of the last byte buffer filled, i.e. byte_buffer_end_[-1] when the
+  /// buffer was last filled with data. Set in FillByteBufferWrapper() and used in
+  /// CheckForSplitDelimiter(). Copied out of the byte buffer so that it can be
+  /// referenced after the byte buffer is freed or transferred, e.g. in CommitRows().
+  /// Valid if 'byte_buffer_filled_' is true.
+  char byte_buffer_last_byte_;
+
+  /// True if the byte buffer was filled with at least a byte of data since the last time
+  /// the scanner was reset. Set in FillByteBufferWrapper().
+  bool byte_buffer_filled_;
+
   /// True if we are parsing the header for this scanner.
   bool only_parsing_header_;
 
@@ -127,6 +138,14 @@ class HdfsTextScanner : public HdfsScanner {
   /// advances the scan state to DONE. Only valid to call in state PAST_SCAN_RANGE.
   Status FinishScanRange(RowBatch* row_batch) WARN_UNUSED_RESULT;
 
+  /// Wrapper around FillByteBuffer() that also updates 'byte_buffer_last_byte_'
+  /// and 'byte_buffer_filled_'. Callers should call this instead of calling
+  /// FillByteBuffer() directly.
+  /// TODO: IMPALA-6146: this is a workaround that could be removed if FillByteBuffer()
+  /// was a cleaner interface.
+  Status FillByteBufferWrapper(MemPool* pool, bool* eosr, int num_bytes = 0)
+      WARN_UNUSED_RESULT;
+
   /// Fills the next byte buffer from the context.  This will block if there are no bytes
   /// ready.  Updates byte_buffer_ptr_, byte_buffer_end_ and byte_buffer_read_size_.
   /// If num_bytes is 0, the scanner will read whatever is the io mgr buffer size,
@@ -136,6 +155,10 @@ class HdfsTextScanner : public HdfsScanner {
   /// If applicable, attaches decompression buffers from previous calls that might still
   /// be referenced by returned batches to 'pool'. If 'pool' is nullptr the buffers are
   /// freed instead.
+  ///
+  /// Subclasses can override this function to implement different behaviour.
+  /// TODO: IMPALA-6146: rethink this interface - having subclasses modify member
+  /// variables is brittle.
   virtual Status FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes = 0)
       WARN_UNUSED_RESULT;
 


Mime
View raw message