quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zu...@apache.org
Subject [2/2] incubator-quickstep git commit: Added PolicyEnforcer implementation for the distributed version.
Date Fri, 05 Aug 2016 18:18:23 GMT
Added PolicyEnforcer implementation for the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c60583a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c60583a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c60583a8

Branch: refs/heads/dist-policy-enforcer
Commit: c60583a8008a2e079a25f7d9b473f57818ca8517
Parents: bf7646f
Author: Zuyu Zhang <zuyuz@twitter.com>
Authored: Thu Aug 4 11:45:51 2016 -0700
Committer: Zuyu Zhang <zuyuz@twitter.com>
Committed: Fri Aug 5 11:18:09 2016 -0700

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                |  24 ++
 query_execution/PolicyEnforcerBase.cpp        |   2 +
 query_execution/PolicyEnforcerBase.hpp        |   7 +
 query_execution/PolicyEnforcerDistributed.cpp | 279 +++++++++++++++++++++
 query_execution/PolicyEnforcerDistributed.hpp | 113 +++++++++
 query_execution/QueryExecutionMessages.proto  |  20 +-
 query_execution/QueryExecutionTypedefs.hpp    |   5 +
 query_execution/QueryManagerBase.cpp          |   3 +-
 query_execution/QueryManagerBase.hpp          |  11 +-
 9 files changed, 458 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c60583a8/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index edbe5d0..108c3f9 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -35,6 +35,9 @@ endif()
 add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp)
 add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp)
+if (ENABLE_DISTRIBUTED)
+  add_library(quickstep_queryexecution_PolicyEnforcerDistributed PolicyEnforcerDistributed.cpp
PolicyEnforcerDistributed.hpp)
+endif(ENABLE_DISTRIBUTED)
 add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp
PolicyEnforcerSingleNode.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
 add_library(quickstep_queryexecution_QueryContext_proto
@@ -110,6 +113,26 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase
                       quickstep_storage_StorageBlockInfo
                       quickstep_utility_Macros
                       tmb)
+if (ENABLE_DISTRIBUTED)
+  target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed
+                        glog
+                        quickstep_catalog_CatalogRelation
+                        quickstep_catalog_Catalog_proto
+                        quickstep_queryexecution_PolicyEnforcerBase
+                        quickstep_queryexecution_QueryContext_proto
+                        quickstep_queryexecution_QueryExecutionMessages_proto
+                        quickstep_queryexecution_QueryExecutionState
+                        quickstep_queryexecution_QueryExecutionTypedefs
+                        quickstep_queryexecution_QueryExecutionUtil
+                        quickstep_queryexecution_QueryManagerBase
+                        quickstep_queryexecution_QueryManagerDistributed
+                        quickstep_queryexecution_ShiftbossDirectory
+                        quickstep_queryoptimizer_QueryHandle
+                        quickstep_storage_StorageBlockInfo
+                        quickstep_utility_Macros
+                        tmb
+                        ${GFLAGS_LIB_NAME})
+endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode
                       glog
                       quickstep_catalog_CatalogTypedefs
