impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [61/61] incubator-impala git commit: IMPALA-4180: Synchronize accesses to RuntimeState::reader_contexts_
Date Fri, 30 Sep 2016 02:15:18 GMT
IMPALA-4180: Synchronize accesses to RuntimeState::reader_contexts_

HdfsScanNodeBase::Close() may add its outstanding DiskIO context to
RuntimeState::reader_contexts_ to be unregistered later when the
fragment is closed. In a plan fragment with multiple HDFS scan nodes,
it's possible for HdfsScanNodeBase::Close() to be called concurrently.
To allow safe concurrent accesses, this change adds a SpinLock to
synchronize accesses to 'reader_contexts_' in RuntimeState.

Change-Id: I911fda526a99514b12f88a3e9fb5952ea4fe1973
Reviewed-on: http://gerrit.cloudera.org:8080/4558
Reviewed-by: Dan Hecht <dhecht@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2a31fbdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2a31fbdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2a31fbdb

Branch: refs/heads/master
Commit: 2a31fbdbfac9a7092c96e4ab9894e0db0e4ce9ca
Parents: f640b3a
Author: Michael Ho <kwho@cloudera.com>
Authored: Wed Sep 28 14:32:55 2016 -0700
Committer: Internal Jenkins <cloudera-hudson@gerrit.cloudera.org>
Committed: Fri Sep 30 01:21:05 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scan-node-base.cc                   |  2 +-
 be/src/runtime/plan-fragment-executor.cc             |  4 +---
 be/src/runtime/runtime-state.cc                      | 13 +++++++++++++
 be/src/runtime/runtime-state.h                       | 12 +++++++++++-
 .../queries/QueryTest/single-node-nlj.test           | 15 +++++++++++++--
 5 files changed, 39 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2a31fbdb/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index c03817b..4acf3f5 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -446,7 +446,7 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
     // There may still be io buffers used by parent nodes so we can't unregister the
     // reader context yet. The runtime state keeps a list of all the reader contexts and
     // they are unregistered when the fragment is closed.
-    state->reader_contexts()->push_back(reader_context_);
+    state->AcquireReaderContext(reader_context_);
     // Need to wait for all the active scanner threads to finish to ensure there is no
     // more memory tracked by this scan node's mem tracker.
     state->io_mgr()->CancelContext(reader_context_, true);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2a31fbdb/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 7300f44..e0d314b 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -508,9 +508,7 @@ void PlanFragmentExecutor::Close() {
   // Prepare may not have been called, which sets runtime_state_
   if (runtime_state_.get() != NULL) {
     if (plan_ != NULL) plan_->Close(runtime_state_.get());
-    for (DiskIoRequestContext* context: *runtime_state_->reader_contexts()) {
-      runtime_state_->io_mgr()->UnregisterContext(context);
-    }
+    runtime_state_->UnregisterReaderContexts();
     exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
     runtime_state_->desc_tbl().ClosePartitionExprs(runtime_state_.get());
     runtime_state_->filter_bank()->Close();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2a31fbdb/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 5249076..a05b3ef 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -292,4 +292,17 @@ Status RuntimeState::GetCodegen(LlvmCodeGen** codegen, bool initialize)
{
   return Status::OK();
 }
 
+void RuntimeState::AcquireReaderContext(DiskIoRequestContext* reader_context) {
+  boost::lock_guard<SpinLock> l(reader_contexts_lock_);
+  reader_contexts_.push_back(reader_context);
+}
+
+void RuntimeState::UnregisterReaderContexts() {
+  boost::lock_guard<SpinLock> l(reader_contexts_lock_);
+  for (DiskIoRequestContext* context : reader_contexts_) {
+    io_mgr()->UnregisterContext(context);
+  }
+  reader_contexts_.clear();
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2a31fbdb/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 0bf9db5..3496d9c 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -138,7 +138,6 @@ class RuntimeState {
   ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
 
   FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; }
-  std::vector<DiskIoRequestContext*>* reader_contexts() { return &reader_contexts_;
}
 
   void set_fragment_root_id(PlanNodeId id) {
     DCHECK_EQ(root_node_id_, -1) << "Should not set this twice.";
@@ -163,6 +162,14 @@ class RuntimeState {
   /// even when codegen is enabled if nothing has been codegen'd.
   bool codegen_created() const { return codegen_.get() != NULL; }
 
+  /// Takes ownership of a scan node's reader context and plan fragment executor will call
+  /// UnregisterReaderContexts() to unregister it when the fragment is closed. The IO
+  /// buffers may still be in use and thus the deferred unregistration.
+  void AcquireReaderContext(DiskIoRequestContext* reader_context);
+
+  /// Unregisters all reader contexts acquired through AcquireReaderContext().
+  void UnregisterReaderContexts();
+
   /// Returns codegen_ in 'codegen'. If 'initialize' is true, codegen_ will be created if
   /// it has not been initialized by a previous call already. If 'initialize' is false,
   /// 'codegen' will be set to NULL if codegen_ has not been initialized.
@@ -344,6 +351,9 @@ class RuntimeState {
   Status query_status_;
 
   /// Reader contexts that need to be closed when the fragment is closed.
+  /// Synchronization is needed if there are multiple scan nodes in a plan fragment and
+  /// Close() may be called on them concurrently (see IMPALA-4180).
+  SpinLock reader_contexts_lock_;
   std::vector<DiskIoRequestContext*> reader_contexts_;
 
   /// BufferedBlockMgr object used to allocate and manage blocks of input data in memory

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2a31fbdb/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test b/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test
index 49cdf9d..fa1ccfc 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test
@@ -149,7 +149,6 @@ group by id) v2;
 ---- TYPES
 TINYINT,INT,BIGINT
 ====
-====
 ---- QUERY
 # Regression test for IMPALA-561: Multiple scan nodes in a plan fragment.
 select count(*)
@@ -161,6 +160,18 @@ left join functional.alltypes a2 on a2.tinyint_col >= 1
 BIGINT
 ====
 ---- QUERY
+# Regression test for IMPALA-4180: a single node plan with blocking join node
+# and multiple top-n + scan nodes to trigger concurrent Close() on scan nodes.
+with t as (select int_col x from functional.alltypestiny order by id limit 2)
+select * from t t1 left join t t2 on t1.x > 0
+---- RESULTS
+0,NULL
+1,0
+1,1
+---- TYPES
+INT,INT
+====
+---- QUERY
 # Right non-equi-join with empty build.
 select straight_join at.id
 from alltypes at
@@ -211,4 +222,4 @@ limit 5
 7295
 ---- TYPES
 INT
-====
+====
\ No newline at end of file


Mime
View raw message