quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [1/2] incubator-quickstep git commit: Dispatch BuildHashJoin and Aggregation based on block locality. [Forced Update!]
Date Thu, 04 May 2017 04:13:29 GMT
Repository: incubator-quickstep
Updated Branches:
  refs/heads/fix-copy-group 21559eb61 -> 4677e7d48 (forced update)


Dispatch BuildHashJoin and Aggregation based on block locality.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ece7e424
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ece7e424
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ece7e424

Branch: refs/heads/fix-copy-group
Commit: ece7e424b5c43e91dbe6e52b1ca95312696e57d2
Parents: 0572f40
Author: Zuyu Zhang <zuyuz@apache.org>
Authored: Tue Apr 11 00:26:40 2017 -0700
Committer: Zuyu Zhang <zuyuz@apache.org>
Committed: Wed May 3 12:50:03 2017 -0700

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                |  2 ++
 query_execution/ForemanDistributed.cpp        |  9 +++++++--
 query_execution/PolicyEnforcerDistributed.cpp |  8 ++++++++
 query_execution/PolicyEnforcerDistributed.hpp | 15 ++++++++++++--
 query_execution/QueryManagerDistributed.hpp   | 23 ++++++++++++++++++----
 5 files changed, 49 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index eeed791..c74fa36 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -267,6 +267,7 @@ target_link_libraries(quickstep_queryexecution_QueryManagerBase
 if (ENABLE_DISTRIBUTED)
   target_link_libraries(quickstep_queryexecution_QueryManagerDistributed
                         quickstep_catalog_CatalogTypedefs
+                        quickstep_queryexecution_BlockLocator
                         quickstep_queryexecution_QueryContext
                         quickstep_queryexecution_QueryContext_proto
                         quickstep_queryexecution_QueryExecutionMessages_proto
@@ -277,6 +278,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_queryexecution_WorkOrderProtosContainer
                         quickstep_relationaloperators_RelationalOperator
                         quickstep_relationaloperators_WorkOrder_proto
+                        quickstep_storage_StorageBlockInfo
                         quickstep_utility_DAG
                         quickstep_utility_Macros
                         tmb)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 81b5ec1..e5e0eee 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -243,10 +243,12 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
                                                        size_t *shiftboss_index_for_aggregation)
{
   const S::WorkOrder &work_order_proto = proto.work_order();
   QueryContext::aggregation_state_id aggr_state_index;
+  block_id block = kInvalidBlockId;
 
   switch (work_order_proto.work_order_type()) {
     case S::AGGREGATION:
       aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index);
+      block = work_order_proto.GetExtension(S::AggregationWorkOrder::block_id);
       break;
     case S::FINALIZE_AGGREGATION:
       aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index);
@@ -259,7 +261,8 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
   }
 
   static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForAggregation(
-      proto.query_id(), aggr_state_index, next_shiftboss_index_to_schedule, shiftboss_index_for_aggregation);
+      proto.query_id(), aggr_state_index, block_locator_, block, next_shiftboss_index_to_schedule,
+      shiftboss_index_for_aggregation);
 
   return true;
 }
@@ -270,11 +273,13 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage
&p
   const S::WorkOrder &work_order_proto = proto.work_order();
   QueryContext::join_hash_table_id join_hash_table_index;
   partition_id part_id;
+  block_id block = kInvalidBlockId;
 
   switch (work_order_proto.work_order_type()) {
     case S::BUILD_HASH:
       join_hash_table_index = work_order_proto.GetExtension(S::BuildHashWorkOrder::join_hash_table_index);
       part_id = work_order_proto.GetExtension(S::BuildHashWorkOrder::partition_id);
+      block = work_order_proto.GetExtension(S::BuildHashWorkOrder::block_id);
       break;
     case S::HASH_JOIN:
       join_hash_table_index = work_order_proto.GetExtension(S::HashJoinWorkOrder::join_hash_table_index);
@@ -289,7 +294,7 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage
&p
   }
 
   static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForHashJoin(
-      proto.query_id(), join_hash_table_index, part_id, next_shiftboss_index_to_schedule,
+      proto.query_id(), join_hash_table_index, part_id, block_locator_, block, next_shiftboss_index_to_schedule,
       shiftboss_index_for_hash_join);
 
   return true;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 46a0972..36becf2 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -192,11 +192,15 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const
tmb:
 void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
     const std::size_t query_id,
     const QueryContext::aggregation_state_id aggr_state_index,
+    const BlockLocator &block_locator,
+    const block_id block,
     const std::size_t next_shiftboss_index_to_schedule,
     std::size_t *shiftboss_index) {
   DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
   QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
   query_manager->getShiftbossIndexForAggregation(aggr_state_index,
+                                                 block_locator,
+                                                 block,
                                                  next_shiftboss_index_to_schedule,
                                                  shiftboss_index);
 }
@@ -205,12 +209,16 @@ void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
     const std::size_t query_id,
     const QueryContext::join_hash_table_id join_hash_table_index,
     const partition_id part_id,
