quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hbdeshm...@apache.org
Subject [6/6] incubator-quickstep git commit: Configurable initial number of active operators.
Date Tue, 01 Nov 2016 21:57:03 GMT
Configurable initial number of active operators.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1d486aef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1d486aef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1d486aef

Branch: refs/heads/delay-hashtable-memory-alloc
Commit: 1d486aefe1f9a5ffbaf23919923faea010174797
Parents: 13cf395
Author: Harshad Deshmukh <hbdeshmukh@apache.org>
Authored: Mon Oct 31 23:24:40 2016 -0500
Committer: Harshad Deshmukh <hbdeshmukh@apache.org>
Committed: Tue Nov 1 16:56:38 2016 -0500

----------------------------------------------------------------------
 query_execution/QueryManagerSingleNode.cpp | 41 +++++++++++++++++--------
 query_execution/QueryManagerSingleNode.hpp |  3 +-
 2 files changed, 30 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1d486aef/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index e77600f..1b211a9 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -60,8 +60,9 @@ QueryManagerSingleNode::QueryManagerSingleNode(
       workorders_container_(
           new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)) {
   // Mark the active operators in the DAG.
+  const std::size_t kNumMaxInitialActiveOperators = 2;
   for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
-    if (checkAllBlockingDependenciesMet(index)) {
+    if (checkAllBlockingDependenciesMet(index) && active_operators_.size() < kNumMaxInitialActiveOperators)
{
       active_operators_.push_back(index);
       activateOperator(index);
     } else {
@@ -72,7 +73,6 @@ QueryManagerSingleNode::QueryManagerSingleNode(
 
 WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
     const dag_node_index start_operator_index, const numa_node_id numa_node) {
-  // Default policy: Operator with lowest index first.
   WorkerMessage *worker_message = getNextWorkerMessageFromActiveOperators(numa_node);
   if (worker_message != nullptr) {
     return worker_message;
@@ -91,18 +91,35 @@ WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
     DCHECK(iter != active_operators_.end());
     active_operators_.erase(iter);
   }
-  // Move some inactive operators to active operators.
-  if (!inactive_operators_.empty()) {
-    for (dag_node_index inactive_op_count = 0;
-         inactive_op_count < execution_finished_operator_indexes.size();
-         ++inactive_op_count) {
-      if (checkAllBlockingDependenciesMet(inactive_operators_.front())) {
-        active_operators_.push_back(inactive_operators_.front());
-        inactive_operators_.pop_front();
-        activateOperator(active_operators_.back());
-      }
+  // Collect the list of "ready to be active" operators.
+  std::vector<dag_node_index> ready_to_be_active_operators;
+  for (dag_node_index index : inactive_operators_) {
+    if (checkAllBlockingDependenciesMet(index)) {
+      ready_to_be_active_operators.push_back(index);
     }
   }
+  // Move as many inactive operators to active operators, as the size of
+  // execution_finished_operator_indexes.
+  std::size_t num_operators_activated = 0;
+  if (!inactive_operators_.empty() &&
+      !execution_finished_operator_indexes.empty() &&
+      !ready_to_be_active_operators.empty()) {
+    for (; num_operators_activated <
+           std::min(execution_finished_operator_indexes.size(),
+                    inactive_operators_.size());
+         ++num_operators_activated) {
+      active_operators_.push_back(ready_to_be_active_operators[num_operators_activated]);
+      activateOperator(active_operators_.back());
+    }
+  }
+  // Remove the newly activated operators.
+  for (std::size_t i = 0; i < num_operators_activated; ++i) {
+    auto iter = std::find(inactive_operators_.begin(),
+                          inactive_operators_.end(),
+                          ready_to_be_active_operators[i]);
+    DCHECK(iter != inactive_operators_.end());
+    inactive_operators_.erase(iter);
+  }
   worker_message = getNextWorkerMessageFromActiveOperators(numa_node);
   return worker_message;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1d486aef/query_execution/QueryManagerSingleNode.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
index c00c31c..693684a 100644
--- a/query_execution/QueryManagerSingleNode.hpp
+++ b/query_execution/QueryManagerSingleNode.hpp
@@ -21,7 +21,6 @@
 #define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_SINGLE_NODE_HPP_
 
 #include <cstddef>
-#include <deque>
 #include <memory>
 #include <vector>
 
@@ -146,7 +145,7 @@ class QueryManagerSingleNode final : public QueryManagerBase {
 
   std::vector<dag_node_index> active_operators_;
 
-  std::deque<dag_node_index> inactive_operators_;
+  std::vector<dag_node_index> inactive_operators_;
 
   DISALLOW_COPY_AND_ASSIGN(QueryManagerSingleNode);
 };


Mime
View raw message