impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbap...@apache.org
Subject [1/8] incubator-impala git commit: IMPALA-5892: Allow reporting status independent of fragment instance
Date Thu, 07 Sep 2017 03:50:19 GMT
Repository: incubator-impala
Updated Branches:
  refs/heads/master 545eab6d6 -> e993b9712


IMPALA-5892: Allow reporting status independent of fragment instance

Queries can hit an error that is not specific to a
particular fragment instance. For example, QueryState::StartFInstances()
calls DescriptorTbl::Create() before any fragment instances
start. This location has no reason to report status via a
particular fragment, and there is currently no way to report
status otherwise. This leads to a query hang, because the
status is never propagated back to the coordinator.

This adds the ability to report status that is not associated
with a particular fragment instance. By reporting status,
the coordinator will now correctly abort the query in the
case of the QueryState::StartFInstances() scenario described.

Change-Id: I4cd98022f1d62a999c7c80ff5474fa8d069eb12c
Reviewed-on: http://gerrit.cloudera.org:8080/7943
Reviewed-by: Lars Volker <lv@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/91f7bc19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/91f7bc19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/91f7bc19

Branch: refs/heads/master
Commit: 91f7bc1947c1800e689fee040d4820fd8dbf94e4
Parents: 545eab6
Author: Joe McDonnell <joemcdonnell@cloudera.com>
Authored: Fri Sep 1 14:50:45 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenkins@gerrit.cloudera.org>
Committed: Wed Sep 6 21:26:24 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.cc | 14 ++++++++++-
 be/src/runtime/coordinator-backend-state.h  | 21 +++++++++++++---
 be/src/runtime/coordinator.cc               | 32 ++++++++++++++----------
 be/src/runtime/coordinator.h                | 15 +++++++----
 be/src/runtime/query-state.cc               |  1 +
 common/thrift/ImpalaInternalService.thrift  | 12 +++++++++
 6 files changed, 73 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 34e0671..1b7fd20 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -211,9 +211,12 @@ void Coordinator::BackendState::Exec(
   VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id_);
 }
 
-Status Coordinator::BackendState::GetStatus(TUniqueId* failed_instance_id) {
+Status Coordinator::BackendState::GetStatus(bool* is_fragment_failure,
+    TUniqueId* failed_instance_id) {
   lock_guard<mutex> l(lock_);
+  DCHECK_EQ(is_fragment_failure == nullptr, failed_instance_id == nullptr);
   if (!status_.ok() && failed_instance_id != nullptr) {
+    *is_fragment_failure = is_fragment_failure_;
     *failed_instance_id = failed_instance_id_;
   }
   return status_;
@@ -278,6 +281,7 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
       if (status_.ok() || status_.IsCancelled()) {
         status_ = instance_status;
         failed_instance_id_ = instance_exec_status.fragment_instance_id;
+        is_fragment_failure_ = true;
       }
     }
     DCHECK_GT(num_remaining_instances_, 0);
@@ -302,6 +306,14 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
     }
   }
 
