quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hbdeshm...@apache.org
Subject [2/2] incubator-quickstep git commit: Measure execution time of normal WorkOrders.
Date Tue, 14 Jun 2016 19:06:28 GMT
Measure execution time of normal WorkOrders.

- Measure normal work order execution time
- Split the WorkOrderCompletion proto message in two: Normal and Rebuild
  work orders.
- Add execution time field in the NormalWorkOrderComplete proto message.
- Include the recorded time in NormalWorkOrderComplete proto message
  that is sent back to Foreman.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ba57e683
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ba57e683
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ba57e683

Branch: refs/heads/record-wo-execution-time
Commit: ba57e68333527e2a9c4376ab784beb42630887e2
Parents: dd44958
Author: Harshad Deshmukh <hbdeshmukh@apache.org>
Authored: Tue Jun 14 14:02:45 2016 -0500
Committer: Harshad Deshmukh <hbdeshmukh@apache.org>
Committed: Tue Jun 14 14:05:57 2016 -0500

----------------------------------------------------------------------
 query_execution/PolicyEnforcer.cpp              | 10 ++-
 query_execution/QueryExecutionMessages.proto    | 15 ++--
 query_execution/QueryManager.cpp                |  4 +-
 query_execution/Worker.cpp                      | 93 ++++++++++++++------
 query_execution/Worker.hpp                      | 40 +++++++--
 query_execution/tests/QueryManager_unittest.cpp |  9 +-
 6 files changed, 123 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ba57e683/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 2145429..4309cb8 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -66,9 +66,15 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message)
{
   // TaggedMessage only once.
   std::size_t query_id;
   switch (tagged_message.message_type()) {
-    case kWorkOrderCompleteMessage:  // Fall through.
+    case kWorkOrderCompleteMessage: {
+      serialization::NormalWorkOrderCompletionMessage proto;
+      CHECK(proto.ParseFromArray(tagged_message.message(),
+                                 tagged_message.message_bytes()));
+      query_id = proto.query_id();
+      break;
+    }
     case kRebuildWorkOrderCompleteMessage: {
-      serialization::WorkOrderCompletionMessage proto;
+      serialization::RebuildWorkOrderCompletionMessage proto;
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
       query_id = proto.query_id();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ba57e683/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 9d9a9e5..3523694 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -20,11 +20,16 @@ package quickstep.serialization;
 message EmptyMessage {
 }
 
-// Used for both Normal WorkOrders and RebuildWorkOrders.
-// NOTE(zuyu): we might need to seperate the completion messages to contain
-// run-time information for Foreman to make better decisions on scheduling
-// WorkOrders.
-message WorkOrderCompletionMessage {
+// A message sent upon completion of a normal (not rebuild) WorkOrder execution.
+message NormalWorkOrderCompletionMessage {
+  required uint64 operator_index = 1;
+  required uint64 worker_thread_index = 2;
+  required uint64 query_id = 3;  
+  optional uint64 execution_time_us = 4;  // Execution time in micro seconds.
+}
+
+// A message sent upon completion of a rebuild WorkOrder execution.
+message RebuildWorkOrderCompletionMessage {
   required uint64 operator_index = 1;
   required uint64 worker_thread_index = 2;
   required uint64 query_id = 3;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ba57e683/query_execution/QueryManager.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManager.cpp b/query_execution/QueryManager.cpp
index e4e4c9d..d20b592 100644
--- a/query_execution/QueryManager.cpp
+++ b/query_execution/QueryManager.cpp
@@ -161,7 +161,7 @@ QueryManager::QueryStatusCode QueryManager::processMessage(
   dag_node_index op_index;
   switch (tagged_message.message_type()) {
     case kWorkOrderCompleteMessage: {
-      serialization::WorkOrderCompletionMessage proto;
+      serialization::NormalWorkOrderCompletionMessage proto;
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
 
@@ -170,7 +170,7 @@ QueryManager::QueryStatusCode QueryManager::processMessage(
       break;
     }
     case kRebuildWorkOrderCompleteMessage: {
-      serialization::WorkOrderCompletionMessage proto;
+      serialization::RebuildWorkOrderCompletionMessage proto;
       CHECK(proto.ParseFromArray(tagged_message.message(),
                                  tagged_message.message_bytes()));
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ba57e683/query_execution/Worker.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp
index ef596e1..d327cd2 100644
--- a/query_execution/Worker.cpp
+++ b/query_execution/Worker.cpp
@@ -16,7 +16,9 @@
 
 #include "query_execution/Worker.hpp"
 
+#include <chrono>
 #include <cstddef>
+#include <cstdint>
 #include <cstdlib>
 #include <utility>
 
@@ -47,13 +49,32 @@ void Worker::run() {
   }
   ClientIDMap *thread_id_map = ClientIDMap::Instance();
   thread_id_map->addValue(worker_client_id_);
+  std::chrono::time_point<std::chrono::steady_clock> start, end;
   for (;;) {
     // Receive() is a blocking call, causing this thread to sleep until next
     // message is received.
     const AnnotatedMessage annotated_msg = bus_->Receive(worker_client_id_, 0, true);
     const TaggedMessage &tagged_message = annotated_msg.tagged_message;
     switch (tagged_message.message_type()) {
-      case kWorkOrderMessage:  // Fall through.
+      case kWorkOrderMessage: {
+        WorkerMessage message(*static_cast<const WorkerMessage*>(tagged_message.message()));
+        DCHECK(message.getWorkOrder() != nullptr);
+        // Start measuring the execution time.
+        start = std::chrono::steady_clock::now();
+        message.getWorkOrder()->execute();
+        end = std::chrono::steady_clock::now();
+        const std::size_t query_id_for_workorder =
+            message.getWorkOrder()->getQueryID();
+        delete message.getWorkOrder();
+        const uint64_t time_us =
+            std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
+                .count();
+        sendNormalWorkOrderCompleteMessage(annotated_msg.sender,
+                                           message.getRelationalOpIndex(),
+                                           query_id_for_workorder,
+                                           time_us);
+        break;
+      }
       case kRebuildWorkOrderMessage: {
         WorkerMessage message(*static_cast<const WorkerMessage*>(tagged_message.message()));
         DCHECK(message.getWorkOrder() != nullptr);
@@ -62,10 +83,9 @@ void Worker::run() {
             message.getWorkOrder()->getQueryID();
         delete message.getWorkOrder();
 
-        sendWorkOrderCompleteMessage(
-            annotated_msg.sender, message.getRelationalOpIndex(),
-            query_id_for_workorder,
-            tagged_message.message_type() == kRebuildWorkOrderMessage);
+        sendRebuildWorkOrderCompleteMessage(annotated_msg.sender,
+                                            message.getRelationalOpIndex(),
+                                            query_id_for_workorder);
         break;
       }
       case kPoisonMessage: {
@@ -77,34 +97,57 @@ void Worker::run() {
   }
 }
 
-void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver,
-                                          const size_t op_index,
-                                          const size_t query_id,
-                                          const bool is_rebuild_work_order) {
-  serialization::WorkOrderCompletionMessage proto;
-  proto.set_operator_index(op_index);
-  proto.set_worker_thread_index(worker_thread_index_);
-  proto.set_query_id(query_id);
-
+template <typename CompletionMessageProtoT>
+void Worker::sendWorkOrderCompleteMessageHelper(
+    const CompletionMessageProtoT &proto,
+    const tmb::client_id receiver,
+    const message_type_id message_type) {
   // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
   const size_t proto_length = proto.ByteSize();
-  char *proto_bytes = static_cast<char*>(std::malloc(proto_length));
+  char *proto_bytes = static_cast<char *>(std::malloc(proto_length));
   CHECK(proto.SerializeToArray(proto_bytes, proto_length));
 
-  TaggedMessage message(static_cast<const void*>(proto_bytes),
+  TaggedMessage message(static_cast<const void *>(proto_bytes),
                         proto_length,
-                        is_rebuild_work_order ? kRebuildWorkOrderCompleteMessage
-                                              : kWorkOrderCompleteMessage);
+                        message_type);
   std::free(proto_bytes);
 
   const tmb::MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(bus_,
-                                         worker_client_id_,
-                                         receiver,
-                                         std::move(message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not "
-      "be sent from worker with TMB client ID " << worker_client_id_ << " to
"
-      "Foreman with TMB client ID " << receiver;
+      QueryExecutionUtil::SendTMBMessage(
+          bus_, worker_client_id_, receiver, std::move(message));
+  CHECK(send_status == tmb::MessageBus::SendStatus::kOK)
+      << "Message could not be sent from worker with TMB client ID "
+      << worker_client_id_ << " to Foreman with TMB client ID " << receiver;
+}
+
+void Worker::sendNormalWorkOrderCompleteMessage(
+    const tmb::client_id receiver,
+    const size_t op_index,
+    const size_t query_id,
+    const uint64_t execution_time_us) {
+  serialization::NormalWorkOrderCompletionMessage proto;
+  proto.set_operator_index(op_index);
+  proto.set_worker_thread_index(worker_thread_index_);
+  proto.set_query_id(query_id);
+  proto.set_execution_time_us(execution_time_us);
+
+  sendWorkOrderCompleteMessageHelper<
+      serialization::NormalWorkOrderCompletionMessage>(
+      proto, receiver, kWorkOrderCompleteMessage);
+}
+
+void Worker::sendRebuildWorkOrderCompleteMessage(
+    const tmb::client_id receiver,
+    const size_t op_index,
+    const size_t query_id) {
+  serialization::RebuildWorkOrderCompletionMessage proto;
+  proto.set_operator_index(op_index);
+  proto.set_worker_thread_index(worker_thread_index_);
+  proto.set_query_id(query_id);
+
+  sendWorkOrderCompleteMessageHelper<
+      serialization::RebuildWorkOrderCompletionMessage>(
+      proto, receiver, kRebuildWorkOrderCompleteMessage);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ba57e683/query_execution/Worker.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp
index c0bafdc..1fea2ea 100644
--- a/query_execution/Worker.hpp
+++ b/query_execution/Worker.hpp
@@ -19,6 +19,7 @@
 #define QUICKSTEP_QUERY_EXECUTION_WORKER_HPP_
 
 #include <cstddef>
+#include <cstdint>
 
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "threading/Thread.hpp"
@@ -93,18 +94,43 @@ class Worker : public Thread {
 
  private:
   /**
-   * @brief Send the response WorkOrder completion message.
+   * @brief Construct a normal WorkOrder completion response message.
    *
    * @param receiver The id of the TMB client which should receive the response.
    * @param op_index The index of the operator to which the WorkOrder belongs.
    * @param query_id The ID of the query which the WorkOrder belongs to.
-   * @param is_rebuild_work_order True if it is a RebuildWorkOrder. Otherwise
-   *        false.
+   * @param execution_time_us The execution time of the WorkOrder in
+   *        microseconds.
    **/
-  void sendWorkOrderCompleteMessage(const tmb::client_id receiver,
-                                    const std::size_t op_index,
-                                    const std::size_t query_id,
-                                    const bool is_rebuild_work_order);
+  void sendNormalWorkOrderCompleteMessage(const tmb::client_id receiver,
+                                          const size_t op_index,
+                                          const size_t query_id,
+                                          const uint64_t execution_time_us);
+
+  /**
+   * @brief Construct a rebuild WorkOrder completion response message.
+   *
+   * @param receiver The id of the TMB client which should receive the response.
+   * @param op_index The index of the operator to which the WorkOrder belongs.
+   * @param query_id The ID of the query which the WorkOrder belongs to.
+   **/
+  void sendRebuildWorkOrderCompleteMessage(const tmb::client_id receiver,
+                                           const size_t op_index,
+                                           const size_t query_id);
+
+  /**
+   * @brief A helper method to send the WorkOrder completion message.
+   *
+   * @note CompletionMessageProtoT is the type of the completion message.
+   *
+   * @param proto The proto message to be sent.
+   * @param receiver The TMB client ID of the receiver.
+   * @param message_type The ID of the type of the message being sent.
+   **/
+  template <typename CompletionMessageProtoT>
+  void sendWorkOrderCompleteMessageHelper(const CompletionMessageProtoT &proto,
+                                          const tmb::client_id receiver,
+                                          const message_type_id message_type);
 
   const std::size_t worker_thread_index_;
   MessageBus *bus_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ba57e683/query_execution/tests/QueryManager_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManager_unittest.cpp b/query_execution/tests/QueryManager_unittest.cpp
index 4f98748..37e2cdd 100644
--- a/query_execution/tests/QueryManager_unittest.cpp
+++ b/query_execution/tests/QueryManager_unittest.cpp
@@ -275,7 +275,7 @@ class QueryManagerTest : public ::testing::Test {
   inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
     VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";
     TaggedMessage tagged_message;
-    serialization::WorkOrderCompletionMessage proto;
+    serialization::NormalWorkOrderCompletionMessage proto;
     proto.set_operator_index(index);
     proto.set_worker_thread_index(1);  // dummy worker ID.
     proto.set_query_id(0);  // dummy query ID.
@@ -296,8 +296,7 @@ class QueryManagerTest : public ::testing::Test {
 
   inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
     VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index <<
"]";
-    // foreman_->processRebuildWorkOrderCompleteMessage(index, 0 /* worker id */);
-    serialization::WorkOrderCompletionMessage proto;
+    serialization::RebuildWorkOrderCompletionMessage proto;
     proto.set_operator_index(index);
     proto.set_worker_thread_index(1);  // dummy worker thread ID.
     proto.set_query_id(0);  // dummy query ID.
@@ -346,7 +345,6 @@ class QueryManagerTest : public ::testing::Test {
   unique_ptr<QueryHandle> query_handle_;
   unique_ptr<QueryManager> query_manager_;
 
-  // unique_ptr<Foreman> foreman_;
   MessageBusImpl bus_;
 
   client_id worker_client_id_;
@@ -357,7 +355,6 @@ class QueryManagerTest : public ::testing::Test {
 TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) {
   // This test creates a DAG of a single node. No workorders are generated.
   query_plan_->addRelationalOperator(new MockOperator(false, false));
-  // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
 
   const MockOperator &op = static_cast<const MockOperator &>(
       query_plan_->getQueryPlanDAG().getNodePayload(0));
@@ -377,7 +374,6 @@ TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) {
   // This test creates a DAG of a single node. Static workorders are generated.
   const QueryPlan::DAGNodeIndex id =
       query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
-  // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
 
   const MockOperator &op = static_cast<const MockOperator &>(
       query_plan_->getQueryPlanDAG().getNodePayload(id));
@@ -429,7 +425,6 @@ TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
   // scaffolding of mocking. If we use gMock, we can do much better.
   const QueryPlan::DAGNodeIndex id =
       query_plan_->addRelationalOperator(new MockOperator(true, false, 4, 3));
-  // foreman_->setQueryPlan(query_plan_->getQueryPlanDAGMutable());
 
   const MockOperator &op = static_cast<const MockOperator &>(
       query_plan_->getQueryPlanDAG().getNodePayload(id));


Mime
View raw message