impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mi...@apache.org
Subject incubator-impala git commit: IMPALA-5625: stress test: write profile when queries fail
Date Thu, 24 Aug 2017 22:15:32 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master d03e7d6ce -> fb735df13


IMPALA-5625: stress test: write profile when queries fail

This change writes query profiles as text files for all of the major
query failure reasons in the concurrent_select stress test.

1) Change the --result-hash-log-dir command-line option to --results-dir
   and update the help text.
2) Introduce two new directories under the directory given by the
   --results-dir command-line argument:
     profiles
     result_hashes
3) Move results into the result_hashes directory.
4) Write the query profile to the profiles directory when a query times
   out or gets an error or incorrect results.
5) Remove the query profile from the log output for unexpected mem
   limit exceeded exceptions. Instead, write those to the profiles
   directory as well.

Testing:
Ran the stress test with a driver that changes the hashes of some of the
query results in the runtime info json file to inject incorrect result
failures. Set tight bounds on the mem limit and timeout to ensure there
would be timeouts and exceeded memory limit failures. Restarted the
NameNode mid test run to induce a query failure. That covers the 4 cases
for which an exception is thrown and profile is written for query
failures. Verified that the profiles were written for each kind of
query failure.

Change-Id: I1dbdf5fcf97d6c5681c9fc8fb9eb448bc459b3b0
Reviewed-on: http://gerrit.cloudera.org:8080/7376
Reviewed-by: Michael Brown <mikeb@cloudera.com>
Tested-by: Michael Brown <mikeb@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/fb735df1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/fb735df1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/fb735df1

Branch: refs/heads/master
Commit: fb735df134652b4131f77e1807684af0cc424647
Parents: d03e7d6
Author: Matthew Mulder <mmulder@cloudera.com>
Authored: Fri Jun 30 16:18:12 2017 -0700
Committer: Michael Brown <mikeb@cloudera.com>
Committed: Thu Aug 24 19:41:33 2017 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 152 ++++++++++++++++++++-------------
 1 file changed, 92 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fb735df1/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py
index d77e306..6626f58 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -92,6 +92,9 @@ MEM_LIMIT_EQ_THRESHOLD_MB = 50
 # Regex to extract the estimated memory from an explain plan.
 MEM_ESTIMATE_PATTERN = re.compile(r"Estimated.*Memory=(\d+.?\d*)(T|G|M|K)?B")
 
+PROFILES_DIR = "profiles"
+RESULT_HASHES_DIR = "result_hashes"
+
 # The version of the file format containing the collected query runtime info.
 RUNTIME_INFO_FILE_VERSION = 3
 
@@ -164,6 +167,7 @@ class QueryReport(object):
     self.timed_out = False
     self.was_cancelled = False
     self.profile = None
+    self.query_id = None
 
 
 class MemBroker(object):
@@ -312,7 +316,7 @@ class StressRunner(object):
     self.startup_queries_per_sec = 1.0
     self.num_successive_errors_needed_to_abort = 1
     self._num_successive_errors = Value("i", 0)
-    self.result_hash_log_dir = gettempdir()
+    self.results_dir = gettempdir()
 
     self._status_headers = [
         "Done", "Running", "Mem Lmt Ex", "Time Out", "Cancel",
@@ -594,7 +598,7 @@ class StressRunner(object):
     LOG.debug("New query runner started")
     runner = QueryRunner()
     runner.impalad = impalad
-    runner.result_hash_log_dir = self.result_hash_log_dir
+    runner.results_dir = self.results_dir
     runner.use_kerberos = self.use_kerberos
     runner.common_query_options = self.common_query_options
     runner.connect()
@@ -641,7 +645,7 @@ class StressRunner(object):
         else:
           timeout = solo_runtime * max(
               10, self._num_queries_started.value - self._num_queries_finished.value)
-        report = runner.run_query(query, timeout, mem_limit)
+        report = runner.run_query(query, timeout, mem_limit, should_cancel=should_cancel)
         LOG.debug("Got execution report for query")
         if report.timed_out and should_cancel:
           report.was_cancelled = True
@@ -675,15 +679,16 @@ class StressRunner(object):
             continue
           increment(self._num_successive_errors)
           increment(self._num_other_errors)
-          raise Exception("Query failed: %s" % str(report.non_mem_limit_error))
+          self._write_query_profile(report)
+          raise Exception("Query {0} failed: {1}".format(report.query_id, error_msg))
         if (
             report.mem_limit_exceeded and
             not self._mem_broker.was_overcommitted(reservation_id)
         ):
           increment(self._num_successive_errors)
-          raise Exception(
-              "Unexpected mem limit exceeded; mem was not overcommitted\n"
-              "Profile: %s" % report.profile)
+          self._write_query_profile(report)
+          raise Exception("Unexpected mem limit exceeded; mem was not overcommitted. "
+                          "Query ID: {0}".format(report.query_id))
         if (
             not report.mem_limit_exceeded and
             not report.timed_out and
@@ -691,9 +696,18 @@ class StressRunner(object):
         ):
           increment(self._num_successive_errors)
           increment(self._num_result_mismatches)
+          self._write_query_profile(report)
+          raise Exception(dedent("""\
+                                 Result hash mismatch; expected {expected}, got {actual}
+                                 Query ID: {id}
+                                 Query: {query}""".format(expected=query.result_hash,
+                                                          actual=report.result_hash,
+                                                          id=report.query_id,
+                                                          query=query.sql)))
+        if report.timed_out and not should_cancel:
+          self._write_query_profile(report)
           raise Exception(
-              "Result hash mismatch; expected %s, got %s\nQuery: %s"
-              % (query.result_hash, report.result_hash, query.sql))
+              "Query unexpectedly timed out. Query ID: {0}".format(report.query_id))
         self._num_successive_errors.value = 0
 
   def _print_status_header(self):
