quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [21/22] incubator-quickstep git commit: Add BitVectorExactFilter as a LIP filter and supports Join-to-Semijoin transformation.
Date Thu, 26 Jan 2017 02:34:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d52b9126/query_optimizer/rules/InjectJoinFilters.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/InjectJoinFilters.hpp b/query_optimizer/rules/InjectJoinFilters.hpp
new file mode 100644
index 0000000..0eaebdc
--- /dev/null
+++ b/query_optimizer/rules/InjectJoinFilters.hpp
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+class InjectJoinFilters : public Rule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  InjectJoinFilters() {}
+
+  ~InjectJoinFilters() override {}
+
+  std::string getName() const override {
+    return "TransformFilterJoins";
+  }
+
+  physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
+
+ private:
+  bool isTransformable(const physical::HashJoinPtr &hash_join) const;
+
+  physical::PhysicalPtr transformHashJoinToFilters(
+      const physical::PhysicalPtr &input) const;
+
+  physical::PhysicalPtr pushDownFilters(const physical::PhysicalPtr &input) const;
+
+  physical::PhysicalPtr addFilterAnchors(const physical::PhysicalPtr &input,
+                                         const bool ancestor_can_anchor_filter) const;
+
+  void concretizeAsLIPFilters(const physical::PhysicalPtr &input,
+                              const physical::PhysicalPtr &anchor_node) const;
+
+  physical::PhysicalPtr pushDownFiltersInternal(
+      const physical::PhysicalPtr &probe_child,
+      const physical::PhysicalPtr &build_child,
+      const physical::FilterJoinPtr &filter_join) const;
+
+  bool findMinMaxValuesForAttributeHelper(
+      const physical::PhysicalPtr &physical_plan,
+      const expressions::AttributeReferencePtr &attribute,
+      std::int64_t *min_cpp_value,
+      std::int64_t *max_cpp_value) const;
+
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+  std::unique_ptr<physical::LIPFilterConfiguration> lip_filter_configuration_;
+
+  // 1G bits = 128MB
+  static constexpr std::int64_t kMaxFilterSize = 1000000000;
+
+  DISALLOW_COPY_AND_ASSIGN(InjectJoinFilters);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d52b9126/relational_operators/BuildLIPFilterOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.cpp b/relational_operators/BuildLIPFilterOperator.cpp
new file mode 100644
index 0000000..34df385
--- /dev/null
+++ b/relational_operators/BuildLIPFilterOperator.cpp
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/BuildLIPFilterOperator.hpp"
+
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
+#include "utility/lip_filter/LIPFilterUtil.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool BuildLIPFilterOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  DCHECK(query_context != nullptr);
+
+  const Predicate *build_side_predicate =
+      query_context->getPredicate(build_side_predicate_index_);
+
+  if (input_relation_is_stored_) {
+    if (!started_) {
+      for (const block_id input_block_id : input_relation_block_ids_) {
+        container->addNormalWorkOrder(
+            new BuildLIPFilterWorkOrder(query_id_,
+                                       input_relation_,
+                                       input_block_id,
+                                       build_side_predicate,
+                                       storage_manager,
+                                       CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_,
query_context),
+                                       CreateLIPFilterBuilderHelper(lip_deployment_index_,
query_context)),
+            op_index_);
+      }
+      started_ = true;
+    }
+    return started_;
+  } else {
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      container->addNormalWorkOrder(
+          new BuildLIPFilterWorkOrder(
+              query_id_,
+              input_relation_,
+              input_relation_block_ids_[num_workorders_generated_],
+              build_side_predicate,
+              storage_manager,
+              CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context),
+              CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
+          op_index_);
+      ++num_workorders_generated_;
+    }
+    return done_feeding_input_relation_;
+  }
+}
+
+bool BuildLIPFilterOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  // TODO
+  return true;
+}
+
+serialization::WorkOrder* BuildLIPFilterOperator::createWorkOrderProto(const block_id block)
{
+  // TODO
+  return nullptr;
+}
+
+void BuildLIPFilterWorkOrder::execute() {
+  BlockReference block(
+      storage_manager_->getBlock(build_block_id_, input_relation_));
+
+  std::unique_ptr<TupleIdSequence> predicate_matches;
+  if (build_side_predicate_ != nullptr) {
+    predicate_matches.reset(block->getMatchesForPredicate(build_side_predicate_));
+  }
+
+  std::unique_ptr<ValueAccessor> accessor(
+      block->getTupleStorageSubBlock().createValueAccessor(predicate_matches.get()));
+
+  if (lip_filter_adaptive_prober_ != nullptr) {
+    std::unique_ptr<TupleIdSequence> matches(
+        lip_filter_adaptive_prober_->filterValueAccessor(accessor.get()));
+    std::unique_ptr<ValueAccessor> filtered_accessor(
+        accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+
+    lip_filter_builder_->insertValueAccessor(filtered_accessor.get());
+  } else {
+    lip_filter_builder_->insertValueAccessor(accessor.get());
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d52b9126/relational_operators/BuildLIPFilterOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.hpp b/relational_operators/BuildLIPFilterOperator.hpp
new file mode 100644
index 0000000..fe8a0fb
--- /dev/null
+++ b/relational_operators/BuildLIPFilterOperator.hpp
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogRelationSchema;
+class Predicate;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ *  @{
+ */
+
+/**
+ * @brief An operator which builds a LIPFilter on one relation.
+ **/
+class BuildLIPFilterOperator : public RelationalOperator {
+ public:
+  BuildLIPFilterOperator(const std::size_t query_id,
+                         const CatalogRelation &input_relation,
+                         const QueryContext::predicate_id build_side_predicate_index,
+                         const bool input_relation_is_stored)
+    : RelationalOperator(query_id),
+      input_relation_(input_relation),
+      build_side_predicate_index_(build_side_predicate_index),
+      input_relation_is_stored_(input_relation_is_stored),
+      input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
+                                                         : std::vector<block_id>()),
+      num_workorders_generated_(0),
+      started_(false) {}
+
+  ~BuildLIPFilterOperator() override {}
+
+  const CatalogRelation& input_relation() const {
+    return input_relation_;
+  }
+
+  std::string getName() const override {
+    return "BuildLIPFilterOperator";
+  }
+
+  bool getAllWorkOrders(WorkOrdersContainer *container,
+                        QueryContext *query_context,
+                        StorageManager *storage_manager,
+                        const tmb::client_id scheduler_client_id,
+                        tmb::MessageBus *bus) override;
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+  void feedInputBlock(const block_id input_block_id,
+                      const relation_id input_relation_id,
+                      const partition_id part_id) override {
+    input_relation_block_ids_.push_back(input_block_id);
+  }
+
+ private:
+  /**
+   * @brief Create Work Order proto.
+   *
+   * @param block The block id used in the Work Order.
+   **/
+  serialization::WorkOrder* createWorkOrderProto(const block_id block);
+
+  const CatalogRelation &input_relation_;
+  const QueryContext::predicate_id build_side_predicate_index_;
+  const bool input_relation_is_stored_;
+
+  std::vector<block_id> input_relation_block_ids_;
+  std::vector<block_id>::size_type num_workorders_generated_;
+
+  bool started_;
+
+  DISALLOW_COPY_AND_ASSIGN(BuildLIPFilterOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by BuildLIPFilterOperator
+ **/
+class BuildLIPFilterWorkOrder : public WorkOrder {
+ public:
+  BuildLIPFilterWorkOrder(const std::size_t query_id,
+                          const CatalogRelationSchema &input_relation,
+                          const block_id build_block_id,
+                          const Predicate *build_side_predicate,
+                          StorageManager *storage_manager,
+                          LIPFilterAdaptiveProber *lip_filter_adaptive_prober,
+                          LIPFilterBuilder *lip_filter_builder)
+      : WorkOrder(query_id),
+        input_relation_(input_relation),
+        build_block_id_(build_block_id),
+        build_side_predicate_(build_side_predicate),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober),
+        lip_filter_builder_(DCHECK_NOTNULL(lip_filter_builder)) {}
+
+  ~BuildLIPFilterWorkOrder() override {}
+
+  const CatalogRelationSchema& input_relation() const {
+    return input_relation_;
+  }
+
+  void execute() override;
+
+ private:
+  const CatalogRelationSchema &input_relation_;
+  const block_id build_block_id_;
+  const Predicate *build_side_predicate_;
+
+  StorageManager *storage_manager_;
+
+  std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+  std::unique_ptr<LIPFilterBuilder> lip_filter_builder_;
+
+  DISALLOW_COPY_AND_ASSIGN(BuildLIPFilterWorkOrder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d52b9126/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 78da7b8..85887b1 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -38,6 +38,7 @@ endif()
 # Declare micro-libs:
 add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp)
 add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp)
+add_library(quickstep_relationaloperators_BuildLIPFilterOperator BuildLIPFilterOperator.cpp
BuildLIPFilterOperator.hpp)
 add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp)
 add_library(quickstep_relationaloperators_CreateTableOperator CreateTableOperator.cpp CreateTableOperator.hpp)
 add_library(quickstep_relationaloperators_DestroyAggregationStateOperator
@@ -117,6 +118,27 @@ target_link_libraries(quickstep_relationaloperators_BuildHashOperator
                       quickstep_utility_lipfilter_LIPFilterBuilder
                       quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
+target_link_libraries(quickstep_relationaloperators_BuildLIPFilterOperator
+                      glog
+                      quickstep_catalog_CatalogRelation
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_WorkOrderProtosContainer
+                      quickstep_queryexecution_WorkOrdersContainer
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_StorageBlock
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageManager
+                      quickstep_storage_TupleIdSequence
+                      quickstep_storage_TupleStorageSubBlock
+                      quickstep_storage_ValueAccessor
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilterAdaptiveProber
+                      quickstep_utility_lipfilter_LIPFilterBuilder
+                      quickstep_utility_lipfilter_LIPFilterUtil
+                      tmb)
 target_link_libraries(quickstep_relationaloperators_CreateIndexOperator
                       glog
                       quickstep_catalog_CatalogRelation
@@ -517,6 +539,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
 add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp)
 target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_AggregationOperator
+                      quickstep_relationaloperators_BuildLIPFilterOperator
                       quickstep_relationaloperators_BuildHashOperator
                       quickstep_relationaloperators_CreateIndexOperator
                       quickstep_relationaloperators_CreateTableOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d52b9126/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index 6a656fb..4990083 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -274,6 +274,7 @@ target_link_libraries(quickstep_utility_PlanVisualizer
                       quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ExprId
+                      quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_LIPFilterConfiguration
                       quickstep_queryoptimizer_physical_Physical

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d52b9126/utility/PlanVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp
index df7a20c..e533e15 100644
--- a/utility/PlanVisualizer.cpp
+++ b/utility/PlanVisualizer.cpp
@@ -32,6 +32,7 @@
 #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
@@ -58,6 +59,8 @@ std::string PlanVisualizer::visualize(const P::PhysicalPtr &input) {
 
   color_map_["TableReference"] = "skyblue";
   color_map_["Selection"] = "#90EE90";
+  color_map_["FilterJoin"] = "pink";
+  color_map_["FilterJoin(Anti)"] = "pink";
   color_map_["HashJoin"] = "red";
   color_map_["HashLeftOuterJoin"] = "orange";
   color_map_["HashLeftSemiJoin"] = "orange";
@@ -126,7 +129,8 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
     edge_info.dst_node_id = node_id;
     edge_info.dashed = false;
 
-    if (input->getPhysicalType() == P::PhysicalType::kHashJoin &&
+    if ((input->getPhysicalType() == P::PhysicalType::kHashJoin ||
+         input->getPhysicalType() == P::PhysicalType::kFilterJoin) &&
         child == input->children()[1]) {
       edge_info.dashed = true;
     }
@@ -165,6 +169,20 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
       }
       break;
     }
+    case P::PhysicalType::kFilterJoin: {
+      const P::FilterJoinPtr filter_join =
+        std::static_pointer_cast<const P::FilterJoin>(input);
+      node_info.labels.emplace_back(input->getName());
+
+      const auto &probe_attributes = filter_join->probe_attributes();
+      const auto &build_attributes = filter_join->build_attributes();
+      for (std::size_t i = 0; i < probe_attributes.size(); ++i) {
+        node_info.labels.emplace_back(
+            probe_attributes[i]->attribute_alias() + " = " +
+                build_attributes[i]->attribute_alias());
+      }
+      break;
+    }
     default: {
       node_info.labels.emplace_back(input->getName());
       break;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d52b9126/utility/lip_filter/BitVectorExactFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp
new file mode 100644
index 0000000..15c8f0b
--- /dev/null
+++ b/utility/lip_filter/BitVectorExactFilter.hpp
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
+
+#include <atomic>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+template <typename CppType, bool is_anti_filter>
+class BitVectorExactFilter : public LIPFilter {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param filter_cardinality The cardinality of this hash filter.
+   */
+  explicit BitVectorExactFilter(const std::size_t filter_cardinality)
+      : LIPFilter(LIPFilterType::kBitVectorExactFilter),
+        filter_cardinality_(filter_cardinality),
+        bit_array_(GetByteSize(filter_cardinality)) {
+    DCHECK_GE(filter_cardinality, 0u);
+    std::memset(bit_array_.data(),
+                0x0,
+                sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality));
+  }
+
+  void insertValueAccessor(ValueAccessor *accessor,
+                           const attribute_id attr_id,
+                           const Type *attr_type) override {
+    InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      if (attr_type->isNullable()) {
+        this->insertValueAccessorInternal<true>(accessor, attr_id);
+      } else {
+        this->insertValueAccessorInternal<false>(accessor, attr_id);
+      }
+    });
+  }
+
+  std::size_t filterBatch(ValueAccessor *accessor,
+                          const attribute_id attr_id,
+                          const bool is_attr_nullable,
+                          std::vector<tuple_id> *batch,
+                          const std::size_t batch_size) const override {
+    DCHECK(batch != nullptr);
+    DCHECK_LE(batch_size, batch->size());
+
+    return InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> std::size_t {  // NOLINT(build/c++11)
+      if (is_attr_nullable) {
+        return this->filterBatchInternal<true>(accessor, attr_id, batch, batch_size);
+      } else {
+        return this->filterBatchInternal<false>(accessor, attr_id, batch, batch_size);
+      }
+    });
+  }
+
+ private:
+  /**
+   * @brief Round up bit_size to multiples of 8.
+   */
+  inline static std::size_t GetByteSize(const std::size_t bit_size) {
+    return (bit_size + 7) / 8;
+  }
+
+  /**
+   * @brief Iterate through the accessor and hash values into the internal bit
+   *        array.
+   */
+  template <bool is_attr_nullable, typename ValueAccessorT>
+  inline void insertValueAccessorInternal(ValueAccessorT *accessor,
+                                          const attribute_id attr_id) {
+    accessor->beginIteration();
+    while (accessor->next()) {
+      const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+      if (!is_attr_nullable || value != nullptr) {
+        insert(value);
+      }
+    }
+  }
+
+  /**
+   * @brief Filter the given batch of tuples from a ValueAccessor. Write the
+   *        tuple ids which survive in the filtering back to \p batch.
+   */
+  template <bool is_attr_nullable, typename ValueAccessorT>
+  inline std::size_t filterBatchInternal(const ValueAccessorT *accessor,
+                                         const attribute_id attr_id,
+                                         std::vector<tuple_id> *batch,
+                                         const std::size_t batch_size) const {
+    std::size_t out_size = 0;
+    for (std::size_t i = 0; i < batch_size; ++i) {
+      const tuple_id tid = batch->at(i);
+      const void *value =
+          accessor->template getUntypedValueAtAbsolutePosition(attr_id, tid);
+      if (is_attr_nullable && value == nullptr) {
+        continue;
+      }
+      if (contains(value)) {
+        batch->at(out_size) = tid;
+        ++out_size;
+      }
+    }
+    return out_size;
+  }
+
+  /**
+   * @brief Inserts a given value into the exact filter.
+   */
+  inline void insert(const void *key_begin) {
+    const CppType loc = *reinterpret_cast<const CppType *>(key_begin);
+    DCHECK_LE(loc, filter_cardinality_);
+    bit_array_[loc >> 3u].fetch_or(1u << (loc & 7u), std::memory_order_relaxed);
+  }
+
+  /**
+   * @brief Test membership of a given value in the exact filter.
+   */
+  inline bool contains(const void *key_begin) const {
+    const CppType loc = *reinterpret_cast<const CppType *>(key_begin);
+    DCHECK_LE(loc, filter_cardinality_);
+    const bool is_bit_set =
+        (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc
& 7u))) != 0;
+    if (is_anti_filter) {
+      return !is_bit_set;
+    } else {
+      return is_bit_set;
+    }
+  }
+
+  std::size_t filter_cardinality_;
+  alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+
+  DISALLOW_COPY_AND_ASSIGN(BitVectorExactFilter);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d52b9126/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index 23b3763..edd0d24 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -20,6 +20,7 @@ QS_PROTOBUF_GENERATE_CPP(utility_lipfilter_LIPFilter_proto_srcs
                          LIPFilter.proto)
 
 # Declare micro-libs:
+add_library(quickstep_utility_lipfilter_BitVectorExactFilter ../../empty_src.cpp BitVectorExactFilter.hpp)
 add_library(quickstep_utility_lipfilter_LIPFilter ../../empty_src.cpp LIPFilter.hpp)
 add_library(quickstep_utility_lipfilter_LIPFilterAdaptiveProber ../../empty_src.cpp LIPFilterAdaptiveProber.hpp)
 add_library(quickstep_utility_lipfilter_LIPFilterBuilder ../../empty_src.cpp LIPFilterBuilder.hpp)
@@ -31,6 +32,15 @@ add_library(quickstep_utility_lipfilter_LIPFilter_proto
 add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src.cpp SingleIdentityHashFilter.hpp)
 
 # Link dependencies:
+target_link_libraries(quickstep_utility_lipfilter_BitVectorExactFilter
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_utility_lipfilter_LIPFilter
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_lipfilter_LIPFilter
                       quickstep_catalog_CatalogTypedefs
                       quickstep_storage_StorageBlockInfo
@@ -56,6 +66,7 @@ target_link_libraries(quickstep_utility_lipfilter_LIPFilterDeployment
                       quickstep_utility_lipfilter_LIPFilterBuilder
                       quickstep_utility_lipfilter_LIPFilter_proto)
 target_link_libraries(quickstep_utility_lipfilter_LIPFilterFactory
+                      quickstep_utility_lipfilter_BitVectorExactFilter
                       quickstep_utility_lipfilter_LIPFilter_proto
                       quickstep_utility_lipfilter_SingleIdentityHashFilter
                       quickstep_utility_Macros)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d52b9126/utility/lip_filter/LIPFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.hpp b/utility/lip_filter/LIPFilter.hpp
index 682d69f..ba38264 100644
--- a/utility/lip_filter/LIPFilter.hpp
+++ b/utility/lip_filter/LIPFilter.hpp
@@ -37,8 +37,8 @@ class ValueAccessor;
  */
 
 enum class LIPFilterType {
+  kBitVectorExactFilter,
   kBloomFilter,
-  kExactFilter,
   kSingleIdentityHashFilter
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d52b9126/utility/lip_filter/LIPFilter.proto
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.proto b/utility/lip_filter/LIPFilter.proto
index def13dd..fcbaa7b 100644
--- a/utility/lip_filter/LIPFilter.proto
+++ b/utility/lip_filter/LIPFilter.proto
@@ -22,8 +22,8 @@ package quickstep.serialization;
 import "types/Type.proto";
 
 enum LIPFilterType {
-  BLOOM_FILTER = 1;
-  EXACT_FILTER = 2;
+  BIT_VECTOR_EXACT_FILTER = 1;
+  BLOOM_FILTER = 2;
   SINGLE_IDENTITY_HASH_FILTER = 3;
 }
 
@@ -33,17 +33,21 @@ message LIPFilter {
   extensions 16 to max;
 }
 
-message SingleIdentityHashFilter {
+message BitVectorExactFilter {
   extend LIPFilter {
     // All required
     optional uint64 filter_cardinality = 16;
     optional uint64 attribute_size = 17;
+    optional bool is_anti_filter = 18;
   }
 }
 
-enum LIPFilterActionType {
-  BUILD = 1;
-  PROBE = 2;
+message SingleIdentityHashFilter {
+  extend LIPFilter {
+    // All required
+    optional uint64 filter_cardinality = 24;
+    optional uint64 attribute_size = 25;
+  }
 }
 
 message LIPFilterDeployment {
@@ -53,6 +57,6 @@ message LIPFilterDeployment {
     required Type attribute_type = 3;
   }
 
-  required LIPFilterActionType action_type = 1;
-  repeated Entry entries = 2;
+  repeated Entry build_entries = 1;
+  repeated Entry probe_entries = 2;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d52b9126/utility/lip_filter/LIPFilterDeployment.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.cpp b/utility/lip_filter/LIPFilterDeployment.cpp
index cd4d90f..bbb6dd6 100644
--- a/utility/lip_filter/LIPFilterDeployment.cpp
+++ b/utility/lip_filter/LIPFilterDeployment.cpp
@@ -35,38 +35,44 @@ namespace quickstep {
 LIPFilterDeployment::LIPFilterDeployment(
     const serialization::LIPFilterDeployment &proto,
     const std::vector<std::unique_ptr<LIPFilter>> &lip_filters) {
-  switch (proto.action_type()) {
-    case serialization::LIPFilterActionType::BUILD:
-      action_type_ = LIPFilterActionType::kBuild;
-      break;
-    case serialization::LIPFilterActionType::PROBE:
-      action_type_ = LIPFilterActionType::kProbe;
-      break;
-    default:
-      LOG(FATAL) << "Unsupported LIPFilterActionType: "
-                 << serialization::LIPFilterActionType_Name(proto.action_type());
+  if (proto.build_entries_size() > 0) {
+    build_.reset(new LIPFilterDeploymentInfo());
+    for (int i = 0; i < proto.build_entries_size(); ++i) {
+      const auto &entry_proto = proto.build_entries(i);
+      build_->lip_filters_.emplace_back(
+          lip_filters.at(entry_proto.lip_filter_id()).get());
+      build_->attr_ids_.emplace_back(entry_proto.attribute_id());
+      build_->attr_types_.emplace_back(
+          &TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+    }
   }
 
-  for (int i = 0; i < proto.entries_size(); ++i) {
-    const auto &entry_proto = proto.entries(i);
-    lip_filters_.emplace_back(lip_filters.at(entry_proto.lip_filter_id()).get());
-    attr_ids_.emplace_back(entry_proto.attribute_id());
-    attr_types_.emplace_back(&TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+  if (proto.probe_entries_size() > 0) {
+    probe_.reset(new LIPFilterDeploymentInfo());
+    for (int i = 0; i < proto.probe_entries_size(); ++i) {
+      const auto &entry_proto = proto.probe_entries(i);
+      probe_->lip_filters_.emplace_back(
+          lip_filters.at(entry_proto.lip_filter_id()).get());
+      probe_->attr_ids_.emplace_back(entry_proto.attribute_id());
+      probe_->attr_types_.emplace_back(
+          &TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+    }
   }
 }
 
 bool LIPFilterDeployment::ProtoIsValid(
     const serialization::LIPFilterDeployment &proto) {
-  if (proto.action_type() != serialization::LIPFilterActionType::BUILD &&
-      proto.action_type() != serialization::LIPFilterActionType::PROBE) {
-    LOG(FATAL) << "Unsupported LIPFilterActionType: "
-               << serialization::LIPFilterActionType_Name(proto.action_type());
-  }
-  if (proto.entries_size() == 0) {
+  if (proto.build_entries_size() == 0 && proto.probe_entries_size() == 0) {
     return false;
   }
-  for (int i = 0; i < proto.entries_size(); ++i) {
-    const auto &entry_proto = proto.entries(i);
+  for (int i = 0; i < proto.build_entries_size(); ++i) {
+    const auto &entry_proto = proto.build_entries(i);
+    if (!TypeFactory::ProtoIsValid(entry_proto.attribute_type())) {
+      return false;
+    }
+  }
+  for (int i = 0; i < proto.probe_entries_size(); ++i) {
+    const auto &entry_proto = proto.probe_entries(i);
     if (!TypeFactory::ProtoIsValid(entry_proto.attribute_type())) {
       return false;
     }
@@ -75,13 +81,23 @@ bool LIPFilterDeployment::ProtoIsValid(
 }
 
 LIPFilterBuilder* LIPFilterDeployment::createLIPFilterBuilder() const {
-  DCHECK(action_type_ == LIPFilterActionType::kBuild);
-  return new LIPFilterBuilder(lip_filters_, attr_ids_, attr_types_);
+  if (build_ == nullptr) {
+    return nullptr;
+  }
+
+  return new LIPFilterBuilder(build_->lip_filters_,
+                              build_->attr_ids_,
+                              build_->attr_types_);
 }
 
 LIPFilterAdaptiveProber* LIPFilterDeployment::createLIPFilterAdaptiveProber() const {
-  DCHECK(action_type_ == LIPFilterActionType::kProbe);
-  return new LIPFilterAdaptiveProber(lip_filters_, attr_ids_, attr_types_);
+  if (probe_ == nullptr) {
+    return nullptr;
+  }
+
+  return new LIPFilterAdaptiveProber(probe_->lip_filters_,
+                                     probe_->attr_ids_,
+                                     probe_->attr_types_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d52b9126/utility/lip_filter/LIPFilterDeployment.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.hpp b/utility/lip_filter/LIPFilterDeployment.hpp
index 9b37f88..ab1259b 100644
--- a/utility/lip_filter/LIPFilterDeployment.hpp
+++ b/utility/lip_filter/LIPFilterDeployment.hpp
@@ -39,11 +39,6 @@ class Type;
  *  @{
  */
 
-enum class LIPFilterActionType {
-  kBuild = 0,
-  kProbe
-};
-
 /**
  * @brief Helper class for organizing a group of LIPFilters in the backend.
  *        Each LIPFilterDeployment object is attached to a RelationalOperator.
@@ -69,16 +64,6 @@ class LIPFilterDeployment {
   static bool ProtoIsValid(const serialization::LIPFilterDeployment &proto);
 
   /**
-   * @brief Get the action type for this group of LIPFilters (i.e. whether
-   *        to build or probe the filters).
-   *
-   * @return The action type.
-   */
-  LIPFilterActionType getActionType() const {
-    return action_type_;
-  }
-
-  /**
    * @brief Create a LIPFilterBuilder for this group of LIPFilters.
    *
    * @return A new LIPFilterBuilder object for this group of LIPFilters.
@@ -95,11 +80,14 @@ class LIPFilterDeployment {
   LIPFilterAdaptiveProber* createLIPFilterAdaptiveProber() const;
 
  private:
-  LIPFilterActionType action_type_;
-
-  std::vector<LIPFilter *> lip_filters_;
-  std::vector<attribute_id> attr_ids_;
-  std::vector<const Type *> attr_types_;
+  struct LIPFilterDeploymentInfo {
+    std::vector<LIPFilter *> lip_filters_;
+    std::vector<attribute_id> attr_ids_;
+    std::vector<const Type *> attr_types_;
+  };
+
+  std::unique_ptr<LIPFilterDeploymentInfo> build_;
+  std::unique_ptr<LIPFilterDeploymentInfo> probe_;
 
   DISALLOW_COPY_AND_ASSIGN(LIPFilterDeployment);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d52b9126/utility/lip_filter/LIPFilterFactory.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterFactory.cpp b/utility/lip_filter/LIPFilterFactory.cpp
index ebc4a0e..a96151f 100644
--- a/utility/lip_filter/LIPFilterFactory.cpp
+++ b/utility/lip_filter/LIPFilterFactory.cpp
@@ -23,6 +23,7 @@
 #include <cstdint>
 
 #include "utility/lip_filter/LIPFilter.pb.h"
+#include "utility/lip_filter/BitVectorExactFilter.hpp"
 #include "utility/lip_filter/SingleIdentityHashFilter.hpp"
 
 #include "glog/logging.h"
@@ -31,6 +32,44 @@ namespace quickstep {
 
 LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter &proto)
{
   switch (proto.lip_filter_type()) {
+    case serialization::LIPFilterType::BIT_VECTOR_EXACT_FILTER: {
+      const std::size_t attr_size =
+          proto.GetExtension(serialization::BitVectorExactFilter::attribute_size);
+      const std::size_t filter_cardinality =
+          proto.GetExtension(serialization::BitVectorExactFilter::filter_cardinality);
+      const bool is_anti_filter =
+          proto.GetExtension(serialization::BitVectorExactFilter::is_anti_filter);
+
+      switch (attr_size) {
+        case 1:
+          if (is_anti_filter) {
+            return new BitVectorExactFilter<std::uint8_t, true>(filter_cardinality);
+          } else {
+            return new BitVectorExactFilter<std::uint8_t, false>(filter_cardinality);
+          }
+        case 2:
+          if (is_anti_filter) {
+            return new BitVectorExactFilter<std::uint16_t, true>(filter_cardinality);
+          } else {
+            return new BitVectorExactFilter<std::uint16_t, false>(filter_cardinality);
+          }
+        case 4:
+          if (is_anti_filter) {
+            return new BitVectorExactFilter<std::uint32_t, true>(filter_cardinality);
+          } else {
+            return new BitVectorExactFilter<std::uint32_t, false>(filter_cardinality);
+          }
+        case 8:
+          if (is_anti_filter) {
+            return new BitVectorExactFilter<std::uint64_t, true>(filter_cardinality);
+          } else {
+            return new BitVectorExactFilter<std::uint64_t, false>(filter_cardinality);
+          }
+        default:
+          LOG(FATAL) << "Invalid attribute size for BitVectorExactFilter: "
+                     << attr_size;
+      }
+    }
     case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: {
       const std::size_t attr_size =
           proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size);
@@ -57,6 +96,13 @@ LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter
 
 bool LIPFilterFactory::ProtoIsValid(const serialization::LIPFilter &proto) {
   switch (proto.lip_filter_type()) {
+    case serialization::LIPFilterType::BIT_VECTOR_EXACT_FILTER: {
+      const std::size_t attr_size =
+          proto.GetExtension(serialization::BitVectorExactFilter::attribute_size);
+      const std::size_t filter_cardinality =
+          proto.GetExtension(serialization::BitVectorExactFilter::filter_cardinality);
+      return (attr_size != 0 && filter_cardinality != 0);
+    }
     case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: {
       const std::size_t attr_size =
           proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size);



Mime
View raw message