impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taras...@apache.org
Subject [3/3] incubator-impala git commit: IMPALA-5186: Handle failed CreateAndOpenScanner() in MT scan.
Date Fri, 14 Apr 2017 19:20:49 GMT
IMPALA-5186: Handle failed CreateAndOpenScanner() in MT scan.

The bug was that a failed CreateAndOpenScanner() could cause
a scanner to be closed twice leading to freed memory being
accessed. The fix is straightforward.

Testing:
- I cleaned up test_failpoints.py and added an MT_DOP test
  dimension to cover this bug.
- Core tests passed.

Change-Id: I777c9b8ef2eb5b556c9b145d231c543b3b8ae270
Reviewed-on: http://gerrit.cloudera.org:8080/6618
Reviewed-by: Alex Behm <alex.behm@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/491154c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/491154c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/491154c8

Branch: refs/heads/master
Commit: 491154c8ef7a9b6b1be2430c1bf119e21ce171c3
Parents: fcefe47
Author: Alex Behm <alex.behm@cloudera.com>
Authored: Mon Apr 10 23:31:08 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Fri Apr 14 01:50:30 2017 +0000

----------------------------------------------------------------------
 be/src/exec/base-sequence-scanner.cc |   1 +
 be/src/exec/hdfs-parquet-scanner.cc  |   1 +
 be/src/exec/hdfs-scan-node-base.cc   |   2 +
 be/src/exec/hdfs-scan-node-base.h    |   3 +-
 be/src/exec/hdfs-scan-node-mt.cc     |   8 ++-
 be/src/exec/hdfs-scanner.cc          |   9 ++-
 be/src/exec/hdfs-scanner.h           |   4 ++
 be/src/exec/hdfs-text-scanner.cc     |   1 +
 tests/failure/test_failpoints.py     | 112 +++++++++++++-----------------
 9 files changed, 73 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 432dac8..411d87c 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -97,6 +97,7 @@ Status BaseSequenceScanner::Open(ScannerContext* context) {
 }
 
 void BaseSequenceScanner::Close(RowBatch* row_batch) {
+  DCHECK(!is_closed_);
   VLOG_FILE << "Bytes read past scan range: " << -stream_->bytes_left();
   VLOG_FILE << "Average block size: "
             << (num_syncs_ > 1 ? total_block_size_ / (num_syncs_ - 1) : 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index aba2bef..046ec46 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -267,6 +267,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 }
 
 void HdfsParquetScanner::Close(RowBatch* row_batch) {
+  DCHECK(!is_closed_);
   if (row_batch != nullptr) {
     FlushRowGroupResources(row_batch);
     row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 238155f..24802e3 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -684,9 +684,11 @@ Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor*
partition
     if (!status.ok()) {
       RowBatch* batch = (HasRowBatchQueue()) ? scanner->get()->batch() : NULL;
       scanner->get()->Close(batch);
+      scanner->reset();
     }
   } else {
     context->ClearStreams();
+    scanner->reset();
   }
   return status;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 7777d4d..1711bb5 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -452,8 +452,7 @@ class HdfsScanNodeBase : public ScanNode {
   Status IssueInitialScanRanges(RuntimeState* state);
 
   /// Create and open new scanner for this partition type.
-  /// If the scanner is successfully created, it is returned in 'scanner'.
-  /// Passes 'add_batches_to_queue' to the scanner constructor.
+  /// If the scanner is successfully created and opened, it is returned in 'scanner'.
   Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition,
       ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 9fad46e..53aa026 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -89,7 +89,13 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch,
bool* e
     HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
     scanner_ctx_.reset(new ScannerContext(
         runtime_state_, this, partition, scan_range_, filter_ctxs()));
-    RETURN_IF_ERROR(CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_));
+    Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_);
+    if (!status.ok()) {
+      DCHECK(scanner_ == NULL);
+      // Avoid leaking unread buffers in the scan range.
+      scan_range_->Cancel(status);
+      return status;
+    }
   }
 
   Status status = scanner_->GetNext(row_batch);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index d613b15..a529668 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -63,6 +63,7 @@ HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
       context_(NULL),
       stream_(NULL),
       eos_(false),
+      is_closed_(false),
       scanner_conjunct_ctxs_(NULL),
       template_tuple_pool_(new MemPool(scan_node->mem_tracker())),
       template_tuple_(NULL),
@@ -83,6 +84,7 @@ HdfsScanner::HdfsScanner()
       context_(NULL),
       stream_(NULL),
       eos_(false),
+      is_closed_(false),
       scanner_conjunct_ctxs_(NULL),
       template_tuple_pool_(NULL),
       template_tuple_(NULL),