@@ -736,6 +750,14 @@ class StressRunner(object):
     if report.timed_out:
       increment(self._num_queries_timedout)
 
+  def _write_query_profile(self, report):
+    if not (report.profile and report.query_id):
+      return
+    file_name = report.query_id.replace(":", "_") + "_profile.txt"
+    profile_log_path = os.path.join(self.results_dir, PROFILES_DIR, file_name)
+    with open(profile_log_path, "w") as profile_log:
+      profile_log.write(report.profile)
+
 
 class QueryTimeout(Exception):
   pass
@@ -795,7 +817,7 @@ class QueryRunner(object):
     self.impalad = None
     self.impalad_conn = None
     self.use_kerberos = False
-    self.result_hash_log_dir = gettempdir()
+    self.results_dir = gettempdir()
     self.check_if_mem_was_spilled = False
     self.common_query_options = {}
 
@@ -807,10 +829,14 @@ class QueryRunner(object):
       self.impalad_conn.close()
       self.impalad_conn = None
 
-  def run_query(self, query, timeout_secs, mem_limit_mb, run_set_up=False):
+  def run_query(self, query, timeout_secs, mem_limit_mb, run_set_up=False,
+                should_cancel=False):
     """Run a query and return an execution report. If 'run_set_up' is True, set up sql
     will be executed before the main query. This should be the case during the binary
     search phase of the stress test.
+    If 'should_cancel' is True, don't get the query profile for timed out queries because
+    the query was purposely cancelled by setting the query timeout too short to complete,
+    rather than having some problem that needs to be investigated.
     """
     if not self.impalad_conn:
       raise Exception("connect() must first be called")
@@ -843,13 +869,15 @@ class QueryRunner(object):
           cursor.execute_async(
               "/* Mem: %s MB. Coordinator: %s. */\n"
               % (mem_limit_mb, self.impalad.host_name) + query.sql)
-          LOG.debug(
-              "Query id is %s", op_handle_to_query_id(cursor._last_operation.handle if
-                                                      cursor._last_operation else None))
+          report.query_id = op_handle_to_query_id(cursor._last_operation.handle if
+                                                  cursor._last_operation else None)
+          LOG.debug("Query id is %s", report.query_id)
           sleep_secs = 0.1
           secs_since_log = 0
           while cursor.is_executing():
             if time() > timeout_unix_time:
+              if not should_cancel:
+                fetch_and_set_profile(cursor, report)
               self._cancel(cursor, report)
               return report
             if secs_since_log > 5:
@@ -860,6 +888,8 @@ class QueryRunner(object):
           if query.query_type == QueryType.SELECT:
             try:
               report.result_hash = self._hash_result(cursor, timeout_unix_time, query)
+              if query.result_hash and report.result_hash != query.result_hash:
+                fetch_and_set_profile(cursor, report)
             except QueryTimeout:
               self._cancel(cursor, report)
               return report
@@ -867,18 +897,15 @@ class QueryRunner(object):
             # If query is in error state, this will raise an exception
             cursor._wait_to_finish()
         except Exception as error:
-          LOG.debug(
-              "Error running query with id %s: %s",
-              op_handle_to_query_id(cursor._last_operation.handle if
-                                    cursor._last_operation else None), error)
+          report.query_id = op_handle_to_query_id(cursor._last_operation.handle if
+                                                  cursor._last_operation else None)
+          LOG.debug("Error running query with id %s: %s", report.query_id, error)
           self._check_for_mem_limit_exceeded(report, cursor, error)
         if report.non_mem_limit_error or report.mem_limit_exceeded:
           return report
         report.runtime_secs = time() - start_time
         if cursor.execution_failed() or self.check_if_mem_was_spilled:
