quickstep-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zuyu <...@git.apache.org>
Subject [GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...
Date Mon, 06 Feb 2017 05:12:27 GMT
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99505589
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -353,187 +353,286 @@ bool AggregationOperationState::ProtoIsValid(
       return true;
     }
     
    -void AggregationOperationState::aggregateBlock(const block_id input_block,
    -                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
{
    -  if (group_by_list_.empty()) {
    -    aggregateBlockSingleState(input_block);
    -  } else {
    -    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
    +bool AggregationOperationState::checkAggregatePartitioned(
    +    const std::size_t estimated_num_groups,
    +    const std::vector<bool> &is_distinct,
    +    const std::vector<std::unique_ptr<const Scalar>> &group_by,
    +    const std::vector<const AggregateFunction *> &aggregate_functions) const
{
    +  // If there's no aggregation, return false.
    +  if (aggregate_functions.empty()) {
    +    return false;
    +  }
    +  // Check if there's a distinct operation involved in any aggregate, if so
    +  // the aggregate can't be partitioned.
    +  for (auto distinct : is_distinct) {
    +    if (distinct) {
    +      return false;
    +    }
    +  }
    +  // There's no distinct aggregation involved, Check if there's at least one
    +  // GROUP BY operation.
    +  if (group_by.empty()) {
    +    return false;
    +  }
    +
    +  // Currently we require that all the group-by keys are ScalarAttributes for
    +  // the convenient of implementing copy elision.
    +  // TODO(jianqiao): relax this requirement.
    +  for (const auto &group_by_element : group_by) {
    +    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID)
{
    +      return false;
    +    }
       }
    +
    +  // There are GROUP BYs without DISTINCT. Check if the estimated number of
    +  // groups is large enough to warrant a partitioned aggregation.
    +  return estimated_num_groups >
    +         static_cast<std::size_t>(
    +             FLAGS_partition_aggregation_num_groups_threshold);
    +  return false;
     }
     
    -void AggregationOperationState::finalizeAggregate(
    -    InsertDestination *output_destination) {
    -  if (group_by_list_.empty()) {
    -    finalizeSingleState(output_destination);
    +std::size_t AggregationOperationState::getNumInitializationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumInitializationPartitions();
       } else {
    -    finalizeHashTable(output_destination);
    +    return 0u;
       }
     }
     
    -void AggregationOperationState::mergeSingleState(
    -    const std::vector<std::unique_ptr<AggregationState>> &local_state)
{
    -  DEBUG_ASSERT(local_state.size() == single_states_.size());
    -  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    -    if (!is_distinct_[agg_idx]) {
    -      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
    -                                     single_states_[agg_idx].get());
    -    }
    +std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumFinalizationPartitions();
    +  } else if (is_aggregate_partitioned_) {
    +    return partitioned_group_by_hashtable_pool_->getNumPartitions();
    +  } else  {
    +    return 1u;
       }
     }
     
    -void AggregationOperationState::aggregateBlockSingleState(
    -    const block_id input_block) {
    -  // Aggregate per-block state for each aggregate.
    -  std::vector<std::unique_ptr<AggregationState>> local_state;
    +void AggregationOperationState::initialize(const std::size_t partition_id) {
    +  if (is_aggregate_collision_free_) {
    +    static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->initialize(partition_id);
    +  } else {
    +    LOG(FATAL) << "AggregationOperationState::initializeState() "
    +               << "is not supported by this aggregation";
    +  }
    +}
     
    +void AggregationOperationState::aggregateBlock(const block_id input_block,
    +                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober)
{
       BlockReference block(
           storage_manager_->getBlock(input_block, input_relation_));
    +  const auto &tuple_store = block->getTupleStorageSubBlock();
    +  std::unique_ptr<ValueAccessor> base_accessor(tuple_store.createValueAccessor());
    +  std::unique_ptr<ValueAccessor> shared_accessor;
    +  ValueAccessor *accessor = base_accessor.get();
     
    +  // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
    +  // as the existence map for the tuples.
       std::unique_ptr<TupleIdSequence> matches;
       if (predicate_ != nullptr) {
    -    std::unique_ptr<ValueAccessor> accessor(
    -        block->getTupleStorageSubBlock().createValueAccessor());
    -    matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get()));
    +    matches.reset(block->getMatchesForPredicate(predicate_.get()));
    +    shared_accessor.reset(
    +        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
    +    accessor = shared_accessor.get();
    +  }
    +  if (lip_filter_adaptive_prober != nullptr) {
    +    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor));
    +    shared_accessor.reset(
    +        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
    +    accessor = shared_accessor.get();
       }
     
    -  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    -    const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
    -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
    -    // If all arguments are attributes of the input relation, elide a copy.
    -    if (!arguments_as_attributes_[agg_idx].empty()) {
    -      local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]);
    +  std::unique_ptr<ColumnVectorsValueAccessor> non_trivial_results;
    +  if (!non_trivial_expressions_.empty()) {
    +    non_trivial_results.reset(new ColumnVectorsValueAccessor());
    +    SubBlocksReference sub_blocks_ref(tuple_store,
    +                                      block->getIndices(),
    +                                      block->getIndicesConsistent());
    +    for (const auto &expression : non_trivial_expressions_) {
    +      non_trivial_results->addColumn(
    +          expression->getAllValues(accessor, &sub_blocks_ref));
         }
    -#endif
    +  }
    +
    +  accessor->beginIterationVirtual();
    +
    +  ValueAccessorMultiplexer accessor_mux(accessor, non_trivial_results.get());
    +  if (group_by_key_ids_.empty()) {
    +    aggregateBlockSingleState(accessor_mux);
    +  } else {
    +    aggregateBlockHashTable(accessor_mux);
    +  }
    +}
    +
    +void AggregationOperationState::aggregateBlockSingleState(
    +    const ValueAccessorMultiplexer &accessor_mux) {
    +  // Aggregate per-block state for each aggregate.
    +  std::vector<std::unique_ptr<AggregationState>> local_state;
    +
    +  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    +    const auto &argument_ids = argument_ids_[agg_idx];
    +    const auto &handle = handles_[agg_idx];
    +
    +    AggregationState *state = nullptr;
         if (is_distinct_[agg_idx]) {
    -      // Call StorageBlock::aggregateDistinct() to put the arguments as keys
    -      // directly into the (threadsafe) shared global distinctify HashTable
    -      // for this aggregate.
    -      block->aggregateDistinct(*handles_[agg_idx],
    -                               arguments_[agg_idx],
    -                               local_arguments_as_attributes,
    -                               {}, /* group_by */
    -                               matches.get(),
    -                               distinctify_hashtables_[agg_idx].get(),
    -                               nullptr /* reuse_group_by_vectors */);
    -      local_state.emplace_back(nullptr);
    +      handle->insertValueAccessorIntoDistinctifyHashTable(
    +          argument_ids,
    +          {},
    +          accessor_mux,
    +          distinctify_hashtables_[agg_idx].get());
         } else {
    -      // Call StorageBlock::aggregate() to actually do the aggregation.
    -      local_state.emplace_back(block->aggregate(*handles_[agg_idx],
    -                                                arguments_[agg_idx],
    -                                                local_arguments_as_attributes,
    -                                                matches.get()));
    +      if (argument_ids.empty()) {
    +        // Special case. This is a nullary aggregate (i.e. COUNT(*)).
    +        ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
    +        DCHECK(base_accessor != nullptr);
    +        state = handle->accumulateNullary(base_accessor->getNumTuplesVirtual());
    +      } else {
    +        // Have the AggregationHandle actually do the aggregation.
    +        state = handle->accumulateValueAccessor(argument_ids, accessor_mux);
    +      }
         }
    +    local_state.emplace_back(state);
       }
     
       // Merge per-block aggregation states back with global state.
       mergeSingleState(local_state);
     }
     
    -void AggregationOperationState::aggregateBlockHashTable(
    -    const block_id input_block,
    -    LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
    -  BlockReference block(
    -      storage_manager_->getBlock(input_block, input_relation_));
    -
    -  // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
    -  // as the existence map for the tuples.
    -  std::unique_ptr<TupleIdSequence> matches;
    -  if (predicate_ != nullptr) {
    -    matches.reset(block->getMatchesForPredicate(predicate_.get()));
    -  }
    -  if (lip_filter_adaptive_prober != nullptr) {
    -    std::unique_ptr<ValueAccessor> accessor(
    -        block->getTupleStorageSubBlock().createValueAccessor(matches.get()));
    -    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
    -  }
    -
    -  // This holds values of all the GROUP BY attributes so that the can be reused
    -  // across multiple aggregates (i.e. we only pay the cost of evaluatin the
    -  // GROUP BY expressions once).
    -  std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
    -
    +void AggregationOperationState::mergeSingleState(
    +    const std::vector<std::unique_ptr<AggregationState>> &local_state)
{
    +  DEBUG_ASSERT(local_state.size() == single_states_.size());
    --- End diff --
    
    Use `DCHECK_EQ` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message