Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0D3F4200B16 for ; Mon, 20 Jun 2016 17:02:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0B618160A55; Mon, 20 Jun 2016 15:02:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 04DD4160A26 for ; Mon, 20 Jun 2016 17:02:37 +0200 (CEST) Received: (qmail 45933 invoked by uid 500); 20 Jun 2016 15:02:37 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 45924 invoked by uid 99); 20 Jun 2016 15:02:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jun 2016 15:02:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id C0C7918028E for ; Mon, 20 Jun 2016 15:02:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id Rw3v7U1CB8iJ for ; Mon, 20 Jun 2016 15:02:34 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with SMTP id 1529D5F368 for ; Mon, 20 Jun 2016 15:02:32 +0000 (UTC) Received: (qmail 45917 invoked by uid 99); 20 Jun 2016 15:02:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jun 2016 15:02:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DDD06DFC6F; Mon, 20 Jun 2016 15:02:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: jignesh@apache.org To: commits@quickstep.incubator.apache.org Message-Id: <3c561530893e40c6be245faeebf0989d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-quickstep git commit: Basic support to report individual work order profiling results Date: Mon, 20 Jun 2016 15:02:31 +0000 (UTC) archived-at: Mon, 20 Jun 2016 15:02:39 -0000 Repository: incubator-quickstep Updated Branches: refs/heads/master c1476d1e7 -> 07435a430 Basic support to report individual work order profiling results - A flag to enable work order profiling report generation. - At the end of each query, a report is generated which includes worker ID, its NUMA socket, the operator that produced the WorkOrder and the execution time in microseconds for the latest query. - The output is printed on stdout in CSV format as of now. - As this is a rudimentary support for the functionality, there is a lot of future work in this regards, which includes printing of CPU core information, printing operator name, allowing user to specify a file where the output can be written etc. - Fixed a bug in constructing Foreman thread. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/07435a43 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/07435a43 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/07435a43 Branch: refs/heads/master Commit: 07435a430776c0b8b6381a4c0f0470250814c14b Parents: c1476d1 Author: Harshad Deshmukh Authored: Thu Jun 16 14:03:34 2016 -0500 Committer: Harshad Deshmukh Committed: Mon Jun 20 09:56:52 2016 -0500 ---------------------------------------------------------------------- cli/QuickstepCli.cpp | 12 +++++++- query_execution/CMakeLists.txt | 2 ++ query_execution/Foreman.cpp | 26 +++++++++++++++-- query_execution/Foreman.hpp | 22 ++++++++++++++- query_execution/PolicyEnforcer.cpp | 15 ++++++++++ query_execution/PolicyEnforcer.hpp | 50 +++++++++++++++++++++++++++++++-- 6 files changed, 121 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/cli/QuickstepCli.cpp ---------------------------------------------------------------------- diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp index 35bd16e..3f99130 100644 --- a/cli/QuickstepCli.cpp +++ b/cli/QuickstepCli.cpp @@ -137,6 +137,9 @@ static constexpr char kPathSeparator = '/'; static constexpr char kDefaultStoragePath[] = "qsstor/"; #endif +DEFINE_bool(profile_and_report_workorder_perf, false, + "If true, Quickstep will record the exceution time of all the individual " + "normal work orders and report it at the end of query execution."); DEFINE_int32(num_workers, 0, "Number of worker threads. If this value is " "specified and is greater than 0, then this " "user-supplied value is used. Else (i.e. the" @@ -356,7 +359,9 @@ int main(int argc, char* argv[]) { &bus, query_processor->getDefaultDatabase(), query_processor->getStorageManager(), - num_numa_nodes_system); + -1, // Don't pin the Foreman thread. + num_numa_nodes_system, + quickstep::FLAGS_profile_and_report_workorder_perf); // Start the worker threads. for (Worker &worker : workers) { @@ -461,6 +466,11 @@ int main(int argc, char* argv[]) { printf("Time: %s ms\n", quickstep::DoubleToStringWithSignificantDigits( time_ms.count(), 3).c_str()); + if (quickstep::FLAGS_profile_and_report_workorder_perf) { + // TODO(harshad) - Allow user specified file instead of stdout. + foreman.printWorkOrderProfilingResults(query_handle->query_id(), + stdout); + } } catch (const std::exception &e) { fprintf(stderr, "QUERY EXECUTION ERROR: %s\n", e.what()); break; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 501166e..b031a44 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -1,5 +1,7 @@ # Copyright 2011-2015 Quickstep Technologies LLC. # Copyright 2015-2016 Pivotal Software, Inc. +# Copyright 2016, Quickstep Research Group, Computer Sciences Department, +# University of Wisconsin—Madison. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/Foreman.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp index 828834d..f9f2e7a 100644 --- a/query_execution/Foreman.cpp +++ b/query_execution/Foreman.cpp @@ -18,7 +18,9 @@ #include "query_execution/Foreman.hpp" #include +#include #include +#include #include #include @@ -54,7 +56,8 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id, CatalogDatabaseLite *catalog_database, StorageManager *storage_manager, const int cpu_id, - const size_t num_numa_nodes) + const size_t num_numa_nodes, + const bool profile_individual_workorders) : ForemanLite(bus, cpu_id), main_thread_client_id_(main_thread_client_id), worker_directory_(DCHECK_NOTNULL(worker_directory)), @@ -90,7 +93,8 @@ Foreman::Foreman(const tmb::client_id main_thread_client_id, catalog_database_, storage_manager_, worker_directory_, - bus_)); + bus_, + profile_individual_workorders)); } void Foreman::run() { @@ -229,4 +233,22 @@ void Foreman::sendWorkerMessage(const size_t worker_thread_index, << worker_directory_->getClientID(worker_thread_index); } +void Foreman::printWorkOrderProfilingResults(const std::size_t query_id, + std::FILE *out) const { + const std::vector< + std::tuple> + &recorded_times = policy_enforcer_->getProfilingResults(query_id); + fputs("Worker ID, NUMA Socket, Operator ID, Time (microseconds)\n", out); + for (auto workorder_entry : recorded_times) { + // Note: Index of the "worker thread index" in the tuple is 0. + const std::size_t worker_id = std::get<0>(workorder_entry); + fprintf(out, + "%lu, %d, %lu, %lu\n", + worker_id, + worker_directory_->getNUMANode(worker_id), + std::get<1>(workorder_entry), + std::get<2>(workorder_entry)); + } +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/Foreman.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp index 94cb9fc..7be57e7 100644 --- a/query_execution/Foreman.hpp +++ b/query_execution/Foreman.hpp @@ -19,6 +19,7 @@ #define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_ #include +#include #include #include @@ -57,6 +58,8 @@ class Foreman final : public ForemanLite { * @param storage_manager The StorageManager to use. * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned. * @param num_numa_nodes The number of NUMA nodes in the system. + * @param profile_individual_workorders Whether every workorder's execution + * be profiled or not. * * @note If cpu_id is not specified, Foreman thread can be possibly moved * around on different CPUs by the OS. @@ -67,10 +70,27 @@ class Foreman final : public ForemanLite { CatalogDatabaseLite *catalog_database, StorageManager *storage_manager, const int cpu_id = -1, - const std::size_t num_numa_nodes = 1); + const std::size_t num_numa_nodes = 1, + const bool profile_individual_workorders = false); ~Foreman() override {} + /** + * @brief Print the results of profiling individual work orders for a given + * query. + * + * TODO(harshad) - Add the name of the operator to the output. + * TODO(harshad) - Add the CPU core ID of the operator to the output. This + * will require modifying the WorkerDirectory to remember worker affinities. + * Until then, the users can refer to the worker_affinities provided to the + * cli to infer the CPU core ID where a given worker is pinned. + * + * @param query_id The ID of the query for which the results are to be printed. + * @param out The file stream. + **/ + void printWorkOrderProfilingResults(const std::size_t query_id, + std::FILE *out) const; + protected: void run() override; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/PolicyEnforcer.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp index 9f0502d..84aa86a 100644 --- a/query_execution/PolicyEnforcer.cpp +++ b/query_execution/PolicyEnforcer.cpp @@ -76,6 +76,9 @@ void PolicyEnforcer::processMessage(const TaggedMessage &tagged_message) { query_id = proto.query_id(); worker_directory_->decrementNumQueuedWorkOrders( proto.worker_thread_index()); + if (profile_individual_workorders_) { + recordTimeForWorkOrder(proto); + } break; } case kRebuildWorkOrderCompleteMessage: { @@ -197,4 +200,16 @@ bool PolicyEnforcer::admitQueries( return true; } +void PolicyEnforcer::recordTimeForWorkOrder( + const serialization::NormalWorkOrderCompletionMessage &proto) { + const std::size_t query_id = proto.query_id(); + if (workorder_time_recorder_.find(query_id) == workorder_time_recorder_.end()) { + workorder_time_recorder_[query_id]; + } + workorder_time_recorder_[query_id].emplace_back( + proto.worker_thread_index(), + proto.operator_index(), + proto.execution_time_in_microseconds()); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/07435a43/query_execution/PolicyEnforcer.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp index 9f87056..470ff2a 100644 --- a/query_execution/PolicyEnforcer.hpp +++ b/query_execution/PolicyEnforcer.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -62,13 +63,15 @@ class PolicyEnforcer { CatalogDatabaseLite *catalog_database, StorageManager *storage_manager, WorkerDirectory *worker_directory, - tmb::MessageBus *bus) + tmb::MessageBus *bus, + const bool profile_individual_workorders = false) : foreman_client_id_(foreman_client_id), num_numa_nodes_(num_numa_nodes), catalog_database_(catalog_database), storage_manager_(storage_manager), worker_directory_(worker_directory), - bus_(bus) {} + bus_(bus), + profile_individual_workorders_(profile_individual_workorders) {} /** * @brief Destructor. @@ -143,9 +146,40 @@ class PolicyEnforcer { return !(admitted_queries_.empty() && waiting_queries_.empty()); } + /** + * @brief Get the profiling results for individual work order execution for a + * given query. + * + * @note This function should only be called if profiling individual work + * orders option is enabled. + * + * @param query_id The ID of the query for which the profiling results are + * requested. + * + * @return A vector of tuples, each being a single profiling entry. + **/ + inline const std::vector>& + getProfilingResults(const std::size_t query_id) const { + DCHECK(profile_individual_workorders_); + DCHECK(workorder_time_recorder_.find(query_id) != + workorder_time_recorder_.end()); + return workorder_time_recorder_.at(query_id); + } + private: static constexpr std::size_t kMaxConcurrentQueries = 1; + /** + * @brief Record the execution time for a finished WorkOrder. + * + * TODO(harshad) - Extend the functionality to rebuild work orders. + * + * @param proto The completion message proto sent after the WorkOrder + * execution. + **/ + void recordTimeForWorkOrder( + const serialization::NormalWorkOrderCompletionMessage &proto); + const tmb::client_id foreman_client_id_; const std::size_t num_numa_nodes_; @@ -154,6 +188,7 @@ class PolicyEnforcer { WorkerDirectory *worker_directory_; tmb::MessageBus *bus_; + const bool profile_individual_workorders_; // Key = query ID, value = QueryManager* for the key query. std::unordered_map> admitted_queries_; @@ -161,6 +196,17 @@ class PolicyEnforcer { // The queries which haven't been admitted yet. std::queue waiting_queries_; + // Key = Query ID. + // Value = A tuple indicating a record of executing a work order. + // Within a tuple ... + // 1st element: Logical worker ID. + // 2nd element: Operator ID. + // 3rd element: Time in microseconds to execute the work order. + std::unordered_map< + std::size_t, + std::vector>> + workorder_time_recorder_; + DISALLOW_COPY_AND_ASSIGN(PolicyEnforcer); };