Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 94E41200BC3 for ; Fri, 18 Nov 2016 22:21:52 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 93A3F160B19; Fri, 18 Nov 2016 21:21:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8F137160B04 for ; Fri, 18 Nov 2016 22:21:51 +0100 (CET) Received: (qmail 26632 invoked by uid 500); 18 Nov 2016 21:21:50 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 26598 invoked by uid 99); 18 Nov 2016 21:21:50 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Nov 2016 21:21:50 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 557A4C0B75 for ; Fri, 18 Nov 2016 21:21:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id zj3wpy_dHIMX for ; Fri, 18 Nov 2016 21:21:49 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 01E575FCCF for ; Fri, 18 Nov 2016 21:21:47 +0000 (UTC) Received: (qmail 26478 invoked by uid 99); 18 Nov 2016 21:21:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Nov 2016 21:21:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D9D5DF12EF; Fri, 18 Nov 2016 21:21:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Fri, 18 Nov 2016 21:21:51 -0000 Message-Id: <63b667bb08f54d33a9cb35e6784f40c0@git.apache.org> In-Reply-To: <3fbbed48a1de42ba913b3593f5a2d7f4@git.apache.org> References: <3fbbed48a1de42ba913b3593f5a2d7f4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/6] incubator-quickstep git commit: Unified WorkOrderCompletionMessage. archived-at: Fri, 18 Nov 2016 21:21:52 -0000 Unified WorkOrderCompletionMessage. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/178ed4b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/178ed4b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/178ed4b3 Branch: refs/heads/wo-msg Commit: 178ed4b3241d5c16736459f9e9073922a03d99b2 Parents: 7095987 Author: Zuyu Zhang Authored: Sat Nov 12 16:44:39 2016 -0800 Committer: Zuyu Zhang Committed: Fri Nov 18 13:21:38 2016 -0800 ---------------------------------------------------------------------- query_execution/PolicyEnforcerBase.cpp | 6 ++--- query_execution/PolicyEnforcerBase.hpp | 4 +-- query_execution/QueryExecutionMessages.proto | 32 ++++++++--------------- query_execution/Worker.cpp | 28 +++++++++----------- query_execution/Worker.hpp | 13 +++++---- 5 files changed, 35 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/PolicyEnforcerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp index 745ded6..4e8c782 100644 --- a/query_execution/PolicyEnforcerBase.cpp +++ b/query_execution/PolicyEnforcerBase.cpp @@ -45,7 +45,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) { switch (tagged_message.message_type()) { case kWorkOrderCompleteMessage: { - serialization::NormalWorkOrderCompletionMessage proto; + serialization::WorkOrderCompletionMessage proto; // Note: This proto message contains the time it took to execute the // WorkOrder. It can be accessed in this scope. CHECK(proto.ParseFromArray(tagged_message.message(), @@ -64,7 +64,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) { break; } case kRebuildWorkOrderCompleteMessage: { - serialization::RebuildWorkOrderCompletionMessage proto; + serialization::WorkOrderCompletionMessage proto; // Note: This proto message contains the time it took to execute the // rebuild WorkOrder. It can be accessed in this scope. CHECK(proto.ParseFromArray(tagged_message.message(), @@ -157,7 +157,7 @@ bool PolicyEnforcerBase::admitQueries( } void PolicyEnforcerBase::recordTimeForWorkOrder( - const serialization::NormalWorkOrderCompletionMessage &proto) { + const serialization::WorkOrderCompletionMessage &proto) { const std::size_t query_id = proto.query_id(); std::vector &workorder_time_entries = workorder_time_recorder_[query_id]; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/PolicyEnforcerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp index ea2c06f..7009a0a 100644 --- a/query_execution/PolicyEnforcerBase.hpp +++ b/query_execution/PolicyEnforcerBase.hpp @@ -38,7 +38,7 @@ namespace quickstep { class CatalogDatabaseLite; class QueryHandle; -namespace serialization { class NormalWorkOrderCompletionMessage; } +namespace serialization { class WorkOrderCompletionMessage; } /** \addtogroup QueryExecution * @{ @@ -165,7 +165,7 @@ class PolicyEnforcerBase { * execution. **/ void recordTimeForWorkOrder( - const serialization::NormalWorkOrderCompletionMessage &proto); + const serialization::WorkOrderCompletionMessage &proto); CatalogDatabaseLite *catalog_database_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 1a2cb78..165a194 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -31,31 +31,21 @@ import "relational_operators/WorkOrder.proto"; // order completion message, we may be interested in adding the compression // ratio or dictionary size of the rebuilt block. -// TODO(harshad) : If there are different fields in the two message types below, -// create a base message class called WorkOrderCompletionMessage and make the -// two classes below extend the base class. All the common fields in both the -// classes can be moved to the base class. +message WorkOrderCompletionMessage { + enum WorkOrderType { + NORMAL = 0; + REBUILD = 1; + } -// A message sent upon completion of a normal (not rebuild) WorkOrder execution. -message NormalWorkOrderCompletionMessage { - required uint64 operator_index = 1; - required uint64 worker_thread_index = 2; - required uint64 query_id = 3; - - // Epoch time in microseconds. - optional uint64 execution_start_time = 4; - optional uint64 execution_end_time = 5; -} + required WorkOrderType work_order_type = 1; -// A message sent upon completion of a rebuild WorkOrder execution. -message RebuildWorkOrderCompletionMessage { - required uint64 operator_index = 1; - required uint64 worker_thread_index = 2; - required uint64 query_id = 3; + required uint64 operator_index = 2; + required uint64 worker_thread_index = 3; + required uint64 query_id = 4; // Epoch time in microseconds. - optional uint64 execution_start_time = 4; - optional uint64 execution_end_time = 5; + optional uint64 execution_start_time = 5; + optional uint64 execution_end_time = 6; } message CatalogRelationNewBlockMessage { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/Worker.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp index 0b1efba..0db17b4 100644 --- a/query_execution/Worker.cpp +++ b/query_execution/Worker.cpp @@ -47,6 +47,8 @@ using tmb::TaggedMessage; namespace quickstep { +using serialization::WorkOrderCompletionMessage; + void Worker::run() { if (cpu_id_ >= 0) { ThreadUtil::BindToCPU(cpu_id_); @@ -61,21 +63,16 @@ void Worker::run() { const TaggedMessage &tagged_message = annotated_msg.tagged_message; switch (tagged_message.message_type()) { case kWorkOrderMessage: { - serialization::NormalWorkOrderCompletionMessage proto; - executeWorkOrderHelper( - tagged_message, &proto); - sendWorkOrderCompleteMessage< - serialization::NormalWorkOrderCompletionMessage>( + WorkOrderCompletionMessage proto; + executeWorkOrderHelper(tagged_message, &proto); + sendWorkOrderCompleteMessage( annotated_msg.sender, proto, kWorkOrderCompleteMessage); break; } case kRebuildWorkOrderMessage: { - serialization::RebuildWorkOrderCompletionMessage proto; - executeWorkOrderHelper< - serialization::RebuildWorkOrderCompletionMessage>(tagged_message, - &proto); - sendWorkOrderCompleteMessage< - serialization::RebuildWorkOrderCompletionMessage>( + WorkOrderCompletionMessage proto; + executeWorkOrderHelper(tagged_message, &proto, true /* is_rebuild */); + sendWorkOrderCompleteMessage( annotated_msg.sender, proto, kRebuildWorkOrderCompleteMessage); break; } @@ -88,9 +85,8 @@ void Worker::run() { } } -template void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver, - const CompletionMessageProtoT &proto, + const WorkOrderCompletionMessage &proto, const message_type_id message_type) { // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. const size_t proto_length = proto.ByteSize(); @@ -109,9 +105,9 @@ void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver, CHECK(send_status == tmb::MessageBus::SendStatus::kOK); } -template void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message, - CompletionMessageProtoT *proto) { + WorkOrderCompletionMessage *proto, + const bool is_rebuild_work_order) { std::chrono::time_point start, end; WorkerMessage worker_message( *static_cast(tagged_message.message())); @@ -133,6 +129,8 @@ void Worker::executeWorkOrderHelper(const TaggedMessage &tagged_message, end.time_since_epoch()).count(); // Construct the proto message. + proto->set_work_order_type(is_rebuild_work_order ? WorkOrderCompletionMessage::REBUILD + : WorkOrderCompletionMessage::NORMAL); proto->set_operator_index(worker_message.getRelationalOpIndex()); proto->set_query_id(query_id_for_workorder); proto->set_worker_thread_index(worker_thread_index_); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/178ed4b3/query_execution/Worker.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Worker.hpp b/query_execution/Worker.hpp index aa39bb3..fec2242 100644 --- a/query_execution/Worker.hpp +++ b/query_execution/Worker.hpp @@ -34,6 +34,8 @@ namespace tmb { class TaggedMessge; } namespace quickstep { +namespace serialization { class WorkOrderCompletionMessage; } + /** \addtogroup QueryExecution * @{ */ @@ -100,7 +102,6 @@ class Worker : public Thread { * @brief A helper method to execute the WorkOrder and construct a * completion message. * - * @note CompletionMessageProtoT is the type of the completion message. * @note Right now a single helper method works for all message types. * If different message types need to collect different statistics for * the WorkOrder execution, we need to create different helper methods, @@ -108,23 +109,21 @@ class Worker : public Thread { * * @param tagged_message The TaggedMessage which consists of the WorkOrder. * @param proto The proto message to be sent. + * @param is_rebuild_work_order Whether it is used for a RebuildWorkOrder. **/ - template void executeWorkOrderHelper(const TaggedMessage &tagged_message, - CompletionMessageProtoT *proto); + serialization::WorkOrderCompletionMessage *proto, + const bool is_rebuild_work_order = false); /** * @brief A helper method to send the WorkOrder completion message. * - * @note CompletionMessageProtoT is the type of the completion message. - * * @param receiver The TMB client ID of the receiver. * @param proto The proto message to be sent. * @param message_type The ID of the type of the message being sent. **/ - template void sendWorkOrderCompleteMessage(const tmb::client_id receiver, - const CompletionMessageProtoT &proto, + const serialization::WorkOrderCompletionMessage &proto, const message_type_id message_type); const std::size_t worker_thread_index_;