quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hbdeshm...@apache.org
Subject [4/6] incubator-quickstep git commit: Add active and inactive operators in QueryManager
Date Tue, 01 Nov 2016 19:44:12 GMT
Add active and inactive operators in QueryManager


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/485c7047
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/485c7047
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/485c7047

Branch: refs/heads/delay-hashtable-memory-alloc
Commit: 485c70478d2a27fe2f14ee419ef2361d581db0ea
Parents: fc898b1
Author: Harshad Deshmukh <hbdeshmukh@apache.org>
Authored: Mon Oct 31 16:23:03 2016 -0500
Committer: Harshad Deshmukh <hbdeshmukh@apache.org>
Committed: Tue Nov 1 14:43:38 2016 -0500

----------------------------------------------------------------------
 query_execution/QueryManagerSingleNode.cpp | 53 ++++++++++++++++++++-----
 query_execution/QueryManagerSingleNode.hpp | 16 ++++++++
 2 files changed, 58 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/485c7047/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index 237796f..e77600f 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -59,11 +59,13 @@ QueryManagerSingleNode::QueryManagerSingleNode(
                                       bus_)),
       workorders_container_(
           new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)) {
-  // Collect all the workorders from all the relational operators in the DAG.
+  // Mark the active operators in the DAG.
   for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
     if (checkAllBlockingDependenciesMet(index)) {
-      query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
-      processOperator(index, false);
+      active_operators_.push_back(index);
+      activateOperator(index);
+    } else {
+      inactive_operators_.push_back(index);
     }
   }
 }
@@ -71,14 +73,44 @@ QueryManagerSingleNode::QueryManagerSingleNode(
 WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
     const dag_node_index start_operator_index, const numa_node_id numa_node) {
   // Default policy: Operator with lowest index first.
-  WorkOrder *work_order = nullptr;
-  size_t num_operators_checked = 0;
-  for (dag_node_index index = start_operator_index;
-       num_operators_checked < num_operators_in_dag_;
-       index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) {
-    if (query_exec_state_->hasExecutionFinished(index)) {
-      continue;
+  WorkerMessage *worker_message = getNextWorkerMessageFromActiveOperators(numa_node);
+  if (worker_message != nullptr) {
+    return worker_message;
+  }
+  // Check the operators that have finished execution.
+  std::vector<dag_node_index> execution_finished_operator_indexes;
+  for (dag_node_index active_operator_index : active_operators_) {
+    if (query_exec_state_->hasExecutionFinished(active_operator_index)) {
+      execution_finished_operator_indexes.push_back(active_operator_index);
     }
+  }
+  // Remove the "done" operators from the list of active operators.
+  for (dag_node_index index : execution_finished_operator_indexes) {
+    auto iter =
+        std::find(active_operators_.begin(), active_operators_.end(), index);
+    DCHECK(iter != active_operators_.end());
+    active_operators_.erase(iter);
+  }
+  // Move some inactive operators to active operators.
+  if (!inactive_operators_.empty()) {
+    for (dag_node_index inactive_op_count = 0;
+         inactive_op_count < execution_finished_operator_indexes.size();
+         ++inactive_op_count) {
+      if (checkAllBlockingDependenciesMet(inactive_operators_.front())) {
+        active_operators_.push_back(inactive_operators_.front());
+        inactive_operators_.pop_front();
+        activateOperator(active_operators_.back());
+      }
+    }
+  }
+  worker_message = getNextWorkerMessageFromActiveOperators(numa_node);
+  return worker_message;
+}
+
+WorkerMessage* QueryManagerSingleNode::getNextWorkerMessageFromActiveOperators(
+    const numa_node_id numa_node) {
+  WorkOrder *work_order = nullptr;
+  for (dag_node_index index : active_operators_) {
     if (numa_node != kAnyNUMANodeID) {
       // First try to get a normal WorkOrder from the specified NUMA node.
       work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
@@ -110,7 +142,6 @@ WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
       }
     }
   }
-  // No WorkOrders available right now.
   return nullptr;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/485c7047/query_execution/QueryManagerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
index dd044a5..c00c31c 100644
--- a/query_execution/QueryManagerSingleNode.hpp
+++ b/query_execution/QueryManagerSingleNode.hpp
@@ -21,7 +21,9 @@
 #define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_
 
 #include <cstddef>
+#include <deque>
 #include <memory>
+#include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
@@ -123,6 +125,16 @@ class QueryManagerSingleNode final : public QueryManagerBase {
   void getRebuildWorkOrders(const dag_node_index index,
                             WorkOrdersContainer *container);
 
+  void activateOperator(const dag_node_index index) {
+    DCHECK(checkAllBlockingDependenciesMet(index));
+    // It is okay to call the line below multiple times.
+    query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
+    processOperator(index, false);
+  }
+
+  WorkerMessage *getNextWorkerMessageFromActiveOperators(
+      const numa_node_id numa_node);
+
   const tmb::client_id foreman_client_id_;
 
   StorageManager *storage_manager_;
@@ -132,6 +144,10 @@ class QueryManagerSingleNode final : public QueryManagerBase {
 
   std::unique_ptr<WorkOrdersContainer> workorders_container_;
 
+  std::vector<dag_node_index> active_operators_;
+
+  std::deque<dag_node_index> inactive_operators_;
+
   DISALLOW_COPY_AND_ASSIGN(QueryManagerSingleNode);
 };
 


Mime
View raw message