@@ -132,12 +134,17 @@ Status HdfsScanner::Open(ScannerContext* context) {
 }
 
 void HdfsScanner::Close(RowBatch* row_batch) {
-  if (decompressor_.get() != NULL) decompressor_->Close();
+  DCHECK(!is_closed_);
+  if (decompressor_.get() != NULL) {
+    decompressor_->Close();
+    decompressor_.reset();
+  }
   for (const auto& entry: scanner_conjuncts_map_) Expr::Close(entry.second, state_);
   for (const auto& entry: scanner_dict_filter_map_) Expr::Close(entry.second, state_);
   obj_pool_.Clear();
   stream_ = NULL;
   context_->ClearStreams();
+  is_closed_ = true;
 }
 
 Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index f61c5fc..417ade7 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -142,6 +142,7 @@ class HdfsScanner {
   /// and memory in mem pools to the given row batch. If the row batch is NULL,
   /// those resources are released instead. In any case, releases all other resources
   /// that are not backing returned rows (e.g. temporary decompression buffers).
+  /// This function is not idempotent and must only be called once.
   virtual void Close(RowBatch* row_batch);
 
   /// Only valid to call if the parent scan node is single-threaded.
@@ -202,6 +203,9 @@ class HdfsScanner {
   /// Only relevant when calling the GetNext() interface.
   bool eos_;
 
+  /// Starts as false and is set to true in Close().
+  bool is_closed_;
+
   /// Clones of the conjuncts ExprContexts in scan_node_->conjuncts_map(). Each scanner
   /// has its own ExprContexts so the conjuncts can be safely evaluated in parallel.
   HdfsScanNodeBase::ConjunctsMap scanner_conjuncts_map_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index ced7ab1..0a66460 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -169,6 +169,7 @@ Status HdfsTextScanner::ProcessSplit() {
 }
 
 void HdfsTextScanner::Close(RowBatch* row_batch) {
+  DCHECK(!is_closed_);
   // Need to close the decompressor before transferring the remaining resources to
   // 'row_batch' because in some cases there is memory allocated in the decompressor_'s
   // temp_memory_pool_.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/tests/failure/test_failpoints.py
----------------------------------------------------------------------
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index b51950e..301762b 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -30,32 +30,26 @@ from tests.common.skip import SkipIf, SkipIfS3, SkipIfIsilon, SkipIfLocal
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
 
-FAILPOINT_ACTION = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED']
-FAILPOINT_LOCATION = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'CLOSE']
+FAILPOINT_ACTIONS = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED']
+FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'CLOSE']
 # Map debug actions to their corresponding query options' values.
 FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT',
                         'MEM_LIMIT_EXCEEDED': 'MEM_LIMIT_EXCEEDED'}
-
-# The goal of this query is to use all of the node types.
-# TODO: This query could be simplified a bit...
-QUERY = """
-select a.int_col, count(b.int_col) int_sum, count(c.c_name)
-from functional_hbase.alltypesagg a, tpch_parquet.customer c
-join
-  (select * from alltypes
-   where year=2009 and month=1 order by int_col limit 2500
-   union all
-   select * from alltypes
-   where year=2009 and month=2 limit 3000) b
-on (a.int_col = b.int_col) and (a.int_col = c.c_custkey)
-where c.c_mktsegment = 'BUILDING'
-group by a.int_col
-order by int_sum
-"""
-
-# TODO: Update to include INSERT when we support failpoints in the HDFS/Hbase sinks using
-# a similar pattern as test_cancellation.py
-QUERY_TYPE = ["SELECT"]
+MT_DOP_VALUES = [0, 4]
+
+# Queries should cover all exec nodes.
+QUERIES = [
+  "select * from alltypessmall",
+  "select count(*) from alltypessmall",
+  "select count(int_col) from alltypessmall group by id",
+  "select 1 from alltypessmall a join alltypessmall b on a.id = b.id",
+  "select 1 from alltypessmall a join alltypessmall b on a.id != b.id",
+  "select 1 from alltypessmall order by id",
+  "select 1 from alltypessmall order by id limit 100",
+  "select * from alltypessmall union all select * from alltypessmall",
+  "select row_number() over (partition by int_col order by id) from alltypessmall",
+  "select c from (select id c from alltypessmall order by id limit 10) v where c = 1"
+]
 
 @SkipIf.skip_hbase # -skip_hbase argument specified
 @SkipIfS3.hbase # S3: missing coverage: failures
@@ -67,69 +61,46 @@ class TestFailpoints(ImpalaTestSuite):
     return 'functional-query'
 
   @classmethod
