quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zu...@apache.org
Subject [20/50] [abbrv] incubator-quickstep git commit: Assign equal probability to all the active operators.
Date Fri, 27 May 2016 03:23:11 GMT
Assign equal probability to all the active operators.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/7805b33f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/7805b33f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/7805b33f

Branch: refs/heads/operator-selection-heuristics
Commit: 7805b33f34944941a465306a09bba70d7dd04c46
Parents: a2c8bb0
Author: Harshad Deshmukh <harshad@cs.wisc.edu>
Authored: Wed May 18 12:13:00 2016 -0500
Committer: Harshad Deshmukh <harshad@cs.wisc.edu>
Committed: Wed May 18 12:13:00 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt |   1 +
 query_execution/Foreman.cpp    | 158 +++++++++++++++++++++++++++++++++++-
 query_execution/Foreman.hpp    |  34 +++++++-
 3 files changed, 190 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7805b33f/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 04a0348..50fd14b 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -65,6 +65,7 @@ target_link_libraries(quickstep_queryexecution_Foreman
                       quickstep_threading_ThreadUtil
                       quickstep_utility_DAG
                       quickstep_utility_Macros
+                      quickstep_utility_StringUtil
                       tmb)
 target_link_libraries(quickstep_queryexecution_ForemanLite
                       glog

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7805b33f/query_execution/Foreman.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 304c429..35eceea 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -17,6 +17,7 @@
 
 #include "query_execution/Foreman.hpp"
 
+#include <algorithm>
 #include <cstddef>
 #include <memory>
 #include <utility>
@@ -317,6 +318,13 @@ void Foreman::initializeState() {
       }
     }
   }
+  // Loop again to populate the active_operators_ vector.
+  for (dag_node_index node_index = 0; node_index < dag_size; ++node_index) {
+    if (checkAllBlockingDependenciesMet(node_index)) {
+      // Add to active_operators_.
+      active_operators_.emplace_back(node_index);
+    }
+  }
 }
 
 // TODO(harshad) : The default policy may execute remote WorkOrders for an
@@ -326,8 +334,48 @@ void Foreman::initializeState() {
 WorkerMessage* Foreman::getNextWorkerMessage(
     const dag_node_index start_operator_index, const int numa_node) {
   // Default policy: Operator with lowest index first.
+  updateProbabilities();
+  // We try few times.
   WorkOrder *work_order = nullptr;
-  size_t num_operators_checked = 0;
+  //while (work_order == nullptr) {
+    // std::cout << "Look for workorder active operator count: " << operator_probabilities_.size()
<< "\n";
+    int next_operator_index = chooseOperator();
+    if (next_operator_index == -1) {
+      return nullptr;
+    }
+    if (numa_node != -1) {
+      work_order = workorders_container_->getNormalWorkOrderForNUMANode(next_operator_index,
numa_node);
+      if (work_order != nullptr) {
+        // A WorkOrder found on the given NUMA node.
+        query_exec_state_->incrementNumQueuedWorkOrders(next_operator_index);
+        return WorkerMessage::WorkOrderMessage(work_order, next_operator_index);
+      } else {
+        // Normal workorder not found on this node. Look for a rebuild workorder
+        // on this NUMA node.
+        work_order = workorders_container_->getRebuildWorkOrderForNUMANode(next_operator_index,
numa_node);
+        if (work_order != nullptr) {
+          return WorkerMessage::RebuildWorkOrderMessage(work_order, next_operator_index);
+        }
+      }
+    }
+    // Either no workorder found on the given NUMA node, or numa_node is -1.
+    // Try to get a normal WorkOrder from other NUMA nodes.
+    work_order = workorders_container_->getNormalWorkOrder(next_operator_index);
+    if (work_order != nullptr) {
+      /*if (!hasOperatorStarted(next_operator_index)) {
+        operator_start_timestamp_[next_operator_index] = std::chrono::steady_clock::now();
+      }*/
+      query_exec_state_->incrementNumQueuedWorkOrders(next_operator_index);
+      return WorkerMessage::WorkOrderMessage(work_order, next_operator_index);
+    } else {
+      // Normal WorkOrder not found, look for a RebuildWorkOrder.
+      work_order = workorders_container_->getRebuildWorkOrder(next_operator_index);
+      if (work_order != nullptr) {
+        return WorkerMessage::RebuildWorkOrderMessage(work_order, next_operator_index);
+      }
+    }
+  // }
+  /*size_t num_operators_checked = 0;
   for (dag_node_index index = start_operator_index;
        num_operators_checked < query_dag_->size();
        index = (index + 1) % query_dag_->size(), ++num_operators_checked) {
@@ -354,6 +402,9 @@ WorkerMessage* Foreman::getNextWorkerMessage(
     // Try to get a normal WorkOrder from other NUMA nodes.
     work_order = workorders_container_->getNormalWorkOrder(index);
     if (work_order != nullptr) {
+      if (!hasOperatorStarted(index)) {
+        operator_start_timestamp_[index] = std::chrono::steady_clock::now();
+      }
       query_exec_state_->incrementNumQueuedWorkOrders(index);
       return WorkerMessage::WorkOrderMessage(work_order, index);
     } else {
@@ -363,7 +414,7 @@ WorkerMessage* Foreman::getNextWorkerMessage(
         return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
       }
     }
-  }
+  }*/
   // No WorkOrders available right now.
   return nullptr;
 }
@@ -423,6 +474,13 @@ bool Foreman::fetchNormalWorkOrders(const dag_node_index index) {
         (num_pending_workorders_before <
          workorders_container_->getNumNormalWorkOrders(index));
   }
+  if (generated_new_workorders) {
+    if (std::find(active_operators_.begin(), active_operators_.end(), index) == active_operators_.end())
{
+      // std::cout << "Added operator " << index << " in processOperator()\n";
+      active_operators_.emplace_back(index);
+      updateProbabilities();
+    }
+  }
   return generated_new_workorders;
 }
 
