Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3BCF7200497 for ; Wed, 23 Aug 2017 17:41:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3A7AA16902D; Wed, 23 Aug 2017 15:41:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 33C24169036 for ; Wed, 23 Aug 2017 17:41:18 +0200 (CEST) Received: (qmail 5521 invoked by uid 500); 23 Aug 2017 15:41:16 -0000 Mailing-List: contact commits-help@impala.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@impala.incubator.apache.org Delivered-To: mailing list commits@impala.incubator.apache.org Received: (qmail 5511 invoked by uid 99); 23 Aug 2017 15:41:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Aug 2017 15:41:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 29FC0180652 for ; Wed, 23 Aug 2017 15:41:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id mHumRXe1dk6j for ; Wed, 23 Aug 2017 15:41:13 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id D78375F4DC for ; Wed, 23 Aug 2017 15:41:12 +0000 (UTC) Received: (qmail 4070 invoked by uid 99); 23 Aug 2017 15:41:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Aug 2017 15:41:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F2FEBE053D; Wed, 23 Aug 2017 15:41:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tarmstrong@apache.org To: commits@impala.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-impala git commit: IMPALA-5823: fix SET_DENY_RESERVATION_PROBABILITY Date: Wed, 23 Aug 2017 15:41:11 +0000 (UTC) archived-at: Wed, 23 Aug 2017 15:41:19 -0000 Repository: incubator-impala Updated Branches: refs/heads/master 679ebc1ac -> d8bc570b6 IMPALA-5823: fix SET_DENY_RESERVATION_PROBABILITY Sometimes the client is not open when the debug action fires at the start of Open() or Prepare(). In that case we should set the probability when the client is opened later. This caused one of the large row tests to start failing with a "failed to repartition" error in the aggregation. The error is a false positive caused by two distinct keys hashing to the same partition. Removing the check allows the query to succeed because the keys hash to different partitions in the next round of repartitioning. If we repeatedly get unlucky and have collisions, the query will still fail when it reaches MAX_PARTITION_DEPTH. Testing: Ran TestSpilling in a loop for a couple of hours, including the exhaustive-only tests. Change-Id: Ib26b697544d6c2312a8e1fe91b0cf8c0917e5603 Reviewed-on: http://gerrit.cloudera.org:8080/7771 Reviewed-by: Tim Armstrong Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d8bc570b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d8bc570b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d8bc570b Branch: refs/heads/master Commit: d8bc570b67930dd136a7a10fc05dfa4b995c65fa Parents: 679ebc1 Author: Tim Armstrong Authored: Mon Aug 21 16:47:50 2017 -0700 Committer: Impala Public Jenkins Committed: Wed Aug 23 07:18:33 2017 +0000 ---------------------------------------------------------------------- be/src/exec/exec-node.cc | 38 +++++++++++++++++------- be/src/exec/exec-node.h | 5 ++++ be/src/exec/partitioned-aggregation-node.cc | 26 ---------------- be/src/exec/partitioned-aggregation-node.h | 5 ---- common/thrift/generate_error_codes.py | 4 +-- 5 files changed, 34 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d8bc570b/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index afd7262..b326d94 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -244,6 +244,13 @@ Status ExecNode::ClaimBufferReservation(RuntimeState* state) { VLOG_FILE << id_ << " claiming reservation " << resource_profile_.min_reservation; state->query_state()->initial_reservations()->Claim( &buffer_pool_client_, resource_profile_.min_reservation); + if (debug_action_ == TDebugAction::SET_DENY_RESERVATION_PROBABILITY && + (debug_phase_ == TExecNodePhase::PREPARE || debug_phase_ == TExecNodePhase::OPEN)) { + // We may not have been able to enable the debug action at the start of Prepare() or + // Open() because the client is not registered then. Do it now to be sure that it is + // effective. + RETURN_IF_ERROR(EnableDenyReservationDebugAction()); + } return Status::OK(); } @@ -251,6 +258,22 @@ Status ExecNode::ReleaseUnusedReservation() { return buffer_pool_client_.DecreaseReservationTo(resource_profile_.min_reservation); } +Status ExecNode::EnableDenyReservationDebugAction() { + DCHECK_EQ(debug_action_, TDebugAction::SET_DENY_RESERVATION_PROBABILITY); + DCHECK(buffer_pool_client_.is_registered()); + // Parse [0.0, 1.0] probability. + StringParser::ParseResult parse_result; + double probability = StringParser::StringToFloat( + debug_action_param_.c_str(), debug_action_param_.size(), &parse_result); + if (parse_result != StringParser::PARSE_SUCCESS || probability < 0.0 + || probability > 1.0) { + return Status(Substitute( + "Invalid SET_DENY_RESERVATION_PROBABILITY param: '$0'", debug_action_param_)); + } + buffer_pool_client_.SetDebugDenyIncreaseReservation(probability); + return Status::OK(); +} + Status ExecNode::CreateTree( RuntimeState* state, const TPlan& plan, const DescriptorTbl& descs, ExecNode** root) { if (plan.nodes.size() == 0) { @@ -463,17 +486,12 @@ Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* s return mem_tracker()->MemLimitExceeded(state, "Debug Action: MEM_LIMIT_EXCEEDED"); } else { DCHECK_EQ(debug_action_, TDebugAction::SET_DENY_RESERVATION_PROBABILITY); + // We can only enable the debug action right if the buffer pool client is registered. + // If the buffer client is not registered at this point (e.g. if phase is PREPARE or + // OPEN), then we will enable the debug action at the time when the client is + // registered. if (buffer_pool_client_.is_registered()) { - // Parse [0.0, 1.0] probability. - StringParser::ParseResult parse_result; - double probability = StringParser::StringToFloat( - debug_action_param_.c_str(), debug_action_param_.size(), &parse_result); - if (parse_result != StringParser::PARSE_SUCCESS || probability < 0.0 - || probability > 1.0) { - return Status(Substitute( - "Invalid SET_DENY_RESERVATION_PROBABILITY param: '$0'", debug_action_param_)); - } - buffer_pool_client_.SetDebugDenyIncreaseReservation(probability); + RETURN_IF_ERROR(EnableDenyReservationDebugAction()); } } return Status::OK(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d8bc570b/be/src/exec/exec-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index 04470f2..2f3f714 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -244,6 +244,11 @@ class ExecNode { /// fails. Status ReleaseUnusedReservation() WARN_UNUSED_RESULT; + /// Enable the increase reservation denial probability on 'buffer_pool_client_' based on + /// the 'debug_action_' set on this node. Returns an error if 'debug_action_param_' is + /// invalid. + Status EnableDenyReservationDebugAction(); + /// Extends blocking queue for row batches. Row batches have a property that /// they must be processed in the order they were produced, even in cancellation /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d8bc570b/be/src/exec/partitioned-aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index b1d54a6..214810f 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -1190,21 +1190,6 @@ Status PartitionedAggregationNode::CheckAndResizeHashPartitions( return Status::OK(); } -int64_t PartitionedAggregationNode::LargestSpilledPartition() const { - DCHECK(!is_streaming_preagg_); - int64_t max_rows = 0; - for (int i = 0; i < hash_partitions_.size(); ++i) { - Partition* partition = hash_partitions_[i]; - if (partition == nullptr || partition->is_closed || !partition->is_spilled()) { - continue; - } - int64_t rows = partition->aggregated_row_stream->num_rows() - + partition->unaggregated_row_stream->num_rows(); - if (rows > max_rows) max_rows = rows; - } - return max_rows; -} - Status PartitionedAggregationNode::NextPartition() { DCHECK(output_partition_ == nullptr); @@ -1336,17 +1321,6 @@ Status PartitionedAggregationNode::RepartitionSpilledPartition() { // spilled_partitions_/aggregated_partitions_. int64_t num_input_rows = partition->aggregated_row_stream->num_rows() + partition->unaggregated_row_stream->num_rows(); - - // Check if there was any reduction in the size of partitions after repartitioning. - int64_t largest_partition = LargestSpilledPartition(); - DCHECK_GE(num_input_rows, largest_partition) << "Partition had more rows than input"; - if (UNLIKELY(num_input_rows == largest_partition)) { - stringstream ss; - DebugString(2, &ss); - return Status(TErrorCode::PARTITIONED_AGG_REPARTITION_FAILS, id_, - partition->level + 1, num_input_rows, buffer_pool_client_.DebugString(), - ss.str()); - } RETURN_IF_ERROR(MoveHashPartitions(num_input_rows)); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d8bc570b/be/src/exec/partitioned-aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h index fa8674c..ade223b 100644 --- a/be/src/exec/partitioned-aggregation-node.h +++ b/be/src/exec/partitioned-aggregation-node.h @@ -639,11 +639,6 @@ class PartitionedAggregationNode : public ExecNode { Status CheckAndResizeHashPartitions( bool aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) WARN_UNUSED_RESULT; - /// Iterates over all the partitions in hash_partitions_ and returns the number of rows - /// of the largest spilled partition (in terms of number of aggregated and unaggregated - /// rows). - int64_t LargestSpilledPartition() const; - /// Prepares the next partition to return results from. On return, this function /// initializes output_iterator_ and output_partition_. This either removes /// a partition from aggregated_partitions_ (and is done) or removes the next http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d8bc570b/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 32a54ca..1d3b7c6 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -240,9 +240,7 @@ error_codes = ( "id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning " "level $1. Number of rows $2:\\n$3\\n$4"), - ("PARTITIONED_AGG_REPARTITION_FAILS", 77, "Cannot perform aggregation at node with " - "id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning " - "level $1. Number of rows $2:\\n$3\\n$4"), + ("UNUSED_77", 77, "Not in use."), ("AVRO_TRUNCATED_BLOCK", 78, "File '$0' is corrupt: truncated data block at offset $1"),