quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [10/17] incubator-quickstep git commit: Refactored RebuildStatus.
Date Wed, 05 Oct 2016 04:25:44 GMT
Refactored RebuildStatus.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6b377d5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6b377d5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6b377d5d

Branch: refs/heads/lip-refactor
Commit: 6b377d5d6c6f2e05ef74a4c1a022590af2db6571
Parents: ac3512c
Author: Zuyu Zhang <zuyuz@twitter.com>
Authored: Sat Sep 24 15:05:16 2016 -0700
Committer: Zuyu Zhang <zuyuz@twitter.com>
Committed: Sat Sep 24 15:05:16 2016 -0700

----------------------------------------------------------------------
 query_execution/QueryExecutionState.hpp | 72 ++++++++++++++++++++--------
 1 file changed, 53 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6b377d5d/query_execution/QueryExecutionState.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionState.hpp b/query_execution/QueryExecutionState.hpp
index 9ae9563..f5281d5 100644
--- a/query_execution/QueryExecutionState.hpp
+++ b/query_execution/QueryExecutionState.hpp
@@ -92,8 +92,15 @@ class QueryExecutionState {
                                const std::size_t num_rebuild_workorders,
                                const bool rebuild_initiated) {
     DCHECK(operator_index < num_operators_);
-    rebuild_status_[operator_index].second = num_rebuild_workorders;
-    rebuild_status_[operator_index].first = rebuild_initiated;
+    auto search_res = rebuild_status_.find(operator_index);
+    if (search_res != rebuild_status_.end()) {
+      search_res->second.has_initiated = rebuild_initiated;
+      search_res->second.num_pending_workorders = num_rebuild_workorders;
+    } else {
+      RebuildStatus rebuild_status(rebuild_initiated, num_rebuild_workorders);
+
+      rebuild_status_.emplace(operator_index, std::move(rebuild_status));
+    }
   }
 
   /**
@@ -107,7 +114,7 @@ class QueryExecutionState {
     DCHECK(operator_index < num_operators_);
     const auto search_res = rebuild_status_.find(operator_index);
     if (search_res != rebuild_status_.end()) {
-      return search_res->second.first;
+      return search_res->second.has_initiated;
     }
     return false;
   }
@@ -124,7 +131,7 @@ class QueryExecutionState {
     DCHECK(operator_index < num_operators_);
     const auto search_res = rebuild_status_.find(operator_index);
     if (search_res != rebuild_status_.end()) {
-      return search_res->second.second;
+      return search_res->second.num_pending_workorders;
     }
     LOG(WARNING) << "Called QueryExecutionState::getNumRebuildWorkOrders() "
                     "for an operator whose rebuild entry doesn't exist.";
@@ -132,22 +139,39 @@ class QueryExecutionState {
   }
 
   /**
+   * @brief Increment the number of rebuild WorkOrders for the given operator.
+   *
+   * @param operator_index The index of the given operator.
+   * @param num_rebuild_workorders The number of rebuild workorders of the given
+   *        operator.
+   **/
+  inline void incrementNumRebuildWorkOrders(const std::size_t operator_index,
+                                            const std::size_t num_rebuild_workorders) {
+    DCHECK_LT(operator_index, num_operators_);
+    auto search_res = rebuild_status_.find(operator_index);
+    DCHECK(search_res != rebuild_status_.end())
+        << "Called for an operator whose rebuild status does not exist.";
+    DCHECK(search_res->second.has_initiated);
+
+    search_res->second.num_pending_workorders += num_rebuild_workorders;
+  }
+
+  /**
    * @brief Decrement the number of rebuild WorkOrders for the given operator.
    *
    * @param operator_index The index of the given operator.
    **/
   inline void decrementNumRebuildWorkOrders(const std::size_t operator_index) {
     DCHECK(operator_index < num_operators_);
-    const auto search_res = rebuild_status_.find(operator_index);
-    if (search_res != rebuild_status_.end()) {
-      DCHECK(search_res->second.first);
-      DCHECK_GE(search_res->second.second, 1u);
-      --rebuild_status_[operator_index].second;
-    } else {
-      LOG(FATAL) <<
-          "Called QueryExecutionState::decrementNumRebuildWorkOrders() for an "
-          "operator whose rebuild entry doesn't exist.";
-    }
+    auto search_res = rebuild_status_.find(operator_index);
+    CHECK(search_res != rebuild_status_.end())
+        << "Called QueryExecutionState::decrementNumRebuildWorkOrders() for an "
+           "operator whose rebuild entry doesn't exist.";
+
+    DCHECK(search_res->second.has_initiated);
+    DCHECK_GE(search_res->second.num_pending_workorders, 1u);
+
+    --(search_res->second.num_pending_workorders);
   }
 
   /**
@@ -279,11 +303,21 @@ class QueryExecutionState {
   // The ith bit denotes if the operator with ID = i has finished its execution.
   std::vector<bool> execution_finished_;
 
-  // Key is dag_node_index for which rebuild is required. Value is a pair -
-  // first element is a bool (whether rebuild for operator at index i has been
-  // initiated) and if the boolean is true, the second element denotes the
-  // number of pending rebuild workorders for the operator.
-  std::unordered_map<std::size_t, std::pair<bool, std::size_t>> rebuild_status_;
+  struct RebuildStatus {
+    RebuildStatus(const bool initiated,
+                  const std::size_t num_workorders)
+        : has_initiated(initiated),
+          num_pending_workorders(num_workorders) {}
+
+    // Whether rebuild for operator at index i has been initiated.
+    bool has_initiated;
+    // The number of pending rebuild workorders for the operator.
+    // Valid if and only if 'has_initiated' is true.
+    std::size_t num_pending_workorders;
+  };
+
+  // Key is dag_node_index for which rebuild is required.
+  std::unordered_map<std::size_t, RebuildStatus> rebuild_status_;
 
   DISALLOW_COPY_AND_ASSIGN(QueryExecutionState);
 };


Mime
View raw message