Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8B1F6200B50 for ; Sat, 30 Jul 2016 03:49:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 89C84160AA6; Sat, 30 Jul 2016 01:49:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A75A1160AA7 for ; Sat, 30 Jul 2016 03:49:37 +0200 (CEST) Received: (qmail 76675 invoked by uid 500); 30 Jul 2016 01:49:36 -0000 Mailing-List: contact commits-help@impala.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@impala.incubator.apache.org Delivered-To: mailing list commits@impala.incubator.apache.org Received: (qmail 76636 invoked by uid 99); 30 Jul 2016 01:49:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Jul 2016 01:49:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 7535FC9191 for ; Sat, 30 Jul 2016 01:49:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.507 X-Spam-Level: X-Spam-Status: No, score=-4.507 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.287] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id JvSZT3CxaT2m for ; Sat, 30 Jul 2016 01:49:34 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 3D8505FB95 for ; Sat, 30 Jul 2016 01:49:33 +0000 (UTC) Received: (qmail 76573 invoked by uid 99); 30 Jul 2016 01:49:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 30 Jul 2016 01:49:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 39DC7ED225; Sat, 30 Jul 2016 01:49:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: abehm@apache.org To: commits@impala.incubator.apache.org Date: Sat, 30 Jul 2016 01:49:36 -0000 Message-Id: <993f41382387495bbbdcea9596da5be1@git.apache.org> In-Reply-To: <7fe901c5c5fa47319d77f4cf50cf67de@git.apache.org> References: <7fe901c5c5fa47319d77f4cf50cf67de@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/8] incubator-impala git commit: IMPALA-3857: KuduScanNode race on returning "optional" threads archived-at: Sat, 30 Jul 2016 01:49:38 -0000 IMPALA-3857: KuduScanNode race on returning "optional" threads The KuduScanNode could return all active scanner threads when there were no more "optional" tokens available. In this case, any remaining scan ranges wouldn't be picked up and the query would produce incorrect results. This fixes the issue by cleaning up the ScannerThread code and making sure not to exit the last thread. This was tested by running the tpch workload repeatedly under load. That work to incorporate tpch data loading for Kudu is actively being worked on, so this was tested manually. Change-Id: I22adf2109b43b1b37d9a597de85e063431dff155 Reviewed-on: http://gerrit.cloudera.org:8080/3798 Reviewed-by: Dan Hecht Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/857b94d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/857b94d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/857b94d0 Branch: refs/heads/master Commit: 857b94d03cf719da37d6bb95695c57944f004d36 Parents: c77fb62 Author: Matthew Jacobs Authored: Mon Jul 25 16:59:56 2016 -0700 Committer: Internal Jenkins Committed: Fri Jul 29 21:36:50 2016 +0000 ---------------------------------------------------------------------- be/src/exec/kudu-scan-node.cc | 98 +++++++++++++++++++++++--------------- be/src/exec/kudu-scan-node.h | 10 ++-- 2 files changed, 66 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/857b94d0/be/src/exec/kudu-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc index a08034f..c2ed16c 100644 --- a/be/src/exec/kudu-scan-node.cc +++ b/be/src/exec/kudu-scan-node.cc @@ -408,60 +408,80 @@ void KuduScanNode::ThreadTokenAvailableCb( } } -void KuduScanNode::ScannerThread(const string& name, const TKuduKeyRange* key_range) { +Status KuduScanNode::ProcessRange(KuduScanner* scanner, const TKuduKeyRange* key_range) { + RETURN_IF_ERROR(scanner->OpenNextRange(*key_range)); + bool eos = false; + while (!eos) { + gscoped_ptr row_batch(new RowBatch( + row_desc(), runtime_state_->batch_size(), mem_tracker())); + RETURN_IF_ERROR(scanner->GetNext(row_batch.get(), &eos)); + while (!done_) { + scanner->KeepKuduScannerAlive(); + if (materialized_row_batches_->AddBatchWithTimeout(row_batch.get(), 1000000)) { + ignore_result(row_batch.release()); + break; + } + } + } + // Mark the current scan range as complete. + if (eos) scan_ranges_complete_counter()->Add(1); + return Status::OK(); +} + +void KuduScanNode::ScannerThread(const string& name, const TKuduKeyRange* initial_range) { + DCHECK(initial_range != NULL); SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters()); SCOPED_TIMER(runtime_state_->total_cpu_timer()); + // Set to true if this thread observes that the number of optional threads has been + // exceeded and is exiting early. + bool optional_thread_exiting = false; KuduScanner scanner(this, runtime_state_); Status status = scanner.Open(); - if (!status.ok()) goto done; - - while (!done_) { - status = scanner.OpenNextRange(*key_range); - if (!status.ok()) goto done; - // Keep looping through all the ranges. - bool eos = false; - while (!eos) { - // Keep looping through all the rows. - gscoped_ptr row_batch(new RowBatch( - row_desc(), runtime_state_->batch_size(), mem_tracker())); - status = scanner.GetNext(row_batch.get(), &eos); - if (!status.ok()) goto done; - while (true) { - if (done_) goto done; - scanner.KeepKuduScannerAlive(); - if (materialized_row_batches_->AddBatchWithTimeout(row_batch.get(), 1000000)) { - ignore_result(row_batch.release()); + if (status.ok()) { + const TKuduKeyRange* key_range = initial_range; + while (!done_ && key_range != NULL) { + status = ProcessRange(&scanner, key_range); + if (!status.ok()) break; + + // Check if the number of optional threads has been exceeded. + if (runtime_state_->resource_pool()->optional_exceeded()) { + unique_lock l(lock_); + // Don't exit if this is the last thread. Otherwise, the scan will indicate it's + // done before all ranges have been processed. + if (num_active_scanners_ > 1) { + --num_active_scanners_; + optional_thread_exiting = true; break; } } + key_range = GetNextKeyRange(); } - // Mark the current scan range as complete. - scan_ranges_complete_counter()->Add(1); - if (runtime_state_->resource_pool()->optional_exceeded()) goto done; - key_range = GetNextKeyRange(); - if (key_range == NULL) goto done; + scanner.Close(); } -done: - VLOG(1) << "Thread done: " << name; - scanner.Close(); - runtime_state_->resource_pool()->ReleaseThreadToken(false); - - unique_lock l(lock_); - if (!status.ok()) { - if (status_.ok()) { - status_ = status; + { + unique_lock l(lock_); + if (!status.ok()) { + if (status_.ok()) { + status_ = status; + done_ = true; + } + } + // Decrement num_active_scanners_ unless handling the case of an early exit when + // optional threads have been exceeded, in which case it already was decremented. + if (!optional_thread_exiting) --num_active_scanners_; + if (num_active_scanners_ == 0) { done_ = true; + materialized_row_batches_->Shutdown(); } } - --num_active_scanners_; - if (num_active_scanners_ == 0) { - // If we got here and we are the last thread, we're all done. - done_ = true; - materialized_row_batches_->Shutdown(); - } + + // lock_ is released before calling ThreadResourceMgr::ReleaseThreadToken() which + // invokes ThreadTokenAvailableCb() which attempts to take the same lock. + VLOG(1) << "Thread done: " << name; + runtime_state_->resource_pool()->ReleaseThreadToken(false); } } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/857b94d0/be/src/exec/kudu-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h index 329f52a..5dfb309 100644 --- a/be/src/exec/kudu-scan-node.h +++ b/be/src/exec/kudu-scan-node.h @@ -163,11 +163,15 @@ class KuduScanNode : public ScanNode { void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool); /// Main function for scanner thread which executes a KuduScanner. Begins by processing - /// 'initial_range', once that range is completed it fetches more ranges with 'GetNextKeyRange()' - /// until there are no more ranges to fetch, an error occurred or the limit has been reached. - /// Scanned batches are enqueued in 'materialized_row_batches_'. + /// 'initial_range', and continues processing ranges returned by 'GetNextKeyRange()' + /// until there are no more ranges, an error occurs, or the limit is reached. void ScannerThread(const string& name, const TKuduKeyRange* initial_range); + /// Processes a single scan range. Row batches are fetched using 'scanner' and enqueued + /// in 'materialized_row_batches_' until the scanner reports eos for 'key_range', an + /// error occurs, or the limit is reached. + Status ProcessRange(KuduScanner* scanner, const TKuduKeyRange* key_range); + /// Returns the next partition key range to read. Thread safe. Returns NULL if there are /// no more ranges. TKuduKeyRange* GetNextKeyRange();