Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 650F9200C24 for ; Thu, 9 Feb 2017 06:39:13 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 621AD160B67; Thu, 9 Feb 2017 05:39:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 86D1E160B49 for ; Thu, 9 Feb 2017 06:39:12 +0100 (CET) Received: (qmail 27917 invoked by uid 500); 9 Feb 2017 05:39:11 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 27908 invoked by uid 99); 9 Feb 2017 05:39:11 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Feb 2017 05:39:11 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 577A61A0C88 for ; Thu, 9 Feb 2017 05:39:11 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id pkrIQ7MlXYwZ for ; Thu, 9 Feb 2017 05:39:09 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 43F8D5F298 for ; Thu, 9 Feb 2017 05:39:09 +0000 (UTC) Received: (qmail 20915 invoked by uid 99); 9 Feb 2017 05:37:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Feb 2017 05:37:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 14C89DFD9E; Thu, 9 Feb 2017 05:37:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianqiao@apache.org To: commits@quickstep.incubator.apache.org Date: Thu, 09 Feb 2017 05:37:41 -0000 Message-Id: <67b164b721034b01b3c627c6d00bdbf5@git.apache.org> In-Reply-To: <083438b8f69a4b1387506800e531e3ef@git.apache.org> References: <083438b8f69a4b1387506800e531e3ef@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/5] incubator-quickstep git commit: Minor refactored distributed query execution. archived-at: Thu, 09 Feb 2017 05:39:13 -0000 Minor refactored distributed query execution. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3011ddf6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3011ddf6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3011ddf6 Branch: refs/heads/aggregate-on-left-outer-join Commit: 3011ddf61ec92efcb833ef0a1168255ff97fb9f9 Parents: 5773027 Author: Zuyu Zhang Authored: Wed Feb 8 17:36:45 2017 -0800 Committer: Zuyu Zhang Committed: Wed Feb 8 17:42:42 2017 -0800 ---------------------------------------------------------------------- query_execution/ForemanDistributed.cpp | 1 - query_execution/PolicyEnforcerBase.cpp | 2 - query_execution/PolicyEnforcerBase.hpp | 14 ----- query_execution/PolicyEnforcerDistributed.cpp | 59 ++++++++++------------ 4 files changed, 27 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index 4d95f16..8c20e65 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -175,7 +175,6 @@ void ForemanDistributed::run() { case kQueryInitiateResponseMessage: { S::QueryInitiateResponseMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - CHECK(policy_enforcer_->existQuery(proto.query_id())); break; } case kCatalogRelationNewBlockMessage: // Fall through http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp index a26b84e..082f6e9 100644 --- a/query_execution/PolicyEnforcerBase.cpp +++ b/query_execution/PolicyEnforcerBase.cpp @@ -156,8 +156,6 @@ void PolicyEnforcerBase::removeQuery(const std::size_t query_id) { << " that hasn't finished its execution"; } admitted_queries_.erase(query_id); - - removed_query_ids_.insert(query_id); } bool PolicyEnforcerBase::admitQueries( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp index baf9c68..4107817 100644 --- a/query_execution/PolicyEnforcerBase.hpp +++ b/query_execution/PolicyEnforcerBase.hpp @@ -103,16 +103,6 @@ class PolicyEnforcerBase { void processMessage(const TaggedMessage &tagged_message); /** - * @brief Check if the given query id ever exists. - * - * @return True if the query ever exists, otherwise false. - **/ - inline bool existQuery(const std::size_t query_id) const { - return admitted_queries_.find(query_id) != admitted_queries_.end() || - removed_query_ids_.find(query_id) != removed_query_ids_.end(); - } - - /** * @brief Check if there are any queries to be executed. * * @return True if there is at least one active or waiting query, false if @@ -179,10 +169,6 @@ class PolicyEnforcerBase { // Key = query ID, value = QueryManagerBase* for the key query. std::unordered_map> admitted_queries_; - // TODO(quickstep-team): Delete a 'query_id' after receiving all - // 'QueryInitiateResponseMessage's for the 'query_id'. - std::unordered_set removed_query_ids_; - // The queries which haven't been admitted yet. std::queue waiting_queries_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index 49a1d9a..ef5abb0 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -68,8 +68,15 @@ void PolicyEnforcerDistributed::getWorkOrderProtoMessages( // TODO(harshad) - Make this function generic enough so that it // works well when multiple queries are getting executed. if (admitted_queries_.empty()) { - LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running"; - return; + if (waiting_queries_.empty()) { + LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running"; + return; + } else { + // Admit the earliest waiting query. + QueryHandle *new_query = waiting_queries_.front(); + waiting_queries_.pop(); + admitQuery(new_query); + } } const std::size_t per_query_share = @@ -106,28 +113,28 @@ void PolicyEnforcerDistributed::getWorkOrderProtoMessages( } bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) { - if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) { - // Ok to admit the query. - const std::size_t query_id = query_handle->query_id(); - if (admitted_queries_.find(query_id) == admitted_queries_.end()) { - // NOTE(zuyu): Should call before constructing a 'QueryManager'. - // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext' - // initializes. - initiateQueryInShiftboss(query_handle); - - // Query with the same ID not present, ok to admit. - admitted_queries_[query_id].reset( - new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_)); - return true; - } else { - LOG(ERROR) << "Query with the same ID " << query_id << " exists"; - return false; - } - } else { + if (admitted_queries_.size() >= PolicyEnforcerBase::kMaxConcurrentQueries) { // This query will have to wait. waiting_queries_.push(query_handle); return false; } + + const std::size_t query_id = query_handle->query_id(); + if (admitted_queries_.find(query_id) != admitted_queries_.end()) { + LOG(ERROR) << "Query with the same ID " << query_id << " exists"; + return false; + } + + // Ok to admit the query. + // NOTE(zuyu): Should call before constructing a 'QueryManager'. + // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext' + // initializes. + initiateQueryInShiftboss(query_handle); + + // Query with the same ID not present, ok to admit. + admitted_queries_[query_id].reset( + new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_)); + return true; } void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message) { @@ -144,18 +151,6 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb: query_manager->processInitiateRebuildResponseMessage( proto.operator_index(), num_rebuild_work_orders, shiftboss_index); shiftboss_directory_->addNumQueuedWorkOrders(shiftboss_index, num_rebuild_work_orders); - - if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { - onQueryCompletion(query_manager); - - removeQuery(query_id); - if (!waiting_queries_.empty()) { - // Admit the earliest waiting query. - QueryHandle *new_query = waiting_queries_.front(); - waiting_queries_.pop(); - admitQuery(new_query); - } - } } void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(