Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 31FBB200B2A for ; Sat, 11 Jun 2016 00:30:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2EE53160A5A; Fri, 10 Jun 2016 22:30:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4FBAA160A38 for ; Sat, 11 Jun 2016 00:30:41 +0200 (CEST) Received: (qmail 1868 invoked by uid 500); 10 Jun 2016 22:30:40 -0000 Mailing-List: contact dev-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list dev@quickstep.incubator.apache.org Received: (qmail 1855 invoked by uid 99); 10 Jun 2016 22:30:40 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jun 2016 22:30:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id D70F8180592 for ; Fri, 10 Jun 2016 22:30:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -5.446 X-Spam-Level: X-Spam-Status: No, score=-5.446 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id FAR32bwu4WY2 for ; Fri, 10 Jun 2016 22:30:38 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 39F965F39A for ; Fri, 10 Jun 2016 22:30:37 +0000 (UTC) Received: (qmail 1834 invoked by uid 99); 10 Jun 2016 22:30:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jun 2016 22:30:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D835EDFC4F; Fri, 10 Jun 2016 22:30:35 +0000 (UTC) From: zuyu To: dev@quickstep.incubator.apache.org Reply-To: dev@quickstep.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-quickstep pull request #14: QUICKSTEP-8: Long running Foreman thre... Content-Type: text/plain Message-Id: <20160610223035.D835EDFC4F@git1-us-west.apache.org> Date: Fri, 10 Jun 2016 22:30:35 +0000 (UTC) archived-at: Fri, 10 Jun 2016 22:30:42 -0000 Github user zuyu commented on a diff in the pull request: https://github.com/apache/incubator-quickstep/pull/14#discussion_r66689849 --- Diff: query_execution/PolicyEnforcer.cpp --- @@ -0,0 +1,177 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of Wisconsin—Madison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_execution/PolicyEnforcer.hpp" + +#include +#include +#include +#include +#include +#include + +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryManager.hpp" +#include "query_optimizer/QueryHandle.hpp" +#include "relational_operators/WorkOrder.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) { + if (admitted_queries_.size() < kMaxConcurrentQueries) { + // Ok to admit the query. + const std::size_t query_id = query_handle->query_id(); + if (admitted_queries_.find(query_id) == admitted_queries_.end()) { + admitted_queries_[query_id].reset( + new QueryManager(foreman_client_id_, num_numa_nodes_, query_handle, + catalog_database_, storage_manager_, bus_)); + return true; + } else { + LOG(ERROR) << "Query with the same ID " << query_id << " exists"; + return false; + } + } else { + // This query will have to wait. + waiting_queries_.push(query_handle); + return false; + } +} + +void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) { + // TODO(harshad) : Provide processXMessage() public functions in + // QueryManager, so that we need to extract message from the + // TaggedMessage only once. + std::size_t query_id; + switch (tagged_message.message_type()) { + case kWorkOrderCompleteMessage: // Fall through. + case kRebuildWorkOrderCompleteMessage: { + serialization::WorkOrderCompletionMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + query_id = proto.query_id(); + break; + } + case kCatalogRelationNewBlockMessage: { + serialization::CatalogRelationNewBlockMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + query_id = proto.query_id(); + break; + } + case kDataPipelineMessage: { + serialization::DataPipelineMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + query_id = proto.query_id(); + break; + } + case kWorkOrdersAvailableMessage: { + serialization::WorkOrdersAvailableMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), + tagged_message.message_bytes())); + query_id = proto.query_id(); + break; + } + case kWorkOrderFeedbackMessage: { + // TODO(harshad) Add query ID to FeedbackMessage. + WorkOrder::FeedbackMessage msg(const_cast(tagged_message.message()), tagged_message.message_bytes()); + query_id = msg.header().query_id; + break; + } + default: + LOG(FATAL) << "Unknown message type found in PolicyEnforcer"; + } + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + const QueryManager::QueryStatusCode return_code = + admitted_queries_[query_id]->processMessage(tagged_message); + if (return_code == QueryManager::QueryStatusCode::kQueryExecuted) { + removeQuery(query_id); + if (!waiting_queries_.empty()) { + // Admit the earliest waiting query. + QueryHandle *new_query = waiting_queries_.front(); + waiting_queries_.pop(); + admitQuery(new_query); + } + } +} + +void PolicyEnforcer::getWorkerMessages( + std::vector> *worker_messages) { + // Iterate over admitted queries until either there are no more + // messages available, or the maximum number of messages have + // been collected. + DCHECK(worker_messages->empty()); + // TODO(harshad) - Make this function generic enough so that it + // works well when multiple queries are getting executed. + std::size_t per_query_share = 0; + if (!admitted_queries_.empty()) { + per_query_share = kMaxNumWorkerMessages / admitted_queries_.size(); + } else { + LOG(WARNING) << "Requesting WorkerMessages when no query is running"; + return; + } + DCHECK_GT(per_query_share, 0u); + std::vector finished_queries_ids; + + for (const auto &admitted_query_info : admitted_queries_) { + QueryManager *curr_query_manager = admitted_query_info.second.get(); + DCHECK(curr_query_manager != nullptr); + std::size_t messages_collected_curr_query = 0; + while (messages_collected_curr_query < per_query_share) { + WorkerMessage *next_worker_message = + curr_query_manager->getNextWorkerMessage(0, -1); + if (next_worker_message != nullptr) { + ++messages_collected_curr_query; + worker_messages->push_back(std::unique_ptr(next_worker_message)); + } else { + // No more work ordes from the current query at this time. + // Check if the query's execution is over. + if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { + // If the query has been executed, remove it. + finished_queries_ids.push_back(admitted_query_info.first); + } + break; + } + } + } + for (std::size_t finished_qid : finished_queries_ids) { --- End diff -- Please add `const` before `std::size_t`. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---