Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C0140200BBD for ; Tue, 25 Oct 2016 01:31:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BE99C160B00; Mon, 24 Oct 2016 23:31:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B3878160AEB for ; Tue, 25 Oct 2016 01:31:25 +0200 (CEST) Received: (qmail 22428 invoked by uid 500); 24 Oct 2016 23:31:24 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 22419 invoked by uid 99); 24 Oct 2016 23:31:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Oct 2016 23:31:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 531B6C0C69 for ; Mon, 24 Oct 2016 23:31:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id m86OIZ4-LYqU for ; Mon, 24 Oct 2016 23:31:13 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 53C225FC3A for ; Mon, 24 Oct 2016 23:31:12 +0000 (UTC) Received: (qmail 22276 invoked by uid 99); 24 Oct 2016 23:31:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Oct 2016 23:31:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 67E2ADFF5A; Mon, 24 Oct 2016 23:31:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hbdeshmukh@apache.org To: commits@quickstep.incubator.apache.org Date: Mon, 24 Oct 2016 23:31:12 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-quickstep git commit: Add backend support for LIPFilters. archived-at: Mon, 24 Oct 2016 23:31:27 -0000 Add backend support for LIPFilters. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/393eba55 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/393eba55 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/393eba55 Branch: refs/heads/master Commit: 393eba550efdabbdccb2bf166fe8deae8bd3ed77 Parents: 539a95a Author: Jianqiao Zhu Authored: Wed Sep 7 13:20:43 2016 -0500 Committer: Jianqiao Zhu Committed: Mon Oct 24 17:00:55 2016 -0500 ---------------------------------------------------------------------- expressions/scalar/ScalarAttribute.cpp | 2 +- query_execution/QueryContext.hpp | 8 +- query_optimizer/PhysicalGenerator.cpp | 2 +- query_optimizer/rules/AttachLIPFilters.cpp | 17 ++- query_optimizer/rules/CMakeLists.txt | 1 + query_optimizer/tests/OptimizerTextTest.cpp | 6 +- relational_operators/AggregationOperator.cpp | 12 +- relational_operators/AggregationOperator.hpp | 10 +- relational_operators/BuildHashOperator.cpp | 17 ++- relational_operators/BuildHashOperator.hpp | 18 ++- relational_operators/CMakeLists.txt | 11 ++ relational_operators/HashJoinOperator.cpp | 161 ++++++++++++++++------ relational_operators/HashJoinOperator.hpp | 64 ++++++--- relational_operators/SelectOperator.cpp | 87 +++++++++--- relational_operators/SelectOperator.hpp | 16 ++- relational_operators/WorkOrder.proto | 7 +- relational_operators/WorkOrderFactory.cpp | 73 +++++++++- storage/AggregationOperationState.cpp | 49 ++++--- storage/AggregationOperationState.hpp | 9 +- storage/CMakeLists.txt | 5 +- storage/StorageBlock.cpp | 112 +++++---------- storage/StorageBlock.hpp | 82 ++++------- utility/PlanVisualizer.cpp | 4 +- utility/lip_filter/CMakeLists.txt | 4 + utility/lip_filter/LIPFilterBuilder.hpp | 3 - utility/lip_filter/LIPFilterUtil.hpp | 79 +++++++++++ 26 files changed, 588 insertions(+), 271 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/expressions/scalar/ScalarAttribute.cpp ---------------------------------------------------------------------- diff --git a/expressions/scalar/ScalarAttribute.cpp b/expressions/scalar/ScalarAttribute.cpp index b29286b..cc42084 100644 --- a/expressions/scalar/ScalarAttribute.cpp +++ b/expressions/scalar/ScalarAttribute.cpp @@ -168,7 +168,7 @@ ColumnVector* ScalarAttribute::getAllValuesForJoin( ValueAccessor *accessor = using_left_relation ? left_accessor : right_accessor; - return InvokeOnValueAccessorNotAdapter( + return InvokeOnAnyValueAccessor( accessor, [&joined_tuple_ids, &attr_id, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/query_execution/QueryContext.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp index 4ebb042..7ad8fa1 100644 --- a/query_execution/QueryContext.hpp +++ b/query_execution/QueryContext.hpp @@ -88,7 +88,7 @@ class QueryContext { /** * @brief A unique identifier for a LIPFilterDeployment per query. **/ - typedef std::uint32_t lip_deployment_id; + typedef std::int32_t lip_deployment_id; static constexpr lip_deployment_id kInvalidLIPDeploymentId = static_cast(-1); /** @@ -315,7 +315,7 @@ class QueryContext { * @return True if valid, otherwise false. **/ bool isValidLIPDeploymentId(const lip_deployment_id id) const { - return id < lip_deployments_.size(); + return static_cast(id) < lip_deployments_.size(); } /** @@ -328,7 +328,7 @@ class QueryContext { **/ inline const LIPFilterDeployment* getLIPDeployment( const lip_deployment_id id) const { - DCHECK_LT(id, lip_deployments_.size()); + DCHECK_LT(static_cast(id), lip_deployments_.size()); return lip_deployments_[id].get(); } @@ -338,7 +338,7 @@ class QueryContext { * @param id The id of the LIPFilterDeployment to destroy. **/ inline void destroyLIPDeployment(const lip_deployment_id id) { - DCHECK_LT(id, lip_deployments_.size()); + DCHECK_LT(static_cast(id), lip_deployments_.size()); lip_deployments_[id].reset(); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/query_optimizer/PhysicalGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp index 9db4037..7cb97dc 100644 --- a/query_optimizer/PhysicalGenerator.cpp +++ b/query_optimizer/PhysicalGenerator.cpp @@ -50,7 +50,7 @@ DEFINE_bool(reorder_hash_joins, true, "cardinality and selective tables to be joined first, which is suitable " "for queries on star-schema tables."); -DEFINE_bool(use_lip_filters, false, +DEFINE_bool(use_lip_filters, true, "If true, use LIP (Lookahead Information Passing) filters to accelerate " "query processing. LIP filters are effective for queries on star schema " "tables (e.g. the SSB benchmark) and snowflake schema tables (e.g. the " http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/query_optimizer/rules/AttachLIPFilters.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/AttachLIPFilters.cpp b/query_optimizer/rules/AttachLIPFilters.cpp index 090fb8c..b3c57ab 100644 --- a/query_optimizer/rules/AttachLIPFilters.cpp +++ b/query_optimizer/rules/AttachLIPFilters.cpp @@ -36,6 +36,7 @@ #include "query_optimizer/physical/PhysicalType.hpp" #include "query_optimizer/physical/Selection.hpp" #include "query_optimizer/physical/TopLevelPlan.hpp" +#include "types/TypedValue.hpp" #include "utility/lip_filter/LIPFilter.hpp" #include "glog/logging.h" @@ -174,12 +175,16 @@ const std::vector& AttachLIPFilters if (selectivity < 1.0) { std::size_t cardinality = cost_model_->estimateCardinality(build_node); for (const auto &attr : hash_join->right_join_attributes()) { - lip_filters.emplace_back( - std::make_shared(attr, - path.cdr()->node, - path.depth, - selectivity, - cardinality)); + // NOTE(jianqiao): currently we only consider attributes of primitive + // fixed-length types. + if (TypedValue::HashIsReversible(attr->getValueType().getTypeID())) { + lip_filters.emplace_back( + std::make_shared(attr, + path.cdr()->node, + path.depth, + selectivity, + cardinality)); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/query_optimizer/rules/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt index 29875f6..7fffadc 100644 --- a/query_optimizer/rules/CMakeLists.txt +++ b/query_optimizer/rules/CMakeLists.txt @@ -50,6 +50,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_AttachLIPFilters quickstep_queryoptimizer_physical_Selection quickstep_queryoptimizer_physical_TopLevelPlan quickstep_queryoptimizer_rules_Rule + quickstep_types_TypedValue quickstep_utility_Macros quickstep_utility_lipfilter_LIPFilter) target_link_libraries(quickstep_queryoptimizer_rules_BottomUpRule http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/query_optimizer/tests/OptimizerTextTest.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/OptimizerTextTest.cpp b/query_optimizer/tests/OptimizerTextTest.cpp index b6be739..759c173 100644 --- a/query_optimizer/tests/OptimizerTextTest.cpp +++ b/query_optimizer/tests/OptimizerTextTest.cpp @@ -32,6 +32,7 @@ namespace quickstep { namespace optimizer { DECLARE_bool(reorder_hash_joins); +DECLARE_bool(use_lip_filters); } } @@ -57,9 +58,10 @@ int main(int argc, char** argv) { test_driver->registerOptions( quickstep::optimizer::OptimizerTextTestRunner::kTestOptions); - // Turn off join order optimization for optimizer test since it is up to change - // and affects a large number of test cases. + // Turn off join order optimization and LIPFilter for optimizer test since + // it is up to change and affects a large number of test cases. quickstep::optimizer::FLAGS_reorder_hash_joins = false; + quickstep::optimizer::FLAGS_use_lip_filters = false; ::testing::InitGoogleTest(&argc, argv); int success = RUN_ALL_TESTS(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/AggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp index 056e76d..e111f5b 100644 --- a/relational_operators/AggregationOperator.cpp +++ b/relational_operators/AggregationOperator.cpp @@ -27,6 +27,8 @@ #include "relational_operators/WorkOrder.pb.h" #include "storage/AggregationOperationState.hpp" #include "storage/StorageBlockInfo.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" +#include "utility/lip_filter/LIPFilterUtil.hpp" #include "tmb/id_typedefs.h" @@ -45,7 +47,8 @@ bool AggregationOperator::getAllWorkOrders( new AggregationWorkOrder( query_id_, input_block_id, - query_context->getAggregationState(aggr_state_index_)), + query_context->getAggregationState(aggr_state_index_), + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)), op_index_); } started_ = true; @@ -57,7 +60,8 @@ bool AggregationOperator::getAllWorkOrders( new AggregationWorkOrder( query_id_, input_relation_block_ids_[num_workorders_generated_], - query_context->getAggregationState(aggr_state_index_)), + query_context->getAggregationState(aggr_state_index_), + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)), op_index_); ++num_workorders_generated_; } @@ -92,13 +96,13 @@ serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_ proto->SetExtension(serialization::AggregationWorkOrder::block_id, block); proto->SetExtension(serialization::AggregationWorkOrder::aggr_state_index, aggr_state_index_); + proto->SetExtension(serialization::AggregationWorkOrder::lip_deployment_index, lip_deployment_index_); return proto; } - void AggregationWorkOrder::execute() { - state_->aggregateBlock(input_block_id_); + state_->aggregateBlock(input_block_id_, lip_filter_adaptive_prober_.get()); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/AggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp index 31c1da4..b5ed977 100644 --- a/relational_operators/AggregationOperator.hpp +++ b/relational_operators/AggregationOperator.hpp @@ -30,6 +30,7 @@ #include "relational_operators/WorkOrder.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" #include "glog/logging.h" @@ -137,13 +138,16 @@ class AggregationWorkOrder : public WorkOrder { * @param query_id The ID of this query. * @param input_block_id The block id. * @param state The AggregationState to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ AggregationWorkOrder(const std::size_t query_id, const block_id input_block_id, - AggregationOperationState *state) + AggregationOperationState *state, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), input_block_id_(input_block_id), - state_(DCHECK_NOTNULL(state)) {} + state_(DCHECK_NOTNULL(state)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} ~AggregationWorkOrder() override {} @@ -153,6 +157,8 @@ class AggregationWorkOrder : public WorkOrder { const block_id input_block_id_; AggregationOperationState *state_; + std::unique_ptr lip_filter_adaptive_prober_; + DISALLOW_COPY_AND_ASSIGN(AggregationWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/BuildHashOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp index 465621c..60e091f 100644 --- a/relational_operators/BuildHashOperator.cpp +++ b/relational_operators/BuildHashOperator.cpp @@ -34,6 +34,8 @@ #include "storage/TupleReference.hpp" #include "storage/TupleStorageSubBlock.hpp" #include "storage/ValueAccessor.hpp" +#include "utility/lip_filter/LIPFilterBuilder.hpp" +#include "utility/lip_filter/LIPFilterUtil.hpp" #include "glog/logging.h" @@ -79,7 +81,8 @@ bool BuildHashOperator::getAllWorkOrders( any_join_key_attributes_nullable_, input_block_id, hash_table, - storage_manager), + storage_manager, + CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)), op_index_); } started_ = true; @@ -95,7 +98,8 @@ bool BuildHashOperator::getAllWorkOrders( any_join_key_attributes_nullable_, input_relation_block_ids_[num_workorders_generated_], hash_table, - storage_manager), + storage_manager, + CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)), op_index_); ++num_workorders_generated_; } @@ -136,17 +140,24 @@ serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id any_join_key_attributes_nullable_); proto->SetExtension(serialization::BuildHashWorkOrder::join_hash_table_index, hash_table_index_); proto->SetExtension(serialization::BuildHashWorkOrder::block_id, block); + proto->SetExtension(serialization::BuildHashWorkOrder::lip_deployment_index, lip_deployment_index_); return proto; } - void BuildHashWorkOrder::execute() { BlockReference block( storage_manager_->getBlock(build_block_id_, input_relation_)); TupleReferenceGenerator generator(build_block_id_); std::unique_ptr accessor(block->getTupleStorageSubBlock().createValueAccessor()); + + // Build LIPFilters if enabled. + if (lip_filter_builder_ != nullptr) { + lip_filter_builder_->insertValueAccessor(accessor.get()); + accessor->beginIterationVirtual(); + } + HashTablePutResult result; if (join_key_attributes_.size() == 1) { result = hash_table_->putValueAccessor(accessor.get(), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/BuildHashOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp index 4a80a8a..0f96ef2 100644 --- a/relational_operators/BuildHashOperator.hpp +++ b/relational_operators/BuildHashOperator.hpp @@ -20,6 +20,7 @@ #ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_ #define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_ +#include #include #include #include @@ -31,6 +32,7 @@ #include "relational_operators/WorkOrder.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilterBuilder.hpp" #include "glog/logging.h" @@ -162,6 +164,7 @@ class BuildHashWorkOrder : public WorkOrder { * @param build_block_id The block id. * @param hash_table The JoinHashTable to use. * @param storage_manager The StorageManager to use. + * @param lip_filter_builder The attached LIP filter builer. **/ BuildHashWorkOrder(const std::size_t query_id, const CatalogRelationSchema &input_relation, @@ -169,14 +172,16 @@ class BuildHashWorkOrder : public WorkOrder { const bool any_join_key_attributes_nullable, const block_id build_block_id, JoinHashTable *hash_table, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterBuilder *lip_filter_builder = nullptr) : WorkOrder(query_id), input_relation_(input_relation), join_key_attributes_(join_key_attributes), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), build_block_id_(build_block_id), hash_table_(DCHECK_NOTNULL(hash_table)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_builder_(lip_filter_builder) {} /** * @brief Constructor for the distributed version. @@ -189,6 +194,7 @@ class BuildHashWorkOrder : public WorkOrder { * @param build_block_id The block id. * @param hash_table The JoinHashTable to use. * @param storage_manager The StorageManager to use. + * @param lip_filter_builder The attached LIP filter builer. **/ BuildHashWorkOrder(const std::size_t query_id, const CatalogRelationSchema &input_relation, @@ -196,14 +202,16 @@ class BuildHashWorkOrder : public WorkOrder { const bool any_join_key_attributes_nullable, const block_id build_block_id, JoinHashTable *hash_table, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterBuilder *lip_filter_builder = nullptr) : WorkOrder(query_id), input_relation_(input_relation), join_key_attributes_(std::move(join_key_attributes)), any_join_key_attributes_nullable_(any_join_key_attributes_nullable), build_block_id_(build_block_id), hash_table_(DCHECK_NOTNULL(hash_table)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_builder_(lip_filter_builder) {} ~BuildHashWorkOrder() override {} @@ -222,6 +230,8 @@ class BuildHashWorkOrder : public WorkOrder { JoinHashTable *hash_table_; StorageManager *storage_manager_; + std::unique_ptr lip_filter_builder_; + DISALLOW_COPY_AND_ASSIGN(BuildHashWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index a9645b4..8dd65d0 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -92,6 +92,8 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator quickstep_storage_AggregationOperationState quickstep_storage_StorageBlockInfo quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilterAdaptiveProber + quickstep_utility_lipfilter_LIPFilterUtil tmb) target_link_libraries(quickstep_relationaloperators_BuildHashOperator glog @@ -111,6 +113,8 @@ target_link_libraries(quickstep_relationaloperators_BuildHashOperator quickstep_storage_TupleStorageSubBlock quickstep_storage_ValueAccessor quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilterBuilder + quickstep_utility_lipfilter_LIPFilterUtil tmb) target_link_libraries(quickstep_relationaloperators_CreateIndexOperator glog @@ -223,6 +227,8 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator quickstep_types_containers_ColumnVector quickstep_types_containers_ColumnVectorsValueAccessor quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilterAdaptiveProber + quickstep_utility_lipfilter_LIPFilterUtil tmb) target_link_libraries(quickstep_relationaloperators_InsertOperator glog @@ -322,7 +328,11 @@ target_link_libraries(quickstep_relationaloperators_SelectOperator quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager + quickstep_storage_TupleIdSequence + quickstep_storage_ValueAccessor quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilterAdaptiveProber + quickstep_utility_lipfilter_LIPFilterUtil tmb) if(QUICKSTEP_HAVE_LIBNUMA) target_link_libraries(quickstep_relationaloperators_SelectOperator @@ -492,6 +502,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory quickstep_relationaloperators_WorkOrder_proto quickstep_storage_StorageBlockInfo quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilterUtil tmb) target_link_libraries(quickstep_relationaloperators_WorkOrder_proto quickstep_relationaloperators_SortMergeRunOperator_proto http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/HashJoinOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp index 779c0fe..4a91f86 100644 --- a/relational_operators/HashJoinOperator.cpp +++ b/relational_operators/HashJoinOperator.cpp @@ -48,6 +48,8 @@ #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" +#include "utility/lip_filter/LIPFilterUtil.hpp" #include "gflags/gflags.h" #include "glog/logging.h" @@ -95,28 +97,22 @@ class MapBasedJoinedTupleCollector { class SemiAntiJoinTupleCollector { public: - explicit SemiAntiJoinTupleCollector(const TupleStorageSubBlock &tuple_store) { - filter_.reset(tuple_store.getExistenceMap()); - } + explicit SemiAntiJoinTupleCollector(TupleIdSequence *filter) + : filter_(filter) {} template inline void operator()(const ValueAccessorT &accessor) { filter_->set(accessor.getCurrentPosition(), false); } - const TupleIdSequence* filter() const { - return filter_.get(); - } - private: - std::unique_ptr filter_; + TupleIdSequence *filter_; }; class OuterJoinTupleCollector { public: - explicit OuterJoinTupleCollector(const TupleStorageSubBlock &tuple_store) { - filter_.reset(tuple_store.getExistenceMap()); - } + explicit OuterJoinTupleCollector(TupleIdSequence *filter) + : filter_(filter) {} template inline void operator()(const ValueAccessorT &accessor, @@ -134,14 +130,10 @@ class OuterJoinTupleCollector { return &joined_tuples_; } - const TupleIdSequence* filter() const { - return filter_.get(); - } - private: std::unordered_map>> joined_tuples_; // BitVector on the probe relation. 1 if the corresponding tuple has no match. - std::unique_ptr filter_; + TupleIdSequence *filter_; }; } // namespace @@ -203,7 +195,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders( selection, hash_table, output_destination, - storage_manager), + storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)), op_index_); } started_ = true; @@ -223,7 +216,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders( selection, hash_table, output_destination, - storage_manager), + storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)), op_index_); ++num_workorders_generated_; } // end while @@ -264,7 +258,8 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders( is_selection_on_build_, hash_table, output_destination, - storage_manager), + storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)), op_index_); } started_ = true; @@ -284,7 +279,8 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders( is_selection_on_build_, hash_table, output_destination, - storage_manager), + storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)), op_index_); ++num_workorders_generated_; } @@ -360,6 +356,7 @@ serialization::WorkOrder* HashJoinOperator::createNonOuterJoinWorkOrderProto( proto->SetExtension(serialization::HashJoinWorkOrder::selection_index, selection_index_); proto->SetExtension(serialization::HashJoinWorkOrder::block_id, block); proto->SetExtension(serialization::HashJoinWorkOrder::residual_predicate_index, residual_predicate_index_); + proto->SetExtension(serialization::HashJoinWorkOrder::lip_deployment_index, lip_deployment_index_); return proto; } @@ -408,6 +405,7 @@ serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const proto->SetExtension(serialization::HashJoinWorkOrder::join_hash_table_index, hash_table_index_); proto->SetExtension(serialization::HashJoinWorkOrder::selection_index, selection_index_); proto->SetExtension(serialization::HashJoinWorkOrder::block_id, block); + proto->SetExtension(serialization::HashJoinWorkOrder::lip_deployment_index, lip_deployment_index_); for (const bool is_attribute_on_build : is_selection_on_build_) { proto->AddExtension(serialization::HashJoinWorkOrder::is_selection_on_build, is_attribute_on_build); @@ -421,8 +419,19 @@ void HashInnerJoinWorkOrder::execute() { BlockReference probe_block( storage_manager_->getBlock(block_id_, probe_relation_)); const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); - std::unique_ptr probe_accessor(probe_store.createValueAccessor()); + + // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled. + std::unique_ptr existence_map; + std::unique_ptr base_accessor; + if (lip_filter_adaptive_prober_ != nullptr) { + base_accessor.reset(probe_accessor.release()); + existence_map.reset( + lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get())); + probe_accessor.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); + } + MapBasedJoinedTupleCollector collector; if (join_key_attributes_.size() == 1) { hash_table_.getAllFromValueAccessor( @@ -526,9 +535,19 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() { BlockReference probe_block = storage_manager_->getBlock(block_id_, probe_relation_); const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); - std::unique_ptr probe_accessor(probe_store.createValueAccessor()); + // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled. + std::unique_ptr existence_map; + std::unique_ptr base_accessor; + if (lip_filter_adaptive_prober_ != nullptr) { + base_accessor.reset(probe_accessor.release()); + existence_map.reset( + lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get())); + probe_accessor.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); + } + // We collect all the matching probe relation tuples, as there's a residual // preidcate that needs to be applied after collecting these matches. MapBasedJoinedTupleCollector collector; @@ -548,7 +567,6 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() { // Get a filter for tuples in the given probe block. TupleIdSequence filter(probe_store.getMaxTupleID() + 1); - filter.setRange(0, filter.length(), false); for (const std::pair>> &build_block_entry : *collector.getJoinedTuples()) { @@ -607,9 +625,24 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() { BlockReference probe_block = storage_manager_->getBlock(block_id_, probe_relation_); const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); - std::unique_ptr probe_accessor(probe_store.createValueAccessor()); - SemiAntiJoinTupleCollector collector(probe_store); + + // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled. + std::unique_ptr existence_map; + std::unique_ptr base_accessor; + if (lip_filter_adaptive_prober_ != nullptr) { + base_accessor.reset(probe_accessor.release()); + existence_map.reset( + lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get())); + probe_accessor.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); + } + + if (existence_map == nullptr) { + existence_map.reset(probe_store.getExistenceMap()); + } + + SemiAntiJoinTupleCollector collector(existence_map.get()); // We collect all the probe relation tuples which have at least one matching // tuple in the build relation. As a performance optimization, the hash table // just looks for the existence of the probing key in the hash table and sets @@ -637,7 +670,7 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() { probe_block->getIndicesConsistent()); std::unique_ptr probe_accessor_with_filter( - probe_store.createValueAccessor(collector.filter())); + probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); ColumnVectorsValueAccessor temp_result; for (vector>::const_iterator selection_it = selection_.begin(); selection_it != selection_.end(); ++selection_it) { @@ -654,9 +687,24 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() { BlockReference probe_block = storage_manager_->getBlock(block_id_, probe_relation_); const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); - std::unique_ptr probe_accessor(probe_store.createValueAccessor()); - SemiAntiJoinTupleCollector collector(probe_store); + + // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled. + std::unique_ptr existence_map; + std::unique_ptr base_accessor; + if (lip_filter_adaptive_prober_ != nullptr) { + base_accessor.reset(probe_accessor.release()); + existence_map.reset( + lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get())); + probe_accessor.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); + } + + if (existence_map == nullptr) { + existence_map.reset(probe_store.getExistenceMap()); + } + + SemiAntiJoinTupleCollector collector(existence_map.get()); // We probe the hash table to find the keys which have an entry in the // hash table. if (join_key_attributes_.size() == 1) { @@ -680,7 +728,7 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() { probe_block->getIndicesConsistent()); std::unique_ptr probe_accessor_with_filter( - probe_store.createValueAccessor(collector.filter())); + probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); ColumnVectorsValueAccessor temp_result; for (vector>::const_iterator selection_it = selection_.begin(); selection_it != selection_.end(); ++selection_it) { @@ -698,8 +746,19 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() { BlockReference probe_block = storage_manager_->getBlock(block_id_, probe_relation_); const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); - std::unique_ptr probe_accessor(probe_store.createValueAccessor()); + + // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled. + std::unique_ptr existence_map; + std::unique_ptr base_accessor; + if (lip_filter_adaptive_prober_ != nullptr) { + base_accessor.reset(probe_accessor.release()); + existence_map.reset( + lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get())); + probe_accessor.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); + } + MapBasedJoinedTupleCollector collector; // We probe the hash table and get all the matches. Unlike // executeWithoutResidualPredicate(), we have to collect all the matching @@ -719,8 +778,12 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() { &collector); } - // Create a filter for all the tuples from the given probe block. - std::unique_ptr filter(probe_store.getExistenceMap()); + // If the existence map has not been initialized by the pre-filtering LIPFilters. + // Then create it for all the tuples from the given probe block. + if (existence_map == nullptr) { + existence_map.reset(probe_store.getExistenceMap()); + } + for (const std::pair>> &build_block_entry : *collector.getJoinedTuples()) { // First element of the pair build_block_entry is the build block ID @@ -733,7 +796,7 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() { std::unique_ptr build_accessor(build_store.createValueAccessor()); for (const std::pair &hash_match : build_block_entry.second) { - if (!filter->get(hash_match.second)) { + if (!existence_map->get(hash_match.second)) { // We have already seen this tuple, skip it. continue; } @@ -743,9 +806,9 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() { *probe_accessor, probe_relation_id, hash_match.second)) { - // Note that the filter marks a match as false, as needed by the anti - // join definition. - filter->set(hash_match.second, false); + // Note that the existence map marks a match as false, as needed by the + // anti join definition. + existence_map->set(hash_match.second, false); } } } @@ -755,7 +818,7 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() { probe_block->getIndicesConsistent()); std::unique_ptr probe_accessor_with_filter( - probe_store.createValueAccessor(filter.get())); + probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); ColumnVectorsValueAccessor temp_result; for (vector>::const_iterator selection_it = selection_.begin(); selection_it != selection_.end(); @@ -775,9 +838,24 @@ void HashOuterJoinWorkOrder::execute() { const BlockReference probe_block = storage_manager_->getBlock(block_id_, probe_relation_); const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock(); - std::unique_ptr probe_accessor(probe_store.createValueAccessor()); - OuterJoinTupleCollector collector(probe_store); + + // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled. + std::unique_ptr existence_map; + std::unique_ptr base_accessor; + if (lip_filter_adaptive_prober_ != nullptr) { + base_accessor.reset(probe_accessor.release()); + existence_map.reset( + lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get())); + probe_accessor.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); + } + + if (existence_map == nullptr) { + existence_map.reset(probe_store.getExistenceMap()); + } + + OuterJoinTupleCollector collector(existence_map.get()); if (join_key_attributes_.size() == 1) { hash_table_.getAllFromValueAccessorWithExtraWorkForFirstMatch( probe_accessor.get(), @@ -822,11 +900,10 @@ void HashOuterJoinWorkOrder::execute() { probe_block->getIndicesConsistent()); // Populate the output tuples for non-matches. - const TupleIdSequence *filter = collector.filter(); - const TupleIdSequence::size_type num_tuples_without_matches = filter->size(); + const TupleIdSequence::size_type num_tuples_without_matches = existence_map->size(); if (num_tuples_without_matches > 0) { std::unique_ptr probe_accessor_with_filter( - probe_store.createValueAccessor(filter)); + probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map)); ColumnVectorsValueAccessor temp_result; for (std::size_t i = 0; i < selection_.size(); ++i) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/HashJoinOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp index fa393b6..0ed1eeb 100644 --- a/relational_operators/HashJoinOperator.hpp +++ b/relational_operators/HashJoinOperator.hpp @@ -35,6 +35,7 @@ #include "storage/HashTable.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" #include "glog/logging.h" @@ -295,6 +296,7 @@ class HashInnerJoinWorkOrder : public WorkOrder { * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ HashInnerJoinWorkOrder( const std::size_t query_id, @@ -307,7 +309,8 @@ class HashInnerJoinWorkOrder : public WorkOrder { const std::vector> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -318,7 +321,8 @@ class HashInnerJoinWorkOrder : public WorkOrder { selection_(selection), hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} /** * @brief Constructor for the distributed version. @@ -342,6 +346,7 @@ class HashInnerJoinWorkOrder : public WorkOrder { * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ HashInnerJoinWorkOrder( const std::size_t query_id, @@ -354,7 +359,8 @@ class HashInnerJoinWorkOrder : public WorkOrder { const std::vector> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -365,7 +371,8 @@ class HashInnerJoinWorkOrder : public WorkOrder { selection_(selection), hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} ~HashInnerJoinWorkOrder() override {} @@ -392,6 +399,8 @@ class HashInnerJoinWorkOrder : public WorkOrder { InsertDestination *output_destination_; StorageManager *storage_manager_; + std::unique_ptr lip_filter_adaptive_prober_; + DISALLOW_COPY_AND_ASSIGN(HashInnerJoinWorkOrder); }; @@ -423,6 +432,7 @@ class HashSemiJoinWorkOrder : public WorkOrder { * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ HashSemiJoinWorkOrder( const std::size_t query_id, @@ -435,7 +445,8 @@ class HashSemiJoinWorkOrder : public WorkOrder { const std::vector> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -446,7 +457,8 @@ class HashSemiJoinWorkOrder : public WorkOrder { selection_(selection), hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} /** * @brief Constructor for the distributed version. @@ -470,6 +482,7 @@ class HashSemiJoinWorkOrder : public WorkOrder { * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ HashSemiJoinWorkOrder( const std::size_t query_id, @@ -482,7 +495,8 @@ class HashSemiJoinWorkOrder : public WorkOrder { const std::vector> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -493,7 +507,8 @@ class HashSemiJoinWorkOrder : public WorkOrder { selection_(selection), hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} ~HashSemiJoinWorkOrder() override {} @@ -516,6 +531,8 @@ class HashSemiJoinWorkOrder : public WorkOrder { InsertDestination *output_destination_; StorageManager *storage_manager_; + std::unique_ptr lip_filter_adaptive_prober_; + DISALLOW_COPY_AND_ASSIGN(HashSemiJoinWorkOrder); }; @@ -547,6 +564,7 @@ class HashAntiJoinWorkOrder : public WorkOrder { * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ HashAntiJoinWorkOrder( const std::size_t query_id, @@ -559,7 +577,8 @@ class HashAntiJoinWorkOrder : public WorkOrder { const std::vector> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -570,7 +589,8 @@ class HashAntiJoinWorkOrder : public WorkOrder { selection_(selection), hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} /** * @brief Constructor for the distributed version. @@ -594,6 +614,7 @@ class HashAntiJoinWorkOrder : public WorkOrder { * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ HashAntiJoinWorkOrder( const std::size_t query_id, @@ -606,7 +627,8 @@ class HashAntiJoinWorkOrder : public WorkOrder { const std::vector> &selection, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -617,7 +639,8 @@ class HashAntiJoinWorkOrder : public WorkOrder { selection_(selection), hash_table_(hash_table), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} ~HashAntiJoinWorkOrder() override {} @@ -646,6 +669,8 @@ class HashAntiJoinWorkOrder : public WorkOrder { InsertDestination *output_destination_; StorageManager *storage_manager_; + std::unique_ptr lip_filter_adaptive_prober_; + DISALLOW_COPY_AND_ASSIGN(HashAntiJoinWorkOrder); }; @@ -675,6 +700,7 @@ class HashOuterJoinWorkOrder : public WorkOrder { * @param hash_table The JoinHashTable to use. * @param output_destination The InsertDestination to insert the join results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ HashOuterJoinWorkOrder( const std::size_t query_id, @@ -687,7 +713,8 @@ class HashOuterJoinWorkOrder : public WorkOrder { const std::vector &is_selection_on_build, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -698,7 +725,8 @@ class HashOuterJoinWorkOrder : public WorkOrder { is_selection_on_build_(is_selection_on_build), hash_table_(hash_table), output_destination_(output_destination), - storage_manager_(storage_manager) {} + storage_manager_(storage_manager), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} /** * @brief Constructor for the distributed version. @@ -733,7 +761,8 @@ class HashOuterJoinWorkOrder : public WorkOrder { std::vector &&is_selection_on_build, const JoinHashTable &hash_table, InsertDestination *output_destination, - StorageManager *storage_manager) + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober) : WorkOrder(query_id), build_relation_(build_relation), probe_relation_(probe_relation), @@ -744,7 +773,8 @@ class HashOuterJoinWorkOrder : public WorkOrder { is_selection_on_build_(std::move(is_selection_on_build)), hash_table_(hash_table), output_destination_(output_destination), - storage_manager_(storage_manager) {} + storage_manager_(storage_manager), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) {} ~HashOuterJoinWorkOrder() override {} @@ -763,6 +793,8 @@ class HashOuterJoinWorkOrder : public WorkOrder { InsertDestination *output_destination_; StorageManager *storage_manager_; + std::unique_ptr lip_filter_adaptive_prober_; + DISALLOW_COPY_AND_ASSIGN(HashOuterJoinWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/SelectOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp index d56326e..236ee7c 100644 --- a/relational_operators/SelectOperator.cpp +++ b/relational_operators/SelectOperator.cpp @@ -30,6 +30,10 @@ #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" +#include "storage/TupleIdSequence.hpp" +#include "storage/ValueAccessor.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" +#include "utility/lip_filter/LIPFilterUtil.hpp" #include "glog/logging.h" @@ -40,22 +44,26 @@ namespace quickstep { class Predicate; void SelectOperator::addWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, StorageManager *storage_manager, const Predicate *predicate, const std::vector> *selection, InsertDestination *output_destination) { if (input_relation_is_stored_) { for (const block_id input_block_id : input_relation_block_ids_) { - container->addNormalWorkOrder(new SelectWorkOrder(query_id_, - input_relation_, - input_block_id, - predicate, - simple_projection_, - simple_selection_, - selection, - output_destination, - storage_manager), - op_index_); + container->addNormalWorkOrder( + new SelectWorkOrder( + query_id_, + input_relation_, + input_block_id, + predicate, + simple_projection_, + simple_selection_, + selection, + output_destination, + storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)), + op_index_); } } else { while (num_workorders_generated_ < input_relation_block_ids_.size()) { @@ -69,7 +77,8 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container, simple_selection_, selection, output_destination, - storage_manager), + storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)), op_index_); ++num_workorders_generated_; } @@ -78,6 +87,7 @@ void SelectOperator::addWorkOrders(WorkOrdersContainer *container, #ifdef QUICKSTEP_HAVE_LIBNUMA void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, StorageManager *storage_manager, const Predicate *predicate, const std::vector> *selection, @@ -99,6 +109,7 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container, selection, output_destination, storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), placement_scheme_->getNUMANodeForBlock(input_block_id)), op_index_); } @@ -120,6 +131,7 @@ void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container, selection, output_destination, storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), placement_scheme_->getNUMANodeForBlock(block_in_partition)), op_index_); ++num_workorders_generated_in_partition_[part_id]; @@ -151,11 +163,21 @@ bool SelectOperator::getAllWorkOrders( if (input_relation_.hasPartitionScheme()) { #ifdef QUICKSTEP_HAVE_LIBNUMA if (input_relation_.hasNUMAPlacementScheme()) { - addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination); + addPartitionAwareWorkOrders(container, + query_context, + storage_manager, + predicate, + selection, + output_destination); } #endif } else { - addWorkOrders(container, storage_manager, predicate, selection, output_destination); + addWorkOrders(container, + query_context, + storage_manager, + predicate, + selection, + output_destination); } started_ = true; } @@ -164,11 +186,21 @@ bool SelectOperator::getAllWorkOrders( if (input_relation_.hasPartitionScheme()) { #ifdef QUICKSTEP_HAVE_LIBNUMA if (input_relation_.hasNUMAPlacementScheme()) { - addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination); + addPartitionAwareWorkOrders(container, + query_context, + storage_manager, + predicate, + selection, + output_destination); } #endif } else { - addWorkOrders(container, storage_manager, predicate, selection, output_destination); + addWorkOrders(container, + query_context, + storage_manager, + predicate, + selection, + output_destination); } return done_feeding_input_relation_; } @@ -210,22 +242,41 @@ serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id bl } } proto->SetExtension(serialization::SelectWorkOrder::selection_index, selection_index_); + proto->SetExtension(serialization::SelectWorkOrder::lip_deployment_index, lip_deployment_index_); return proto; } - void SelectWorkOrder::execute() { BlockReference block( storage_manager_->getBlock(input_block_id_, input_relation_, getPreferredNUMANodes()[0])); + // NOTE(jianqiao): For SSB and TPCH queries, experiments show that it is almost + // always better to apply the predicate BEFORE the LIPFilters. But as a future + // work this ordering may even be adaptive. + std::unique_ptr predicate_matches; + if (predicate_ != nullptr) { + predicate_matches.reset( + block->getMatchesForPredicate(predicate_)); + } + + std::unique_ptr matches; + if (lip_filter_adaptive_prober_ != nullptr) { + std::unique_ptr accessor( + block->getTupleStorageSubBlock().createValueAccessor(predicate_matches.get())); + matches.reset( + lip_filter_adaptive_prober_->filterValueAccessor(accessor.get())); + } else { + matches.reset(predicate_matches.release()); + } + if (simple_projection_) { block->selectSimple(simple_selection_, - predicate_, + matches.get(), output_destination_); } else { block->select(*DCHECK_NOTNULL(selection_), - predicate_, + matches.get(), output_destination_); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/SelectOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp index 0f5c712..2ace458 100644 --- a/relational_operators/SelectOperator.hpp +++ b/relational_operators/SelectOperator.hpp @@ -38,6 +38,7 @@ #include "relational_operators/WorkOrder.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" #include "glog/logging.h" @@ -49,6 +50,7 @@ namespace quickstep { class CatalogRelationSchema; class InsertDestination; +class LIPFilterDeployment; class Predicate; class Scalar; class StorageManager; @@ -247,12 +249,14 @@ class SelectOperator : public RelationalOperator { } void addWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, StorageManager *storage_manager, const Predicate *predicate, const std::vector> *selection, InsertDestination *output_destination); void addPartitionAwareWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, StorageManager *storage_manager, const Predicate *predicate, const std::vector> *selection, @@ -318,6 +322,7 @@ class SelectWorkOrder : public WorkOrder { * @param output_destination The InsertDestination to insert the selection * results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ SelectWorkOrder(const std::size_t query_id, const CatalogRelationSchema &input_relation, @@ -328,6 +333,7 @@ class SelectWorkOrder : public WorkOrder { const std::vector> *selection, InsertDestination *output_destination, StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr, const numa_node_id numa_node = 0) : WorkOrder(query_id), input_relation_(input_relation), @@ -337,7 +343,8 @@ class SelectWorkOrder : public WorkOrder { simple_selection_(simple_selection), selection_(selection), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) { + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) { preferred_numa_nodes_.push_back(numa_node); } @@ -360,6 +367,7 @@ class SelectWorkOrder : public WorkOrder { * @param output_destination The InsertDestination to insert the selection * results. * @param storage_manager The StorageManager to use. + * @param lip_filter_adaptive_prober The attached LIP filter prober. **/ SelectWorkOrder(const std::size_t query_id, const CatalogRelationSchema &input_relation, @@ -370,6 +378,7 @@ class SelectWorkOrder : public WorkOrder { const std::vector> *selection, InsertDestination *output_destination, StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr, const numa_node_id numa_node = 0) : WorkOrder(query_id), input_relation_(input_relation), @@ -379,7 +388,8 @@ class SelectWorkOrder : public WorkOrder { simple_selection_(std::move(simple_selection)), selection_(selection), output_destination_(DCHECK_NOTNULL(output_destination)), - storage_manager_(DCHECK_NOTNULL(storage_manager)) { + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober) { preferred_numa_nodes_.push_back(numa_node); } @@ -407,6 +417,8 @@ class SelectWorkOrder : public WorkOrder { InsertDestination *output_destination_; StorageManager *storage_manager_; + std::unique_ptr lip_filter_adaptive_prober_; + DISALLOW_COPY_AND_ASSIGN(SelectWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index 3eed379..86f34b8 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -59,6 +59,7 @@ message AggregationWorkOrder { // All required. optional uint32 aggr_state_index = 16; optional fixed64 block_id = 17; + optional int32 lip_deployment_index = 18; } } @@ -70,6 +71,7 @@ message BuildHashWorkOrder { optional bool any_join_key_attributes_nullable = 34; optional uint32 join_hash_table_index = 35; optional fixed64 block_id = 36; + optional int32 lip_deployment_index = 37; } } @@ -131,6 +133,8 @@ message HashJoinWorkOrder { optional int32 residual_predicate_index = 169; // Used by HashOuterJoinWorkOrder only. repeated bool is_selection_on_build = 170; + + optional int32 lip_deployment_index = 171; } } @@ -185,9 +189,10 @@ message SelectWorkOrder { // When 'simple_projection' is true. repeated int32 simple_selection = 245; - // Otherwise. optional int32 selection_index = 246; + + optional int32 lip_deployment_index = 247; } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index 2356bab..91a717f 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -50,6 +50,7 @@ #include "relational_operators/WindowAggregationOperator.hpp" #include "relational_operators/WorkOrder.pb.h" #include "storage/StorageBlockInfo.hpp" +#include "utility/lip_filter/LIPFilterUtil.hpp" #include "glog/logging.h" @@ -61,6 +62,7 @@ using std::vector; namespace quickstep { class InsertDestination; +class LIPFilterAdaptiveProber; class Predicate; class Scalar; @@ -82,7 +84,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.query_id(), proto.GetExtension(serialization::AggregationWorkOrder::block_id), query_context->getAggregationState( - proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index))); + proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)), + CreateLIPFilterAdaptiveProberHelper( + proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context)); } case serialization::BUILD_HASH: { LOG(INFO) << "Creating BuildHashWorkOrder"; @@ -101,7 +105,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::BuildHashWorkOrder::block_id), query_context->getJoinHashTable( proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index)), - storage_manager); + storage_manager, + CreateLIPFilterBuilderHelper( + proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index), query_context)); } case serialization::DELETE: { LOG(INFO) << "Creating DeleteWorkOrder"; @@ -200,6 +206,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder InsertDestination *output_destination = query_context->getInsertDestination( proto.GetExtension(serialization::HashJoinWorkOrder::insert_destination_index)); + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = + CreateLIPFilterAdaptiveProberHelper( + proto.GetExtension(serialization::HashJoinWorkOrder::lip_deployment_index), query_context); switch (hash_join_work_order_type) { case serialization::HashJoinWorkOrder::HASH_ANTI_JOIN: { @@ -215,7 +224,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder selection, hash_table, output_destination, - storage_manager); + storage_manager, + lip_filter_adaptive_prober); } case serialization::HashJoinWorkOrder::HASH_INNER_JOIN: { LOG(INFO) << "Creating HashInnerJoinWorkOrder"; @@ -230,7 +240,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder selection, hash_table, output_destination, - storage_manager); + storage_manager, + lip_filter_adaptive_prober); } case serialization::HashJoinWorkOrder::HASH_OUTER_JOIN: { vector is_selection_on_build; @@ -253,7 +264,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder move(is_selection_on_build), hash_table, output_destination, - storage_manager); + storage_manager, + lip_filter_adaptive_prober); } case serialization::HashJoinWorkOrder::HASH_SEMI_JOIN: { LOG(INFO) << "Creating HashSemiJoinWorkOrder"; @@ -268,7 +280,8 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder selection, hash_table, output_destination, - storage_manager); + storage_manager, + lip_filter_adaptive_prober); } default: LOG(FATAL) << "Unknown HashJoinWorkOrder Type in WorkOrderFactory::ReconstructFromProto"; @@ -346,7 +359,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder proto.GetExtension(serialization::SelectWorkOrder::selection_index)), query_context->getInsertDestination( proto.GetExtension(serialization::SelectWorkOrder::insert_destination_index)), - storage_manager); + storage_manager, + CreateLIPFilterAdaptiveProberHelper( + proto.GetExtension(serialization::HashJoinWorkOrder::lip_deployment_index), query_context)); } case serialization::SORT_MERGE_RUN: { LOG(INFO) << "Creating SortMergeRunWorkOrder"; @@ -459,6 +474,17 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, switch (proto.work_order_type()) { case serialization::AGGREGATION: { + if (!proto.HasExtension(serialization::AggregationWorkOrder::lip_deployment_index)) { + return false; + } else { + const QueryContext::lip_deployment_id lip_deployment_index = + proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index); + if (lip_deployment_index != QueryContext::kInvalidLIPDeploymentId && + !query_context.isValidLIPDeploymentId(lip_deployment_index)) { + return false; + } + } + return proto.HasExtension(serialization::AggregationWorkOrder::block_id) && proto.HasExtension(serialization::AggregationWorkOrder::aggr_state_index) && query_context.isValidAggregationStateId( @@ -482,6 +508,17 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, } } + if (!proto.HasExtension(serialization::BuildHashWorkOrder::lip_deployment_index)) { + return false; + } else { + const QueryContext::lip_deployment_id lip_deployment_index = + proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index); + if (lip_deployment_index != QueryContext::kInvalidLIPDeploymentId && + !query_context.isValidLIPDeploymentId(lip_deployment_index)) { + return false; + } + } + return proto.HasExtension(serialization::BuildHashWorkOrder::any_join_key_attributes_nullable) && proto.HasExtension(serialization::BuildHashWorkOrder::block_id) && proto.HasExtension(serialization::BuildHashWorkOrder::join_hash_table_index) && @@ -556,6 +593,17 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, } } + if (!proto.HasExtension(serialization::HashJoinWorkOrder::lip_deployment_index)) { + return false; + } else { + const QueryContext::lip_deployment_id lip_deployment_index = + proto.GetExtension(serialization::HashJoinWorkOrder::lip_deployment_index); + if (lip_deployment_index != QueryContext::kInvalidLIPDeploymentId && + !query_context.isValidLIPDeploymentId(lip_deployment_index)) { + return false; + } + } + if (hash_join_work_order_type == serialization::HashJoinWorkOrder::HASH_OUTER_JOIN) { if (!proto.HasExtension(serialization::HashJoinWorkOrder::is_selection_on_build)) { return false; @@ -655,6 +703,17 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, return false; } + if (!proto.HasExtension(serialization::SelectWorkOrder::lip_deployment_index)) { + return false; + } else { + const QueryContext::lip_deployment_id lip_deployment_index = + proto.GetExtension(serialization::SelectWorkOrder::lip_deployment_index); + if (lip_deployment_index != QueryContext::kInvalidLIPDeploymentId && + !query_context.isValidLIPDeploymentId(lip_deployment_index)) { + return false; + } + } + return proto.HasExtension(serialization::SelectWorkOrder::insert_destination_index) && query_context.isValidInsertDestinationId( proto.GetExtension(serialization::SelectWorkOrder::insert_destination_index)) && http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 249026d..eb7ca79 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -46,10 +46,13 @@ #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" +#include "storage/TupleIdSequence.hpp" +#include "storage/ValueAccessor.hpp" #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" #include "types/containers/Tuple.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" #include "glog/logging.h" @@ -331,11 +334,12 @@ bool AggregationOperationState::ProtoIsValid( return true; } -void AggregationOperationState::aggregateBlock(const block_id input_block) { +void AggregationOperationState::aggregateBlock(const block_id input_block, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober) { if (group_by_list_.empty()) { aggregateBlockSingleState(input_block); } else { - aggregateBlockHashTable(input_block); + aggregateBlockHashTable(input_block, lip_filter_adaptive_prober); } } @@ -367,10 +371,13 @@ void AggregationOperationState::aggregateBlockSingleState( BlockReference block( storage_manager_->getBlock(input_block, input_relation_)); - // If there is a filter predicate, 'reuse_matches' holds the set of matching - // tuples so that it can be reused across multiple aggregates (i.e. we only - // pay the cost of evaluating the predicate once). - std::unique_ptr reuse_matches; + std::unique_ptr matches; + if (predicate_ != nullptr) { + std::unique_ptr accessor( + block->getTupleStorageSubBlock().createValueAccessor()); + matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get())); + } + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { const std::vector *local_arguments_as_attributes = nullptr; #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION @@ -387,9 +394,8 @@ void AggregationOperationState::aggregateBlockSingleState( arguments_[agg_idx], local_arguments_as_attributes, {}, /* group_by */ - predicate_.get(), + matches.get(), distinctify_hashtables_[agg_idx].get(), - &reuse_matches, nullptr /* reuse_group_by_vectors */); local_state.emplace_back(nullptr); } else { @@ -397,8 +403,7 @@ void AggregationOperationState::aggregateBlockSingleState( local_state.emplace_back(block->aggregate(*handles_[agg_idx], arguments_[agg_idx], local_arguments_as_attributes, - predicate_.get(), - &reuse_matches)); + matches.get())); } } @@ -407,14 +412,22 @@ void AggregationOperationState::aggregateBlockSingleState( } void AggregationOperationState::aggregateBlockHashTable( - const block_id input_block) { + const block_id input_block, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober) { BlockReference block( storage_manager_->getBlock(input_block, input_relation_)); - // If there is a filter predicate, 'reuse_matches' holds the set of matching - // tuples so that it can be reused across multiple aggregates (i.e. we only - // pay the cost of evaluating the predicate once). - std::unique_ptr reuse_matches; + // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence + // as the existence map for the tuples. + std::unique_ptr matches; + if (predicate_ != nullptr) { + matches.reset(block->getMatchesForPredicate(predicate_.get())); + } + if (lip_filter_adaptive_prober != nullptr) { + std::unique_ptr accessor( + block->getTupleStorageSubBlock().createValueAccessor(matches.get())); + matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get())); + } // This holds values of all the GROUP BY attributes so that the can be reused // across multiple aggregates (i.e. we only pay the cost of evaluatin the @@ -431,9 +444,8 @@ void AggregationOperationState::aggregateBlockHashTable( arguments_[agg_idx], nullptr, /* arguments_as_attributes */ group_by_list_, - predicate_.get(), + matches.get(), distinctify_hashtables_[agg_idx].get(), - &reuse_matches, &reuse_group_by_vectors); } } @@ -447,9 +459,8 @@ void AggregationOperationState::aggregateBlockHashTable( DCHECK(agg_hash_table != nullptr); block->aggregateGroupBy(arguments_, group_by_list_, - predicate_.get(), + matches.get(), agg_hash_table, - &reuse_matches, &reuse_group_by_vectors); group_by_hashtable_pool_->returnHashTable(agg_hash_table); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 3b0f286..c251983 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -41,6 +41,7 @@ class AggregateFunction; class CatalogDatabaseLite; class CatalogRelationSchema; class InsertDestination; +class LIPFilterAdaptiveProber; class StorageManager; /** \addtogroup Storage @@ -155,8 +156,11 @@ class AggregationOperationState { * * @param input_block The block ID of the storage block where the aggreates * are going to be computed. + * @param lip_filter_adaptive_prober The LIPFilter prober for pre-filtering + * the block. **/ - void aggregateBlock(const block_id input_block); + void aggregateBlock(const block_id input_block, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr); /** * @brief Generate the final results for the aggregates managed by this @@ -185,7 +189,8 @@ class AggregationOperationState { // Aggregate on input block. void aggregateBlockSingleState(const block_id input_block); - void aggregateBlockHashTable(const block_id input_block); + void aggregateBlockHashTable(const block_id input_block, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober); void finalizeSingleState(InsertDestination *output_destination); void finalizeHashTable(InsertDestination *output_destination); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/393eba55/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index 325a7cb..0e32cc1 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -292,11 +292,14 @@ target_link_libraries(quickstep_storage_AggregationOperationState quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager + quickstep_storage_TupleIdSequence + quickstep_storage_ValueAccessor quickstep_types_TypedValue quickstep_types_containers_ColumnVector quickstep_types_containers_ColumnVectorsValueAccessor quickstep_types_containers_Tuple - quickstep_utility_Macros) + quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilterAdaptiveProber) target_link_libraries(quickstep_storage_AggregationOperationState_proto quickstep_expressions_Expressions_proto quickstep_expressions_aggregation_AggregateFunction_proto