@@ -467,6 +525,14 @@ void Foreman::processOperator(const dag_node_index index,
 }
 
 void Foreman::markOperatorFinished(const dag_node_index index) {
+  // std::cout << "Operator " << index << "finished\n";
+  // operator_duration_[index] = std::chrono::steady_clock::now() - operator_start_timestamp_[index];
+  // Remove this operator.
+  active_operators_.erase(std::remove(active_operators_.begin(), active_operators_.end(),
index), active_operators_.end());
+  // Add to finished operators.
+  // finished_operators_.emplace_back(index);
+  updateProbabilities();
+
   query_exec_state_->setExecutionFinished(index);
 
   RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
@@ -496,6 +562,11 @@ bool Foreman::initiateRebuild(const dag_node_index index) {
   query_exec_state_->setRebuildStatus(
       index, workorders_container_->getNumRebuildWorkOrders(index), true);
 
+  if (query_exec_state_->getNumRebuildWorkOrders(index) > 0) {
+    if (std::find(active_operators_.begin(), active_operators_.end(), index) == active_operators_.end())
{
+      active_operators_.emplace_back(index);
+    }
+  }
   return (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
 }
 
@@ -528,4 +599,87 @@ void Foreman::getRebuildWorkOrders(const dag_node_index index, WorkOrdersContain
   }
 }
 
+void Foreman::updateProbabilities() {
+  // std::unordered_map<dag_node_index, std::size_t> num_workorders;
+  std::vector<std::pair<double, dag_node_index>> operator_probabilities;
+  /*std::size_t total_num_workorders = 0;
+  for (dag_node_index active_op_index : active_operators_) {
+    if (checkAllBlockingDependenciesMet(active_op_index)) {
+      std::size_t num_workorders_for_op = workorders_container_->getNumNormalWorkOrders(active_op_index);
+      num_workorders_for_op += workorders_container_->getNumRebuildWorkOrders(active_op_index);
+      if (num_workorders_for_op > 0) {
+        num_workorders[active_op_index] = num_workorders_for_op;
+        total_num_workorders += num_workorders_for_op;
+      }
+    }
+  }*/
+  std::unordered_map<dag_node_index, bool> schedulable_operators;
+  for (dag_node_index active_op_index : active_operators_) {
+    if (checkAllBlockingDependenciesMet(active_op_index)) {
+      schedulable_operators[active_op_index] = true;
+    }
+  }
+  /*if (total_num_workorders == 0) {
+    operator_probabilities_.swap(operator_probabilities);
+    return;
+  }*/
+  if (schedulable_operators.empty()) {
+    operator_probabilities_.swap(operator_probabilities);
+    return;
+  }
+  std::size_t last_operator_index = 0;
+  // if (num_workorders.size() == 1u) {
+  if (schedulable_operators.size() == 1u) {
+    // Only one operator is active.
+    // last_operator_index = num_workorders.begin()->first;
+    last_operator_index = schedulable_operators.begin()->first;
+    operator_probabilities.emplace_back(1.0, last_operator_index);
+  } else {
+    // More than one active operators.
+    double cumulative_probability = 0.0;
+    const double individual_probability = 1 / static_cast<double>(schedulable_operators.size());
+    /*for (auto it = num_workorders.begin(); it != num_workorders.end(); ++it) {
+      const double individual_probability = it->second / static_cast<double>(total_num_workorders);
+      cumulative_probability += individual_probability;
+      operator_probabilities.emplace_back(cumulative_probability, it->first);
+      last_operator_index = it->first;
+    }*/
+    for (auto it = schedulable_operators.begin(); it != schedulable_operators.end(); ++it)
{
+      cumulative_probability += individual_probability;
+      operator_probabilities.emplace_back(cumulative_probability, it->first);
+      last_operator_index = it->first;
+    }
+    DCHECK(!operator_probabilities.empty());
+    operator_probabilities.back().first = 1.0;
+    operator_probabilities.back().second = last_operator_index;
+  }
+  // Round off the cumulative probability for the last element.
+  operator_probabilities_.swap(operator_probabilities);
+}
+
+int Foreman::chooseOperator() {
+  if (operator_probabilities_.empty()) {
+    // std::cout << "No operator right now\n";
+    return -1;
+  } else if (operator_probabilities_.size() == 1u) {
+    int operator_index = static_cast<int>(operator_probabilities_.front().second);
+    if (workorders_container_->hasNormalWorkOrder(operator_index) || workorders_container_->hasRebuildWorkOrder(operator_index))
{
+      // std::cout << "Single operator: " << operator_index << "\n";
+      return operator_index;
+    } else {
+      // std::cout << "Single operator: " << operator_index << " but no
workorder\n";
+      return -1;
+    }
+  } else {
+    std::uniform_real_distribution<double> dist(0.0, 1.0);
+    double chosen_probability = dist(mt_);
+    std::pair<double, std::size_t> search_key = std::make_pair(chosen_probability,
0);
+    auto chosen_operator_it = std::upper_bound(
+          operator_probabilities_.begin(), operator_probabilities_.end(), search_key);
+    DCHECK(chosen_operator_it != operator_probabilities_.end());
+    // std::cout << "Operator: " << chosen_operator_it->second << "\n";
+    return static_cast<int>(chosen_operator_it->second);
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7805b33f/query_execution/Foreman.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
index 2d6e0d3..39852f8 100644
--- a/query_execution/Foreman.hpp
+++ b/query_execution/Foreman.hpp
@@ -18,8 +18,11 @@
 #ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
 #define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
 
+#include <chrono>
 #include <cstddef>
 #include <memory>
+#include <random>
+#include <unordered_map>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -34,6 +37,7 @@
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/DAG.hpp"
 #include "utility/Macros.hpp"
+#include "utility/StringUtil.hpp"
 
 #include "glog/logging.h"
 #include "gtest/gtest_prod.h"
@@ -80,7 +84,8 @@ class Foreman final : public ForemanLite {
         catalog_database_(DCHECK_NOTNULL(catalog_database)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)),
         max_msgs_per_worker_(1),
-        num_numa_nodes_(num_numa_nodes) {
+        num_numa_nodes_(num_numa_nodes),
+        mt_(std::random_device()()) {
     bus_->RegisterClientAsSender(foreman_client_id_, kWorkOrderMessage);
     bus_->RegisterClientAsSender(foreman_client_id_, kRebuildWorkOrderMessage);
     // NOTE : Foreman thread sends poison messages in the optimizer's
@@ -271,6 +276,9 @@ class Foreman final : public ForemanLite {
   void cleanUp() {
     output_consumers_.clear();
     blocking_dependencies_.clear();
+    /*for (auto it = operator_duration_.begin(); it != operator_duration_.end(); ++it) {
+      std::cout << "Op: " << it->first << " Time: " << DoubleToStringWithSignificantDigits(it->second.count(),
3) << "\n";
+    }*/
   }
 
   /**
@@ -429,6 +437,17 @@ class Foreman final : public ForemanLite {
    **/
   void getRebuildWorkOrders(const dag_node_index index, WorkOrdersContainer *container);
 
+  /*bool hasOperatorStarted(const dag_node_index index) const {
+    if (operator_start_timestamp_.find(index) != operator_start_timestamp_.end()) {
+      return true;
+    }
+    return false;
+  }*/
+
+  void updateProbabilities();
+
+  int chooseOperator();
+
   CatalogDatabaseLite *catalog_database_;
   StorageManager *storage_manager_;
 
@@ -454,6 +473,19 @@ class Foreman final : public ForemanLite {
 
   WorkerDirectory *workers_;
 
+  // A vector of IDs of the schedulable operators.
+  // Note, for simplicity, in this list there could be operators for which no
+  // work order has been scheduled yet.
+  std::vector<dag_node_index> active_operators_;
+  // Operators which have finished the execution.
+  // std::vector<dag_node_index> finished_operators_;
+
+  std::vector<std::pair<double, dag_node_index>> operator_probabilities_;
+
+  /*std::unordered_map<dag_node_index, std::chrono::duration<double, std::milli>>
operator_duration_;
+  std::unordered_map<dag_node_index, std::chrono::time_point<std::chrono::steady_clock>>
operator_start_timestamp_;*/
+
+  std::mt19937_64 mt_;
   friend class ForemanTest;
   FRIEND_TEST(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest);
 


Mime
View raw message