Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0B4B4200BB3 for ; Tue, 18 Oct 2016 18:43:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0A059160AF7; Tue, 18 Oct 2016 16:43:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 33065160AFB for ; Tue, 18 Oct 2016 18:43:32 +0200 (CEST) Received: (qmail 59960 invoked by uid 500); 18 Oct 2016 16:43:31 -0000 Mailing-List: contact commits-help@impala.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@impala.incubator.apache.org Delivered-To: mailing list commits@impala.incubator.apache.org Received: (qmail 59950 invoked by uid 99); 18 Oct 2016 16:43:31 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Oct 2016 16:43:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 01E36C0C04 for ; Tue, 18 Oct 2016 16:43:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id fgoB2FzfGFNn for ; Tue, 18 Oct 2016 16:43:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 8E8225FC2E for ; Tue, 18 Oct 2016 16:43:25 +0000 (UTC) Received: (qmail 59344 invoked by uid 99); 18 Oct 2016 16:43:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Oct 2016 16:43:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 72323F12ED; Tue, 18 Oct 2016 16:43:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tarmstrong@apache.org To: commits@impala.incubator.apache.org Date: Tue, 18 Oct 2016 16:43:34 -0000 Message-Id: <31502ea3c9b84bf2b510deeb84a1082a@git.apache.org> In-Reply-To: <886e1319fd9c44ada8dfd5cba1a29672@git.apache.org> References: <886e1319fd9c44ada8dfd5cba1a29672@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/32] incubator-impala git commit: IMPALA-3002/IMPALA-1473: Cardinality observability cleanup archived-at: Tue, 18 Oct 2016 16:43:33 -0000 IMPALA-3002/IMPALA-1473: Cardinality observability cleanup IMPALA-3002: The shell prints an incorrect value for '#Rows' in the exec summary for broadcast nodes due to incorrect logic around whether to use max or agg stats. This patch makes the behavior consistent with the way the be treats exec summaries in summary-util.cc. This incorrect logic was also duplicated in the impala_beeswax test framework. IMPALA-1473: When there is a merging exchange with a limit, we may copy rows into the output batch beyond the limit. In this case, we currently update the output batch's size to reflect the limit, but we also need to update ExecNode::num_rows_returned_ or the exec summary may show that the exchange node returned more rows than it really did. Additionally, PlanFragmentExecutor::GetNext does not update rows_produced_counter_ in some cases, leading the runtime profile to display an incorrect value for 'RowsProduced'. Change-Id: I386719370386c9cff09b8b35d15dc712dc6480aa Reviewed-on: http://gerrit.cloudera.org:8080/4679 Reviewed-by: Matthew Jacobs Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7fad3e5d Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7fad3e5d Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7fad3e5d Branch: refs/heads/hadoop-next Commit: 7fad3e5dc38c1097db6be24da0cda6941f554150 Parents: a1c9cb3 Author: Thomas Tauber-Marshall Authored: Mon Oct 10 10:32:55 2016 -0700 Committer: Internal Jenkins Committed: Sat Oct 15 01:25:51 2016 +0000 ---------------------------------------------------------------------- be/src/exec/exchange-node.cc | 1 + be/src/runtime/plan-fragment-executor.cc | 2 +- shell/impala_client.py | 5 ++- tests/beeswax/impala_beeswax.py | 10 +++--- tests/query_test/test_observability.py | 52 +++++++++++++++++++++++++++ 5 files changed, 63 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7fad3e5d/be/src/exec/exchange-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc index 22dfe40..833949b 100644 --- a/be/src/exec/exchange-node.cc +++ b/be/src/exec/exchange-node.cc @@ -207,6 +207,7 @@ Status ExchangeNode::GetNextMerging(RuntimeState* state, RowBatch* output_batch, num_rows_returned_ += output_batch->num_rows(); if (ReachedLimit()) { output_batch->set_num_rows(output_batch->num_rows() - (num_rows_returned_ - limit_)); + num_rows_returned_ = limit_; *eos = true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7fad3e5d/be/src/runtime/plan-fragment-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc index e0d314b..aba4a26 100644 --- a/be/src/runtime/plan-fragment-executor.cc +++ b/be/src/runtime/plan-fragment-executor.cc @@ -410,6 +410,7 @@ Status PlanFragmentExecutor::GetNext(RowBatch** batch) { row_batch_->Reset(); } UpdateStatus(status); + COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows()); if (done_) { VLOG_QUERY << "Finished executing fragment query_id=" << PrintId(query_id_) @@ -421,7 +422,6 @@ Status PlanFragmentExecutor::GetNext(RowBatch** batch) { } *batch = row_batch_.get(); - COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows()); return status; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7fad3e5d/shell/impala_client.py ---------------------------------------------------------------------- diff --git a/shell/impala_client.py b/shell/impala_client.py index bc20b09..0d1c835 100755 --- a/shell/impala_client.py +++ b/shell/impala_client.py @@ -115,6 +115,9 @@ class ImpalaClient(object): Returns the index of the next exec node in summary.exec_nodes that should be processed, used internally to this method only. + + NOTE: This is duplicated in impala_beeswax.py, and changes made here should also be + made there. """ attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"] @@ -142,7 +145,7 @@ class ImpalaClient(object): # is the max over all instances (which should all have received the same number of # rows). Otherwise, the cardinality is the sum over all instances which process # disjoint partitions. - if node.is_broadcast and is_fragment_root: + if node.is_broadcast: cardinality = max_stats.cardinality else: cardinality = agg_stats.cardinality http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7fad3e5d/tests/beeswax/impala_beeswax.py ---------------------------------------------------------------------- diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py index 79a106f..e0f5d55 100644 --- a/tests/beeswax/impala_beeswax.py +++ b/tests/beeswax/impala_beeswax.py @@ -30,15 +30,15 @@ import shlex import getpass import re -from impala._thrift_gen.beeswax import BeeswaxService -from impala._thrift_gen.beeswax.BeeswaxService import QueryState +from beeswaxd import BeeswaxService +from beeswaxd.BeeswaxService import QueryState from datetime import datetime try: # If Exec Summary is not implemented in Impala, this cannot be imported - from impala._thrift_gen.ExecStats.ttypes import TExecStats + from ExecStats.ttypes import TExecStats except ImportError: pass -from impala._thrift_gen.ImpalaService import ImpalaService +from ImpalaService import ImpalaService from tests.util.thrift_util import create_transport from thrift.transport.TTransport import TTransportException from thrift.protocol import TBinaryProtocol @@ -265,7 +265,7 @@ class ImpalaBeeswaxClient(object): # is the max over all instances (which should all have received the same number of # rows). Otherwise, the cardinality is the sum over all instances which process # disjoint partitions. - if node.is_broadcast and is_fragment_root: + if node.is_broadcast: cardinality = max_stats.cardinality else: cardinality = agg_stats.cardinality http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7fad3e5d/tests/query_test/test_observability.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py new file mode 100644 index 0000000..59e6a73 --- /dev/null +++ b/tests/query_test/test_observability.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from tests.common.impala_test_suite import ImpalaTestSuite + +class TestObservability(ImpalaTestSuite): + @classmethod + def get_workload(self): + return 'functional-query' + + def test_merge_exchange_num_rows(self): + """Regression test for IMPALA-1473 - checks that the exec summary for a merging + exchange with a limit reports the number of rows returned as equal to the limit, + and that the coordinator fragment portion of the runtime profile reports the number + of rows returned correctly.""" + query = """select tinyint_col, count(*) from functional.alltypes + group by tinyint_col order by tinyint_col limit 5""" + result = self.execute_query(query) + assert result.exec_summary[0]['operator'] == '05:MERGING-EXCHANGE' + assert result.exec_summary[0]['num_rows'] == 5 + assert result.exec_summary[0]['est_num_rows'] == 5 + + for line in result.runtime_profile.split('\n'): + # The first 'RowsProduced' we find is for the coordinator fragment. + if 'RowsProduced' in line: + assert '(5)' in line + break + + def test_broadcast_num_rows(self): + """Regression test for IMPALA-3002 - checks that the num_rows for a broadcast node + in the exec summaty is correctly set as the max over all instances, not the sum.""" + query = """select distinct a.int_col, a.string_col from functional.alltypes a + inner join functional.alltypessmall b on (a.id = b.id) + where a.year = 2009 and b.month = 2""" + result = self.execute_query(query) + assert result.exec_summary[5]['operator'] == '04:EXCHANGE' + assert result.exec_summary[5]['num_rows'] == 25 + assert result.exec_summary[5]['est_num_rows'] == 25