Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 011A4200B25 for ; Wed, 8 Jun 2016 22:59:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F3FD4160A2E; Wed, 8 Jun 2016 20:59:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 53C09160A54 for ; Wed, 8 Jun 2016 22:59:44 +0200 (CEST) Received: (qmail 61629 invoked by uid 500); 8 Jun 2016 20:59:43 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 61619 invoked by uid 99); 8 Jun 2016 20:59:43 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jun 2016 20:59:43 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id E9EC01A023B for ; Wed, 8 Jun 2016 20:59:42 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id 6nLKYr3TEf0G for ; Wed, 8 Jun 2016 20:59:35 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 5A88660D56 for ; Wed, 8 Jun 2016 20:59:31 +0000 (UTC) Received: (qmail 59527 invoked by uid 99); 8 Jun 2016 20:59:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Jun 2016 20:59:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 16BF9E08B6; Wed, 8 Jun 2016 20:59:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zuyuz@apache.org To: commits@quickstep.incubator.apache.org Date: Wed, 08 Jun 2016 21:00:08 -0000 Message-Id: <692aaa07fb654cc1837ed38194e35c0e@git.apache.org> In-Reply-To: <3cd6431e35a8462fa11c712241b8516d@git.apache.org> References: <3cd6431e35a8462fa11c712241b8516d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [40/47] incubator-quickstep git commit: Added Async DataExchange Service. archived-at: Wed, 08 Jun 2016 20:59:46 -0000 Added Async DataExchange Service. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5ae50520 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5ae50520 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5ae50520 Branch: refs/heads/reorder-query-id-param Commit: 5ae50520bdf701153c2390f601900493c7410d08 Parents: 3789da7 Author: Zuyu Zhang Authored: Sat May 28 22:55:05 2016 -0700 Committer: Zuyu Zhang Committed: Wed Jun 8 11:57:46 2016 -0700 ---------------------------------------------------------------------- CMakeLists.txt | 11 +- storage/CMakeLists.txt | 69 +++++++- storage/DataExchange.proto | 31 ++++ storage/DataExchangerAsync.cpp | 165 ++++++++++++++++++ storage/DataExchangerAsync.hpp | 97 +++++++++++ storage/StorageManager.cpp | 113 ++++++++++++- storage/StorageManager.hpp | 61 ++++++- storage/tests/DataExchange_unittest.cpp | 240 +++++++++++++++++++++++++++ third_party/iwyu/iwyu_helper.py | 3 +- validate_cmakelists.py | 5 +- 10 files changed, 787 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ae50520/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index dc51ca6..ef7fd50 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -662,7 +662,12 @@ set(ENABLE_PUREMEMORY ON CACHE BOOL "Enable PureMemory TMB") set(ENABLE_LEVELDB OFF CACHE BOOL "Enable LevelDB TMB") set(ENABLE_MEMORYMIRROR OFF CACHE BOOL "Enable MemoryMirror TMB") set(ENABLE_NATIVELOG OFF CACHE BOOL "Enable NativeLog TMB") -set(ENABLE_NATIVENET OFF CACHE BOOL "Enable NativeNet TMB") + +# The distributed version requires to use the NativeNet implementation. +if (NOT ENABLE_DISTRIBUTED) + set(ENABLE_NATIVENET OFF CACHE BOOL "Enable NativeNet TMB") +endif() + set(ENABLE ENABLE_SQLITE OFF CACHE BOOL "Enable SQLite TMB") set(ENABLE_VOLTDB OFF CACHE BOOL "Enable VoltDB TMB") set(ENABLE_ZOOKEEPER OFF CACHE BOOL "Enable Zookeeper TMB") @@ -670,6 +675,10 @@ set(ENABLE_ZOOKEEPER OFF CACHE BOOL "Enable Zookeeper TMB") add_subdirectory("${THIRD_PARTY_SOURCE_DIR}/tmb" "${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb") include_directories(${TMB_INCLUDE_DIRS}) +if (ENABLE_DISTRIBUTED) + include_directories(${CMAKE_CURRENT_BINARY_DIR}/third_party/tmb/include) +endif() + # Add all of the module subdirectories. CMakeLists.txt in each of the subdirectories # defines how to build that module's libraries. add_subdirectory(catalog) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ae50520/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index 4da16ea..a77976a 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -126,6 +126,13 @@ QS_PROTOBUF_GENERATE_CPP(storage_StorageBlockLayout_proto_srcs storage_StorageBlockLayout_proto_hdrs StorageBlockLayout.proto) +if (ENABLE_DISTRIBUTED) + GRPC_GENERATE_CPP(storage_DataExchange_proto_srcs + storage_DataExchange_proto_hdrs + . + DataExchange.proto) +endif() + # Declare micro-libs: add_library(quickstep_storage_AggregationOperationState AggregationOperationState.cpp @@ -171,6 +178,14 @@ add_library(quickstep_storage_CompressedTupleStorageSubBlock CompressedTupleStorageSubBlock.hpp) add_library(quickstep_storage_CountedReference ../empty_src.cpp CountedReference.hpp) add_library(quickstep_storage_CSBTreeIndexSubBlock CSBTreeIndexSubBlock.cpp CSBTreeIndexSubBlock.hpp) + +if (ENABLE_DISTRIBUTED) + add_library(quickstep_storage_DataExchange_proto + ${storage_DataExchange_proto_srcs} + ${storage_DataExchange_proto_hdrs}) + add_library(quickstep_storage_DataExchangerAsync DataExchangerAsync.cpp DataExchangerAsync.hpp) +endif() + add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp) add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp) if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS) @@ -575,6 +590,19 @@ target_link_libraries(quickstep_storage_CSBTreeIndexSubBlock quickstep_utility_Macros quickstep_utility_PtrVector quickstep_utility_ScopedBuffer) + +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_storage_DataExchange_proto + ${PROTOBUF3_LIBRARY}) + target_link_libraries(quickstep_storage_DataExchangerAsync + glog + quickstep_storage_DataExchange_proto + quickstep_storage_StorageManager + quickstep_threading_Thread + quickstep_utility_Macros + ${GRPCPLUSPLUS_LIBRARIES}) +endif() + target_link_libraries(quickstep_storage_EvictionPolicy quickstep_storage_StorageBlockInfo quickstep_storage_StorageConstants @@ -925,6 +953,7 @@ target_link_libraries(quickstep_storage_StorageManager gflags_nothreads-static glog gtest + quickstep_catalog_CatalogTypedefs quickstep_storage_CountedReference quickstep_storage_EvictionPolicy quickstep_storage_FileManager @@ -955,7 +984,9 @@ if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_storage_StorageManager quickstep_queryexecution_QueryExecutionMessages_proto quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_QueryExecutionUtil) + quickstep_queryexecution_QueryExecutionUtil + quickstep_storage_DataExchange_proto + ${GRPCPLUSPLUS_LIBRARIES}) endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_storage_SubBlockTypeRegistry glog @@ -1071,6 +1102,11 @@ elseif (QUICKSTEP_HAVE_FILE_MANAGER_WINDOWS) target_link_libraries(quickstep_storage quickstep_storage_FileManagerWindows) endif() +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_storage + quickstep_storage_DataExchange_proto + quickstep_storage_DataExchangerAsync) +endif() # CMAKE_VALIDATE_IGNORE_BEGIN if(QUICKSTEP_HAVE_BITWEAVING) target_link_libraries(quickstep_storage @@ -1340,6 +1376,37 @@ target_link_libraries(CompressedPackedRowStoreTupleStorageSubBlock_unittest ${LIBS}) add_test(CompressedPackedRowStoreTupleStorageSubBlock_unittest CompressedPackedRowStoreTupleStorageSubBlock_unittest) +if (ENABLE_DISTRIBUTED) + add_executable(DataExchange_unittest + "${CMAKE_CURRENT_SOURCE_DIR}/tests/DataExchange_unittest.cpp") + target_link_libraries(DataExchange_unittest + gflags_nothreads-static + glog + gtest + quickstep_catalog_CatalogAttribute + quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogTypedefs + quickstep_queryexecution_BlockLocator + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil + quickstep_storage_CountedReference + quickstep_storage_DataExchangerAsync + quickstep_storage_StorageBlob + quickstep_storage_StorageBlock + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageConstants + quickstep_storage_StorageManager + quickstep_storage_TupleStorageSubBlock + quickstep_types_TypeFactory + quickstep_types_TypeID + quickstep_types_TypedValue + quickstep_types_containers_Tuple + tmb + ${LIBS}) + add_test(DataExchange_unittest DataExchange_unittest) +endif(ENABLE_DISTRIBUTED) + add_executable(CSBTreeIndexSubBlock_unittest "${CMAKE_CURRENT_SOURCE_DIR}/tests/CSBTreeIndexSubBlock_unittest.cpp") target_link_libraries(CSBTreeIndexSubBlock_unittest gtest http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ae50520/storage/DataExchange.proto ---------------------------------------------------------------------- diff --git a/storage/DataExchange.proto b/storage/DataExchange.proto new file mode 100644 index 0000000..a2636e5 --- /dev/null +++ b/storage/DataExchange.proto @@ -0,0 +1,31 @@ +// Copyright 2016 Pivotal Software, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package quickstep; + +service DataExchange { + rpc Pull (PullRequest) returns (PullResponse) {} +} + +message PullRequest { + fixed64 block_id = 1; +} + +message PullResponse { + bool is_valid = 1; + uint64 num_slots = 2; + bytes block = 3; +} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ae50520/storage/DataExchangerAsync.cpp ---------------------------------------------------------------------- diff --git a/storage/DataExchangerAsync.cpp b/storage/DataExchangerAsync.cpp new file mode 100644 index 0000000..78c6565 --- /dev/null +++ b/storage/DataExchangerAsync.cpp @@ -0,0 +1,165 @@ +/** + * Copyright 2016 Pivotal Software, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "storage/DataExchangerAsync.hpp" + +#include + +#include +#include +#include + +#include "storage/DataExchange.grpc.pb.h" +#include "storage/DataExchange.pb.h" +#include "storage/StorageManager.hpp" + +#include "glog/logging.h" + +using grpc::ServerCompletionQueue; + +namespace quickstep { +namespace { + +/** + * @brief RPC Request Context Instance. + **/ +class CallContext { + public: + /** + * @brief Constructor. + * + * @param service The async service. + * @param queue The RPC request queue. + * @param storage_manager The StorageManager to use. + **/ + CallContext(DataExchange::AsyncService *service, + ServerCompletionQueue *queue, + StorageManager *storage_manager) + : service_(service), + queue_(queue), + storage_manager_(DCHECK_NOTNULL(storage_manager)), + responder_(&context_), + status_(CallStatus::CREATE) { + Proceed(); + } + + /** + * @brief Process the RPC request. + **/ + void Proceed(); + + private: + DataExchange::AsyncService *service_; + ServerCompletionQueue *queue_; + + StorageManager *storage_manager_; + + grpc::ServerContext context_; + + PullRequest request_; + PullResponse response_; + + grpc::ServerAsyncResponseWriter responder_; + + enum class CallStatus { + CREATE = 0, + PROCESS, + FINISH, + }; + CallStatus status_; +}; + +void CallContext::Proceed() { + switch (status_) { + case CallStatus::CREATE: { + // Change this instance progress to the PROCESS state. + status_ = CallStatus::PROCESS; + + // As part of the initial CREATE state, we *request* that the system + // start processing Pull requests. In this request, "this" acts are + // the tag uniquely identifying the request (so that different CallContext + // instances can serve different requests concurrently), in this case + // the memory address of this CallContext instance. + service_->RequestPull(&context_, &request_, &responder_, queue_, queue_, this); + break; + } + case CallStatus::PROCESS: { + // Spawn a new CallContext instance to serve new clients while we process + // the one for this CallContext. The instance will deallocate itself as + // part of its FINISH state. + new CallContext(service_, queue_, storage_manager_); + + // The actual processing. + storage_manager_->pullBlockOrBlob(request_.block_id(), &response_); + + // And we are done! Let the gRPC runtime know we've finished, using the + // memory address of this instance as the uniquely identifying tag for + // the event. + status_ = CallStatus::FINISH; + responder_.Finish(response_, grpc::Status::OK, this); + break; + } + case CallStatus::FINISH: { + // Once in the FINISH state, deallocate ourselves (CallContext). + delete this; + break; + } + default: + LOG(FATAL) << "Unknown call status."; + } +} + +} // namespace + +const char *DataExchangerAsync::kLocalNetworkAddress = "0.0.0.0:"; + +DataExchangerAsync::DataExchangerAsync() { + grpc::ServerBuilder builder; + builder.AddListeningPort(kLocalNetworkAddress, grpc::InsecureServerCredentials(), &port_); + builder.RegisterService(&service_); + + queue_ = builder.AddCompletionQueue(); + server_ = builder.BuildAndStart(); + + DCHECK_GT(port_, 0); + server_address_ = kLocalNetworkAddress + std::to_string(port_); + LOG(INFO) << "DataExchangerAsync Service listening on " << server_address_; +} + +void DataExchangerAsync::run() { + // Self-destruct upon success. + new CallContext(&service_, queue_.get(), storage_manager_); + + void *tag = nullptr; // Uniquely identify a request. + bool ok = false; + + while (true) { + if (queue_->Next(&tag, &ok)) { + CallContext *call_context = static_cast(tag); + if (ok) { + call_context->Proceed(); + } else { + LOG(WARNING) << "Not ok\n"; + delete call_context; + } + } else { + LOG(INFO) << "Shutdown\n"; + return; + } + } +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ae50520/storage/DataExchangerAsync.hpp ---------------------------------------------------------------------- diff --git a/storage/DataExchangerAsync.hpp b/storage/DataExchangerAsync.hpp new file mode 100644 index 0000000..75a4e4d --- /dev/null +++ b/storage/DataExchangerAsync.hpp @@ -0,0 +1,97 @@ +/** + * Copyright 2016 Pivotal Software, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_ +#define QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_ + +#include + +#include +#include + +#include "storage/DataExchange.grpc.pb.h" +#include "threading/Thread.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +class StorageManager; + +/** + * @brief A class which exchanges data from a StorageManager to its peer. + **/ +class DataExchangerAsync final : public Thread { + public: + /** + * @brief Constructor. + **/ + DataExchangerAsync(); + + ~DataExchangerAsync() override {} + + /** + * @brief Set the local StorageManager. + * + * @param storage_manager The StorageManager to use. + **/ + void set_storage_manager(StorageManager *storage_manager) { + storage_manager_ = storage_manager; + } + + /** + * @brief Return its network address for peers to connect. + * + * @return Its network address. + **/ + std::string network_address() const { + DCHECK(!server_address_.empty()); + return server_address_; + } + + /** + * @brief Shutdown itself. + **/ + void shutdown() { + server_->Shutdown(); + // Always shutdown the completion queue after the server. + queue_->Shutdown(); + } + + protected: + void run() override; + + private: + static const char *kLocalNetworkAddress; + + DataExchange::AsyncService service_; + + int port_ = -1; + // Format IP:port, i.e., "0.0.0.0:0". + std::string server_address_; + + std::unique_ptr queue_; + std::unique_ptr server_; + + StorageManager *storage_manager_ = nullptr; + + DISALLOW_COPY_AND_ASSIGN(DataExchangerAsync); +}; + +} // namespace quickstep + +#endif // QUICKSTEP_STORAGE_DATA_EXCHANGER_ASYNC_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ae50520/storage/StorageManager.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp index 15e2503..8fc1224 100644 --- a/storage/StorageManager.cpp +++ b/storage/StorageManager.cpp @@ -41,6 +41,10 @@ #include #endif +#ifdef QUICKSTEP_DISTRIBUTED +#include +#endif + #include #include #include @@ -53,6 +57,8 @@ #include #include +#include "catalog/CatalogTypedefs.hpp" + #ifdef QUICKSTEP_DISTRIBUTED #include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryExecutionTypedefs.hpp" @@ -60,6 +66,12 @@ #endif #include "storage/CountedReference.hpp" + +#ifdef QUICKSTEP_DISTRIBUTED +#include "storage/DataExchange.grpc.pb.h" +#include "storage/DataExchange.pb.h" +#endif + #include "storage/EvictionPolicy.hpp" #include "storage/FileManagerLocal.hpp" #include "storage/StorageBlob.hpp" @@ -456,6 +468,80 @@ block_id StorageManager::allocateNewBlockOrBlob(const std::size_t num_slots, } #ifdef QUICKSTEP_DISTRIBUTED +void StorageManager::pullBlockOrBlob(const block_id block, + PullResponse *response) const { + SpinSharedMutexSharedLock read_lock(blocks_shared_mutex_); + std::unordered_map::const_iterator cit = blocks_.find(block); + if (cit != blocks_.end()) { + response->set_is_valid(true); + + const BlockHandle &block_handle = cit->second; + const std::size_t num_slots = block_handle.block_memory_size; + + response->set_num_slots(num_slots); + response->set_block(block_handle.block_memory, + num_slots * kSlotSizeBytes); + } else { + response->set_is_valid(false); + } +} + +StorageManager::DataExchangerClientAsync::DataExchangerClientAsync(const std::shared_ptr &channel, + StorageManager *storage_manager) + : stub_(DataExchange::NewStub(channel)), + storage_manager_(storage_manager) { +} + +bool StorageManager::DataExchangerClientAsync::Pull(const block_id block, + const numa_node_id numa_node, + BlockHandle *block_handle) { + grpc::ClientContext context; + + PullRequest request; + request.set_block_id(block); + + grpc::CompletionQueue queue; + + unique_ptr> rpc( + stub_->AsyncPull(&context, request, &queue)); + + PullResponse response; + grpc::Status status; + + rpc->Finish(&response, &status, reinterpret_cast(1)); + + void *got_tag; + bool ok = false; + + queue.Next(&got_tag, &ok); + CHECK(got_tag == reinterpret_cast(1)); + CHECK(ok); + + if (!status.ok()) { + LOG(ERROR) << "DataExchangerClientAsync Pull error: RPC failed"; + return false; + } + + if (!response.is_valid()) { + LOG(INFO) << "The pulling block not found in all the peers"; + return false; + } + + const size_t num_slots = response.num_slots(); + DCHECK_NE(num_slots, 0u); + + const string &block_content = response.block(); + DCHECK_EQ(kSlotSizeBytes * num_slots, block_content.size()); + + void *block_buffer = storage_manager_->allocateSlots(num_slots, numa_node); + + block_handle->block_memory = + std::memcpy(block_buffer, block_content.c_str(), block_content.size()); + block_handle->block_memory_size = num_slots; + + return true; +} + vector StorageManager::getPeerDomainNetworkAddresses(const block_id block) { serialization::BlockMessage proto; proto.set_block_id(block); @@ -541,14 +627,37 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob( // The caller of this function holds an exclusive lock on this block/blob's // mutex in the lock manager. The caller has ensured that the block is not // already loaded before this function gets called. - size_t num_slots = file_manager_->numSlots(block); + BlockHandle loaded_handle; + +#ifdef QUICKSTEP_DISTRIBUTED + // TODO(quickstep-team): Use a cost model to determine whether to load from + // a remote peer or the disk. + if (BlockIdUtil::Domain(block) != block_domain_) { + LOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer"; + const vector peer_domain_network_addresses = getPeerDomainNetworkAddresses(block); + for (const string &peer_domain_network_address : peer_domain_network_addresses) { + DataExchangerClientAsync client( + grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()), + this); + + if (client.Pull(block, numa_node, &loaded_handle)) { + sendBlockLocationMessage(block, kAddBlockLocationMessage); + return loaded_handle; + } + } + + LOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block) + << " from remote peers, so try to load from disk."; + } +#endif + + const size_t num_slots = file_manager_->numSlots(block); DEBUG_ASSERT(num_slots != 0); void *block_buffer = allocateSlots(num_slots, numa_node); const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots); CHECK(status) << "Failed to read block from persistent storage: " << block; - BlockHandle loaded_handle; loaded_handle.block_memory = block_buffer; loaded_handle.block_memory_size = num_slots; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ae50520/storage/StorageManager.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp index 55a011e..50ddb0f 100644 --- a/storage/StorageManager.hpp +++ b/storage/StorageManager.hpp @@ -26,9 +26,14 @@ #include #include +#include "catalog/CatalogTypedefs.hpp" #include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED - #include "storage/CountedReference.hpp" + +#ifdef QUICKSTEP_DISTRIBUTED +#include "storage/DataExchange.grpc.pb.h" +#endif + #include "storage/EvictionPolicy.hpp" #include "storage/FileManager.hpp" #include "storage/StorageBlob.hpp" @@ -45,6 +50,10 @@ #include "tmb/id_typedefs.h" +#ifdef QUICKSTEP_DISTRIBUTED +namespace grpc { class Channel; } +#endif + namespace tmb { class MessageBus; } namespace quickstep { @@ -58,6 +67,10 @@ DECLARE_bool(use_hdfs); class CatalogRelationSchema; +#ifdef QUICKSTEP_DISTRIBUTED +class PullResponse; +#endif + class StorageBlockLayout; /** \addtogroup Storage @@ -365,6 +378,16 @@ class StorageManager { **/ bool blockOrBlobIsLoadedAndDirty(const block_id block); +#ifdef QUICKSTEP_DISTRIBUTED + /** + * @brief Pull a block or a blob. Used by DataExchangerAsync. + * + * @param block The id of the block or blob. + * @param response Where to store the pulled block content. + **/ + void pullBlockOrBlob(const block_id block, PullResponse *response) const; +#endif + private: struct BlockHandle { void *block_memory; @@ -374,6 +397,42 @@ class StorageManager { #ifdef QUICKSTEP_DISTRIBUTED /** + * @brief A class which connects to DataExchangerAsync to exchange data from + * remote peers. + **/ + class DataExchangerClientAsync { + public: + /** + * @brief Constructor. + * + * @param channel The RPC channel to connect DataExchangerAsync. + * @param storage_manager The StorageManager to use. + */ + DataExchangerClientAsync(const std::shared_ptr &channel, + StorageManager *storage_manager); + + /** + * @brief Pull a block or blob from a remote StorageManager. + * + * @param block The block or blob to pull. + * @param numa_node The NUMA node for placing this block. + * @param block_handle Where the pulled block or blob stores. + * + * @return Whether the pull operation is successful or not. + */ + bool Pull(const block_id block, + const numa_node_id numa_node, + BlockHandle *block_handle); + + private: + std::unique_ptr stub_; + + StorageManager *storage_manager_; + + DISALLOW_COPY_AND_ASSIGN(DataExchangerClientAsync); + }; + + /** * @brief Get the network info of all the remote StorageManagers which may * load the given block in the buffer pool. * http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ae50520/storage/tests/DataExchange_unittest.cpp ---------------------------------------------------------------------- diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp new file mode 100644 index 0000000..38d12f6 --- /dev/null +++ b/storage/tests/DataExchange_unittest.cpp @@ -0,0 +1,240 @@ +/** + * Copyright 2016 Pivotal Software, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include +#include +#include +#include +#include + +#include "catalog/CatalogAttribute.hpp" +#include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/BlockLocator.hpp" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#include "storage/CountedReference.hpp" +#include "storage/DataExchangerAsync.hpp" +#include "storage/StorageBlob.hpp" +#include "storage/StorageBlock.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageConstants.hpp" +#include "storage/StorageManager.hpp" +#include "storage/TupleStorageSubBlock.hpp" +#include "types/TypeFactory.hpp" +#include "types/TypeID.hpp" +#include "types/TypedValue.hpp" +#include "types/containers/Tuple.hpp" + +#include "gflags/gflags.h" +#include "glog/logging.h" +#include "gtest/gtest.h" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +using std::free; +using std::malloc; +using std::move; +using std::string; +using std::unique_ptr; +using std::vector; + +using tmb::MessageBus; +using tmb::TaggedMessage; + +namespace quickstep { + +class DataExchangeTest : public ::testing::Test { + protected: + static const char kStoragePath[]; + static const char kCheckedDomainNetworkAddress[]; + + ~DataExchangeTest() { + data_exchanger_expected_.join(); + locator_->join(); + } + + virtual void SetUp() { + bus_.Initialize(); + + locator_.reset(new BlockLocator(&bus_)); + locator_client_id_ = locator_->getBusClientID(); + locator_->start(); + + worker_client_id_ = bus_.Connect(); + bus_.RegisterClientAsSender(worker_client_id_, kBlockDomainRegistrationMessage); + bus_.RegisterClientAsReceiver(worker_client_id_, kBlockDomainRegistrationResponseMessage); + + bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage); + + storage_manager_expected_.reset(new StorageManager( + kStoragePath, + getBlockDomain(data_exchanger_expected_.network_address()), + locator_client_id_, + &bus_)); + + data_exchanger_expected_.set_storage_manager(storage_manager_expected_.get()); + data_exchanger_expected_.start(); + + storage_manager_checked_.reset(new StorageManager( + kStoragePath, + getBlockDomain(kCheckedDomainNetworkAddress), + locator_client_id_, + &bus_)); + } + + virtual void TearDown() { + storage_manager_checked_.reset(); + + data_exchanger_expected_.shutdown(); + storage_manager_expected_.reset(); + + serialization::EmptyMessage proto; + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kPoisonMessage); + free(proto_bytes); + + LOG(INFO) << "Worker (id '" << worker_client_id_ + << "') sent PoisonMessage (typed '" << kPoisonMessage + << "') to BlockLocator (id '" << locator_client_id_ << "')"; + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, + worker_client_id_, + locator_client_id_, + move(message))); + } + + unique_ptr storage_manager_expected_, storage_manager_checked_; + + private: + block_id_domain getBlockDomain(const string &network_address) { + serialization::BlockDomainRegistrationMessage proto; + proto.set_domain_network_address(network_address); + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast(proto_bytes), + proto_length, + kBlockDomainRegistrationMessage); + free(proto_bytes); + + LOG(INFO) << "Worker (id '" << worker_client_id_ + << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage + << "') to BlockLocator (id '" << locator_client_id_ << "')"; + + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, + worker_client_id_, + locator_client_id_, + move(message))); + + const tmb::AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true)); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + EXPECT_EQ(locator_client_id_, annotated_message.sender); + EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type()); + + LOG(INFO) << "Worker (id '" << worker_client_id_ + << "') received BlockDomainRegistrationResponseMessage (typed '" + << kBlockDomainRegistrationResponseMessage + << "') from BlockLocator"; + + serialization::BlockDomainMessage response_proto; + CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + return static_cast(response_proto.block_domain()); + } + + MessageBusImpl bus_; + + unique_ptr locator_; + tmb::client_id locator_client_id_; + + tmb::client_id worker_client_id_; + + DataExchangerAsync data_exchanger_expected_; +}; + +const char DataExchangeTest::kStoragePath[] = "./data_exchange_test_data/"; +const char DataExchangeTest::kCheckedDomainNetworkAddress[] = "ip:port"; + +TEST_F(DataExchangeTest, BlockPull) { + CatalogRelation relation(nullptr, "rel"); + const attribute_id attr_id = + relation.addAttribute(new CatalogAttribute(nullptr, "attr_int", TypeFactory::GetType(kInt))); + + const block_id block = + storage_manager_expected_->createBlock(relation, relation.getDefaultStorageBlockLayout()); + + { + MutableBlockReference block_expected = storage_manager_expected_->getBlockMutable(block, relation); + + // Insert a tuple. + const int value_expected = -1; + { + vector attrs(1, TypedValue(value_expected)); + const Tuple tuple(move(attrs)); + + EXPECT_TRUE(block_expected->insertTuple(tuple)); + } + + const BlockReference block_checked = storage_manager_checked_->getBlock(block, relation); + EXPECT_FALSE(block_checked->isBlob()); + + const TupleStorageSubBlock &tuple_store_checked = block_checked->getTupleStorageSubBlock(); + + EXPECT_EQ(1, tuple_store_checked.numTuples()); + EXPECT_EQ(value_expected, tuple_store_checked.getAttributeValueTyped(0 /* tuple_id */, attr_id).getLiteral()); + } + + storage_manager_checked_->deleteBlockOrBlobFile(block); + storage_manager_expected_->deleteBlockOrBlobFile(block); +} + +TEST_F(DataExchangeTest, BlobPull) { + const block_id blob = storage_manager_expected_->createBlob(kDefaultBlockSizeInSlots); + { + const BlobReference blob_expected = storage_manager_expected_->getBlob(blob); + const BlobReference blob_checked = storage_manager_checked_->getBlob(blob); + EXPECT_TRUE(blob_checked->isBlob()); + EXPECT_EQ(blob, blob_checked->getID()); + } + + storage_manager_checked_->deleteBlockOrBlobFile(blob); + storage_manager_expected_->deleteBlockOrBlobFile(blob); +} + +} // namespace quickstep + +int main(int argc, char **argv) { + google::InitGoogleLogging(argv[0]); + // Honor FLAGS_buffer_pool_slots in StorageManager. + gflags::ParseCommandLineFlags(&argc, &argv, true); + + ::testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ae50520/third_party/iwyu/iwyu_helper.py ---------------------------------------------------------------------- diff --git a/third_party/iwyu/iwyu_helper.py b/third_party/iwyu/iwyu_helper.py index a204c50..13697be 100755 --- a/third_party/iwyu/iwyu_helper.py +++ b/third_party/iwyu/iwyu_helper.py @@ -19,8 +19,9 @@ import sys QUICKSTEP_INCLUDES = [ '.', './build', './build/third_party', - './build/third_party/protobuf/include', './build/third_party/gflags/include', + './build/third_party/protobuf/include', + './build/third_party/tmb/include', './third_party/benchmark/include', './third_party/glog/src', './third_party/googletest/googletest/include', http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ae50520/validate_cmakelists.py ---------------------------------------------------------------------- diff --git a/validate_cmakelists.py b/validate_cmakelists.py index c7b5883..7dd6fc5 100755 --- a/validate_cmakelists.py +++ b/validate_cmakelists.py @@ -17,7 +17,7 @@ TODO List / Known Issues & Limitations: """ # Copyright 2011-2015 Quickstep Technologies LLC. -# Copyright 2015 Pivotal Software, Inc. +# Copyright 2015-2016 Pivotal Software, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -40,7 +40,8 @@ EXCLUDED_TOP_LEVEL_DIRS = ["build", "third_party"] # Explicitly ignored dependencies (special headers with no other quickstep # dependencies). IGNORED_DEPENDENCIES = frozenset( - ["quickstep_threading_WinThreadsAPI", + ["quickstep_storage_DataExchange.grpc_proto", + "quickstep_threading_WinThreadsAPI", "quickstep_utility_textbasedtest_TextBasedTest", "quickstep_utility_textbasedtest_TextBasedTestDriver", "quickstep_storage_bitweaving_BitWeavingHIndexSubBlock",