quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zu...@apache.org
Subject [09/10] incubator-quickstep git commit: Scheduled FinalizeAggr / DestroyAggr WorkOrder on the same Shiftboss of AggrWorkOrder.
Date Mon, 21 Nov 2016 04:11:05 GMT
Scheduled FinalizeAggr / DestroyAggr WorkOrder on the same Shiftboss of AggrWorkOrder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8f5e71f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8f5e71f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8f5e71f9

Branch: refs/heads/multi-shiftboss-test
Commit: 8f5e71f94de468084a4256a9ed3a39b4c79638ce
Parents: a62129f
Author: Zuyu Zhang <zuyuz@apache.org>
Authored: Thu Nov 17 12:01:26 2016 -0800
Committer: Zuyu Zhang <zuyuz@apache.org>
Committed: Sun Nov 20 20:10:49 2016 -0800

----------------------------------------------------------------------
 query_execution/ForemanDistributed.cpp        | 71 +++++++++++++++++-----
 query_execution/ForemanDistributed.hpp        |  9 ++-
 query_execution/PolicyEnforcerDistributed.cpp | 12 ++++
 query_execution/PolicyEnforcerDistributed.hpp | 18 ++++++
 query_execution/QueryManagerDistributed.hpp   | 23 +++++++
 5 files changed, 116 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8f5e71f9/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 900a71f..7ca246d 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -243,28 +243,55 @@ bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id
messag
                                         kWorkOrderFeedbackMessage);
 }
 
-bool ForemanDistributed::isHashJoinRelatedWorkOrder(const unique_ptr<S::WorkOrderMessage>
&message,
+bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage &proto,
+                                                       const size_t next_shiftboss_index_to_schedule,
+                                                       size_t *shiftboss_index_for_aggregation)
{
+  const S::WorkOrder &work_order_proto = proto.work_order();
+  QueryContext::aggregation_state_id aggr_state_index;
+
+  switch (work_order_proto.work_order_type()) {
+    case S::AGGREGATION:
+      aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index);
+      break;
+    case S::FINALIZE_AGGREGATION:
+      aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index);
+      break;
+    case S::DESTROY_AGGREGATION_STATE:
+      aggr_state_index = work_order_proto.GetExtension(S::DestroyAggregationStateWorkOrder::aggr_state_index);
+      break;
+    default:
+      return false;
+  }
+
+  policy_enforcer_->getShiftbossIndexForAggregation(
+           proto.query_id(), aggr_state_index, next_shiftboss_index_to_schedule,
+           shiftboss_index_for_aggregation);
+
+  return true;
+}
+
+bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &proto,
                                                     const size_t next_shiftboss_index_to_schedule,
                                                     size_t *shiftboss_index_for_hash_join)
{
-  const S::WorkOrder &work_order_proto = message->work_order();
+  const S::WorkOrder &work_order_proto = proto.work_order();
   QueryContext::join_hash_table_id join_hash_table_index;
 
   switch (work_order_proto.work_order_type()) {
     case S::BUILD_HASH:
-      join_hash_table_index = work_order_proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index);
-      break;
-    case S::DESTROY_HASH:
-      join_hash_table_index = work_order_proto.GetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index);
+      join_hash_table_index = work_order_proto.GetExtension(S::BuildHashWorkOrder::join_hash_table_index);
       break;
     case S::HASH_JOIN:
-      join_hash_table_index = work_order_proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index);
+      join_hash_table_index = work_order_proto.GetExtension(S::HashJoinWorkOrder::join_hash_table_index);
+      break;
+    case S::DESTROY_HASH:
+      join_hash_table_index = work_order_proto.GetExtension(S::DestroyHashWorkOrder::join_hash_table_index);
       break;
     default:
       return false;
   }
 
   policy_enforcer_->getShiftbossIndexForHashJoin(
-           message->query_id(), join_hash_table_index, next_shiftboss_index_to_schedule,
+           proto.query_id(), join_hash_table_index, next_shiftboss_index_to_schedule,
            shiftboss_index_for_hash_join);
 
   return true;
@@ -275,14 +302,11 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
   size_t shiftboss_index = 0u;
   for (const auto &message : messages) {
     DCHECK(message != nullptr);
-    size_t shiftboss_index_for_hash_join;
-    if (isHashJoinRelatedWorkOrder(message, shiftboss_index, &shiftboss_index_for_hash_join))
{
-      sendWorkOrderMessage(shiftboss_index_for_hash_join, *message);
-      shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_hash_join);
-
-      if (shiftboss_index == shiftboss_index_for_hash_join) {
-        shiftboss_index = (shiftboss_index + 1) % num_shiftbosses;
-      }
+    size_t shiftboss_index_for_particular_work_order_type;
+    if (isAggregationRelatedWorkOrder(*message, shiftboss_index, &shiftboss_index_for_particular_work_order_type))
{
+      dispatchWorkOrderMessagesHelper(*message, shiftboss_index_for_particular_work_order_type,
&shiftboss_index);
+    } else if (isHashJoinRelatedWorkOrder(*message, shiftboss_index, &shiftboss_index_for_particular_work_order_type))
{
+      dispatchWorkOrderMessagesHelper(*message, shiftboss_index_for_particular_work_order_type,
&shiftboss_index);
     } else {
       sendWorkOrderMessage(shiftboss_index, *message);
       shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index);
@@ -293,6 +317,21 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
   }
 }
 
