Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 449D1200BB9 for ; Mon, 7 Nov 2016 22:56:35 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 43035160AF9; Mon, 7 Nov 2016 21:56:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B451A160AE0 for ; Mon, 7 Nov 2016 22:56:32 +0100 (CET) Received: (qmail 19256 invoked by uid 500); 7 Nov 2016 21:56:31 -0000 Mailing-List: contact commits-help@quickstep.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@quickstep.incubator.apache.org Delivered-To: mailing list commits@quickstep.incubator.apache.org Received: (qmail 19247 invoked by uid 99); 7 Nov 2016 21:56:31 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Nov 2016 21:56:31 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id E4D3DC0AD8 for ; Mon, 7 Nov 2016 21:56:30 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Z-97gWeTsDlw for ; Mon, 7 Nov 2016 21:56:27 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 556835F254 for ; Mon, 7 Nov 2016 21:56:25 +0000 (UTC) Received: (qmail 18795 invoked by uid 99); 7 Nov 2016 21:56:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Nov 2016 21:56:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 157F4F16B8; Mon, 7 Nov 2016 21:56:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianqiao@apache.org To: commits@quickstep.incubator.apache.org Date: Mon, 07 Nov 2016 21:56:30 -0000 Message-Id: <60d281d512a94483b143bde787994910@git.apache.org> In-Reply-To: <2c6c242189d648ff8620530b38031137@git.apache.org> References: <2c6c242189d648ff8620530b38031137@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [8/9] incubator-quickstep git commit: Support for BitVectorExactFilter. archived-at: Mon, 07 Nov 2016 21:56:35 -0000 http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/query_optimizer/rules/InjectJoinFilters.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/InjectJoinFilters.cpp b/query_optimizer/rules/InjectJoinFilters.cpp new file mode 100644 index 0000000..2fcff3e --- /dev/null +++ b/query_optimizer/rules/InjectJoinFilters.cpp @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "query_optimizer/rules/InjectJoinFilters.hpp" + +#include +#include +#include + +#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" +#include "query_optimizer/expressions/AttributeReference.hpp" +#include "query_optimizer/expressions/ExpressionUtil.hpp" +#include "query_optimizer/physical/LIPFilterConfiguration.hpp" +#include "query_optimizer/physical/Aggregate.hpp" +#include "query_optimizer/physical/FilterInjection.hpp" +#include "query_optimizer/physical/HashJoin.hpp" +#include "query_optimizer/physical/PatternMatcher.hpp" +#include "query_optimizer/physical/Physical.hpp" +#include "query_optimizer/physical/PhysicalType.hpp" +#include "query_optimizer/physical/Selection.hpp" +#include "query_optimizer/physical/TopLevelPlan.hpp" +#include "query_optimizer/rules/PruneColumns.hpp" +#include "types/TypeID.hpp" +#include "types/TypedValue.hpp" +#include "utility/lip_filter/LIPFilter.hpp" + +#include "glog/logging.h" + +namespace quickstep { +namespace optimizer { + +namespace E = ::quickstep::optimizer::expressions; +namespace P = ::quickstep::optimizer::physical; + +P::PhysicalPtr InjectJoinFilters::apply(const P::PhysicalPtr &input) { + DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan); + + const P::TopLevelPlanPtr top_level_plan = + std::static_pointer_cast(input); + cost_model_.reset( + new cost::StarSchemaSimpleCostModel( + top_level_plan->shared_subplans())); + lip_filter_configuration_.reset(new P::LIPFilterConfiguration()); + + P::PhysicalPtr output = transformHashJoinToFilters(input); + output = pushDownFilters(output); + output = addFilterAnchors(output, false); + output = PruneColumns().apply(output); + + concretizeAsLIPFilters(output, nullptr); + + if (!lip_filter_configuration_->getBuildInfoMap().empty() || + !lip_filter_configuration_->getProbeInfoMap().empty()) { + output = std::static_pointer_cast(output) + ->copyWithLIPFilterConfiguration( + P::LIPFilterConfigurationPtr(lip_filter_configuration_.release())); + } + + return output; +} + +bool InjectJoinFilters::isTransformable( + const physical::HashJoinPtr &hash_join) const { + if (hash_join->residual_predicate() != nullptr) { + return false; + } + if (hash_join->right_join_attributes().size() > 1) { + return false; + } + if (!E::SubsetOfExpressions(hash_join->getOutputAttributes(), + hash_join->left()->getOutputAttributes())) { + return false; + } + switch (hash_join->join_type()) { + case P::HashJoin::JoinType::kInnerJoin: { + if (!cost_model_->impliesUniqueAttributes(hash_join->right(), + hash_join->right_join_attributes())) { + return false; + } + break; + } + case P::HashJoin::JoinType::kLeftSemiJoin: // Fall through + case P::HashJoin::JoinType::kLeftAntiJoin: + break; + default: + return false; + } + + std::int64_t min_cpp_value; + std::int64_t max_cpp_value; + const bool has_min_max_stats = + findMinMaxValuesForAttributeHelper(hash_join->right(), + hash_join->right_join_attributes().front(), + &min_cpp_value, + &max_cpp_value); + if (!has_min_max_stats) { + return false; + } + + // TODO(jianqiao): implement SimpleHashSetExactFilter to relax this requirement. + // 1G bits = 128MB + if (min_cpp_value < 0 || max_cpp_value > kMaxFilterSize) { + return false; + } + + return true; +} + +P::PhysicalPtr InjectJoinFilters::transformHashJoinToFilters( + const P::PhysicalPtr &input) const { + std::vector new_children; + bool has_changed_children = false; + for (const P::PhysicalPtr &child : input->children()) { + const P::PhysicalPtr new_child = transformHashJoinToFilters(child); + if (child != new_child && !has_changed_children) { + has_changed_children = true; + } + new_children.push_back(new_child); + } + + P::HashJoinPtr hash_join; + if (P::SomeHashJoin::MatchesWithConditionalCast(input, &hash_join) && + isTransformable(hash_join)) { + const bool is_anti_join = + hash_join->join_type() == P::HashJoin::JoinType::kLeftAntiJoin; + + P::PhysicalPtr build_child = new_children[1]; + E::PredicatePtr build_side_filter_predicate = nullptr; + P::SelectionPtr selection; + if (P::SomeSelection::MatchesWithConditionalCast(build_child, &selection) && + E::SubsetOfExpressions(hash_join->right_join_attributes(), + selection->input()->getOutputAttributes())) { + build_child = selection->input(); + build_side_filter_predicate = selection->filter_predicate(); + } + + return P::FilterInjection::Create(new_children[0], + build_child, + hash_join->left_join_attributes(), + hash_join->right_join_attributes(), + hash_join->project_expressions(), + build_side_filter_predicate, + is_anti_join); + } + + if (has_changed_children) { + return input->copyWithNewChildren(new_children); + } else { + return input; + } +} + +physical::PhysicalPtr InjectJoinFilters::pushDownFilters( + const physical::PhysicalPtr &input) const { + std::vector new_children; + bool has_changed_children = false; + for (const P::PhysicalPtr &child : input->children()) { + const P::PhysicalPtr new_child = pushDownFilters(child); + if (child != new_child && !has_changed_children) { + has_changed_children = true; + } + new_children.push_back(new_child); + } + + P::FilterInjectionPtr filter_injection; + if (P::SomeFilterInjection::MatchesWithConditionalCast(input, &filter_injection)) { + DCHECK_EQ(2u, new_children.size()); + return pushDownFiltersInternal( + new_children[0], new_children[1], filter_injection); + } + + if (has_changed_children) { + return input->copyWithNewChildren(new_children); + } else { + return input; + } +} + +physical::PhysicalPtr InjectJoinFilters::pushDownFiltersInternal( + const physical::PhysicalPtr &probe_child, + const physical::PhysicalPtr &build_child, + const physical::FilterInjectionPtr &filter_injection) const { + switch (probe_child->getPhysicalType()) { + case P::PhysicalType::kAggregate: // Fall through + case P::PhysicalType::kHashJoin: + case P::PhysicalType::kSample: + case P::PhysicalType::kSelection: + case P::PhysicalType::kSort: + case P::PhysicalType::kWindowAggregate: { + DCHECK_GE(probe_child->getNumChildren(), 1u); + const P::PhysicalPtr child = probe_child->children()[0]; + if (E::SubsetOfExpressions(filter_injection->probe_attributes(), + child->getOutputAttributes())) { + const P::PhysicalPtr new_child = + pushDownFiltersInternal(child, build_child, filter_injection); + if (new_child != child) { + std::vector new_children = probe_child->children(); + new_children[0] = new_child; + return probe_child->copyWithNewChildren(new_children); + } + } + } + default: + break; + } + + if (probe_child != filter_injection->left()) { + // TODO(jianqiao): may need to update probe_attributes. + return P::FilterInjection::Create(probe_child, + build_child, + filter_injection->probe_attributes(), + filter_injection->build_attributes(), + E::ToNamedExpressions(probe_child->getOutputAttributes()), + filter_injection->build_side_filter_predicate(), + filter_injection->is_anti_filter()); + } else { + return filter_injection; + } +} + + +physical::PhysicalPtr InjectJoinFilters::addFilterAnchors( + const physical::PhysicalPtr &input, + const bool ancestor_can_anchor_filter) const { + std::vector new_children; + + switch (input->getPhysicalType()) { + case P::PhysicalType::kAggregate: { + const P::AggregatePtr &aggregate = + std::static_pointer_cast(input); + new_children.emplace_back( + addFilterAnchors(aggregate->input(), true)); + break; + } + case P::PhysicalType::kSelection: { + const P::SelectionPtr &selection = + std::static_pointer_cast(input); + new_children.emplace_back( + addFilterAnchors(selection->input(), true)); + break; + } +// case P::PhysicalType::kHashJoin: { +// const P::HashJoinPtr &hash_join = +// std::static_pointer_cast(input); +// new_children.emplace_back( +// addFilterAnchors(hash_join->left(), true)); +// new_children.emplace_back( +// addFilterAnchors(hash_join->right(), false)); +// break; +// } + case P::PhysicalType::kFilterInjection: { + const P::FilterInjectionPtr &filter_injection = + std::static_pointer_cast(input); + new_children.emplace_back( + addFilterAnchors(filter_injection->left(), true)); + new_children.emplace_back( + addFilterAnchors(filter_injection->right(), true)); + break; + } + default: { + for (const P::PhysicalPtr &child : input->children()) { + new_children.emplace_back(addFilterAnchors(child, false)); + } + } + } + + DCHECK_EQ(new_children.size(), input->children().size()); + const P::PhysicalPtr output_with_new_children = + new_children == input->children() + ? input + : input->copyWithNewChildren(new_children); + + if (input->getPhysicalType() == P::PhysicalType::kFilterInjection && + !ancestor_can_anchor_filter) { + const P::FilterInjectionPtr &filter_injection = + std::static_pointer_cast(output_with_new_children); + return P::Selection::Create(filter_injection, + filter_injection->project_expressions(), + nullptr); + } else { + return output_with_new_children; + } +} + +void InjectJoinFilters::concretizeAsLIPFilters( + const P::PhysicalPtr &input, + const P::PhysicalPtr &anchor_node) const { + switch (input->getPhysicalType()) { + case P::PhysicalType::kAggregate: { + const P::AggregatePtr &aggregate = + std::static_pointer_cast(input); + concretizeAsLIPFilters(aggregate->input(), aggregate); + break; + } + case P::PhysicalType::kSelection: { + const P::SelectionPtr &selection = + std::static_pointer_cast(input); + concretizeAsLIPFilters(selection->input(), selection); + break; + } +// case P::PhysicalType::kHashJoin: { +// const P::HashJoinPtr &hash_join = +// std::static_pointer_cast(input); +// concretizeAsLIPFilters(hash_join->left(), hash_join); +// concretizeAsLIPFilters(hash_join->right(), nullptr); +// break; +// } + case P::PhysicalType::kFilterInjection: { + const P::FilterInjectionPtr &filter_injection = + std::static_pointer_cast(input); + DCHECK_EQ(1u, filter_injection->build_attributes().size()); + const E::AttributeReferencePtr &build_attr = + filter_injection->build_attributes().front(); + + std::int64_t min_cpp_value; + std::int64_t max_cpp_value; + const bool has_min_max_stats = + findMinMaxValuesForAttributeHelper(filter_injection, + build_attr, + &min_cpp_value, + &max_cpp_value); + DCHECK(has_min_max_stats); + DCHECK_GE(min_cpp_value, 0); + DCHECK_GE(max_cpp_value, 0); + DCHECK_LE(max_cpp_value, kMaxFilterSize); + CHECK(anchor_node != nullptr); + + lip_filter_configuration_->addBuildInfo( + build_attr, + filter_injection, + static_cast(max_cpp_value), + LIPFilterType::kBitVectorExactFilter, + filter_injection->is_anti_filter()); + lip_filter_configuration_->addProbeInfo( + filter_injection->probe_attributes().front(), + anchor_node, + build_attr, + filter_injection); + + concretizeAsLIPFilters(filter_injection->left(), anchor_node); + concretizeAsLIPFilters(filter_injection->right(), filter_injection); + break; + } + default: { + for (const P::PhysicalPtr &child : input->children()) { + concretizeAsLIPFilters(child, nullptr); + } + } + } +} + +bool InjectJoinFilters::findMinMaxValuesForAttributeHelper( + const physical::PhysicalPtr &physical_plan, + const expressions::AttributeReferencePtr &attribute, + std::int64_t *min_cpp_value, + std::int64_t *max_cpp_value) const { + const TypedValue min_value = + cost_model_->findMinValueStat(physical_plan, attribute); + const TypedValue max_value = + cost_model_->findMaxValueStat(physical_plan, attribute); + if (min_value.isNull() || max_value.isNull()) { + return false; + } + + switch (attribute->getValueType().getTypeID()) { + case TypeID::kInt: { + *min_cpp_value = min_value.getLiteral(); + *max_cpp_value = max_value.getLiteral(); + return true; + } + case TypeID::kLong: { + *min_cpp_value = min_value.getLiteral(); + *max_cpp_value = max_value.getLiteral(); + return true; + } + default: + return false; + } +} + + +} // namespace optimizer +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/query_optimizer/rules/InjectJoinFilters.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/InjectJoinFilters.hpp b/query_optimizer/rules/InjectJoinFilters.hpp new file mode 100644 index 0000000..6c413d9 --- /dev/null +++ b/query_optimizer/rules/InjectJoinFilters.hpp @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_ +#define QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_ + +#include +#include +#include +#include + +#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" +#include "query_optimizer/expressions/AttributeReference.hpp" +#include "query_optimizer/expressions/ExprId.hpp" +#include "query_optimizer/physical/LIPFilterConfiguration.hpp" +#include "query_optimizer/physical/FilterInjection.hpp" +#include "query_optimizer/physical/HashJoin.hpp" +#include "query_optimizer/physical/Physical.hpp" +#include "query_optimizer/rules/Rule.hpp" +#include "utility/Macros.hpp" + +namespace quickstep { +namespace optimizer { + +/** \addtogroup OptimizerRules + * @{ + */ + +class InjectJoinFilters : public Rule { + public: + /** + * @brief Constructor. + */ + InjectJoinFilters() {} + + ~InjectJoinFilters() override {} + + std::string getName() const override { + return "TransformFilterJoins"; + } + + physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override; + + private: + bool isTransformable(const physical::HashJoinPtr &hash_join) const; + + physical::PhysicalPtr transformHashJoinToFilters( + const physical::PhysicalPtr &input) const; + + physical::PhysicalPtr pushDownFilters(const physical::PhysicalPtr &input) const; + + physical::PhysicalPtr addFilterAnchors(const physical::PhysicalPtr &input, + const bool ancestor_can_anchor_filter) const; + + void concretizeAsLIPFilters(const physical::PhysicalPtr &input, + const physical::PhysicalPtr &anchor_node) const; + + physical::PhysicalPtr pushDownFiltersInternal( + const physical::PhysicalPtr &probe_child, + const physical::PhysicalPtr &build_child, + const physical::FilterInjectionPtr &filter_injection) const; + + bool findMinMaxValuesForAttributeHelper( + const physical::PhysicalPtr &physical_plan, + const expressions::AttributeReferencePtr &attribute, + std::int64_t *min_cpp_value, + std::int64_t *max_cpp_value) const; + + std::unique_ptr cost_model_; + std::unique_ptr lip_filter_configuration_; + + // 1G bits = 128MB + static constexpr std::int64_t kMaxFilterSize = 1000000000; + + DISALLOW_COPY_AND_ASSIGN(InjectJoinFilters); +}; + +/** @} */ + +} // namespace optimizer +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/query_optimizer/rules/TransformFilterJoins.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/TransformFilterJoins.cpp b/query_optimizer/rules/TransformFilterJoins.cpp deleted file mode 100644 index 706df69..0000000 --- a/query_optimizer/rules/TransformFilterJoins.cpp +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "query_optimizer/rules/TransformFilterJoins.hpp" - -#include -#include -#include -#include -#include -#include - -#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" -#include "query_optimizer/expressions/AttributeReference.hpp" -#include "query_optimizer/physical/LIPFilterConfiguration.hpp" -#include "query_optimizer/physical/Aggregate.hpp" -#include "query_optimizer/physical/HashJoin.hpp" -#include "query_optimizer/physical/PatternMatcher.hpp" -#include "query_optimizer/physical/Physical.hpp" -#include "query_optimizer/physical/PhysicalType.hpp" -#include "query_optimizer/physical/Selection.hpp" -#include "query_optimizer/physical/TopLevelPlan.hpp" -#include "types/TypedValue.hpp" -#include "utility/lip_filter/LIPFilter.hpp" - -#include "glog/logging.h" - -namespace quickstep { -namespace optimizer { - -namespace E = ::quickstep::optimizer::expressions; -namespace P = ::quickstep::optimizer::physical; - -P::PhysicalPtr TransformFilterJoins::apply(const P::PhysicalPtr &input) { - DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan); - - const P::TopLevelPlanPtr top_level_plan = - std::static_pointer_cast(input); - cost_model_.reset( - new cost::StarSchemaSimpleCostModel( - top_level_plan->shared_subplans())); - - P::PhysicalPtr output = applyTransform(input); - return output; -} - -P::PhysicalPtr TransformFilterJoins::applyTransform(const P::PhysicalPtr &input) { -// std::vector new_children; -// bool has_changed_children = false; -// for (const P::PhysicalPtr &child : input->children()) { -// P::PhysicalPtr new_child = applyTransform(child); -// if (child != new_child && !has_changed_children) { -// has_changed_children = true; -// } -// new_children.push_back(new_child); -// } -// -// P::HashJoinPtr hash_join; -// if (P::SomeHashJoin::MatchesWithConditionalCast(input, &hash_join)) { -// // TODO(jianqiao): check for other cases -// if (hash_join->join_type() == P::HashJoin::JoinType::kLeftSemiJoin) { -// -// -// } -// } -// -// if (has_changed_children) { -// return applyToNode(tree->copyWithNewChildren(new_children)); -// } else { -// return applyToNode(tree); -// } - return input; -} - -} // namespace optimizer -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/query_optimizer/rules/TransformFilterJoins.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/TransformFilterJoins.hpp b/query_optimizer/rules/TransformFilterJoins.hpp deleted file mode 100644 index 2a19203..0000000 --- a/query_optimizer/rules/TransformFilterJoins.hpp +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_TRANSFORM_FILTER_JOINS_HPP_ -#define QUICKSTEP_QUERY_OPTIMIZER_RULES_TRANSFORM_FILTER_JOINS_HPP_ - -#include -#include -#include -#include -#include -#include - -#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" -#include "query_optimizer/expressions/AttributeReference.hpp" -#include "query_optimizer/expressions/ExprId.hpp" -#include "query_optimizer/physical/LIPFilterConfiguration.hpp" -#include "query_optimizer/physical/Physical.hpp" -#include "query_optimizer/rules/Rule.hpp" -#include "utility/Macros.hpp" - -namespace quickstep { -namespace optimizer { - -/** \addtogroup OptimizerRules - * @{ - */ - -class TransformFilterJoins : public Rule { - public: - /** - * @brief Constructor. - */ - TransformFilterJoins() {} - - ~TransformFilterJoins() override {} - - std::string getName() const override { - return "TransformFilterJoins"; - } - - physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override; - - private: - physical::PhysicalPtr applyTransform(const physical::PhysicalPtr &input); - - std::unique_ptr cost_model_; - - DISALLOW_COPY_AND_ASSIGN(TransformFilterJoins); -}; - -/** @} */ - -} // namespace optimizer -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_OPTIMIZER_RULES_TRANSFORM_FILTER_JOINS_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/relational_operators/BuildLIPFilterOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildLIPFilterOperator.cpp b/relational_operators/BuildLIPFilterOperator.cpp new file mode 100644 index 0000000..34df385 --- /dev/null +++ b/relational_operators/BuildLIPFilterOperator.cpp @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "relational_operators/BuildLIPFilterOperator.hpp" + +#include +#include + +#include "catalog/CatalogRelation.hpp" +#include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" +#include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" +#include "storage/StorageBlock.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageManager.hpp" +#include "storage/TupleIdSequence.hpp" +#include "storage/TupleStorageSubBlock.hpp" +#include "storage/ValueAccessor.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" +#include "utility/lip_filter/LIPFilterBuilder.hpp" +#include "utility/lip_filter/LIPFilterUtil.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace quickstep { + +bool BuildLIPFilterOperator::getAllWorkOrders( + WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) { + DCHECK(query_context != nullptr); + + const Predicate *build_side_predicate = + query_context->getPredicate(build_side_predicate_index_); + + if (input_relation_is_stored_) { + if (!started_) { + for (const block_id input_block_id : input_relation_block_ids_) { + container->addNormalWorkOrder( + new BuildLIPFilterWorkOrder(query_id_, + input_relation_, + input_block_id, + build_side_predicate, + storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), + CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)), + op_index_); + } + started_ = true; + } + return started_; + } else { + while (num_workorders_generated_ < input_relation_block_ids_.size()) { + container->addNormalWorkOrder( + new BuildLIPFilterWorkOrder( + query_id_, + input_relation_, + input_relation_block_ids_[num_workorders_generated_], + build_side_predicate, + storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), + CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)), + op_index_); + ++num_workorders_generated_; + } + return done_feeding_input_relation_; + } +} + +bool BuildLIPFilterOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + // TODO + return true; +} + +serialization::WorkOrder* BuildLIPFilterOperator::createWorkOrderProto(const block_id block) { + // TODO + return nullptr; +} + +void BuildLIPFilterWorkOrder::execute() { + BlockReference block( + storage_manager_->getBlock(build_block_id_, input_relation_)); + + std::unique_ptr predicate_matches; + if (build_side_predicate_ != nullptr) { + predicate_matches.reset(block->getMatchesForPredicate(build_side_predicate_)); + } + + std::unique_ptr accessor( + block->getTupleStorageSubBlock().createValueAccessor(predicate_matches.get())); + + if (lip_filter_adaptive_prober_ != nullptr) { + std::unique_ptr matches( + lip_filter_adaptive_prober_->filterValueAccessor(accessor.get())); + std::unique_ptr filtered_accessor( + accessor->createSharedTupleIdSequenceAdapterVirtual(*matches)); + + lip_filter_builder_->insertValueAccessor(filtered_accessor.get()); + } else { + lip_filter_builder_->insertValueAccessor(accessor.get()); + } +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/relational_operators/BuildLIPFilterOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildLIPFilterOperator.hpp b/relational_operators/BuildLIPFilterOperator.hpp new file mode 100644 index 0000000..96cad84 --- /dev/null +++ b/relational_operators/BuildLIPFilterOperator.hpp @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_ +#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_ + +#include +#include +#include +#include + +#include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/QueryContext.hpp" +#include "relational_operators/RelationalOperator.hpp" +#include "relational_operators/WorkOrder.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "utility/Macros.hpp" +#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp" +#include "utility/lip_filter/LIPFilterBuilder.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + +namespace quickstep { + +class CatalogRelationSchema; +class Predicate; +class StorageManager; +class WorkOrderProtosContainer; +class WorkOrdersContainer; + +namespace serialization { class WorkOrder; } + +/** \addtogroup RelationalOperators + * @{ + */ + +/** + * @brief An operator which builds a LIPFilter on one relation. + **/ +class BuildLIPFilterOperator : public RelationalOperator { + public: + BuildLIPFilterOperator(const std::size_t query_id, + const CatalogRelation &input_relation, + const QueryContext::predicate_id build_side_predicate_index, + const bool input_relation_is_stored) + : RelationalOperator(query_id), + input_relation_(input_relation), + build_side_predicate_index_(build_side_predicate_index), + input_relation_is_stored_(input_relation_is_stored), + input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot() + : std::vector()), + num_workorders_generated_(0), + started_(false) {} + + ~BuildLIPFilterOperator() override {} + + const CatalogRelation& input_relation() const { + return input_relation_; + } + + std::string getName() const override { + return "BuildLIPFilterOperator"; + } + + bool getAllWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) override; + + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + + void feedInputBlock(const block_id input_block_id, + const relation_id input_relation_id) override { + input_relation_block_ids_.push_back(input_block_id); + } + + void feedInputBlocks(const relation_id rel_id, + std::vector *partially_filled_blocks) override { + input_relation_block_ids_.insert(input_relation_block_ids_.end(), + partially_filled_blocks->begin(), + partially_filled_blocks->end()); + } + + private: + /** + * @brief Create Work Order proto. + * + * @param block The block id used in the Work Order. + **/ + serialization::WorkOrder* createWorkOrderProto(const block_id block); + + const CatalogRelation &input_relation_; + const QueryContext::predicate_id build_side_predicate_index_; + const bool input_relation_is_stored_; + + std::vector input_relation_block_ids_; + std::vector::size_type num_workorders_generated_; + + bool started_; + + DISALLOW_COPY_AND_ASSIGN(BuildLIPFilterOperator); +}; + +/** + * @brief A WorkOrder produced by BuildLIPFilterOperator + **/ +class BuildLIPFilterWorkOrder : public WorkOrder { + public: + BuildLIPFilterWorkOrder(const std::size_t query_id, + const CatalogRelationSchema &input_relation, + const block_id build_block_id, + const Predicate *build_side_predicate, + StorageManager *storage_manager, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober, + LIPFilterBuilder *lip_filter_builder) + : WorkOrder(query_id), + input_relation_(input_relation), + build_block_id_(build_block_id), + build_side_predicate_(build_side_predicate), + storage_manager_(DCHECK_NOTNULL(storage_manager)), + lip_filter_adaptive_prober_(lip_filter_adaptive_prober), + lip_filter_builder_(DCHECK_NOTNULL(lip_filter_builder)) {} + + ~BuildLIPFilterWorkOrder() override {} + + const CatalogRelationSchema& input_relation() const { + return input_relation_; + } + + void execute() override; + + private: + const CatalogRelationSchema &input_relation_; + const block_id build_block_id_; + const Predicate *build_side_predicate_; + + StorageManager *storage_manager_; + + std::unique_ptr lip_filter_adaptive_prober_; + std::unique_ptr lip_filter_builder_; + + DISALLOW_COPY_AND_ASSIGN(BuildLIPFilterWorkOrder); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index 0735bce..225f093 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -38,6 +38,7 @@ endif() # Declare micro-libs: add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp) add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp) +add_library(quickstep_relationaloperators_BuildLIPFilterOperator BuildLIPFilterOperator.cpp BuildLIPFilterOperator.hpp) add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp) add_library(quickstep_relationaloperators_CreateTableOperator CreateTableOperator.cpp CreateTableOperator.hpp) add_library(quickstep_relationaloperators_DestroyAggregationStateOperator @@ -116,6 +117,27 @@ target_link_libraries(quickstep_relationaloperators_BuildHashOperator quickstep_utility_lipfilter_LIPFilterBuilder quickstep_utility_lipfilter_LIPFilterUtil tmb) +target_link_libraries(quickstep_relationaloperators_BuildLIPFilterOperator + glog + quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogTypedefs + quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer + quickstep_queryexecution_WorkOrdersContainer + quickstep_relationaloperators_RelationalOperator + quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto + quickstep_storage_StorageBlock + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageManager + quickstep_storage_TupleIdSequence + quickstep_storage_TupleStorageSubBlock + quickstep_storage_ValueAccessor + quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilterAdaptiveProber + quickstep_utility_lipfilter_LIPFilterBuilder + quickstep_utility_lipfilter_LIPFilterUtil + tmb) target_link_libraries(quickstep_relationaloperators_CreateIndexOperator glog quickstep_catalog_CatalogRelation @@ -512,6 +534,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrder_proto add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp) target_link_libraries(quickstep_relationaloperators quickstep_relationaloperators_AggregationOperator + quickstep_relationaloperators_BuildLIPFilterOperator quickstep_relationaloperators_BuildHashOperator quickstep_relationaloperators_CreateIndexOperator quickstep_relationaloperators_CreateTableOperator http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/utility/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt index e9be2ec..872225e 100644 --- a/utility/CMakeLists.txt +++ b/utility/CMakeLists.txt @@ -264,6 +264,7 @@ target_link_libraries(quickstep_utility_PlanVisualizer quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel quickstep_queryoptimizer_expressions_AttributeReference quickstep_queryoptimizer_expressions_ExprId + quickstep_queryoptimizer_physical_FilterInjection quickstep_queryoptimizer_physical_HashJoin quickstep_queryoptimizer_physical_LIPFilterConfiguration quickstep_queryoptimizer_physical_Physical http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/utility/PlanVisualizer.cpp ---------------------------------------------------------------------- diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp index 5d70c86..1124f6a 100644 --- a/utility/PlanVisualizer.cpp +++ b/utility/PlanVisualizer.cpp @@ -31,6 +31,7 @@ #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" #include "query_optimizer/expressions/AttributeReference.hpp" #include "query_optimizer/expressions/ExprId.hpp" +#include "query_optimizer/physical/FilterInjection.hpp" #include "query_optimizer/physical/HashJoin.hpp" #include "query_optimizer/physical/Physical.hpp" #include "query_optimizer/physical/PhysicalType.hpp" @@ -57,6 +58,8 @@ std::string PlanVisualizer::visualize(const P::PhysicalPtr &input) { color_map_["TableReference"] = "skyblue"; color_map_["Selection"] = "#90EE90"; + color_map_["FilterInjection"] = "pink"; + color_map_["FilterInjection(Anti)"] = "pink"; color_map_["HashJoin"] = "red"; color_map_["HashLeftOuterJoin"] = "orange"; color_map_["HashLeftSemiJoin"] = "orange"; @@ -125,7 +128,8 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) { edge_info.dst_node_id = node_id; edge_info.dashed = false; - if (input->getPhysicalType() == P::PhysicalType::kHashJoin && + if ((input->getPhysicalType() == P::PhysicalType::kHashJoin || + input->getPhysicalType() == P::PhysicalType::kFilterInjection) && child == input->children()[1]) { edge_info.dashed = true; } @@ -164,6 +168,20 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) { } break; } + case P::PhysicalType::kFilterInjection: { + const P::FilterInjectionPtr filter_injection = + std::static_pointer_cast(input); + node_info.labels.emplace_back(input->getName()); + + const auto &probe_attributes = filter_injection->probe_attributes(); + const auto &build_attributes = filter_injection->build_attributes(); + for (std::size_t i = 0; i < probe_attributes.size(); ++i) { + node_info.labels.emplace_back( + probe_attributes[i]->attribute_alias() + " = " + + build_attributes[i]->attribute_alias()); + } + break; + } default: { node_info.labels.emplace_back(input->getName()); break; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/utility/lip_filter/BitVectorExactFilter.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp index cb77d70..15c8f0b 100644 --- a/utility/lip_filter/BitVectorExactFilter.hpp +++ b/utility/lip_filter/BitVectorExactFilter.hpp @@ -43,7 +43,7 @@ namespace quickstep { * @{ */ -template +template class BitVectorExactFilter : public LIPFilter { public: /** @@ -158,7 +158,13 @@ class BitVectorExactFilter : public LIPFilter { inline bool contains(const void *key_begin) const { const CppType loc = *reinterpret_cast(key_begin); DCHECK_LE(loc, filter_cardinality_); - return (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc & 7u))); + const bool is_bit_set = + (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc & 7u))) != 0; + if (is_anti_filter) { + return !is_bit_set; + } else { + return is_bit_set; + } } std::size_t filter_cardinality_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/utility/lip_filter/LIPFilter.proto ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilter.proto b/utility/lip_filter/LIPFilter.proto index 7fb748c..fcbaa7b 100644 --- a/utility/lip_filter/LIPFilter.proto +++ b/utility/lip_filter/LIPFilter.proto @@ -22,8 +22,8 @@ package quickstep.serialization; import "types/Type.proto"; enum LIPFilterType { - BIT_VECTOR_EXACT_FILTER = 2; - BLOOM_FILTER = 1; + BIT_VECTOR_EXACT_FILTER = 1; + BLOOM_FILTER = 2; SINGLE_IDENTITY_HASH_FILTER = 3; } @@ -38,6 +38,7 @@ message BitVectorExactFilter { // All required optional uint64 filter_cardinality = 16; optional uint64 attribute_size = 17; + optional bool is_anti_filter = 18; } } @@ -49,11 +50,6 @@ message SingleIdentityHashFilter { } } -enum LIPFilterActionType { - BUILD = 1; - PROBE = 2; -} - message LIPFilterDeployment { message Entry { required uint32 lip_filter_id = 1; @@ -61,6 +57,6 @@ message LIPFilterDeployment { required Type attribute_type = 3; } - required LIPFilterActionType action_type = 1; - repeated Entry entries = 2; + repeated Entry build_entries = 1; + repeated Entry probe_entries = 2; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/utility/lip_filter/LIPFilterDeployment.cpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterDeployment.cpp b/utility/lip_filter/LIPFilterDeployment.cpp index cd4d90f..bbb6dd6 100644 --- a/utility/lip_filter/LIPFilterDeployment.cpp +++ b/utility/lip_filter/LIPFilterDeployment.cpp @@ -35,38 +35,44 @@ namespace quickstep { LIPFilterDeployment::LIPFilterDeployment( const serialization::LIPFilterDeployment &proto, const std::vector> &lip_filters) { - switch (proto.action_type()) { - case serialization::LIPFilterActionType::BUILD: - action_type_ = LIPFilterActionType::kBuild; - break; - case serialization::LIPFilterActionType::PROBE: - action_type_ = LIPFilterActionType::kProbe; - break; - default: - LOG(FATAL) << "Unsupported LIPFilterActionType: " - << serialization::LIPFilterActionType_Name(proto.action_type()); + if (proto.build_entries_size() > 0) { + build_.reset(new LIPFilterDeploymentInfo()); + for (int i = 0; i < proto.build_entries_size(); ++i) { + const auto &entry_proto = proto.build_entries(i); + build_->lip_filters_.emplace_back( + lip_filters.at(entry_proto.lip_filter_id()).get()); + build_->attr_ids_.emplace_back(entry_proto.attribute_id()); + build_->attr_types_.emplace_back( + &TypeFactory::ReconstructFromProto(entry_proto.attribute_type())); + } } - for (int i = 0; i < proto.entries_size(); ++i) { - const auto &entry_proto = proto.entries(i); - lip_filters_.emplace_back(lip_filters.at(entry_proto.lip_filter_id()).get()); - attr_ids_.emplace_back(entry_proto.attribute_id()); - attr_types_.emplace_back(&TypeFactory::ReconstructFromProto(entry_proto.attribute_type())); + if (proto.probe_entries_size() > 0) { + probe_.reset(new LIPFilterDeploymentInfo()); + for (int i = 0; i < proto.probe_entries_size(); ++i) { + const auto &entry_proto = proto.probe_entries(i); + probe_->lip_filters_.emplace_back( + lip_filters.at(entry_proto.lip_filter_id()).get()); + probe_->attr_ids_.emplace_back(entry_proto.attribute_id()); + probe_->attr_types_.emplace_back( + &TypeFactory::ReconstructFromProto(entry_proto.attribute_type())); + } } } bool LIPFilterDeployment::ProtoIsValid( const serialization::LIPFilterDeployment &proto) { - if (proto.action_type() != serialization::LIPFilterActionType::BUILD && - proto.action_type() != serialization::LIPFilterActionType::PROBE) { - LOG(FATAL) << "Unsupported LIPFilterActionType: " - << serialization::LIPFilterActionType_Name(proto.action_type()); - } - if (proto.entries_size() == 0) { + if (proto.build_entries_size() == 0 && proto.probe_entries_size() == 0) { return false; } - for (int i = 0; i < proto.entries_size(); ++i) { - const auto &entry_proto = proto.entries(i); + for (int i = 0; i < proto.build_entries_size(); ++i) { + const auto &entry_proto = proto.build_entries(i); + if (!TypeFactory::ProtoIsValid(entry_proto.attribute_type())) { + return false; + } + } + for (int i = 0; i < proto.probe_entries_size(); ++i) { + const auto &entry_proto = proto.probe_entries(i); if (!TypeFactory::ProtoIsValid(entry_proto.attribute_type())) { return false; } @@ -75,13 +81,23 @@ bool LIPFilterDeployment::ProtoIsValid( } LIPFilterBuilder* LIPFilterDeployment::createLIPFilterBuilder() const { - DCHECK(action_type_ == LIPFilterActionType::kBuild); - return new LIPFilterBuilder(lip_filters_, attr_ids_, attr_types_); + if (build_ == nullptr) { + return nullptr; + } + + return new LIPFilterBuilder(build_->lip_filters_, + build_->attr_ids_, + build_->attr_types_); } LIPFilterAdaptiveProber* LIPFilterDeployment::createLIPFilterAdaptiveProber() const { - DCHECK(action_type_ == LIPFilterActionType::kProbe); - return new LIPFilterAdaptiveProber(lip_filters_, attr_ids_, attr_types_); + if (probe_ == nullptr) { + return nullptr; + } + + return new LIPFilterAdaptiveProber(probe_->lip_filters_, + probe_->attr_ids_, + probe_->attr_types_); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/utility/lip_filter/LIPFilterDeployment.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterDeployment.hpp b/utility/lip_filter/LIPFilterDeployment.hpp index 9b37f88..ab1259b 100644 --- a/utility/lip_filter/LIPFilterDeployment.hpp +++ b/utility/lip_filter/LIPFilterDeployment.hpp @@ -39,11 +39,6 @@ class Type; * @{ */ -enum class LIPFilterActionType { - kBuild = 0, - kProbe -}; - /** * @brief Helper class for organizing a group of LIPFilters in the backend. * Each LIPFilterDeployment object is attached to a RelationalOperator. @@ -69,16 +64,6 @@ class LIPFilterDeployment { static bool ProtoIsValid(const serialization::LIPFilterDeployment &proto); /** - * @brief Get the action type for this group of LIPFilters (i.e. whether - * to build or probe the filters). - * - * @return The action type. - */ - LIPFilterActionType getActionType() const { - return action_type_; - } - - /** * @brief Create a LIPFilterBuilder for this group of LIPFilters. * * @return A new LIPFilterBuilder object for this group of LIPFilters. @@ -95,11 +80,14 @@ class LIPFilterDeployment { LIPFilterAdaptiveProber* createLIPFilterAdaptiveProber() const; private: - LIPFilterActionType action_type_; - - std::vector lip_filters_; - std::vector attr_ids_; - std::vector attr_types_; + struct LIPFilterDeploymentInfo { + std::vector lip_filters_; + std::vector attr_ids_; + std::vector attr_types_; + }; + + std::unique_ptr build_; + std::unique_ptr probe_; DISALLOW_COPY_AND_ASSIGN(LIPFilterDeployment); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4510d5f9/utility/lip_filter/LIPFilterFactory.cpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/LIPFilterFactory.cpp b/utility/lip_filter/LIPFilterFactory.cpp index 9cec4c5..a96151f 100644 --- a/utility/lip_filter/LIPFilterFactory.cpp +++ b/utility/lip_filter/LIPFilterFactory.cpp @@ -37,16 +37,34 @@ LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter proto.GetExtension(serialization::BitVectorExactFilter::attribute_size); const std::size_t filter_cardinality = proto.GetExtension(serialization::BitVectorExactFilter::filter_cardinality); + const bool is_anti_filter = + proto.GetExtension(serialization::BitVectorExactFilter::is_anti_filter); switch (attr_size) { case 1: - return new BitVectorExactFilter(filter_cardinality); + if (is_anti_filter) { + return new BitVectorExactFilter(filter_cardinality); + } else { + return new BitVectorExactFilter(filter_cardinality); + } case 2: - return new BitVectorExactFilter(filter_cardinality); + if (is_anti_filter) { + return new BitVectorExactFilter(filter_cardinality); + } else { + return new BitVectorExactFilter(filter_cardinality); + } case 4: - return new BitVectorExactFilter(filter_cardinality); + if (is_anti_filter) { + return new BitVectorExactFilter(filter_cardinality); + } else { + return new BitVectorExactFilter(filter_cardinality); + } case 8: - return new BitVectorExactFilter(filter_cardinality); + if (is_anti_filter) { + return new BitVectorExactFilter(filter_cardinality); + } else { + return new BitVectorExactFilter(filter_cardinality); + } default: LOG(FATAL) << "Invalid attribute size for BitVectorExactFilter: " << attr_size;