-          # Producing a query profile can be somewhat expensive. A v-tune profile of
-          # impalad showed 10% of cpu time spent generating query profiles.
-          report.profile = cursor.get_profile()
+          fetch_and_set_profile(cursor, report)
           report.mem_was_spilled = any([
               pattern.search(report.profile) is not None
               for pattern in QueryRunner.SPILLED_PATTERNS])
@@ -891,35 +918,27 @@ class QueryRunner(object):
   def _cancel(self, cursor, report):
     report.timed_out = True
 
-    # Copy the operation handle in case another thread causes the handle to be reset.
-    operation_handle = cursor._last_operation.handle if cursor._last_operation else None
-    if not operation_handle:
+    if not report.query_id:
       return
 
-    query_id = op_handle_to_query_id(operation_handle)
     try:
-      LOG.debug("Attempting cancellation of query with id %s", query_id)
+      LOG.debug("Attempting cancellation of query with id %s", report.query_id)
       cursor.cancel_operation()
-      LOG.debug("Sent cancellation request for query with id %s", query_id)
+      LOG.debug("Sent cancellation request for query with id %s", report.query_id)
     except Exception as e:
-      LOG.debug("Error cancelling query with id %s: %s", query_id, e)
+      LOG.debug("Error cancelling query with id %s: %s", report.query_id, e)
       try:
         LOG.debug("Attempting to cancel query through the web server.")
-        self.impalad.cancel_query(query_id)
+        self.impalad.cancel_query(report.query_id)
       except Exception as e:
-        LOG.debug("Error cancelling query %s through the web server: %s", query_id, e)
+        LOG.debug("Error cancelling query %s through the web server: %s",
+                  report.query_id, e)
 
   def _check_for_mem_limit_exceeded(self, report, cursor, caught_exception):
     """To be called after a query failure to check for signs of failed due to a
     mem limit. The report will be updated accordingly.
     """
-    if cursor._last_operation:
-      try:
-        report.profile = cursor.get_profile()
-      except Exception as e:
-        LOG.debug(
-            "Error getting profile for query with id %s: %s",
-            op_handle_to_query_id(cursor._last_operation.handle), e)
+    fetch_and_set_profile(cursor, report)
     caught_msg = str(caught_exception).lower().strip()
 
     # Exceeding a mem limit may result in the message "cancelled". See IMPALA-2234
@@ -941,12 +960,8 @@ class QueryRunner(object):
       report.mem_limit_exceeded = True
       return
 
-    LOG.debug(
-        "Non-mem limit error for query with id %s: %s",
-        op_handle_to_query_id(
-            cursor._last_operation.handle if cursor._last_operation else None),
-        caught_exception,
-        exc_info=True)
+    LOG.debug("Non-mem limit error for query with id %s: %s", report.query_id,
+              caught_exception, exc_info=True)
     report.non_mem_limit_error = caught_exception
 
   def _hash_result(self, cursor, timeout_unix_time, query):
@@ -967,7 +982,8 @@ class QueryRunner(object):
         if query.result_hash is None:
           file_name += "_initial"
         file_name += "_results.txt"
-        result_log = open(os.path.join(self.result_hash_log_dir, file_name), "w")
+        result_log = open(os.path.join(self.results_dir, RESULT_HASHES_DIR, file_name),
+                          "w")
         result_log.write(query.sql)
         result_log.write("\n")
         current_thread().result = 1
@@ -1068,7 +1084,7 @@ def load_queries_from_test_file(file_path, db_name=None):
 
 def load_random_queries_and_populate_runtime_info(
     query_generator, model_translator, tables, db_name, impala, use_kerberos, query_count,
-    query_timeout_secs, result_hash_log_dir
+    query_timeout_secs, results_dir
 ):
   """Returns a list of random queries. Each query will also have its runtime info
   populated. The runtime info population also serves to validate the query.
@@ -1085,12 +1101,11 @@ def load_random_queries_and_populate_runtime_info(
       yield query
   return populate_runtime_info_for_random_queries(
       impala, use_kerberos, generate_candidates(), query_count, query_timeout_secs,
-      result_hash_log_dir)
+      results_dir)
 
 
 def populate_runtime_info_for_random_queries(
-    impala, use_kerberos, candidate_queries,
-    query_count, query_timeout_secs, result_hash_log_dir
+    impala, use_kerberos, candidate_queries, query_count, query_timeout_secs, results_dir
 ):
   """Returns a list of random queries. Each query will also have its runtime info
   populated. The runtime info population also serves to validate the query.
@@ -1102,8 +1117,7 @@ def populate_runtime_info_for_random_queries(
   for query in candidate_queries:
     try:
       populate_runtime_info(
-          query, impala, use_kerberos, result_hash_log_dir,
-          timeout_secs=query_timeout_secs)
+          query, impala, use_kerberos, results_dir, timeout_secs=query_timeout_secs)
       queries.append(query)
     except Exception as e:
       # Ignore any non-fatal errors. These could be query timeouts or bad queries (
