quickstep-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zuyu <...@git.apache.org>
Subject [GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...
Date Mon, 06 Feb 2017 05:12:48 GMT
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99510612
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -80,148 +83,145 @@ AggregationOperationState::AggregationOperationState(
         const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
         StorageManager *storage_manager)
         : input_relation_(input_relation),
    -      is_aggregate_partitioned_(checkAggregatePartitioned(
    -          estimated_num_entries, is_distinct, group_by, aggregate_functions)),
    +      is_aggregate_collision_free_(false),
    +      is_aggregate_partitioned_(false),
           predicate_(predicate),
    -      group_by_list_(std::move(group_by)),
    -      arguments_(std::move(arguments)),
           is_distinct_(std::move(is_distinct)),
           storage_manager_(storage_manager) {
    +  if (!group_by.empty()) {
    +    if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
    +      is_aggregate_collision_free_ = true;
    +    } else {
    +      is_aggregate_partitioned_ = checkAggregatePartitioned(
    +          estimated_num_entries, is_distinct_, group_by, aggregate_functions);
    +    }
    +  }
    +
       // Sanity checks: each aggregate has a corresponding list of arguments.
    -  DCHECK(aggregate_functions.size() == arguments_.size());
    +  DCHECK(aggregate_functions.size() == arguments.size());
     
       // Get the types of GROUP BY expressions for creating HashTables below.
    -  std::vector<const Type *> group_by_types;
    -  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_)
{
    -    group_by_types.emplace_back(&group_by_element->getType());
    +  for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
    +    group_by_types_.emplace_back(&group_by_element->getType());
    +  }
    +
    +  // Prepare group-by key ids and non-trivial expressions.
    +  for (std::unique_ptr<const Scalar> &group_by_element : group_by) {
    +    const attribute_id attr_id =
    +        group_by_element->getAttributeIdForValueAccessor();
    +    if (attr_id != kInvalidAttributeID) {
    +      group_by_key_ids_.emplace_back(ValueAccessorSource::kBase, attr_id);
    +    } else {
    +      group_by_key_ids_.emplace_back(ValueAccessorSource::kDerived,
    +                                     non_trivial_expressions_.size());
    +      non_trivial_expressions_.emplace_back(group_by_element.release());
    +    }
       }
     
       std::vector<AggregationHandle *> group_by_handles;
    -  group_by_handles.clear();
    -
    -  if (aggregate_functions.size() == 0) {
    -    // If there is no aggregation function, then it is a distinctify operation
    -    // on the group-by expressions.
    -    DCHECK_GT(group_by_list_.size(), 0u);
    -
    -    handles_.emplace_back(new AggregationHandleDistinct());
    -    arguments_.push_back({});
    -    is_distinct_.emplace_back(false);
    -    group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
    -                                                     hash_table_impl_type,
    -                                                     group_by_types,
    -                                                     {1},
    -                                                     handles_,
    -                                                     storage_manager));
    -  } else {
    -    // Set up each individual aggregate in this operation.
    -    std::vector<const AggregateFunction *>::const_iterator agg_func_it =
    -        aggregate_functions.begin();
    -    std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
    -        args_it = arguments_.begin();
    -    std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
    -    std::vector<HashTableImplType>::const_iterator
    -        distinctify_hash_table_impl_types_it =
    -            distinctify_hash_table_impl_types.begin();
    -    std::vector<std::size_t> payload_sizes;
    -    for (; agg_func_it != aggregate_functions.end();
    -         ++agg_func_it, ++args_it, ++is_distinct_it) {
    -      // Get the Types of this aggregate's arguments so that we can create an
    -      // AggregationHandle.
    -      std::vector<const Type *> argument_types;
    -      for (const std::unique_ptr<const Scalar> &argument : *args_it) {
    -        argument_types.emplace_back(&argument->getType());
    -      }
     
    -      // Sanity checks: aggregate function exists and can apply to the specified
    -      // arguments.
    -      DCHECK(*agg_func_it != nullptr);
    -      DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
    -
    -      // Have the AggregateFunction create an AggregationHandle that we can use
    -      // to do actual aggregate computation.
    -      handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
    -
    -      if (!group_by_list_.empty()) {
    -        // Aggregation with GROUP BY: combined payload is partially updated in
    -        // the presence of DISTINCT.
    -        if (*is_distinct_it) {
    -          handles_.back()->blockUpdate();
    -        }
    -        group_by_handles.emplace_back(handles_.back());
    -        payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
    +  // Set up each individual aggregate in this operation.
    +  std::vector<const AggregateFunction *>::const_iterator agg_func_it =
    +      aggregate_functions.begin();
    +  std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator
    +      args_it = arguments.begin();
    +  std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
    +  std::vector<HashTableImplType>::const_iterator
    +      distinctify_hash_table_impl_types_it =
    +          distinctify_hash_table_impl_types.begin();
    +  for (; agg_func_it != aggregate_functions.end();
    +       ++agg_func_it, ++args_it, ++is_distinct_it) {
    +    // Get the Types of this aggregate's arguments so that we can create an
    +    // AggregationHandle.
    +    std::vector<const Type *> argument_types;
    +    for (const std::unique_ptr<const Scalar> &argument : *args_it) {
    +      argument_types.emplace_back(&argument->getType());
    +    }
    +
    +    // Prepare argument attribute ids and non-trivial expressions.
    +    std::vector<MultiSourceAttributeId> argument_ids;
    +    for (std::unique_ptr<const Scalar> &argument : *args_it) {
    +      const attribute_id attr_id =
    +          argument->getAttributeIdForValueAccessor();
    +      if (attr_id != kInvalidAttributeID) {
    +        argument_ids.emplace_back(ValueAccessorSource::kBase, attr_id);
           } else {
    -        // Aggregation without GROUP BY: create a single global state.
    -        single_states_.emplace_back(handles_.back()->createInitialState());
    -
    -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
    -        // See if all of this aggregate's arguments are attributes in the input
    -        // relation. If so, remember the attribute IDs so that we can do copy
    -        // elision when actually performing the aggregation.
    -        std::vector<attribute_id> local_arguments_as_attributes;
    -        local_arguments_as_attributes.reserve(args_it->size());
    -        for (const std::unique_ptr<const Scalar> &argument : *args_it) {
    -          const attribute_id argument_id =
    -              argument->getAttributeIdForValueAccessor();
    -          if (argument_id == -1) {
    -            local_arguments_as_attributes.clear();
    -            break;
    -          } else {
    -            DCHECK_EQ(input_relation_.getID(),
    -                      argument->getRelationIdForValueAccessor());
    -            local_arguments_as_attributes.push_back(argument_id);
    -          }
    -        }
    -
    -        arguments_as_attributes_.emplace_back(
    -            std::move(local_arguments_as_attributes));
    -#endif
    +        argument_ids.emplace_back(ValueAccessorSource::kDerived,
    +                                  non_trivial_expressions_.size());
    +        non_trivial_expressions_.emplace_back(argument.release());
           }
    +    }
    +    argument_ids_.emplace_back(std::move(argument_ids));
    +
    +    // Sanity checks: aggregate function exists and can apply to the specified
    +    // arguments.
    +    DCHECK(*agg_func_it != nullptr);
    +    DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
     
    -      // Initialize the corresponding distinctify hash table if this is a
    -      // DISTINCT aggregation.
    +    // Have the AggregateFunction create an AggregationHandle that we can use
    +    // to do actual aggregate computation.
    +    handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
    +
    +    if (!group_by_key_ids_.empty()) {
    +      // Aggregation with GROUP BY: combined payload is partially updated in
    +      // the presence of DISTINCT.
           if (*is_distinct_it) {
    -        std::vector<const Type *> key_types(group_by_types);
    -        key_types.insert(
    -            key_types.end(), argument_types.begin(), argument_types.end());
    -        // TODO(jianqiao): estimated_num_entries is quite inaccurate for
    -        // estimating the number of entries in the distinctify hash table.
    -        // We may estimate for each distinct aggregation an
    -        // estimated_num_distinct_keys value during query optimization, if it's
    -        // worth.
    -        distinctify_hashtables_.emplace_back(
    -            AggregationStateFastHashTableFactory::CreateResizable(
    -                *distinctify_hash_table_impl_types_it,
    -                key_types,
    -                estimated_num_entries,
    -                {0},
    -                {},
    -                storage_manager));
    -        ++distinctify_hash_table_impl_types_it;
    -      } else {
    -        distinctify_hashtables_.emplace_back(nullptr);
    +        handles_.back()->blockUpdate();
           }
    +      group_by_handles.emplace_back(handles_.back().get());
    +    } else {
    +      // Aggregation without GROUP BY: create a single global state.
    +      single_states_.emplace_back(handles_.back()->createInitialState());
         }
     
    -    if (!group_by_handles.empty()) {
    -      // Aggregation with GROUP BY: create a HashTable pool.
    -      if (!is_aggregate_partitioned_) {
    -        group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
    -                                                         hash_table_impl_type,
    -                                                         group_by_types,
    -                                                         payload_sizes,
    -                                                         group_by_handles,
    -                                                         storage_manager));
    -      } else {
    -        partitioned_group_by_hashtable_pool_.reset(
    -            new PartitionedHashTablePool(estimated_num_entries,
    -                                         FLAGS_num_aggregation_partitions,
    -                                         hash_table_impl_type,
    -                                         group_by_types,
    -                                         payload_sizes,
    -                                         group_by_handles,
    -                                         storage_manager));
    -      }
    +    // Initialize the corresponding distinctify hash table if this is a
    +    // DISTINCT aggregation.
    +    if (*is_distinct_it) {
    +      std::vector<const Type *> key_types(group_by_types_);
    +      key_types.insert(
    +          key_types.end(), argument_types.begin(), argument_types.end());
    +      // TODO(jianqiao): estimated_num_entries is quite inaccurate for
    +      // estimating the number of entries in the distinctify hash table.
    +      // We need to estimate for each distinct aggregation an
    +      // estimated_num_distinct_keys value during query optimization.
    +      distinctify_hashtables_.emplace_back(
    +          AggregationStateHashTableFactory::CreateResizable(
    +              *distinctify_hash_table_impl_types_it,
    +              key_types,
    +              estimated_num_entries,
    +              {},
    +              storage_manager));
    +      ++distinctify_hash_table_impl_types_it;
    +    } else {
    +      distinctify_hashtables_.emplace_back(nullptr);
    +    }
    +  }
    +
    +  if (!group_by_key_ids_.empty()) {
    +    // Aggregation with GROUP BY: create the hash table (pool).
    +    if (is_aggregate_collision_free_) {
    +      collision_free_hashtable_.reset(
    +          AggregationStateHashTableFactory::CreateResizable(
    +              hash_table_impl_type,
    +              group_by_types_,
    +              estimated_num_entries,
    +              group_by_handles,
    +              storage_manager));
    +    } else if (is_aggregate_partitioned_) {
    +      partitioned_group_by_hashtable_pool_.reset(
    +          new PartitionedHashTablePool(estimated_num_entries,
    +                                       FLAGS_num_aggregation_partitions,
    +                                       hash_table_impl_type,
    +                                       group_by_types_,
    +                                       group_by_handles,
    +                                       storage_manager));
    +    } else {
    +      group_by_hashtable_pool_.reset(
    --- End diff --
    
    I think `group_by_hashtable_pool_` could be the special case of `partitioned_group_by_hashtable_pool_`
where the number of partition is one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message