+void ForemanDistributed::dispatchWorkOrderMessagesHelper(const S::WorkOrderMessage &proto,
+                                                         const size_t shiftboss_index_for_particular_work_order_type,
+                                                         size_t *shiftboss_index) {
+  sendWorkOrderMessage(shiftboss_index_for_particular_work_order_type, proto);
+  shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type);
+
+  if (*shiftboss_index == shiftboss_index_for_particular_work_order_type) {
+    *shiftboss_index = (*shiftboss_index + 1) % shiftboss_directory_.size();
+  } else {
+    // NOTE(zuyu): This is not the exact round-robin scheduling, as in this case,
+    // <shiftboss_index_for_particular_work_order_type> will be scheduled one
+    // more WorkOrder.
+  }
+}
+
 void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index,
                                               const S::WorkOrderMessage &proto) {
   const size_t proto_length = proto.ByteSize();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8f5e71f9/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index 55e747f..f6d8597 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -71,7 +71,11 @@ class ForemanDistributed final : public ForemanBase {
   void run() override;
 
  private:
-  bool isHashJoinRelatedWorkOrder(const std::unique_ptr<serialization::WorkOrderMessage>
&message,
+  bool isAggregationRelatedWorkOrder(const serialization::WorkOrderMessage &proto,
+                                     const std::size_t next_shiftboss_index_to_schedule,
+                                     std::size_t *shiftboss_index_for_aggregation);
+
+  bool isHashJoinRelatedWorkOrder(const serialization::WorkOrderMessage &proto,
                                   const std::size_t next_shiftboss_index_to_schedule,
                                   std::size_t *shiftboss_index_for_hash_join);
 
@@ -84,6 +88,9 @@ class ForemanDistributed final : public ForemanBase {
   void dispatchWorkOrderMessages(
       const std::vector<std::unique_ptr<serialization::WorkOrderMessage>> &messages);
 
+  void dispatchWorkOrderMessagesHelper(const serialization::WorkOrderMessage &proto,
+                                       const std::size_t shiftboss_index_for_particular_work_order_type,
+                                       std::size_t *shiftboss_index);
   /**
    * @brief Send the given message to the specified worker.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8f5e71f9/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 86b36c8..c5642bc 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -158,6 +158,18 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const
tmb:
   }
 }
 
+void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
+    const std::size_t query_id,
+    const QueryContext::aggregation_state_id aggr_state_index,
+    const std::size_t next_shiftboss_index_to_schedule,
+    std::size_t *shiftboss_index) {
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+  QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
+  query_manager->getShiftbossIndexForAggregation(aggr_state_index,
+                                                 next_shiftboss_index_to_schedule,
+                                                 shiftboss_index);
+}
+
 void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
     const std::size_t query_id,
     const QueryContext::join_hash_table_id join_hash_table_index,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8f5e71f9/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 37326bd..e8bc394 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -90,6 +90,24 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
   void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message);
 
   /**
+   * @brief Get or set the index of Shiftboss for an Aggregation related
+   * WorkOrder. If it is the first Aggregation on <aggr_state_index>,
+   * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.
+   * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
+   * has executed the first Aggregation.
+   *
+   * @param query_id The query id.
+   * @param aggr_state_index The Hash Table for the Aggregation.
+   * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+   * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+   **/
+  void getShiftbossIndexForAggregation(
+      const std::size_t query_id,
+      const QueryContext::aggregation_state_id aggr_state_index,
+      const std::size_t next_shiftboss_index_to_schedule,
+      std::size_t *shiftboss_index);
+
+  /**
    * @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder.
    * If it is the first BuildHash on <join_hash_table_index>, <shiftboss_index>
    * will be set to <next_shiftboss_index_to_schedule>. Otherwise,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8f5e71f9/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 2b21303..6455bf7 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -95,6 +95,26 @@ class QueryManagerDistributed final : public QueryManagerBase {
       const dag_node_index start_operator_index);
 
   /**
+   * @brief Get the index of Shiftboss for an Aggregation related WorkOrder. If
+   * the Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
+   *
+   * @param aggr_state_index The Hash Table for the Aggregation.
+   * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+   * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+   **/
+  void getShiftbossIndexForAggregation(const QueryContext::aggregation_state_id aggr_state_index,
+                                       const std::size_t next_shiftboss_index_to_schedule,
+                                       std::size_t *shiftboss_index) {
+    const auto cit = shiftboss_indexes_for_aggrs_.find(aggr_state_index);
+    if (cit != shiftboss_indexes_for_aggrs_.end()) {
+      *shiftboss_index = cit->second;
+    } else {
+      shiftboss_indexes_for_aggrs_.emplace(aggr_state_index, next_shiftboss_index_to_schedule);
+      *shiftboss_index = next_shiftboss_index_to_schedule;
+    }
+  }
+
+  /**
    * @brief Get the index of Shiftboss for a HashJoin related WorkOrder. If the
    * Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
    *
@@ -136,6 +156,9 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_;
 
+  // A map from a aggregation to its scheduled Shiftboss index.
+  std::unordered_map<QueryContext::aggregation_state_id, std::size_t> shiftboss_indexes_for_aggrs_;
+
   // A map from a join hash table to its scheduled Shiftboss index.
   std::unordered_map<QueryContext::join_hash_table_id, std::size_t> shiftboss_indexes_for_hash_joins_;
 


Mime
View raw message