+    const BlockLocator &block_locator,
+    const block_id block,
     const std::size_t next_shiftboss_index_to_schedule,
     std::size_t *shiftboss_index) {
   DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
   QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
   query_manager->getShiftbossIndexForHashJoin(join_hash_table_index,
                                               part_id,
+                                              block_locator,
+                                              block,
                                               next_shiftboss_index_to_schedule,
                                               shiftboss_index);
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index fb46b39..cd3a434 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -47,6 +47,7 @@ class TaggedMessage;
 
 namespace quickstep {
 
+class BlockLocator;
 class CatalogDatabaseLite;
 class CatalogRelation;
 class QueryProcessor;
@@ -125,31 +126,39 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
   /**
    * @brief Get or set the index of Shiftboss for an Aggregation related
    * WorkOrder. If it is the first Aggregation on <aggr_state_index>,
-   * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.
+   * <shiftboss_index> will be set based on block locality if found,
+   * otherwise <next_shiftboss_index_to_schedule>.
    * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
    * has executed the first Aggregation.
    *
    * @param query_id The query id.
    * @param aggr_state_index The Hash Table for the Aggregation.
+   * @param block_locator The BlockLocator to use.
+   * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
    * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
    **/
   void getShiftbossIndexForAggregation(
       const std::size_t query_id,
       const QueryContext::aggregation_state_id aggr_state_index,
+      const BlockLocator &block_locator,
+      const block_id block,
       const std::size_t next_shiftboss_index_to_schedule,
       std::size_t *shiftboss_index);
 
   /**
    * @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder.
    * If it is the first BuildHash on <join_hash_table_index, part_id>,
-   * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>.
+   * <shiftboss_index> will be set to block locality if found,
+   * otherwise <next_shiftboss_index_to_schedule>.
    * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
    * has executed the first BuildHash.
    *
    * @param query_id The query id.
    * @param join_hash_table_index The Hash Table for the Join.
    * @param part_id The partition ID.
+   * @param block_locator The BlockLocator to use.
+   * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
    * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
    **/
@@ -157,6 +166,8 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
       const std::size_t query_id,
       const QueryContext::join_hash_table_id join_hash_table_index,
       const partition_id part_id,
+      const BlockLocator &block_locator,
+      const block_id block,
       const std::size_t next_shiftboss_index_to_schedule,
       std::size_t *shiftboss_index);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ece7e424/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 3ebc434..6a47ce8 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -25,11 +25,13 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/BlockLocator.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionState.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_execution/QueryManagerBase.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
+#include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 
 #include "tmb/address.h"
@@ -100,17 +102,23 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   /**
    * @brief Get the index of Shiftboss for an Aggregation related WorkOrder. If
-   * the Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
+   * the Shiftboss index is not found, set using the block locality if found,
+   * otherwise <next_shiftboss_index_to_schedule>.
    *
    * @param aggr_state_index The Hash Table for the Aggregation.
+   * @param block_locator The BlockLocator to use.
+   * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
    * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
    **/
   void getShiftbossIndexForAggregation(const QueryContext::aggregation_state_id aggr_state_index,
+                                       const BlockLocator &block_locator,
+                                       const block_id block,
                                        const std::size_t next_shiftboss_index_to_schedule,
                                        std::size_t *shiftboss_index) {
     DCHECK_LT(aggr_state_index, shiftboss_indexes_for_aggrs_.size());
-    if (shiftboss_indexes_for_aggrs_[aggr_state_index] == kInvalidShiftbossIndex) {
+    if (shiftboss_indexes_for_aggrs_[aggr_state_index] == kInvalidShiftbossIndex &&
+        !block_locator.getBlockLocalityInfo(block, &shiftboss_indexes_for_aggrs_[aggr_state_index]))
{
       shiftboss_indexes_for_aggrs_[aggr_state_index] = next_shiftboss_index_to_schedule;
     }
 
@@ -119,21 +127,28 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   /**
    * @brief Get the index of Shiftboss for a HashJoin related WorkOrder. If the
-   * Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>.
+   * Shiftboss index is not found, set using the block locality if found,
+   * otherwise <next_shiftboss_index_to_schedule>.
    *
    * @param join_hash_table_index The Hash Table for the Join.
    * @param part_id The partition ID.
+   * @param block_locator The BlockLocator to use.
+   * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
    * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
    **/
   void getShiftbossIndexForHashJoin(const QueryContext::join_hash_table_id join_hash_table_index,
                                     const partition_id part_id,
+                                    const BlockLocator &block_locator,
+                                    const block_id block,
                                     const std::size_t next_shiftboss_index_to_schedule,
                                     std::size_t *shiftboss_index) {
     DCHECK_LT(join_hash_table_index, shiftboss_indexes_for_hash_joins_.size());
     DCHECK_LT(part_id, shiftboss_indexes_for_hash_joins_[join_hash_table_index].size());
 
-    if (shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] == kInvalidShiftbossIndex)
{
+    if (shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] == kInvalidShiftbossIndex
&&
+        !block_locator.getBlockLocalityInfo(block,
+                                            &shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id]))
{
       shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] = next_shiftboss_index_to_schedule;
     }
 


Mime
View raw message