Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0092B200BD0 for ; Wed, 16 Nov 2016 02:56:01 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id F34A2160B16; Wed, 16 Nov 2016 01:56:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D2CBA160B03 for ; Wed, 16 Nov 2016 02:55:59 +0100 (CET) Received: (qmail 41556 invoked by uid 500); 16 Nov 2016 01:55:59 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 41547 invoked by uid 99); 16 Nov 2016 01:55:59 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Nov 2016 01:55:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id A31DD1A03F2 for ; Wed, 16 Nov 2016 01:55:58 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id N5Gtwv5Ckhy8 for ; Wed, 16 Nov 2016 01:55:56 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id AF4A45F5CD for ; Wed, 16 Nov 2016 01:55:54 +0000 (UTC) Received: (qmail 41469 invoked by uid 99); 16 Nov 2016 01:55:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Nov 2016 01:55:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A3421E0209; Wed, 16 Nov 2016 01:55:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Wed, 16 Nov 2016 01:55:53 -0000 Message-Id: <7a54f319a4594034a2722d30b43511ab@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-quickstep git commit: Use BlockLocator and DataExchangerAsync in the distributed tests. [Forced Update!] archived-at: Wed, 16 Nov 2016 01:56:01 -0000 Repository: incubator-quickstep Updated Branches: refs/heads/storage-path e3999b210 -> 6fa7be6ba (forced update) Use BlockLocator and DataExchangerAsync in the distributed tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/787a3251 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/787a3251 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/787a3251 Branch: refs/heads/storage-path Commit: 787a3251019162610ebe13efbd341b3f9ac7a268 Parents: 3093e74 Author: Zuyu Zhang Authored: Fri Nov 4 23:12:09 2016 -0700 Committer: Zuyu Zhang Committed: Mon Nov 14 20:38:04 2016 -0800 ---------------------------------------------------------------------- query_optimizer/tests/CMakeLists.txt | 8 ++ .../DistributedExecutionGeneratorTestRunner.cpp | 104 ++++++++++++++++--- .../DistributedExecutionGeneratorTestRunner.hpp | 32 +++++- query_optimizer/tests/TestDatabaseLoader.hpp | 59 +++++++++-- storage/DataExchangerAsync.cpp | 4 +- 5 files changed, 178 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt index ac4548a..9c764e4 100644 --- a/query_optimizer/tests/CMakeLists.txt +++ b/query_optimizer/tests/CMakeLists.txt @@ -79,6 +79,10 @@ target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader quickstep_types_containers_Tuple quickstep_utility_Macros tmb) +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader + quickstep_storage_StorageBlockInfo) +endif(ENABLE_DISTRIBUTED) if (ENABLE_DISTRIBUTED) add_executable(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest @@ -110,7 +114,9 @@ if (ENABLE_DISTRIBUTED) quickstep_cli_PrintToScreen quickstep_parser_ParseStatement quickstep_parser_SqlParserWrapper + quickstep_queryexecution_BlockLocator quickstep_queryexecution_ForemanDistributed + quickstep_queryexecution_QueryExecutionMessages_proto quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil quickstep_queryexecution_Shiftboss @@ -120,6 +126,8 @@ if (ENABLE_DISTRIBUTED) quickstep_queryoptimizer_OptimizerContext quickstep_queryoptimizer_QueryHandle quickstep_queryoptimizer_tests_TestDatabaseLoader + quickstep_storage_DataExchangerAsync + quickstep_storage_StorageBlockInfo quickstep_utility_Macros quickstep_utility_MemStream quickstep_utility_SqlError http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp index 5cccc21..0403e77 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp @@ -20,18 +20,26 @@ #include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp" #include +#include +#include #include #include +#include #include #include "catalog/CatalogTypedefs.hpp" #include "cli/DropRelation.hpp" #include "cli/PrintToScreen.hpp" #include "parser/ParseStatement.hpp" +#include "query_execution/BlockLocator.hpp" #include "query_execution/ForemanDistributed.hpp" +#include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" #include "query_optimizer/OptimizerContext.hpp" #include "query_optimizer/QueryHandle.hpp" +#include "storage/DataExchangerAsync.hpp" +#include "storage/StorageBlockInfo.hpp" #include "utility/MemStream.hpp" #include "utility/SqlError.hpp" @@ -41,10 +49,15 @@ #include "tmb/message_bus.h" #include "tmb/tagged_message.h" -using std::string; +using std::free; using std::make_unique; +using std::malloc; +using std::move; +using std::string; using std::vector; +using tmb::TaggedMessage; + namespace quickstep { class CatalogRelation; @@ -56,10 +69,7 @@ const char *DistributedExecutionGeneratorTestRunner::kResetOption = DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner(const string &storage_path) : query_id_(0), - test_database_loader_(storage_path) { - test_database_loader_.createTestRelation(false /* allow_vchar */); - test_database_loader_.loadTestRelation(); - + data_exchangers_(kNumInstances) { bus_.Initialize(); cli_id_ = bus_.Connect(); @@ -67,9 +77,27 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner bus_.RegisterClientAsSender(cli_id_, kPoisonMessage); bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage); + bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage); + bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage); + + block_locator_ = make_unique(&bus_); + locator_client_id_ = block_locator_->getBusClientID(); + block_locator_->start(); + + test_database_loader_ = make_unique( + storage_path, + getBlockDomain(test_database_loader_data_exchanger_.network_address()), + locator_client_id_, + &bus_); + test_database_loader_data_exchanger_.set_storage_manager(test_database_loader_->storage_manager()); + test_database_loader_data_exchanger_.start(); + + test_database_loader_->createTestRelation(false /* allow_vchar */); + test_database_loader_->loadTestRelation(); + // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former // could receive a registration message from the latter. - foreman_ = make_unique(&bus_, test_database_loader_.catalog_database()); + foreman_ = make_unique(&bus_, test_database_loader_->catalog_database()); // We don't use the NUMA aware version of worker code. const vector numa_nodes(1 /* Number of worker threads per instance */, @@ -78,17 +106,24 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner for (int i = 0; i < kNumInstances; ++i) { workers_.push_back(make_unique(0 /* worker_thread_index */, &bus_)); - const vector worker_client_ids(1, workers_[i]->getBusClientID()); + const vector worker_client_ids(1, workers_.back()->getBusClientID()); worker_directories_.push_back( make_unique(worker_client_ids.size(), worker_client_ids, numa_nodes)); + auto storage_manager = make_unique( + storage_path, getBlockDomain(data_exchangers_[i].network_address()), locator_client_id_, &bus_); + + data_exchangers_[i].set_storage_manager(storage_manager.get()); shiftbosses_.push_back( - make_unique(&bus_, test_database_loader_.storage_manager(), worker_directories_[i].get())); + make_unique(&bus_, storage_manager.get(), worker_directories_.back().get())); + + storage_managers_.push_back(move(storage_manager)); } foreman_->start(); for (int i = 0; i < kNumInstances; ++i) { + data_exchangers_[i].start(); shiftbosses_[i]->start(); workers_[i]->start(); } @@ -101,9 +136,9 @@ void DistributedExecutionGeneratorTestRunner::runTestCase( VLOG(4) << "Test SQL(s): " << input; if (options.find(kResetOption) != options.end()) { - test_database_loader_.clear(); - test_database_loader_.createTestRelation(false /* allow_vchar */); - test_database_loader_.loadTestRelation(); + test_database_loader_->clear(); + test_database_loader_->createTestRelation(false /* allow_vchar */); + test_database_loader_->loadTestRelation(); } MemStream output_stream; @@ -125,7 +160,7 @@ void DistributedExecutionGeneratorTestRunner::runTestCase( QueryHandle query_handle(query_id_++, cli_id_); optimizer_.generateQueryHandle(parse_statement, - test_database_loader_.catalog_database(), + test_database_loader_->catalog_database(), &optimizer_context, &query_handle); @@ -141,11 +176,11 @@ void DistributedExecutionGeneratorTestRunner::runTestCase( const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation(); if (query_result_relation) { PrintToScreen::PrintRelation(*query_result_relation, - test_database_loader_.storage_manager(), + test_database_loader_->storage_manager(), output_stream.file()); DropRelation::Drop(*query_result_relation, - test_database_loader_.catalog_database(), - test_database_loader_.storage_manager()); + test_database_loader_->catalog_database(), + test_database_loader_->storage_manager()); } } catch (const SqlError &error) { *output = error.formatMessage(input); @@ -158,5 +193,44 @@ void DistributedExecutionGeneratorTestRunner::runTestCase( } } +block_id_domain DistributedExecutionGeneratorTestRunner::getBlockDomain( + const string &network_address) { + serialization::BlockDomainRegistrationMessage proto; + proto.set_domain_network_address(network_address); + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kBlockDomainRegistrationMessage); + free(proto_bytes); + + DLOG(INFO) << "Client (id '" << cli_id_ + << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage + << "') to BlockLocator (id '" << locator_client_id_ << "')"; + + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, + cli_id_, + locator_client_id_, + move(message))); + + const tmb::AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true)); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + CHECK_EQ(locator_client_id_, annotated_message.sender); + CHECK_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type()); + DLOG(INFO) << "Client (id '" << cli_id_ + << "') received BlockDomainRegistrationResponseMessage (typed '" + << kBlockDomainRegistrationResponseMessage + << "') from BlockLocator"; + + serialization::BlockDomainMessage response_proto; + CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + return static_cast(response_proto.block_domain()); +} + } // namespace optimizer } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp index ab10841..d2b13e4 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp @@ -28,6 +28,7 @@ #include #include "parser/SqlParserWrapper.hpp" +#include "query_execution/BlockLocator.hpp" #include "query_execution/ForemanDistributed.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryExecutionUtil.hpp" @@ -36,6 +37,8 @@ #include "query_execution/WorkerDirectory.hpp" #include "query_optimizer/Optimizer.hpp" #include "query_optimizer/tests/TestDatabaseLoader.hpp" +#include "storage/DataExchangerAsync.hpp" +#include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" #include "utility/textbased_test/TextBasedTestRunner.hpp" @@ -86,6 +89,25 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner { } foreman_->join(); + + test_database_loader_data_exchanger_.shutdown(); + test_database_loader_.reset(); + for (int i = 0; i < kNumInstances; ++i) { + data_exchangers_[i].shutdown(); + storage_managers_[i].reset(); + } + + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, + cli_id_, + locator_client_id_, + tmb::TaggedMessage(quickstep::kPoisonMessage))); + + test_database_loader_data_exchanger_.join(); + for (int i = 0; i < kNumInstances; ++i) { + data_exchangers_[i].join(); + } + block_locator_->join(); } void runTestCase(const std::string &input, @@ -93,20 +115,26 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner { std::string *output) override; private: + block_id_domain getBlockDomain(const std::string &network_address); + std::size_t query_id_; SqlParserWrapper sql_parser_; - TestDatabaseLoader test_database_loader_; + std::unique_ptr test_database_loader_; + DataExchangerAsync test_database_loader_data_exchanger_; Optimizer optimizer_; MessageBusImpl bus_; + tmb::client_id cli_id_, locator_client_id_; - tmb::client_id cli_id_; + std::unique_ptr block_locator_; std::unique_ptr foreman_; std::vector> workers_; std::vector> worker_directories_; + std::vector data_exchangers_; + std::vector> storage_managers_; std::vector> shiftbosses_; DISALLOW_COPY_AND_ASSIGN(DistributedExecutionGeneratorTestRunner); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/TestDatabaseLoader.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/TestDatabaseLoader.hpp b/query_optimizer/tests/TestDatabaseLoader.hpp index d49719d..87c19c6 100644 --- a/query_optimizer/tests/TestDatabaseLoader.hpp +++ b/query_optimizer/tests/TestDatabaseLoader.hpp @@ -24,12 +24,21 @@ #include "catalog/CatalogDatabase.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" + +#ifdef QUICKSTEP_DISTRIBUTED +#include "storage/StorageBlockInfo.hpp" +#endif // QUICKSTEP_DISTRIBUTED + #include "storage/StorageManager.hpp" #include "threading/ThreadIDBasedMap.hpp" #include "utility/Macros.hpp" #include "tmb/id_typedefs.h" +#ifdef QUICKSTEP_DISTRIBUTED +namespace tmb { class MessageBus; } +#endif // QUICKSTEP_DISTRIBUTED + namespace quickstep { class CatalogRelation; @@ -60,18 +69,34 @@ class TestDatabaseLoader { 0 /* id */), storage_manager_(storage_path), test_relation_(nullptr) { - bus_.Initialize(); - - const tmb::client_id worker_thread_client_id = bus_.Connect(); - bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage); - - // Refer to InsertDestination::sendBlockFilledMessage for the rationale - // behind using ClientIDMap. - thread_id_map_->addValue(worker_thread_client_id); + init(); + } - scheduler_client_id_ = bus_.Connect(); - bus_.RegisterClientAsReceiver(scheduler_client_id_, kCatalogRelationNewBlockMessage); +#ifdef QUICKSTEP_DISTRIBUTED + /** + * @brief Constructor for the distributed version. + * + * @param storage_path A filesystem directory where the blocks may be + * evicted to during the execution of a test query. + * Can be empty if the test query is not executed + * in the query engine. + * @param block_domain The block_domain for StorageManager. + * @param locator_client_id The client id of BlockLocator for StorageManager. + * @param bus_global The Bus for StorageManager. + */ + TestDatabaseLoader(const std::string &storage_path, + const block_id_domain block_domain, + const tmb::client_id locator_client_id, + tmb::MessageBus *bus_global) + : thread_id_map_(ClientIDMap::Instance()), + catalog_database_(nullptr /* parent */, + "TestDatabase" /* name */, + 0 /* id */), + storage_manager_(storage_path, block_domain, locator_client_id, bus_global), + test_relation_(nullptr) { + init(); } +#endif // QUICKSTEP_DISTRIBUTED ~TestDatabaseLoader() { clear(); @@ -139,6 +164,20 @@ class TestDatabaseLoader { void clear(); private: + void init() { + bus_.Initialize(); + + const tmb::client_id worker_thread_client_id = bus_.Connect(); + bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage); + + // Refer to InsertDestination::sendBlockFilledMessage for the rationale + // behind using ClientIDMap. + thread_id_map_->addValue(worker_thread_client_id); + + scheduler_client_id_ = bus_.Connect(); + bus_.RegisterClientAsReceiver(scheduler_client_id_, kCatalogRelationNewBlockMessage); + } + /** * @brief Simulate Foreman to add all new blocks to the relation. */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/storage/DataExchangerAsync.cpp ---------------------------------------------------------------------- diff --git a/storage/DataExchangerAsync.cpp b/storage/DataExchangerAsync.cpp index 59f5ebf..1d2f7db 100644 --- a/storage/DataExchangerAsync.cpp +++ b/storage/DataExchangerAsync.cpp @@ -155,11 +155,11 @@ void DataExchangerAsync::run() { if (ok) { call_context->Proceed(); } else { - LOG(WARNING) << "Not ok\n"; + LOG(WARNING) << "DataExchangerAsync " << server_address_ << " is not ok"; delete call_context; } } else { - LOG(INFO) << "Shutdown\n"; + LOG(INFO) << "DataExchangerAsync " << server_address_ << " shuts down"; return; } }