impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l.@apache.org
Subject [05/13] incubator-impala git commit: IMPALA-2550: Switch to per-query exec rpc
Date Tue, 09 May 2017 15:55:59 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 51a04f2..b428512 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -38,7 +38,7 @@
 #include "runtime/raw-value.h"
 #include "runtime/exec-env.h"
 #include "service/hs2-util.h"
-#include "service/query-exec-state.h"
+#include "service/client-request-state.h"
 #include "service/query-options.h"
 #include "service/query-result-set.h"
 #include "util/debug-util.h"
@@ -133,7 +133,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
   session->ToThrift(session_id, &query_ctx.session);
   request->__set_session(query_ctx.session);
 
-  shared_ptr<QueryExecState> exec_state;
+  shared_ptr<ClientRequestState> request_state;
   // There is no user-supplied query text available because this metadata operation comes
   // from an RPC. As a best effort, we use the type of the operation.
   map<int, const char*>::const_iterator query_text_it =
@@ -141,9 +141,9 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
   const string& query_text = query_text_it == _TMetadataOpcode_VALUES_TO_NAMES.end() ?
       "N/A" : query_text_it->second;
   query_ctx.client_request.stmt = query_text;
-  exec_state.reset(new QueryExecState(query_ctx, exec_env_,
+  request_state.reset(new ClientRequestState(query_ctx, exec_env_,
       exec_env_->frontend(), this, session));
-  Status register_status = RegisterQuery(session, exec_state);
+  Status register_status = RegisterQuery(session, request_state);
   if (!register_status.ok()) {
     status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
     status->__set_errorMessage(register_status.GetDetail());
@@ -151,20 +151,20 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
     return;
   }
 
-  Status exec_status = exec_state->Exec(*request);
+  Status exec_status = request_state->Exec(*request);
   if (!exec_status.ok()) {
-    UnregisterQuery(exec_state->query_id(), false, &exec_status);
+    (void) UnregisterQuery(request_state->query_id(), false, &exec_status);
     status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
     status->__set_errorMessage(exec_status.GetDetail());
     status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
     return;
   }
 
-  exec_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
+  request_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
 
-  Status inflight_status = SetQueryInflight(session, exec_state);
+  Status inflight_status = SetQueryInflight(session, request_state);
   if (!inflight_status.ok()) {
-    UnregisterQuery(exec_state->query_id(), false, &inflight_status);
+    (void) UnregisterQuery(request_state->query_id(), false, &inflight_status);
     status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
     status->__set_errorMessage(inflight_status.GetDetail());
     status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
@@ -172,55 +172,56 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
   }
   handle->__set_hasResultSet(true);
   // TODO: create secret for operationId
-  TUniqueId operation_id = exec_state->query_id();
+  TUniqueId operation_id = request_state->query_id();
   TUniqueIdToTHandleIdentifier(operation_id, operation_id, &(handle->operationId));
   status->__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
 }
 
 Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size,
     bool fetch_first, TFetchResultsResp* fetch_results) {
-  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (UNLIKELY(exec_state == nullptr)) {
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false);
+  if (UNLIKELY(request_state == nullptr)) {
     return Status(Substitute("Invalid query handle: $0", PrintId(query_id)));
   }
 
   // FetchResults doesn't have an associated session handle, so we presume that this
   // request should keep alive the same session that orignated the query.
   ScopedSessionState session_handle(this);
-  const TUniqueId session_id = exec_state->session_id();
+  const TUniqueId session_id = request_state->session_id();
   shared_ptr<SessionState> session;
   RETURN_IF_ERROR(session_handle.WithSession(session_id, &session));
 
-  // Make sure QueryExecState::Wait() has completed before fetching rows. Wait() ensures
-  // that rows are ready to be fetched (e.g., Wait() opens QueryExecState::output_exprs_,
-  // which are evaluated in QueryExecState::FetchRows() below).
-  exec_state->BlockOnWait();
+  // Make sure ClientRequestState::Wait() has completed before fetching rows. Wait()
+  // ensures that rows are ready to be fetched (e.g., Wait() opens
+  // ClientRequestState::output_exprs_, which are evaluated in
+  // ClientRequestState::FetchRows() below).
+  request_state->BlockOnWait();
 
-  lock_guard<mutex> frl(*exec_state->fetch_rows_lock());
-  lock_guard<mutex> l(*exec_state->lock());
+  lock_guard<mutex> frl(*request_state->fetch_rows_lock());
+  lock_guard<mutex> l(*request_state->lock());
 
   // Check for cancellation or an error.
-  RETURN_IF_ERROR(exec_state->query_status());
+  RETURN_IF_ERROR(request_state->query_status());
 
-  if (exec_state->num_rows_fetched() == 0) {
-    exec_state->query_events()->MarkEvent("First row fetched");
-    exec_state->set_fetched_rows();
+  if (request_state->num_rows_fetched() == 0) {
+    request_state->query_events()->MarkEvent("First row fetched");
+    request_state->set_fetched_rows();
   }
 
-  if (fetch_first) RETURN_IF_ERROR(exec_state->RestartFetch());
+  if (fetch_first) RETURN_IF_ERROR(request_state->RestartFetch());
 
-  fetch_results->results.__set_startRowOffset(exec_state->num_rows_fetched());
+  fetch_results->results.__set_startRowOffset(request_state->num_rows_fetched());
 
   // Child queries should always return their results in row-major format, rather than
   // inheriting the parent session's setting.
-  bool is_child_query = exec_state->parent_query_id() != TUniqueId();
+  bool is_child_query = request_state->parent_query_id() != TUniqueId();
   TProtocolVersion::type version = is_child_query ?
       TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1 : session->hs2_version;
   scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateHS2ResultSet(
-      version, *(exec_state->result_metadata()), &(fetch_results->results)));
-  RETURN_IF_ERROR(exec_state->FetchRows(fetch_size, result_set.get()));
+      version, *(request_state->result_metadata()), &(fetch_results->results)));
+  RETURN_IF_ERROR(request_state->FetchRows(fetch_size, result_set.get()));
   fetch_results->__isset.results = true;
-  fetch_results->__set_hasMoreRows(!exec_state->eos());
+  fetch_results->__set_hasMoreRows(!request_state->eos());
   return Status::OK();
 }
 
@@ -451,36 +452,36 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
     }
   }
 
-  shared_ptr<QueryExecState> exec_state;
-  status = Execute(&query_ctx, session, &exec_state);
+  shared_ptr<ClientRequestState> request_state;
+  status = Execute(&query_ctx, session, &request_state);
   HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
 
-  // Optionally enable result caching on the QueryExecState.
+  // Optionally enable result caching on the ClientRequestState.
   if (cache_num_rows > 0) {
-    status = exec_state->SetResultCache(
+    status = request_state->SetResultCache(
         QueryResultSet::CreateHS2ResultSet(
-            session->hs2_version, *exec_state->result_metadata(), nullptr),
+            session->hs2_version, *request_state->result_metadata(), nullptr),
         cache_num_rows);
     if (!status.ok()) {
-      UnregisterQuery(exec_state->query_id(), false, &status);
+      (void) UnregisterQuery(request_state->query_id(), false, &status);
       HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
     }
   }