@@ -1119,7 +1133,7 @@ def populate_runtime_info_for_random_queries(
 
 
 def populate_runtime_info(
-    query, impala, use_kerberos, result_hash_log_dir,
+    query, impala, use_kerberos, results_dir,
     timeout_secs=maxint, samples=1, max_conflicting_samples=0
 ):
   """Runs the given query by itself repeatedly until the minimum memory is determined
@@ -1139,7 +1153,7 @@ def populate_runtime_info(
   runner = QueryRunner()
   runner.check_if_mem_was_spilled = True
   runner.impalad = impala.impalads[0]
-  runner.result_hash_log_dir = result_hash_log_dir
+  runner.results_dir = results_dir
   runner.use_kerberos = use_kerberos
   runner.connect()
   limit_exceeded_mem = 0
@@ -1647,13 +1661,25 @@ def populate_all_queries(queries, impala, args, runtime_info_path,
             query.db_name][query.sql][str(sorted(query.options.items()))])
       else:
         populate_runtime_info(
-            query, impala, args.use_kerberos, args.result_hash_log_dir,
+            query, impala, args.use_kerberos, args.results_dir,
             samples=args.samples, max_conflicting_samples=args.max_conflicting_samples)
         save_runtime_info(runtime_info_path, query, impala)
         result.append(query)
   return result
 
 
+def fetch_and_set_profile(cursor, report):
+  """Set the report's query profile using the given cursor.
+  Producing a query profile can be somewhat expensive. A v-tune profile of
+  impalad showed 10% of cpu time spent generating query profiles.
+  """
+  if not report.profile and cursor._last_operation:
+    try:
+      report.profile = cursor.get_profile()
+    except Exception as e:
+      LOG.debug("Error getting profile for query with id %s: %s", report.query_id, e)
+
+
 def print_version(cluster):
   """
   Print the cluster impalad version info to the console sorted by hostname.
@@ -1703,10 +1729,13 @@ def main():
       ' max-conflicting-samples=1, then 4/5 queries must not spill at a particular mem'
       ' limit.')
   parser.add_argument(
-      "--result-hash-log-dir", default=gettempdir(),
-      help="If query results do not match, a log file will be left in this dir. The log"
-      " file is also created during the first run when runtime info is collected for"
-      " each query.")
+      "--results-dir", default=gettempdir(),
+      help="Directory under which the profiles and result_hashes directories are created."
+      " Query hash results are written in the result_hashes directory. If query results"
+      " do not match, a log file will be left in that dir. The log file is also created"
+      " during the first run when runtime info is collected for each query. Unexpected"
+      " query timeouts, exceeded memory, failures or result mismatches will result in a"
+      " profile written in the profiles directory.")
   parser.add_argument(
       "--no-status", action="store_true", help="Do not print the status table.")
   parser.add_argument(
@@ -1860,6 +1889,9 @@ def main():
         LOG.debug("Common query option '{query_option}' set to '{value}'".format(
             query_option=query_option, value=value))
 
+  os.mkdir(os.path.join(args.results_dir, RESULT_HASHES_DIR))
+  os.mkdir(os.path.join(args.results_dir, PROFILES_DIR))
+
   cluster = cli_options.create_cluster(args)
   impala = cluster.impala
   if impala.find_stopped_impalads():
@@ -1951,7 +1983,7 @@ def main():
     queries.extend(load_random_queries_and_populate_runtime_info(
         query_generator, SqlWriter.create(), tables, args.random_db, impala,
         args.use_kerberos, args.random_query_count, args.random_query_timeout_seconds,
-        args.result_hash_log_dir))
+        args.results_dir))
 
   if args.query_file_path:
     file_queries = load_queries_from_test_file(
@@ -1959,7 +1991,7 @@ def main():
     shuffle(file_queries)
     queries.extend(populate_runtime_info_for_random_queries(
         impala, args.use_kerberos, file_queries, args.random_query_count,
-        args.random_query_timeout_seconds, args.result_hash_log_dir))
+        args.random_query_timeout_seconds, args.results_dir))
 
   # Apply tweaks to the query's runtime info as requested by CLI options.
   for idx in xrange(len(queries) - 1, -1, -1):
@@ -2022,7 +2054,7 @@ def main():
   LOG.info("Number of queries in the list: {0}".format(len(queries)))
 
   stress_runner = StressRunner()
-  stress_runner.result_hash_log_dir = args.result_hash_log_dir
+  stress_runner.results_dir = args.results_dir
   stress_runner.startup_queries_per_sec = args.startup_queries_per_second
   stress_runner.num_successive_errors_needed_to_abort = args.fail_upon_successive_errors
   stress_runner.use_kerberos = args.use_kerberos


Mime
View raw message