+  // status_ has incorporated the status from all fragment instances. If the overall
+  // backend status is not OK, but no specific fragment instance reported an error, then
+  // this is a general backend error. Incorporate the general error into status_.
+  Status overall_backend_status(backend_exec_status.status);
+  if (!overall_backend_status.ok() && (status_.ok() || status_.IsCancelled())) {
+    status_ = overall_backend_status;
+  }
+
   // Log messages aggregated by type
   if (backend_exec_status.__isset.error_log && backend_exec_status.error_log.size()
> 0) {
     // Append the log messages from each update with the global state of the query

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index ccc3618..4ea2f33 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -90,9 +90,19 @@ class Coordinator::BackendState {
   /// if cancellation was attempted, false otherwise.
   bool Cancel();
 
-  /// Return the overall execution status. For an error status, also return the id
-  /// of the instance that caused that status, if failed_instance_id != nullptr.
-  Status GetStatus(TUniqueId* failed_instance_id = nullptr) WARN_UNUSED_RESULT;
+  /// Return the overall execution status. For an error status, the error could come
+  /// from the fragment instance level or it can be a general error from the backend
+  /// (with no specific fragment responsible). For a caller to distinguish between
+  /// these errors and to determine the specific fragment instance (if applicable),
+  /// both 'is_fragment_failure' and 'failed_instance_id' must be non-null.
+  /// A general error will set *is_fragment_failure to false and leave
+  /// failed_instance_id untouched.
+  /// A fragment-specific error will set *is_fragment_failure to true and set
+  /// *failed_instance_id to the id of the fragment instance that failed.
+  /// If the caller does not need this information, both 'is_fragment_failure' and
+  /// 'failed_instance_id' must be omitted (using the default value of nullptr).
+  Status GetStatus(bool* is_fragment_failure = nullptr,
+      TUniqueId* failed_instance_id = nullptr) WARN_UNUSED_RESULT;
 
   /// Return peak memory consumption.
   int64_t GetPeakConsumption();
@@ -199,6 +209,11 @@ class Coordinator::BackendState {
   /// initiated; either way, execution must not be cancelled.
   Status status_;
 
+  /// Used to distinguish between errors reported by a specific fragment instance,
+  /// which would set failed_instance_id_, rather than an error independent of any
+  /// specific fragment.
+  bool is_fragment_failure_ = false;
+
   /// Id of the first fragment instance that reports an error status.
   /// Invalid if no fragment instance has reported an error status.
   TUniqueId failed_instance_id_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 029e0bc..c8df1f5 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -471,8 +471,8 @@ Status Coordinator::GetStatus() {
   return query_status_;
 }
 
-Status Coordinator::UpdateStatus(const Status& status, const TUniqueId& instance_id,
-    const string& instance_hostname) {
+  Status Coordinator::UpdateStatus(const Status& status, const string& backend_hostname,
+     bool is_fragment_failure, const TUniqueId& instance_id) {
   {
     lock_guard<mutex> l(lock_);
 
@@ -490,10 +490,14 @@ Status Coordinator::UpdateStatus(const Status& status, const TUniqueId&
instance
     CancelInternal();
   }
 
-  // Log the id of the fragment that first failed so we can track it down more easily.
-  VLOG_QUERY << "Query id=" << query_id() << " failed because instance
id="
-             << instance_id << " on host=" << instance_hostname <<
" failed.";
-
+  if (is_fragment_failure) {
+    // Log the id of the fragment that first failed so we can track it down more easily.
+    VLOG_QUERY << "Query id=" << query_id() << " failed because instance
id="
+               << instance_id << " on host=" << backend_hostname <<
" failed.";
+  } else {
+    VLOG_QUERY << "Query id=" << query_id() << " failed due to error on
host="
+               << backend_hostname;
+  }
   return query_status_;
 }
 
@@ -822,8 +826,8 @@ Status Coordinator::Wait() {
 
   if (stmt_type_ == TStmtType::QUERY) {
     DCHECK(coord_instance_ != nullptr);
-    return UpdateStatus(coord_instance_->WaitForOpen(),
-        runtime_state()->fragment_instance_id(), FLAGS_hostname);
+    return UpdateStatus(coord_instance_->WaitForOpen(), FLAGS_hostname, true,
+        runtime_state()->fragment_instance_id());
   }
 
   DCHECK_EQ(stmt_type_, TStmtType::DML);
@@ -867,8 +871,8 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool*
eos) {
   // if there was an error, we need to return the query's error status rather than
   // the status we just got back from the local executor (which may well be CANCELLED
   // in that case).  Coordinator fragment failed in this case so we log the query_id.
-  RETURN_IF_ERROR(
-      UpdateStatus(status, runtime_state()->fragment_instance_id(), FLAGS_hostname));
+  RETURN_IF_ERROR(UpdateStatus(status, FLAGS_hostname, true,
+      runtime_state()->fragment_instance_id()));
 
   if (*eos) {
     returned_all_results_ = true;
@@ -950,11 +954,13 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams&
param
     // true (UpdateStatus() initiates cancellation, if it hasn't already been)
     // TODO: clarify control flow here, it's unclear we should even process this status
     // report if returned_all_results_ is true
+    bool is_fragment_failure;
     TUniqueId failed_instance_id;
-    Status status = backend_state->GetStatus(&failed_instance_id);
+    Status status = backend_state->GetStatus(&is_fragment_failure, &failed_instance_id);
     if (!status.ok() && !returned_all_results_) {
-      Status ignored = UpdateStatus(status, failed_instance_id,
-          TNetworkAddressToString(backend_state->impalad_address()));
+      Status ignored =
+          UpdateStatus(status, TNetworkAddressToString(backend_state->impalad_address()),
+              is_fragment_failure, failed_instance_id);
       return Status::OK();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 4edef88..e67ef13 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -363,12 +363,17 @@ class Coordinator { // NOLINT: The member variables could be re-ordered
to save
   void CancelInternal();
 
   /// Acquires lock_ and updates query_status_ with 'status' if it's not already
-  /// an error status, and returns the current query_status_.
+  /// an error status, and returns the current query_status_. The status may be
+  /// due to an error in a specific fragment instance, or it can be a general error
+  /// not tied to a specific fragment instance.
   /// Calls CancelInternal() when switching to an error status.
-  /// failed_fragment is the fragment_id that has failed, used for error reporting along
-  /// with instance_hostname.
-  Status UpdateStatus(const Status& status, const TUniqueId& failed_fragment,
-      const std::string& instance_hostname) WARN_UNUSED_RESULT;
+  /// When an error is due to a specific fragment instance, 'is_fragment_failure' must
+  /// be true and 'failed_fragment' is the fragment_id that has failed, used for error
+  /// reporting. For a general error not tied to a specific instance,
+  /// 'is_fragment_failure' must be false and 'failed_fragment' will be ignored.
+  /// 'backend_hostname' is used for error reporting in either case.
+  Status UpdateStatus(const Status& status, const std::string& backend_hostname,
+      bool is_fragment_failure, const TUniqueId& failed_fragment) WARN_UNUSED_RESULT;
 
   /// Update per_partition_status_ and files_to_move_.
   void UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 5ac4998..4311e27 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -207,6 +207,7 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
   params.__set_query_id(query_ctx().query_id);
   DCHECK(rpc_params().__isset.coord_state_idx);
   params.__set_coord_state_idx(rpc_params().coord_state_idx);
+  status.SetTStatus(&params);
 
   if (fis != nullptr) {
     // create status for 'fis'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 39df289..db6ffbb 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -624,6 +624,18 @@ struct TReportExecStatusParams {
   // New errors that have not been reported to the coordinator by any of the
   // instances included in instance_exec_status
   6: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log;
+
+  // Cumulative status for this backend. A backend can have an error from a specific
+  // fragment instance, or it can have a general error that is independent of any
+  // individual fragment. If reporting a single error, this status is always set to
+  // the error being reported. If reporting multiple errors, the status is set by the
+  // following rules:
+  // 1. A general error takes precedence over any fragment instance error.
+  // 2. Any fragment instance error takes precedence over any cancelled status.
+  // 3. If multiple fragments have errors, prefer the error that comes first in the
+  // 'instance_exec_status' list.
+  // This status is only OK if all fragment instances included are OK.
+  7: optional Status.TStatus status;
 }
 
 struct TReportExecStatusResult {


Mime
View raw message