-  exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
+  request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
   // Start thread to wait for results to become available.
-  exec_state->WaitAsync();
+  request_state->WaitAsync();
   // Once the query is running do a final check for session closure and add it to the
   // set of in-flight queries.
-  status = SetQueryInflight(session, exec_state);
+  status = SetQueryInflight(session, request_state);
   if (!status.ok()) {
-    UnregisterQuery(exec_state->query_id(), false, &status);
+    (void) UnregisterQuery(request_state->query_id(), false, &status);
     HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
   return_val.__isset.operationHandle = true;
   return_val.operationHandle.__set_operationType(TOperationType::EXECUTE_STATEMENT);
-  return_val.operationHandle.__set_hasResultSet(exec_state->returns_result_set());
-  // TODO: create secret for operationId and store the secret in exec_state
-  TUniqueIdToTHandleIdentifier(exec_state->query_id(), exec_state->query_id(),
+  return_val.operationHandle.__set_hasResultSet(request_state->returns_result_set());
+  // TODO: create secret for operationId and store the secret in request_state
+  TUniqueIdToTHandleIdentifier(request_state->query_id(), request_state->query_id(),
                                &return_val.operationHandle.operationId);
   return_val.status.__set_statusCode(
       apache::hive::service::cli::thrift::TStatusCode::SUCCESS_STATUS);
@@ -633,30 +634,30 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
       request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
   VLOG_ROW << "GetOperationStatus(): query_id=" << PrintId(query_id);
 
-  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (UNLIKELY(exec_state.get() == nullptr)) {
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false);
+  if (UNLIKELY(request_state.get() == nullptr)) {
     // No handle was found
     HS2_RETURN_ERROR(return_val,
       Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR);
   }
 
   ScopedSessionState session_handle(this);
-  const TUniqueId session_id = exec_state->session_id();
+  const TUniqueId session_id = request_state->session_id();
   shared_ptr<SessionState> session;
   HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id, &session),
       SQLSTATE_GENERAL_ERROR);
 
   {
-    lock_guard<mutex> l(*exec_state->lock());
+    lock_guard<mutex> l(*request_state->lock());
     TOperationState::type operation_state = QueryStateToTOperationState(
-        exec_state->query_state());
+        request_state->query_state());
     return_val.__set_operationState(operation_state);
     if (operation_state == TOperationState::ERROR_STATE) {
-      DCHECK(!exec_state->query_status().ok());
-      return_val.__set_errorMessage(exec_state->query_status().GetDetail());
+      DCHECK(!request_state->query_status().ok());
+      return_val.__set_errorMessage(request_state->query_status().GetDetail());
       return_val.__set_sqlState(SQLSTATE_GENERAL_ERROR);
     } else {
-      DCHECK(exec_state->query_status().ok());
+      DCHECK(request_state->query_status().ok());
     }
   }
 }
@@ -669,14 +670,14 @@ void ImpalaServer::CancelOperation(TCancelOperationResp& return_val,
       request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
   VLOG_QUERY << "CancelOperation(): query_id=" << PrintId(query_id);
 
-  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (UNLIKELY(exec_state.get() == nullptr)) {
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false);
+  if (UNLIKELY(request_state.get() == nullptr)) {
     // No handle was found
     HS2_RETURN_ERROR(return_val,
       Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR);
   }
   ScopedSessionState session_handle(this);
-  const TUniqueId session_id = exec_state->session_id();
+  const TUniqueId session_id = request_state->session_id();
   HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id),
       SQLSTATE_GENERAL_ERROR);
   HS2_RETURN_IF_ERROR(return_val, CancelInternal(query_id, true), SQLSTATE_GENERAL_ERROR);
@@ -691,17 +692,17 @@ void ImpalaServer::CloseOperation(TCloseOperationResp& return_val,
       request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
   VLOG_QUERY << "CloseOperation(): query_id=" << PrintId(query_id);
 
-  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (UNLIKELY(exec_state.get() == nullptr)) {
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false);
+  if (UNLIKELY(request_state.get() == nullptr)) {
     // No handle was found
     HS2_RETURN_ERROR(return_val,
       Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR);
   }
   ScopedSessionState session_handle(this);
-  const TUniqueId session_id = exec_state->session_id();
+  const TUniqueId session_id = request_state->session_id();
   HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id),
       SQLSTATE_GENERAL_ERROR);
-  // TODO: use timeout to get rid of unwanted exec_state.
+  // TODO: use timeout to get rid of unwanted request_state.
   HS2_RETURN_IF_ERROR(return_val, UnregisterQuery(query_id, true),
       SQLSTATE_GENERAL_ERROR);
   return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
@@ -730,19 +731,19 @@ void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val,
   HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id),
       SQLSTATE_GENERAL_ERROR);
 