-  def parse_plan_nodes_from_explain_output(cls, query, use_db="default"):
-    """Parses the EXPLAIN <query> output and returns a map of node_name->list(node_id)"""
-    client = cls.create_impala_client()
-    client.execute("use %s" % use_db)
-    explain_result = client.execute("explain " + QUERY)
-    # Maps plan node names to their respective node ids. Expects format of <ID>:<NAME>
-    node_id_map = defaultdict(list)
-    for row in explain_result.data:
-      match = re.search(r'\s*(?P<node_id>\d+)\:(?P<node_type>\S+\s*\S+)', row)
-      if match is not None:
-        node_id_map[match.group('node_type')].append(int(match.group('node_id')))
-    return node_id_map
-
-  @classmethod
   def add_test_dimensions(cls):
     super(TestFailpoints, cls).add_test_dimensions()
-    # Executing an explain on the the test query will fail in an enviornment where hbase
-    # tables don't exist (s3). Since this happens before the tests are run, the skipif
-    # marker won't catch it. If 's3' is detected as a file system, return immedietely.
-    if os.getenv("TARGET_FILESYSTEM") in ["s3", "isilon", "local"]: return
-    node_id_map = TestFailpoints.parse_plan_nodes_from_explain_output(QUERY, "functional")
-    assert node_id_map
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension('location', *FAILPOINT_LOCATION))
+        ImpalaTestDimension('query', *QUERIES))
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension('target_node', *(node_id_map.items())))
+        ImpalaTestDimension('action', *FAILPOINT_ACTIONS))
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension('action', *FAILPOINT_ACTION))
+        ImpalaTestDimension('location', *FAILPOINT_LOCATIONS))
     cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension('query_type', *QUERY_TYPE))
+        ImpalaTestDimension('mt_dop', *MT_DOP_VALUES))
     cls.ImpalaTestMatrix.add_dimension(
         create_exec_option_dimension([0], [False], [0]))
 
-    # These are invalid test cases.
-    # For more info see IMPALA-55 and IMPALA-56.
-    cls.ImpalaTestMatrix.add_constraint(lambda v: not (
-        v.get_value('action') == 'FAIL' and
-        v.get_value('location') in ['CLOSE'] and
-        v.get_value('target_node')[0] in ['AGGREGATE', 'HASH JOIN']) and
-        not (v.get_value('location') in ['PREPARE'] and
-             v.get_value('action') == 'CANCEL'))
-
     # Don't create CLOSE:WAIT debug actions to avoid leaking plan fragments (there's no
     # way to cancel a plan fragment once Close() has been called)
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: not (v.get_value('action') == 'CANCEL'
                      and v.get_value('location') == 'CLOSE'))
 
-    # No need to test error in scanner preparation for non-scan nodes.
-    cls.ImpalaTestMatrix.add_constraint(
-        lambda v: (v.get_value('location') != 'PREPARE_SCANNER' or
-            v.get_value('target_node')[0] == 'SCAN HDFS'))
-
   # Run serially because this can create enough memory pressure to invoke the Linux OOM
   # killer on machines with 30GB RAM. This makes the test run in 4 minutes instead of 1-2.
   @pytest.mark.execute_serially
   def test_failpoints(self, vector):
-    query = QUERY
-    node_type, node_ids = vector.get_value('target_node')
+    query = vector.get_value('query')
     action = vector.get_value('action')
     location = vector.get_value('location')
+    vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
 
-    for node_id in node_ids:
+    if action == "CANCEL" and location == "PREPARE":
+      pytest.xfail(reason="IMPALA-5202 leads to a hang.")
+
+    try:
+      plan_node_ids = self.__parse_plan_nodes_from_explain(query, vector)
+    except ImpalaBeeswaxException as e:
+      if "MT_DOP not supported" in str(e):
+        pytest.xfail(reason="MT_DOP not supported.")
+      else:
+        raise e
+
+    for node_id in plan_node_ids:
       debug_action = '%d:%s:%s' % (node_id, location, FAILPOINT_ACTION_MAP[action])
       LOG.info('Current debug action: SET DEBUG_ACTION=%s' % debug_action)
       vector.get_value('exec_option')['debug_action'] = debug_action
@@ -146,6 +117,19 @@ class TestFailpoints(ImpalaTestSuite):
     del vector.get_value('exec_option')['debug_action']
     self.execute_query(query, vector.get_value('exec_option'))
 
+  def __parse_plan_nodes_from_explain(self, query, vector):
+    """Parses the EXPLAIN <query> output and returns a list of node ids.
+    Expects format of <ID>:<NAME>"""
+    explain_result =\
+        self.execute_query("explain " + query, vector.get_value('exec_option'),
+                           table_format=vector.get_value('table_format'))
+    node_ids = []
+    for row in explain_result.data:
+      match = re.search(r'\s*(?P<node_id>\d+)\:(?P<node_type>\S+\s*\S+)', row)
+      if match is not None:
+        node_ids.append(int(match.group('node_id')))
+    return node_ids
+
   def __execute_fail_action(self, query, vector):
     try:
       self.execute_query(query, vector.get_value('exec_option'),


Mime
View raw message