@@ -293,6 +316,7 @@ target_link_libraries(quickstep_queryexecution
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution
                         quickstep_queryexecution_BlockLocator
+                        quickstep_queryexecution_PolicyEnforcerDistributed
                         quickstep_queryexecution_QueryManagerDistributed
                         quickstep_queryexecution_Shiftboss
                         quickstep_queryexecution_ShiftbossDirectory)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c60583a8/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 3371d6d..d50a173 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -134,6 +134,8 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message)
{
   }
   if (admitted_queries_[query_id]->queryStatus(op_index) ==
           QueryManagerBase::QueryStatusCode::kQueryExecuted) {
+    onQueryCompletion(admitted_queries_[query_id].get());
+
     removeQuery(query_id);
     if (!waiting_queries_.empty()) {
       // Admit the earliest waiting query.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c60583a8/query_execution/PolicyEnforcerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp
index 15bc118..4293f0f 100644
--- a/query_execution/PolicyEnforcerBase.hpp
+++ b/query_execution/PolicyEnforcerBase.hpp
@@ -138,6 +138,13 @@ class PolicyEnforcerBase {
   static constexpr std::size_t kMaxConcurrentQueries = 1;
 
   /**
+   * @brief Add custom actions upon the completion of a query.
+   *
+   * @param query_manager The query manager.
+   **/
+  virtual void onQueryCompletion(QueryManagerBase *query_manager) {}
+
+  /**
    * @brief Record the execution time for a finished WorkOrder.
    *
    * TODO(harshad) - Extend the functionality to rebuild work orders.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c60583a8/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
new file mode 100644
index 0000000..8ed4bc9
--- /dev/null
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -0,0 +1,279 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/PolicyEnforcerDistributed.hpp"
+
+#include <cstddef>
+#include <cstdlib>
+#include <memory>
+#include <queue>
+#include <utility>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/Catalog.pb.h"
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/QueryContext.pb.h"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionState.hpp"
+#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/QueryManagerBase.hpp"
+#include "query_execution/QueryManagerDistributed.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+#include "tmb/message_bus.h"
+#include "tmb/tagged_message.h"
+
+using std::free;
+using std::malloc;
+using std::move;
+using std::size_t;
+using std::unique_ptr;
+using std::vector;
+
+using tmb::TaggedMessage;
+
+namespace quickstep {
+
+namespace S = serialization;
+
+DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
+              " can be allocated in a single round of dispatch of messages to"
+              " the workers.");
+
+void PolicyEnforcerDistributed::getWorkOrderMessages(
+    vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) {
+  // Iterate over admitted queries until either there are no more
+  // messages available, or the maximum number of messages have
+  // been collected.
+  DCHECK(work_order_messages->empty());
+  // TODO(harshad) - Make this function generic enough so that it
+  // works well when multiple queries are getting executed.
+  if (admitted_queries_.empty()) {
+    LOG(WARNING) << "Requesting WorkerMessages when no query is running";
+    return;
+  }
+
+  const std::size_t per_query_share =
+      FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size();
+  DCHECK_GT(per_query_share, 0u);
+
+  vector<std::size_t> finished_queries_ids;
+
+  for (const auto &admitted_query_info : admitted_queries_) {
+    QueryManagerBase *curr_query_manager = admitted_query_info.second.get();
+    DCHECK(curr_query_manager != nullptr);
+    std::size_t messages_collected_curr_query = 0;
+    while (messages_collected_curr_query < per_query_share) {
+      S::WorkOrderMessage *next_work_order_message =
+          static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0);
+      if (next_work_order_message != nullptr) {
+        ++messages_collected_curr_query;
+        work_order_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message));
+      } else {
+        // No more work ordes from the current query at this time.
+        // Check if the query's execution is over.
+        if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished())
{
+          // If the query has been executed, remove it.
+          finished_queries_ids.push_back(admitted_query_info.first);
+        }
+        break;
+      }
+    }
+  }
+  for (const std::size_t finished_qid : finished_queries_ids) {
+    onQueryCompletion(admitted_queries_[finished_qid].get());
+    removeQuery(finished_qid);
+  }
+}
+
+bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) {
+  if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) {
+    // Ok to admit the query.
+    const std::size_t query_id = query_handle->query_id();
+    if (admitted_queries_.find(query_id) == admitted_queries_.end()) {
+      // NOTE(zuyu): Should call before constructing a 'QueryManager'.
+      // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext'
+      // initializes.
+      initiateQueryInShiftboss(query_handle);
+
+      // Query with the same ID not present, ok to admit.
+      admitted_queries_[query_id].reset(
+          new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_,
bus_));
+      return true;
+    } else {
+      LOG(ERROR) << "Query with the same ID " << query_id << " exists";
+      return false;
+    }
+  } else {
+    // This query will have to wait.
+    waiting_queries_.push(query_handle);
+    return false;
+  }
+}
+
+void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb::TaggedMessage
&tagged_message) {
+  S::InitiateRebuildResponseMessage proto;
+  CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+  const std::size_t query_id = proto.query_id();
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+
+  QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
+
+  const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders();
+  query_manager->processInitiateRebuildResponseMessage(proto.operator_index(), num_rebuild_work_orders);
+  shiftboss_directory_->addNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders);
+
+  if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) {
+    onQueryCompletion(query_manager);
+
+    removeQuery(query_id);
+    if (!waiting_queries_.empty()) {
+      // Admit the earliest waiting query.
+      QueryHandle *new_query = waiting_queries_.front();
+      waiting_queries_.pop();
+      admitQuery(new_query);
+    }
+  }
+}
+
+void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) {
+  S::QueryInitiateMessage proto;
+  proto.set_query_id(query_handle->query_id());
+  proto.mutable_catalog_database_cache()->MergeFrom(query_handle->getCatalogDatabaseCacheProto());
+  proto.mutable_query_context()->MergeFrom(query_handle->getQueryContextProto());
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kQueryInitiateMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" <<
kQueryInitiateMessage
+            << "') to Shiftboss 0";
+
+  // TODO(zuyu): Multiple Shiftbosses support.
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         foreman_client_id_,
+                                         shiftboss_directory_->getClientId(0),
+                                         move(message));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+      << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+      << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+
+  // Wait Shiftboss for QueryInitiateResponseMessage.
+  const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0,
true);
+  const TaggedMessage &tagged_message = annotated_message.tagged_message;
+  DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type());
+  LOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type()
+            << "' message from client " << annotated_message.sender;
+
+  S::QueryInitiateResponseMessage proto_response;
+  CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+}
+
+void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manager) {
+  const QueryHandle *query_handle = query_manager->query_handle();
+
+  const CatalogRelation *query_result = query_handle->getQueryResultRelation();
+  const tmb::client_id cli_id = query_handle->getClientId();
+  const std::size_t query_id = query_handle->query_id();
+
+  if (query_result == nullptr) {
+    // Clean up query execution states, i.e., QueryContext, in Shiftboss.
+    serialization::QueryTeardownMessage proto;
+    proto.set_query_id(query_id);
+
+    const size_t proto_length = proto.ByteSize();
+    char *proto_bytes = static_cast<char*>(malloc(proto_length));
+    CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+    TaggedMessage message(static_cast<const void*>(proto_bytes),
+                          proto_length,
+                          kQueryTeardownMessage);
+
+    // TODO(zuyu): Support multiple shiftbosses.
+    LOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" <<
kQueryTeardownMessage
+              << "') to Shiftboss 0";
+    tmb::MessageBus::SendStatus send_status =
+        QueryExecutionUtil::SendTMBMessage(bus_,
+                                           foreman_client_id_,
+                                           shiftboss_directory_->getClientId(0),
+                                           move(message));
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+        << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+        << " to Shiftboss";
+
+    TaggedMessage cli_message(kQueryExecutionSuccessMessage);
+
+    // Notify the CLI query execution successfully.
+    LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed
'" << kQueryExecutionSuccessMessage
+              << "') to CLI with TMB client id " << cli_id;
+    send_status =
+        QueryExecutionUtil::SendTMBMessage(bus_,
+                                           foreman_client_id_,
+                                           cli_id,
+                                           move(cli_message));
+    CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+        << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+        << " to CLI with TMB client ID " << cli_id;
+    return;
+  }
+
+  // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss.
+  S::SaveQueryResultMessage proto;
+  proto.set_query_id(query_id);
+  proto.set_relation_id(query_result->getID());
+
+  const vector<block_id> blocks(query_result->getBlocksSnapshot());
+  for (const block_id block : blocks) {
+    proto.add_blocks(block);
+  }
+
+  proto.set_cli_id(cli_id);
+
+  const size_t proto_length = proto.ByteSize();
+  char *proto_bytes = static_cast<char*>(malloc(proto_length));
+  CHECK(proto.SerializeToArray(proto_bytes, proto_length));
+
+  TaggedMessage message(static_cast<const void*>(proto_bytes),
+                        proto_length,
+                        kSaveQueryResultMessage);
+  free(proto_bytes);
+
+  LOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" <<
kSaveQueryResultMessage
+            << "') to Shiftboss 0";
+  // TODO(zuyu): Support multiple shiftbosses.
+  const tmb::MessageBus::SendStatus send_status =
+      QueryExecutionUtil::SendTMBMessage(bus_,
+                                         foreman_client_id_,
+                                         shiftboss_directory_->getClientId(0),
+                                         move(message));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+      << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_
+      << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c60583a8/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
new file mode 100644
index 0000000..807036c
--- /dev/null
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -0,0 +1,113 @@
+/**
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "query_execution/PolicyEnforcerBase.hpp"
+#include "query_execution/ShiftbossDirectory.hpp"
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb {
+class MessageBus;
+class TaggedMessage;
+}
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryHandle;
+class QueryManagerBase;
+
+namespace serialization { class WorkOrderMessage; }
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief A class that ensures that a high level policy is maintained
+ *        in sharing resources among concurrent queries.
+ **/
+class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param foreman_client_id The TMB client ID of the Foreman.
+   * @param catalog_database The CatalogDatabase used.
+   * @param bus The TMB.
+   **/
+  PolicyEnforcerDistributed(const tmb::client_id foreman_client_id,
+                            CatalogDatabaseLite *catalog_database,
+                            ShiftbossDirectory *shiftboss_directory,
+                            tmb::MessageBus *bus,
+                            const bool profile_individual_workorders = false)
+      : PolicyEnforcerBase(catalog_database, profile_individual_workorders),
+        foreman_client_id_(foreman_client_id),
+        shiftboss_directory_(shiftboss_directory),
+        bus_(bus) {}
+
+  /**
+   * @brief Destructor.
+   **/
+  ~PolicyEnforcerDistributed() override {}
+
+  bool admitQuery(QueryHandle *query_handle) override;
+
+  /**
+   * @brief Get work order messages to be dispatched. These messages come from
+   *        the active queries.
+   *
+   * @param work_order_messages The work order messages to be dispatched.
+   **/
+  void getWorkOrderMessages(
+      std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages);
+
+  /**
+   * @brief Process the initiate rebuild work order response message.
+   *
+   * @param tagged_message The message.
+   **/
+  void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
+
+ private:
+  void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) override {
+    shiftboss_directory_->decrementNumQueuedWorkOrders(shiftboss_index);
+  }
+
+  void onQueryCompletion(QueryManagerBase *query_manager) override;
+
+  void initiateQueryInShiftboss(QueryHandle *query_handle);
+
+  const tmb::client_id foreman_client_id_;
+
+  ShiftbossDirectory *shiftboss_directory_;
+
+  tmb::MessageBus *bus_;
+
+  DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c60583a8/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 4922042..bc8ebcf 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -38,8 +38,8 @@ message NormalWorkOrderCompletionMessage {
   required uint64 operator_index = 1;
   required uint64 worker_thread_index = 2;
   required uint64 query_id = 3;
-  
-  // Epoch time in microseconds.  
+
+  // Epoch time in microseconds.
   optional uint64 execution_start_time = 4;
   optional uint64 execution_end_time = 5;
 }
@@ -116,13 +116,25 @@ message InitiateRebuildResponseMessage {
   required uint64 shiftboss_index = 4;
 }
 
+message QueryTeardownMessage {
+  required uint64 query_id = 1;
+}
+
 message SaveQueryResultMessage {
-  required int32 relation_id = 1;
-  repeated fixed64 blocks = 2 [packed=true];
+  required uint64 query_id = 1;
+  required int32 relation_id = 2;
+  repeated fixed64 blocks = 3 [packed=true];
+
+  required uint32 cli_id = 4;  // tmb::client_id.
 }
 
 message SaveQueryResultResponseMessage {
   required int32 relation_id = 1;
+  required uint32 cli_id = 2;  // tmb::client_id.
+}
+
+message QueryExecutionSuccessMessage {
+  optional CatalogRelationSchema result_relation = 1;
 }
 
 // BlockLocator related messages.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c60583a8/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 4bbab59..b535d3d 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -84,9 +84,14 @@ enum QueryExecutionMessageType : message_type_id {
   kInitiateRebuildMessage,  // From Foreman to Shiftboss.
   kInitiateRebuildResponseMessage,  // From Shiftboss to Foreman.
 
+  kQueryTeardownMessage,  // From Foreman to Shiftboss.
+
   kSaveQueryResultMessage,  // From Foreman to Shiftboss.
   kSaveQueryResultResponseMessage,  // From Shiftboss to Foreman.
 
+  // From Foreman to CLI.
+  kQueryExecutionSuccessMessage,
+
   // BlockLocator related messages, sorted in a life cycle of StorageManager
   // with a unique block domain.
   kBlockDomainRegistrationMessage,  // From Worker to BlockLocator.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c60583a8/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index d2a3341..5b94ee8 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -35,7 +35,8 @@ using std::pair;
 namespace quickstep {
 
 QueryManagerBase::QueryManagerBase(QueryHandle *query_handle)
-    : query_id_(DCHECK_NOTNULL(query_handle)->query_id()),
+    : query_handle_(DCHECK_NOTNULL(query_handle)),
+      query_id_(query_handle->query_id()),
       query_dag_(DCHECK_NOTNULL(
           DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())),
       num_operators_in_dag_(query_dag_->size()),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c60583a8/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index 6edfd5c..54cd3d1 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -72,6 +72,13 @@ class QueryManagerBase {
   virtual ~QueryManagerBase() {}
 
   /**
+   * @brief Get the query handle.
+   **/
+  const QueryHandle* query_handle() const {
+    return query_handle_;
+  }
+
+  /**
    * @brief Get the QueryExecutionState for this query.
    **/
   inline const QueryExecutionState& getQueryExecutionState() const {
@@ -250,9 +257,11 @@ class QueryManagerBase {
     return query_exec_state_->hasRebuildInitiated(index);
   }
 
+  const QueryHandle *query_handle_;
+
   const std::size_t query_id_;
 
-  DAG<RelationalOperator, bool> *query_dag_;
+  DAG<RelationalOperator, bool> *query_dag_;  // Owned by 'query_handle_'.
   const dag_node_index num_operators_in_dag_;
 
   // For all nodes, store their receiving dependents.


Mime
View raw message