-  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
-  if (UNLIKELY(exec_state.get() == nullptr)) {
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true);
+  if (UNLIKELY(request_state.get() == nullptr)) {
     VLOG_QUERY << "GetResultSetMetadata(): invalid query handle";
     // No handle was found
     HS2_RETURN_ERROR(return_val,
       Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR);
   }
   {
-    // make sure we release the lock on exec_state if we see any error
-    lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
+    // make sure we release the lock on request_state if we see any error
+    lock_guard<mutex> l(*request_state->lock(), adopt_lock_t());
 
     // Convert TResultSetMetadata to TGetResultSetMetadataResp
-    const TResultSetMetadata* result_set_md = exec_state->result_metadata();
+    const TResultSetMetadata* result_set_md = request_state->result_metadata();
     DCHECK(result_set_md != NULL);
     if (result_set_md->columns.size() > 0) {
       return_val.__isset.schema = true;
@@ -793,7 +794,7 @@ void ImpalaServer::FetchResults(TFetchResultsResp& return_val,
     if (status.IsRecoverableError()) {
       DCHECK(fetch_first);
     } else {
-      UnregisterQuery(query_id, false, &status);
+      (void) UnregisterQuery(query_id, false, &status);
     }
     HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
@@ -806,8 +807,8 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
   HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
       request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
 
-  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (UNLIKELY(exec_state.get() == nullptr)) {
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false);
+  if (UNLIKELY(request_state.get() == nullptr)) {
     // No handle was found
     HS2_RETURN_ERROR(return_val,
       Substitute("Invalid query handle: $0", PrintId(query_id)), SQLSTATE_GENERAL_ERROR);
@@ -816,20 +817,20 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
   // GetLog doesn't have an associated session handle, so we presume that this request
   // should keep alive the same session that orignated the query.
   ScopedSessionState session_handle(this);
-  const TUniqueId session_id = exec_state->session_id();
+  const TUniqueId session_id = request_state->session_id();
   HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id),
                       SQLSTATE_GENERAL_ERROR);
 
   stringstream ss;
-  if (exec_state->coord() != NULL) {
+  if (request_state->coord() != NULL) {
     // Report progress
-    ss << exec_state->coord()->progress().ToString() << "\n";
+    ss << request_state->coord()->progress().ToString() << "\n";
   }
   // Report analysis errors
-  ss << join(exec_state->GetAnalysisWarnings(), "\n");
-  if (exec_state->coord() != NULL) {
+  ss << join(request_state->GetAnalysisWarnings(), "\n");
+  if (request_state->coord() != NULL) {
     // Report execution errors
-    ss << exec_state->coord()->GetErrorLog();
+    ss << request_state->coord()->GetErrorLog();
   }
   return_val.log = ss.str();
   return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index b9f3382..6ccb6df 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -28,7 +28,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
 #include "service/impala-server.h"
-#include "service/query-exec-state.h"
+#include "service/client-request-state.h"
 #include "thrift/protocol/TDebugProtocol.h"
 #include "util/coding-util.h"
 #include "util/logging-support.h"
@@ -237,11 +237,11 @@ void ImpalaHttpHandler::QueryProfileEncodedHandler(const Webserver::ArgumentMap&
 
 void ImpalaHttpHandler::InflightQueryIdsHandler(const Webserver::ArgumentMap& args,
     Document* document) {
-  lock_guard<mutex> l(server_->query_exec_state_map_lock_);
+  lock_guard<mutex> l(server_->client_request_state_map_lock_);
   stringstream ss;
-  for (const ImpalaServer::QueryExecStateMap::value_type& exec_state:
-       server_->query_exec_state_map_) {
-    ss << exec_state.second->query_id() << "\n";
+  for (const ImpalaServer::ClientRequestStateMap::value_type& request_state:
+       server_->client_request_state_map_) {
+    ss << request_state.second->query_id() << "\n";
   }
   document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator());
   Value query_ids(ss.str().c_str(), document->GetAllocator());
@@ -361,11 +361,11 @@ void ImpalaHttpHandler::QueryStateHandler(const Webserver::ArgumentMap& args,
   set<ImpalaServer::QueryStateRecord, ImpalaServer::QueryStateRecordLessThan>
       sorted_query_records;
   {
-    lock_guard<mutex> l(server_->query_exec_state_map_lock_);
-    for (const ImpalaServer::QueryExecStateMap::value_type& exec_state:
-         server_->query_exec_state_map_) {
+    lock_guard<mutex> l(server_->client_request_state_map_lock_);
+    for (const ImpalaServer::ClientRequestStateMap::value_type& request_state:
+         server_->client_request_state_map_) {
       // TODO: Do this in the browser so that sorts on other keys are possible.
-      sorted_query_records.insert(ImpalaServer::QueryStateRecord(*exec_state.second));
+      sorted_query_records.insert(ImpalaServer::QueryStateRecord(*request_state.second));
     }
   }
 
@@ -708,27 +708,26 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include
 
   // Search the in-flight queries first, followed by the archived ones.
   {
-    shared_ptr<ImpalaServer::QueryExecState> exec_state =
-        server_->GetQueryExecState(query_id, true);
-    if (exec_state != NULL) {
+    shared_ptr<ClientRequestState> request_state =
+        server_->GetClientRequestState(query_id, true);
+    if (request_state != NULL) {
       found = true;
-      lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
-      if (exec_state->coord() == NULL) {
+      lock_guard<mutex> l(*request_state->lock(), adopt_lock_t());
+      if (request_state->coord() == NULL) {
         const string& err = Substitute("Invalid query id: $0", PrintId(query_id));
         Value json_error(err.c_str(), document->GetAllocator());
         document->AddMember("error", json_error, document->GetAllocator());
         return;
       }
-      query_status = exec_state->query_status();
-      stmt = exec_state->sql_stmt();
-      plan = exec_state->exec_request().query_exec_request.query_plan;
+      query_status = request_state->query_status();
+      stmt = request_state->sql_stmt();
+      plan = request_state->exec_request().query_exec_request.query_plan;
       if (include_json_plan || include_summary) {
-        lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
-        summary = exec_state->coord()->exec_summary();
+        request_state->coord()->GetTExecSummary(&summary);
       }
       if (include_json_plan) {
         for (const TPlanExecInfo& plan_exec_info:
-            exec_state->exec_request().query_exec_request.plan_exec_info) {
+            request_state->exec_request().query_exec_request.plan_exec_info) {
           for (const TPlanFragment& fragment: plan_exec_info.fragments) {
             fragments.push_back(fragment);
           }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-internal-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
index a9f7f01..d8a2a4a 100644
--- a/be/src/service/impala-internal-service.cc
+++ b/be/src/service/impala-internal-service.cc
@@ -39,12 +39,15 @@ ImpalaInternalService::ImpalaInternalService() {
   DCHECK(query_exec_mgr_ != nullptr);
 }
 
-void ImpalaInternalService::ExecPlanFragment(TExecPlanFragmentResult& return_val,
-    const TExecPlanFragmentParams& params) {
-  VLOG_QUERY << "ExecPlanFragment():"
-            << " instance_id=" << params.fragment_instance_ctx.fragment_instance_id;
-  FAULT_INJECTION_RPC_DELAY(RPC_EXECPLANFRAGMENT);
-  query_exec_mgr_->StartFInstance(params).SetTStatus(&return_val);
+void ImpalaInternalService::ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
+    const TExecQueryFInstancesParams& params) {
+  VLOG_QUERY << "ExecQueryFInstances():" << " query_id=" << params.query_ctx.query_id;
+  FAULT_INJECTION_RPC_DELAY(RPC_EXECQUERYFINSTANCES);
+  DCHECK(params.__isset.coord_state_idx);
+  DCHECK(params.__isset.query_ctx);
+  DCHECK(params.__isset.fragment_ctxs);
+  DCHECK(params.__isset.fragment_instance_ctxs);
+  query_exec_mgr_->StartQuery(params).SetTStatus(&return_val);
 }
 
 template <typename T> void SetUnknownIdError(
@@ -54,53 +57,54 @@ template <typename T> void SetUnknownIdError(
   status.SetTStatus(status_container);
 }
 
-void ImpalaInternalService::CancelPlanFragment(TCancelPlanFragmentResult& return_val,
-    const TCancelPlanFragmentParams& params) {
-  VLOG_QUERY << "CancelPlanFragment(): instance_id=" << params.fragment_instance_id;
-  FAULT_INJECTION_RPC_DELAY(RPC_CANCELPLANFRAGMENT);
-  QueryState::ScopedRef qs(GetQueryId(params.fragment_instance_id));
+void ImpalaInternalService::CancelQueryFInstances(
+    TCancelQueryFInstancesResult& return_val,
+    const TCancelQueryFInstancesParams& params) {
+  VLOG_QUERY << "CancelQueryFInstances(): query_id=" << params.query_id;
+  FAULT_INJECTION_RPC_DELAY(RPC_CANCELQUERYFINSTANCES);
+  DCHECK(params.__isset.query_id);
+  QueryState::ScopedRef qs(params.query_id);
   if (qs.get() == nullptr) {
-    SetUnknownIdError("query", GetQueryId(params.fragment_instance_id), &return_val);
+    SetUnknownIdError("query", params.query_id, &return_val);
     return;
   }
-  FragmentInstanceState* fis = qs->GetFInstanceState(params.fragment_instance_id);
-  if (fis == nullptr) {
-    SetUnknownIdError("instance", params.fragment_instance_id, &return_val);
-    return;
-  }
-  Status status = fis->Cancel();
-  status.SetTStatus(&return_val);
+  qs->Cancel();
 }
 
 void ImpalaInternalService::ReportExecStatus(TReportExecStatusResult& return_val,
     const TReportExecStatusParams& params) {
-  VLOG_QUERY << "ReportExecStatus(): instance_id=" << params.fragment_instance_id;
   FAULT_INJECTION_RPC_DELAY(RPC_REPORTEXECSTATUS);
+  DCHECK(params.__isset.query_id);
+  DCHECK(params.__isset.coord_state_idx);
   impala_server_->ReportExecStatus(return_val, params);
 }
 
 void ImpalaInternalService::TransmitData(TTransmitDataResult& return_val,
     const TTransmitDataParams& params) {
   FAULT_INJECTION_RPC_DELAY(RPC_TRANSMITDATA);
+  DCHECK(params.__isset.dest_fragment_instance_id);
+  DCHECK(params.__isset.sender_id);
+  DCHECK(params.__isset.dest_node_id);
   impala_server_->TransmitData(return_val, params);
 }
 
 void ImpalaInternalService::UpdateFilter(TUpdateFilterResult& return_val,
     const TUpdateFilterParams& params) {
-  VLOG_QUERY << "UpdateFilter(): filter=" << params.filter_id
-            << " query_id=" << PrintId(params.query_id);
   FAULT_INJECTION_RPC_DELAY(RPC_UPDATEFILTER);
+  DCHECK(params.__isset.filter_id);
+  DCHECK(params.__isset.query_id);
+  DCHECK(params.__isset.bloom_filter);
   impala_server_->UpdateFilter(return_val, params);
 }
 
 void ImpalaInternalService::PublishFilter(TPublishFilterResult& return_val,
     const TPublishFilterParams& params) {
-  VLOG_QUERY << "PublishFilter(): filter=" << params.filter_id
-            << " instance_id=" << PrintId(params.dst_instance_id);
   FAULT_INJECTION_RPC_DELAY(RPC_PUBLISHFILTER);
-  QueryState::ScopedRef qs(GetQueryId(params.dst_instance_id));
+  DCHECK(params.__isset.filter_id);
+  DCHECK(params.__isset.dst_query_id);
+  DCHECK(params.__isset.dst_fragment_idx);
+  DCHECK(params.__isset.bloom_filter);
+  QueryState::ScopedRef qs(params.dst_query_id);
   if (qs.get() == nullptr) return;
-  FragmentInstanceState* fis = qs->GetFInstanceState(params.dst_instance_id);
-  if (fis == nullptr) return;
-  fis->PublishFilter(params.filter_id, params.bloom_filter);
+  qs->PublishFilter(params.filter_id, params.dst_fragment_idx, params.bloom_filter);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-internal-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index 372d617..3285d9d 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -31,10 +31,10 @@ class QueryExecMgr;
 class ImpalaInternalService : public ImpalaInternalServiceIf {
  public:
   ImpalaInternalService();
-  virtual void ExecPlanFragment(TExecPlanFragmentResult& return_val,
-      const TExecPlanFragmentParams& params);
-  virtual void CancelPlanFragment(TCancelPlanFragmentResult& return_val,
-      const TCancelPlanFragmentParams& params);
+  virtual void ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
+      const TExecQueryFInstancesParams& params);
+  virtual void CancelQueryFInstances(TCancelQueryFInstancesResult& return_val,
+      const TCancelQueryFInstancesParams& params);
   virtual void ReportExecStatus(TReportExecStatusResult& return_val,
       const TReportExecStatusParams& params);
   virtual void TransmitData(TTransmitDataResult& return_val,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 6132918..1b2e8fe 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -57,7 +57,7 @@
 #include "scheduling/scheduler.h"
 #include "service/impala-http-handler.h"
 #include "service/impala-internal-service.h"
-#include "service/query-exec-state.h"
+#include "service/client-request-state.h"
 #include "util/bit-util.h"
 #include "util/container-util.h"
 #include "util/debug-util.h"
@@ -381,8 +381,8 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
   exec_env_->SetImpalaServer(this);
 }
 
-Status ImpalaServer::LogLineageRecord(const QueryExecState& query_exec_state) {
-  const TExecRequest& request = query_exec_state.exec_request();
+Status ImpalaServer::LogLineageRecord(const ClientRequestState& client_request_state) {
+  const TExecRequest& request = client_request_state.exec_request();
   if (!request.__isset.query_exec_request && !request.__isset.catalog_op_request) {
     return Status::OK();
   }
@@ -397,7 +397,7 @@ Status ImpalaServer::LogLineageRecord(const QueryExecState& query_exec_state) {
     return Status::OK();
   }
   // Set the query end time in TLineageGraph. Must use UNIX time directly rather than
-  // e.g. converting from query_exec_state.end_time() (IMPALA-4440).
+  // e.g. converting from client_request_state.end_time() (IMPALA-4440).
   lineage_graph.__set_ended(UnixMillis() / 1000);
   string lineage_record;
   LineageUtil::TLineageToJSON(lineage_graph, &lineage_record);
@@ -436,7 +436,7 @@ Status ImpalaServer::InitLineageLogging() {
   return Status::OK();
 }
 
-Status ImpalaServer::LogAuditRecord(const ImpalaServer::QueryExecState& exec_state,
+Status ImpalaServer::LogAuditRecord(const ClientRequestState& request_state,
     const TExecRequest& request) {
   stringstream ss;
   rapidjson::StringBuffer buffer;
@@ -448,24 +448,24 @@ Status ImpalaServer::LogAuditRecord(const ImpalaServer::QueryExecState& exec_sta
   writer.String(ss.str().c_str());
   writer.StartObject();
   writer.String("query_id");
-  writer.String(PrintId(exec_state.query_id()).c_str());
+  writer.String(PrintId(request_state.query_id()).c_str());
   writer.String("session_id");
-  writer.String(PrintId(exec_state.session_id()).c_str());
+  writer.String(PrintId(request_state.session_id()).c_str());
   writer.String("start_time");
-  writer.String(exec_state.start_time().DebugString().c_str());
+  writer.String(request_state.start_time().DebugString().c_str());
   writer.String("authorization_failure");
-  writer.Bool(Frontend::IsAuthorizationError(exec_state.query_status()));
+  writer.Bool(Frontend::IsAuthorizationError(request_state.query_status()));
   writer.String("status");
-  writer.String(exec_state.query_status().GetDetail().c_str());
+  writer.String(request_state.query_status().GetDetail().c_str());
   writer.String("user");
-  writer.String(exec_state.effective_user().c_str());
+  writer.String(request_state.effective_user().c_str());
   writer.String("impersonator");
-  if (exec_state.do_as_user().empty()) {
+  if (request_state.do_as_user().empty()) {
     // If there is no do_as_user() is empty, the "impersonator" field should be Null.
     writer.Null();
   } else {
     // Otherwise, the delegator is the current connected user.
-    writer.String(exec_state.connected_user().c_str());
+    writer.String(request_state.connected_user().c_str());
   }
   writer.String("statement_type");
   if (request.stmt_type == TStmtType::DDL) {
@@ -480,9 +480,9 @@ Status ImpalaServer::LogAuditRecord(const ImpalaServer::QueryExecState& exec_sta
   }
   writer.String("network_address");
   writer.String(
-      lexical_cast<string>(exec_state.session()->network_address).c_str());
+      lexical_cast<string>(request_state.session()->network_address).c_str());
   writer.String("sql_statement");
-  string stmt = replace_all_copy(exec_state.sql_stmt(), "\n", " ");
+  string stmt = replace_all_copy(request_state.sql_stmt(), "\n", " ");
   Redact(&stmt);
   writer.String(stmt.c_str());
   writer.String("catalog_objects");
@@ -528,14 +528,14 @@ Status ImpalaServer::InitAuditEventLogging() {
   return Status::OK();
 }
 
-void ImpalaServer::LogQueryEvents(const QueryExecState& exec_state) {
-  Status status = exec_state.query_status();
+void ImpalaServer::LogQueryEvents(const ClientRequestState& request_state) {
+  Status status = request_state.query_status();
   bool log_events = true;
-  switch (exec_state.stmt_type()) {
+  switch (request_state.stmt_type()) {
     case TStmtType::QUERY: {
       // If the query didn't finish, log audit and lineage events only if the
       // the client issued at least one fetch.
-      if (!status.ok() && !exec_state.fetched_rows()) log_events = false;
+      if (!status.ok() && !request_state.fetched_rows()) log_events = false;
       break;
     }
     case TStmtType::DML: {
@@ -543,13 +543,13 @@ void ImpalaServer::LogQueryEvents(const QueryExecState& exec_state) {
       break;
     }
     case TStmtType::DDL: {
-      if (exec_state.catalog_op_type() == TCatalogOpType::DDL) {
+      if (request_state.catalog_op_type() == TCatalogOpType::DDL) {
         // For a DDL operation, log audit and lineage events only if the
         // operation finished.
         if (!status.ok()) log_events = false;
       } else {
         // This case covers local catalog operations such as SHOW and DESCRIBE.
-        if (!status.ok() && !exec_state.fetched_rows()) log_events = false;
+        if (!status.ok() && !request_state.fetched_rows()) log_events = false;
       }
       break;
     }
@@ -561,11 +561,13 @@ void ImpalaServer::LogQueryEvents(const QueryExecState& exec_state) {
   }
   // Log audit events that are due to an AuthorizationException.
   if (IsAuditEventLoggingEnabled() &&
-      (Frontend::IsAuthorizationError(exec_state.query_status()) || log_events)) {
-    LogAuditRecord(exec_state, exec_state.exec_request());
+      (Frontend::IsAuthorizationError(request_state.query_status()) || log_events)) {
+    // TODO: deal with an error status
+    (void) LogAuditRecord(request_state, request_state.exec_request());
   }
   if (IsLineageLoggingEnabled() && log_events) {
-    LogLineageRecord(exec_state);
+    // TODO: deal with an error status
+    (void) LogLineageRecord(request_state);
   }
 }
 
@@ -592,13 +594,13 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id,
   DCHECK(output != nullptr);
   // Search for the query id in the active query map
   {
-    shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-    if (exec_state.get() != nullptr) {
-      lock_guard<mutex> l(*exec_state->lock());
+    shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false);
+    if (request_state.get() != nullptr) {
+      lock_guard<mutex> l(*request_state->lock());
       if (base64_encoded) {
-        exec_state->profile().SerializeToArchiveString(output);
+        request_state->profile().SerializeToArchiveString(output);
       } else {
-        exec_state->profile().PrettyPrint(output);
+        request_state->profile().PrettyPrint(output);
       }
       return Status::OK();
     }
@@ -625,20 +627,17 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id,
 Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, TExecSummary* result) {
   // Search for the query id in the active query map.
   {
-    shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
-    if (exec_state != nullptr) {
-      lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
-      if (exec_state->coord() != nullptr) {
+    shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true);
+    if (request_state != nullptr) {
+      lock_guard<mutex> l(*request_state->lock(), adopt_lock_t());
+      if (request_state->coord() != nullptr) {
+        request_state->coord()->GetTExecSummary(result);
+
         TExecProgress progress;
-        {
-          lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
-          *result = exec_state->coord()->exec_summary();
-
-          // Update the current scan range progress for the summary.
-          progress.__set_num_completed_scan_ranges(
-              exec_state->coord()->progress().num_complete());
-          progress.__set_total_scan_ranges(exec_state->coord()->progress().total());
-        }
+        progress.__set_num_completed_scan_ranges(
+            request_state->coord()->progress().num_complete());
+        progress.__set_total_scan_ranges(request_state->coord()->progress().total());
+        // TODO: does this not need to be synchronized?
         result->__set_progress(progress);
         return Status::OK();
       }
@@ -694,7 +693,7 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, TExecSummary* res
   }
 }
 
-void ImpalaServer::ArchiveQuery(const QueryExecState& query) {
+void ImpalaServer::ArchiveQuery(const ClientRequestState& query) {
   const string& encoded_profile_str = query.profile().SerializeToArchiveString();
 
   // If there was an error initialising archival (e.g. directory is not writeable),
@@ -714,10 +713,7 @@ void ImpalaServer::ArchiveQuery(const QueryExecState& query) {
 
   if (FLAGS_query_log_size == 0) return;
   QueryStateRecord record(query, true, encoded_profile_str);
-  if (query.coord() != nullptr) {
-    lock_guard<SpinLock> lock(query.coord()->GetExecSummaryLock());
-    record.exec_summary = query.coord()->exec_summary();
-  }
+  if (query.coord() != nullptr) query.coord()->GetTExecSummary(&record.exec_summary);
   {
     lock_guard<mutex> l(query_log_lock_);
     // Add record to the beginning of the log, and to the lookup index.
@@ -775,7 +771,7 @@ void ImpalaServer::AddPoolQueryOptions(TQueryCtx* ctx,
 
 Status ImpalaServer::Execute(TQueryCtx* query_ctx,
     shared_ptr<SessionState> session_state,
-    shared_ptr<QueryExecState>* exec_state) {
+    shared_ptr<ClientRequestState>* request_state) {
   PrepareQueryContext(query_ctx);
   ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);
 
@@ -784,11 +780,11 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx,
   Redact(&stmt);
   query_ctx->client_request.__set_redacted_stmt((const string) stmt);
 
-  bool registered_exec_state;
-  Status status = ExecuteInternal(*query_ctx, session_state, &registered_exec_state,
-      exec_state);
-  if (!status.ok() && registered_exec_state) {
-    UnregisterQuery((*exec_state)->query_id(), false, &status);
+  bool registered_request_state;
+  Status status = ExecuteInternal(*query_ctx, session_state, &registered_request_state,
+      request_state);
+  if (!status.ok() && registered_request_state) {
+    (void) UnregisterQuery((*request_state)->query_id(), false, &status);
   }
   return status;
 }
@@ -796,50 +792,50 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx,
 Status ImpalaServer::ExecuteInternal(
     const TQueryCtx& query_ctx,
     shared_ptr<SessionState> session_state,
-    bool* registered_exec_state,
-    shared_ptr<QueryExecState>* exec_state) {
+    bool* registered_request_state,
+    shared_ptr<ClientRequestState>* request_state) {
   DCHECK(session_state != nullptr);
-  *registered_exec_state = false;
+  *registered_request_state = false;
 
-  exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(),
+  request_state->reset(new ClientRequestState(query_ctx, exec_env_, exec_env_->frontend(),
       this, session_state));
 
-  (*exec_state)->query_events()->MarkEvent("Query submitted");
+  (*request_state)->query_events()->MarkEvent("Query submitted");
 
   TExecRequest result;
   {
-    // Keep a lock on exec_state so that registration and setting
+    // Keep a lock on request_state so that registration and setting
     // result_metadata are atomic.
     //
-    // Note: this acquires the exec_state lock *before* the
-    // query_exec_state_map_ lock. This is the opposite of
-    // GetQueryExecState(..., true), and therefore looks like a
+    // Note: this acquires the request_state lock *before* the
+    // client_request_state_map_ lock. This is the opposite of
+    // GetClientRequestState(..., true), and therefore looks like a
     // candidate for deadlock. The reason this works here is that
-    // GetQueryExecState cannot find exec_state (under the exec state
+    // GetClientRequestState cannot find request_state (under the exec state
     // map lock) and take it's lock until RegisterQuery has
     // finished. By that point, the exec state map lock will have been
     // given up, so the classic deadlock interleaving is not possible.
-    lock_guard<mutex> l(*(*exec_state)->lock());
+    lock_guard<mutex> l(*(*request_state)->lock());
 
     // register exec state as early as possible so that queries that
     // take a long time to plan show up, and to handle incoming status
     // reports before execution starts.
-    RETURN_IF_ERROR(RegisterQuery(session_state, *exec_state));
-    *registered_exec_state = true;
+    RETURN_IF_ERROR(RegisterQuery(session_state, *request_state));
+    *registered_request_state = true;
 
-    RETURN_IF_ERROR((*exec_state)->UpdateQueryStatus(
+    RETURN_IF_ERROR((*request_state)->UpdateQueryStatus(
         exec_env_->frontend()->GetExecRequest(query_ctx, &result)));
-    (*exec_state)->query_events()->MarkEvent("Planning finished");
-    (*exec_state)->summary_profile()->AddEventSequence(
+    (*request_state)->query_events()->MarkEvent("Planning finished");
+    (*request_state)->summary_profile()->AddEventSequence(
         result.timeline.name, result.timeline);
     if (result.__isset.result_set_metadata) {
-      (*exec_state)->set_result_metadata(result.result_set_metadata);
+      (*request_state)->set_result_metadata(result.result_set_metadata);
     }
   }
   VLOG(2) << "Execution request: " << ThriftDebugString(result);
 
   // start execution of query; also starts fragment status reports
-  RETURN_IF_ERROR((*exec_state)->Exec(&result));
+  RETURN_IF_ERROR((*request_state)->Exec(&result));
   if (result.stmt_type == TStmtType::DDL) {
     Status status = UpdateCatalogMetrics();
     if (!status.ok()) {
@@ -847,13 +843,13 @@ Status ImpalaServer::ExecuteInternal(
     }
   }
 
-  if ((*exec_state)->coord() != nullptr) {
+  if ((*request_state)->coord() != nullptr) {
     const unordered_set<TNetworkAddress>& unique_hosts =
-        (*exec_state)->schedule()->unique_hosts();
+        (*request_state)->schedule()->unique_hosts();
     if (!unique_hosts.empty()) {
       lock_guard<mutex> l(query_locations_lock_);
       for (const TNetworkAddress& port: unique_hosts) {
-        query_locations_[port].insert((*exec_state)->query_id());
+        query_locations_[port].insert((*request_state)->query_id());
       }
     }
   }
@@ -874,32 +870,32 @@ void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) {
 }
 
 Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
-    const shared_ptr<QueryExecState>& exec_state) {
+    const shared_ptr<ClientRequestState>& request_state) {
   lock_guard<mutex> l2(session_state->lock);
   // The session wasn't expired at the time it was checked out and it isn't allowed to
   // expire while checked out, so it must not be expired.
   DCHECK(session_state->ref_count > 0 && !session_state->expired);
   // The session may have been closed after it was checked out.
   if (session_state->closed) return Status("Session has been closed, ignoring query.");
-  const TUniqueId& query_id = exec_state->query_id();
+  const TUniqueId& query_id = request_state->query_id();
   {
-    lock_guard<mutex> l(query_exec_state_map_lock_);
-    QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id);
-    if (entry != query_exec_state_map_.end()) {
+    lock_guard<mutex> l(client_request_state_map_lock_);
+    ClientRequestStateMap::iterator entry = client_request_state_map_.find(query_id);
+    if (entry != client_request_state_map_.end()) {
       // There shouldn't be an active query with that same id.
       // (query_id is globally unique)
       stringstream ss;
       ss << "query id " << PrintId(query_id) << " already exists";
       return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, ss.str()));
     }
-    query_exec_state_map_.insert(make_pair(query_id, exec_state));
+    client_request_state_map_.insert(make_pair(query_id, request_state));
   }
   return Status::OK();
 }
 
 Status ImpalaServer::SetQueryInflight(shared_ptr<SessionState> session_state,
-    const shared_ptr<QueryExecState>& exec_state) {
-  const TUniqueId& query_id = exec_state->query_id();
+    const shared_ptr<ClientRequestState>& request_state) {
+  const TUniqueId& query_id = request_state->query_id();
   lock_guard<mutex> l(session_state->lock);
   // The session wasn't expired at the time it was checked out and it isn't allowed to
   // expire while checked out, so it must not be expired.
@@ -911,7 +907,7 @@ Status ImpalaServer::SetQueryInflight(shared_ptr<SessionState> session_state,
   session_state->inflight_queries.insert(query_id);
   ++session_state->total_queries;
   // Set query expiration.
-  int32_t timeout_s = exec_state->query_options().query_timeout_s;
+  int32_t timeout_s = request_state->query_options().query_timeout_s;
   if (FLAGS_idle_query_timeout > 0 && timeout_s > 0) {
     timeout_s = min(FLAGS_idle_query_timeout, timeout_s);
   } else {
@@ -935,55 +931,52 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
 
   RETURN_IF_ERROR(CancelInternal(query_id, check_inflight, cause));
 
-  shared_ptr<QueryExecState> exec_state;
+  shared_ptr<ClientRequestState> request_state;
   {
-    lock_guard<mutex> l(query_exec_state_map_lock_);
-    QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id);
-    if (entry == query_exec_state_map_.end()) {
+    lock_guard<mutex> l(client_request_state_map_lock_);
+    ClientRequestStateMap::iterator entry = client_request_state_map_.find(query_id);
+    if (entry == client_request_state_map_.end()) {
       return Status("Invalid or unknown query handle");
     } else {
-      exec_state = entry->second;
+      request_state = entry->second;
     }
-    query_exec_state_map_.erase(entry);
+    client_request_state_map_.erase(entry);
   }
 
-  exec_state->Done();
+  request_state->Done();
 
   double ut_end_time, ut_start_time;
   double duration_ms = 0.0;
-  if (LIKELY(exec_state->end_time().ToSubsecondUnixTime(&ut_end_time))
-      && LIKELY(exec_state->start_time().ToSubsecondUnixTime(&ut_start_time))) {
+  if (LIKELY(request_state->end_time().ToSubsecondUnixTime(&ut_end_time))
+      && LIKELY(request_state->start_time().ToSubsecondUnixTime(&ut_start_time))) {
     duration_ms = 1000 * (ut_end_time - ut_start_time);
   }
 
   // duration_ms can be negative when the local timezone changes during query execution.
   if (duration_ms >= 0) {
-    if (exec_state->stmt_type() == TStmtType::DDL) {
+    if (request_state->stmt_type() == TStmtType::DDL) {
       ImpaladMetrics::DDL_DURATIONS->Update(duration_ms);
     } else {
       ImpaladMetrics::QUERY_DURATIONS->Update(duration_ms);
     }
   }
-  LogQueryEvents(*exec_state.get());
+  LogQueryEvents(*request_state.get());
 
   {
-    lock_guard<mutex> l(exec_state->session()->lock);
-    exec_state->session()->inflight_queries.erase(query_id);
+    lock_guard<mutex> l(request_state->session()->lock);
+    request_state->session()->inflight_queries.erase(query_id);
   }
 
-  if (exec_state->coord() != nullptr) {
-    string exec_summary;
-    {
-      lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
-      const TExecSummary& summary = exec_state->coord()->exec_summary();
-      exec_summary = PrintExecSummary(summary);
-    }
-    exec_state->summary_profile()->AddInfoString("ExecSummary", exec_summary);
-    exec_state->summary_profile()->AddInfoString("Errors",
-        exec_state->coord()->GetErrorLog());
+  if (request_state->coord() != nullptr) {
+    TExecSummary t_exec_summary;
+    request_state->coord()->GetTExecSummary(&t_exec_summary);
+    string exec_summary = PrintExecSummary(t_exec_summary);
+    request_state->summary_profile()->AddInfoString("ExecSummary", exec_summary);
+    request_state->summary_profile()->AddInfoString("Errors",
+        request_state->coord()->GetErrorLog());
 
     const unordered_set<TNetworkAddress>& unique_hosts =
-        exec_state->schedule()->unique_hosts();
+        request_state->schedule()->unique_hosts();
     if (!unique_hosts.empty()) {
       lock_guard<mutex> l(query_locations_lock_);
       for (const TNetworkAddress& hostport: unique_hosts) {
@@ -993,12 +986,12 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
         // thing. They will harmlessly race to remove the query from this map.
         QueryLocations::iterator it = query_locations_.find(hostport);
         if (it != query_locations_.end()) {
-          it->second.erase(exec_state->query_id());
+          it->second.erase(request_state->query_id());
         }
       }
     }
   }
-  ArchiveQuery(*exec_state);
+  ArchiveQuery(*request_state);
   return Status::OK();
 }
 
@@ -1020,9 +1013,9 @@ Status ImpalaServer::UpdateCatalogMetrics() {
 Status ImpalaServer::CancelInternal(const TUniqueId& query_id, bool check_inflight,
     const Status* cause) {
   VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id);
-  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (exec_state == nullptr) return Status("Invalid or unknown query handle");
-  exec_state->Cancel(check_inflight, cause);
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false);
+  if (request_state == nullptr) return Status("Invalid or unknown query handle");
+  RETURN_IF_ERROR(request_state->Cancel(check_inflight, cause));
   return Status::OK();
 }
 
@@ -1061,7 +1054,8 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
   // Unregister all open queries from this session.
   Status status("Session closed");
   for (const TUniqueId& query_id: inflight_queries) {
-    UnregisterQuery(query_id, false, &status);
+    // TODO: deal with an error status
+    (void) UnregisterQuery(query_id, false, &status);
   }
   // Reconfigure the poll period of session_timeout_thread_ if necessary.
   int32_t session_timeout = session_state->session_timeout;
@@ -1102,25 +1096,23 @@ Status ImpalaServer::GetSessionState(const TUniqueId& session_id,
 
 void ImpalaServer::ReportExecStatus(
     TReportExecStatusResult& return_val, const TReportExecStatusParams& params) {
-  VLOG_FILE << "ReportExecStatus()"
-            << " instance_id=" << PrintId(params.fragment_instance_id)
-            << " done=" << (params.done ? "true" : "false");
+  VLOG_FILE << "ReportExecStatus() coord_state_idx=" << params.coord_state_idx;
   // TODO: implement something more efficient here, we're currently
   // acquiring/releasing the map lock and doing a map lookup for
   // every report (assign each query a local int32_t id and use that to index into a
-  // vector of QueryExecStates, w/o lookup or locking?)
-  shared_ptr<QueryExecState> exec_state = GetQueryExecState(params.query_id, false);
-  if (exec_state.get() == nullptr) {
+  // vector of ClientRequestStates, w/o lookup or locking?)
+  shared_ptr<ClientRequestState> request_state =
+      GetClientRequestState(params.query_id, false);
+  if (request_state.get() == nullptr) {
     // This is expected occasionally (since a report RPC might be in flight while
     // cancellation is happening). Return an error to the caller to get it to stop.
     const string& err = Substitute("ReportExecStatus(): Received report for unknown "
-        "query ID (probably closed or cancelled). (instance: $0 done: $1)",
-        PrintId(params.fragment_instance_id), params.done);
+        "query ID (probably closed or cancelled): $0", PrintId(params.query_id));
     Status(TErrorCode::INTERNAL_ERROR, err).SetTStatus(&return_val);
-    VLOG_QUERY << err;
+    //VLOG_QUERY << err;
     return;
   }
-  exec_state->coord()->UpdateFragmentExecStatus(params).SetTStatus(&return_val);
+  request_state->coord()->UpdateBackendExecStatus(params).SetTStatus(&return_val);
 }
 
 void ImpalaServer::TransmitData(
@@ -1365,7 +1357,8 @@ void ImpalaServer::CatalogUpdateCallback(
         catalog_update_info_.catalog_service_id = resp.catalog_service_id;
       }
       ImpaladMetrics::CATALOG_READY->set_value(new_catalog_version > 0);
-      UpdateCatalogMetrics();
+      // TODO: deal with an error status
+      (void) UpdateCatalogMetrics();
       // Remove all dropped objects from the library cache.
       // TODO: is this expensive? We'd like to process heartbeats promptly.
       for (TCatalogObject& object: dropped_objects) {
@@ -1629,39 +1622,39 @@ void ImpalaServer::AddLocalBackendToStatestore(
   }
 }
 
-ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_state,
+ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& request_state,
     bool copy_profile, const string& encoded_profile) {
-  id = exec_state.query_id();
-  const TExecRequest& request = exec_state.exec_request();
+  id = request_state.query_id();
+  const TExecRequest& request = request_state.exec_request();
 
-  const string* plan_str = exec_state.summary_profile().GetInfoString("Plan");
+  const string* plan_str = request_state.summary_profile().GetInfoString("Plan");
   if (plan_str != nullptr) plan = *plan_str;
-  stmt = exec_state.sql_stmt();
+  stmt = request_state.sql_stmt();
   stmt_type = request.stmt_type;
-  effective_user = exec_state.effective_user();
-  default_db = exec_state.default_db();
-  start_time = exec_state.start_time();
-  end_time = exec_state.end_time();
+  effective_user = request_state.effective_user();
+  default_db = request_state.default_db();
+  start_time = request_state.start_time();
+  end_time = request_state.end_time();
   has_coord = false;
 
-  Coordinator* coord = exec_state.coord();
+  Coordinator* coord = request_state.coord();
   if (coord != nullptr) {
     num_complete_fragments = coord->progress().num_complete();
     total_fragments = coord->progress().total();
     has_coord = true;
   }
-  query_state = exec_state.query_state();
-  num_rows_fetched = exec_state.num_rows_fetched();
-  query_status = exec_state.query_status();
+  query_state = request_state.query_state();
+  num_rows_fetched = request_state.num_rows_fetched();
+  query_status = request_state.query_status();
 
-  exec_state.query_events()->ToThrift(&event_sequence);
+  request_state.query_events()->ToThrift(&event_sequence);
 
   if (copy_profile) {
     stringstream ss;
-    exec_state.profile().PrettyPrint(&ss);
+    request_state.profile().PrettyPrint(&ss);
     profile_str = ss.str();
     if (encoded_profile.empty()) {
-      encoded_profile_str = exec_state.profile().SerializeToArchiveString();
+      encoded_profile_str = request_state.profile().SerializeToArchiveString();
     } else {
       encoded_profile_str = encoded_profile;
     }
@@ -1669,13 +1662,13 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_stat
 
   // Save the query fragments so that the plan can be visualised.
   for (const TPlanExecInfo& plan_exec_info:
-      exec_state.exec_request().query_exec_request.plan_exec_info) {
+      request_state.exec_request().query_exec_request.plan_exec_info) {
     fragments.insert(fragments.end(),
         plan_exec_info.fragments.begin(), plan_exec_info.fragments.end());
   }
-  all_rows_returned = exec_state.eos();
-  last_active_time_ms = exec_state.last_active_ms();
-  request_pool = exec_state.request_pool();
+  all_rows_returned = request_state.eos();
+  last_active_time_ms = request_state.last_active_ms();
+  request_pool = request_state.request_pool();
 }
 
 bool ImpalaServer::QueryStateRecordLessThan::operator() (
@@ -1816,7 +1809,7 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
     // The following block accomplishes three things:
     //
     // 1. Update the ordered list of queries by checking the 'idle_time' parameter in
-    // query_exec_state. We are able to avoid doing this for *every* query in flight
+    // client_request_state. We are able to avoid doing this for *every* query in flight
     // thanks to the observation that expiry times never move backwards, only
     // forwards. Therefore once we find a query that a) hasn't changed its idle time and
     // b) has not yet expired we can stop moving through the list. If the idle time has
@@ -1838,8 +1831,8 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
         // know that the true expiration time will be at least that far off. So we can
         // break here and sleep.
         if (expiration_event->first > now) break;
-        shared_ptr<QueryExecState> query_state =
-            GetQueryExecState(expiration_event->second, false);
+        shared_ptr<ClientRequestState> query_state =
+            GetClientRequestState(expiration_event->second, false);
         if (query_state.get() == nullptr) {
           // Query was deleted some other way.
           queries_by_timestamp_.erase(expiration_event++);
@@ -1989,9 +1982,9 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
 bool ImpalaServer::GetSessionIdForQuery(const TUniqueId& query_id,
     TUniqueId* session_id) {
   DCHECK(session_id != nullptr);
-  lock_guard<mutex> l(query_exec_state_map_lock_);
-  QueryExecStateMap::iterator i = query_exec_state_map_.find(query_id);
-  if (i == query_exec_state_map_.end()) {
+  lock_guard<mutex> l(client_request_state_map_lock_);
+  ClientRequestStateMap::iterator i = client_request_state_map_.find(query_id);
+  if (i == client_request_state_map_.end()) {
     return false;
   } else {
     *session_id = i->second->session_id();
@@ -1999,12 +1992,12 @@ bool ImpalaServer::GetSessionIdForQuery(const TUniqueId& query_id,
   }
 }
 
-shared_ptr<ImpalaServer::QueryExecState> ImpalaServer::GetQueryExecState(
+shared_ptr<ClientRequestState> ImpalaServer::GetClientRequestState(
     const TUniqueId& query_id, bool lock) {
-  lock_guard<mutex> l(query_exec_state_map_lock_);
-  QueryExecStateMap::iterator i = query_exec_state_map_.find(query_id);
-  if (i == query_exec_state_map_.end()) {
-    return shared_ptr<QueryExecState>();
+  lock_guard<mutex> l(client_request_state_map_lock_);
+  ClientRequestStateMap::iterator i = client_request_state_map_.find(query_id);
+  if (i == client_request_state_map_.end()) {
+    return shared_ptr<ClientRequestState>();
   } else {
     if (lock) i->second->lock()->lock();
     return i->second;
@@ -2013,12 +2006,15 @@ shared_ptr<ImpalaServer::QueryExecState> ImpalaServer::GetQueryExecState(
 
 void ImpalaServer::UpdateFilter(TUpdateFilterResult& result,
     const TUpdateFilterParams& params) {
-  shared_ptr<QueryExecState> query_exec_state = GetQueryExecState(params.query_id, false);
-  if (query_exec_state.get() == nullptr) {
-    LOG(INFO) << "Could not find query exec state: " << params.query_id;
+  DCHECK(params.__isset.query_id);
+  DCHECK(params.__isset.filter_id);
+  shared_ptr<ClientRequestState> client_request_state =
+      GetClientRequestState(params.query_id, false);
+  if (client_request_state.get() == nullptr) {
+    LOG(INFO) << "Could not find client request state: " << params.query_id;
     return;
   }
-  query_exec_state->coord()->UpdateFilter(params);
+  client_request_state->coord()->UpdateFilter(params);
 }
 
 }


Mime
View raw message