quickstep-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jianq...@apache.org
Subject [10/10] incubator-quickstep git commit: Updates
Date Sat, 04 Feb 2017 03:28:04 GMT
Updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3bcb5c89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3bcb5c89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3bcb5c89

Branch: refs/heads/collision-free-agg
Commit: 3bcb5c89229329cbab07297b2130c08d69647fb1
Parents: 61f6903
Author: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Authored: Fri Feb 3 21:27:51 2017 -0600
Committer: Jianqiao Zhu <jianqiao@cs.wisc.edu>
Committed: Fri Feb 3 21:27:51 2017 -0600

----------------------------------------------------------------------
 .../aggregation/AggregationConcreteHandle.cpp   |  17 ++--
 .../aggregation/AggregationConcreteHandle.hpp   |  66 ++++++------
 expressions/aggregation/AggregationHandle.hpp   |  21 ++--
 .../aggregation/AggregationHandleAvg.cpp        |  49 +++++----
 .../aggregation/AggregationHandleAvg.hpp        |  17 ++--
 .../aggregation/AggregationHandleCount.cpp      |  49 ++++-----
 .../aggregation/AggregationHandleCount.hpp      |  17 ++--
 .../aggregation/AggregationHandleMax.cpp        |  45 ++++----
 .../aggregation/AggregationHandleMax.hpp        |  17 ++--
 .../aggregation/AggregationHandleMin.cpp        |  45 ++++----
 .../aggregation/AggregationHandleMin.hpp        |  17 ++--
 .../aggregation/AggregationHandleSum.cpp        |  49 +++++----
 .../aggregation/AggregationHandleSum.hpp        |  17 ++--
 expressions/aggregation/CMakeLists.txt          |  14 ++-
 relational_operators/CMakeLists.txt             |   1 +
 storage/AggregationOperationState.cpp           | 102 ++++++++++---------
 storage/AggregationOperationState.hpp           |  23 ++---
 storage/CMakeLists.txt                          |  11 +-
 .../CollisionFreeAggregationStateHashTable.cpp  |  83 ++++++++-------
 .../CollisionFreeAggregationStateHashTable.hpp  |   8 +-
 storage/HashTableBase.hpp                       |   8 +-
 .../PackedPayloadAggregationStateHashTable.cpp  |  23 +++--
 .../PackedPayloadAggregationStateHashTable.hpp  |  94 ++++++++---------
 storage/ValueAccessorMultiplexer.hpp            |  94 +++++++++++++++++
 24 files changed, 491 insertions(+), 396 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index d28aa6e..5fd7e0f 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -23,9 +23,9 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
 #include "storage/HashTableFactory.hpp"
 #include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 
 namespace quickstep {
 
@@ -48,16 +48,17 @@ AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashT
 }
 
 void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
-    const std::vector<attribute_id> &argument_ids,
-    const std::vector<attribute_id> &key_attr_ids,
+    const std::vector<MultiSourceAttributeId> &argument_ids,
+    const std::vector<MultiSourceAttributeId> &key_ids,
     AggregationStateHashTableBase *distinctify_hash_table,
-    ValueAccessor *accessor,
-    ColumnVectorsValueAccessor *aux_accessor) const {
-  std::vector<attribute_id> combined_ids(key_attr_ids);
-  combined_ids.insert(combined_ids.end(), argument_ids.begin(), argument_ids.end());
+    ValueAccessorMultiplexer *accessor_mux) const {
+  std::vector<MultiSourceAttributeId> concatenated_ids(key_ids);
+  for (const MultiSourceAttributeId &arg_id : argument_ids) {
+    concatenated_ids.emplace_back(arg_id);
+  }
 
   static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(distinctify_hash_table)
-      ->upsertValueAccessor({}, combined_ids, accessor, aux_accessor);
+      ->upsertValueAccessor({}, concatenated_ids, accessor_mux);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index 82aba66..5b49d0d 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -30,6 +30,7 @@
 #include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
 #include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
@@ -110,11 +111,10 @@ class AggregationConcreteHandle : public AggregationHandle {
       StorageManager *storage_manager) const override;
 
   void insertValueAccessorIntoDistinctifyHashTable(
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &key_attr_ids,
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_ids,
       AggregationStateHashTableBase *distinctify_hash_table,
-      ValueAccessor *accessor,
-      ColumnVectorsValueAccessor *aux_accessor = nullptr) const override;
+      ValueAccessorMultiplexer *accessor_mux) const override;
 
   void blockUpdate() override {
     block_update_ = true;
@@ -132,30 +132,30 @@ class AggregationConcreteHandle : public AggregationHandle {
   StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelper(
       const AggregationStateHashTableBase &distinctify_hash_table) const;
 
-  template <typename HandleT, typename HashTableT>
+  template <typename HandleT>
   void aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *hash_table,
-      std::size_t index) const;
+      const std::size_t index,
+      AggregationStateHashTableBase *hash_table) const;
 
-  template <typename HandleT, typename HashTableT>
+  template <typename HandleT>
   ColumnVector* finalizeHashTableHelper(
       const Type &result_type,
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const;
 
-  template <typename HandleT, typename HashTableT>
+  template <typename HandleT>
   inline TypedValue finalizeGroupInHashTable(
       const AggregationStateHashTableBase &hash_table,
-      const std::vector<TypedValue> &group_key,
-      int index) const {
+      const std::size_t index,
+      const std::vector<TypedValue> &group_key) const {
     const std::uint8_t *group_state =
-        static_cast<const HashTableT &>(hash_table).getSingleCompositeKey(group_key, index);
+        static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(hash_table)
+            .getSingleCompositeKey(group_key, index);
     DCHECK(group_state != nullptr)
         << "Could not find entry for specified group_key in HashTable";
-    return static_cast<const HandleT *>(this)->finalizeHashTableEntry(
-        group_state);
+    return static_cast<const HandleT *>(this)->finalizeHashTableEntry(group_state);
   }
 
   bool block_update_;
@@ -226,15 +226,16 @@ StateT* AggregationConcreteHandle::
   return state;
 }
 
-template <typename HandleT, typename HashTableT>
+template <typename HandleT>
 void AggregationConcreteHandle::
     aggregateOnDistinctifyHashTableForGroupByUnaryHelper(
         const AggregationStateHashTableBase &distinctify_hash_table,
-        AggregationStateHashTableBase *aggregation_hash_table,
-        std::size_t index) const {
+        const std::size_t index,
+        AggregationStateHashTableBase *aggregation_hash_table) const {
   const HandleT &handle = static_cast<const HandleT &>(*this);
-  HashTableT *target_hash_table =
-      static_cast<HashTableT *>(aggregation_hash_table);
+  PackedPayloadSeparateChainingAggregationStateHashTable *target_hash_table =
+      static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
+          aggregation_hash_table);
 
   // A lambda function which will be called on each key-value pair from the
   // distinctify hash table.
@@ -255,22 +256,23 @@ void AggregationConcreteHandle::
     target_hash_table->upsertCompositeKey(key, &upserter, index);
   };
 
-  const HashTableT &source_hash_table =
-      static_cast<const HashTableT &>(distinctify_hash_table);
+  const PackedPayloadSeparateChainingAggregationStateHashTable &source_hash_table =
+      static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(
+          distinctify_hash_table);
   // Invoke the lambda function "aggregate_functor" on each composite key vector
   // from the distinctify hash table.
   source_hash_table.forEachCompositeKey(&aggregate_functor);
 }
 
-template <typename HandleT, typename HashTableT>
+template <typename HandleT>
 ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
     const Type &result_type,
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
+    const std::size_t index,
+    std::vector<std::vector<TypedValue>> *group_by_keys) const {
   const HandleT &handle = static_cast<const HandleT &>(*this);
-  const HashTableT &hash_table_concrete =
-      static_cast<const HashTableT &>(hash_table);
+  const PackedPayloadSeparateChainingAggregationStateHashTable &hash_table_concrete =
+      static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(hash_table);
 
   if (group_by_keys->empty()) {
     if (NativeColumnVector::UsableForType(result_type)) {
@@ -294,8 +296,8 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
           new NativeColumnVector(result_type, group_by_keys->size());
       for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
         result->appendTypedValue(
-            finalizeGroupInHashTable<HandleT, HashTableT>(
-                hash_table, group_by_key, index));
+            finalizeGroupInHashTable<HandleT>(
+                hash_table, index, group_by_key));
       }
       return result;
     } else {
@@ -303,8 +305,8 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
           result_type, hash_table_concrete.numEntries());
       for (const std::vector<TypedValue> &group_by_key : *group_by_keys) {
         result->appendTypedValue(
-            finalizeGroupInHashTable<HandleT, HashTableT>(
-                hash_table, group_by_key, index));
+            finalizeGroupInHashTable<HandleT>(
+                hash_table, index, group_by_key));
       }
       return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index bd8cbaa..3ee4477 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -27,6 +27,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
 
@@ -162,9 +163,8 @@ class AggregationHandle {
    *         for deleting the returned AggregationState.
    **/
   virtual AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      ColumnVectorsValueAccessor *aux_accessor,
-      const std::vector<attribute_id> &argument_ids) const = 0;
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      ValueAccessorMultiplexer *accessor_mux) const = 0;
 
   /**
    * @brief Merge two AggregationStates, updating one in-place. This computes a
@@ -220,8 +220,8 @@ class AggregationHandle {
    **/
   virtual ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const = 0;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const = 0;
 
   /**
    * @brief Create a new HashTable for the distinctify step for DISTINCT
@@ -271,11 +271,10 @@ class AggregationHandle {
    *        by calling createDistinctifyHashTable();
    */
   virtual void insertValueAccessorIntoDistinctifyHashTable(
-      const std::vector<attribute_id> &argument_ids,
-      const std::vector<attribute_id> &key_attr_ids,
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_ids,
       AggregationStateHashTableBase *distinctify_hash_table,
-      ValueAccessor *accessor,
-      ColumnVectorsValueAccessor *aux_accessor = nullptr) const = 0;
+      ValueAccessorMultiplexer *accessor_mux) const = 0;
 
   /**
    * @brief Perform single (i.e. without GROUP BY) aggregation on the keys from
@@ -306,8 +305,8 @@ class AggregationHandle {
    */
   virtual void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const = 0;
+      const std::size_t index,
+      AggregationStateHashTableBase *aggregation_hash_table) const = 0;
 
   /**
    * @brief Get the number of bytes needed to store the aggregation handle's

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/AggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp
index e9cc836..f9ad688 100644
--- a/expressions/aggregation/AggregationHandleAvg.cpp
+++ b/expressions/aggregation/AggregationHandleAvg.cpp
@@ -25,7 +25,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypeFactory.hpp"
@@ -89,24 +89,25 @@ AggregationHandleAvg::AggregationHandleAvg(const Type &type)
 }
 
 AggregationState* AggregationHandleAvg::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    ColumnVectorsValueAccessor *aux_accessor,
-    const std::vector<attribute_id> &argument_ids) const {
+    const std::vector<MultiSourceAttributeId> &argument_ids,
+    ValueAccessorMultiplexer *accessor_mux) const {
   DCHECK_EQ(1u, argument_ids.size())
       << "Got wrong number of attributes for AVG: " << argument_ids.size();
 
-  const attribute_id argument_id = argument_ids.front();
-  DCHECK_NE(argument_id, kInvalidAttributeID);
+  const ValueAccessorSource argument_source = argument_ids.front().source;
+  const attribute_id argument_id = argument_ids.front().attr_id;
 
-  ValueAccessor *target_accessor =
-      argument_id >= 0 ? accessor : aux_accessor;
-  const attribute_id target_argument_id =
-      argument_id >= 0 ? argument_id : -(argument_id+2);
+  DCHECK(argument_source != ValueAccessorSource::kInvalid);
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
   AggregationStateAvg *state = new AggregationStateAvg(blank_state_);
   std::size_t count = 0;
-  state->sum_ = fast_add_operator_->accumulateValueAccessor(
-      state->sum_, target_accessor, target_argument_id, &count);
+  state->sum_ =
+      fast_add_operator_->accumulateValueAccessor(
+          state->sum_,
+          accessor_mux->getValueAccessorBySource(argument_source),
+          argument_id,
+          &count);
   state->count_ = count;
   return state;
 }
@@ -154,29 +155,25 @@ TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const {
 
 ColumnVector* AggregationHandleAvg::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelper<
-      AggregationHandleAvg,
-      PackedPayloadSeparateChainingAggregationStateHashTable>(
-          *result_type_, hash_table, group_by_keys, index);
+    const std::size_t index,
+    std::vector<std::vector<TypedValue>> *group_by_keys) const {
+  return finalizeHashTableHelper<AggregationHandleAvg>(
+      *result_type_, hash_table, index, group_by_keys);
 }
 
 AggregationState* AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
   return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
-      AggregationHandleAvg,
-      AggregationStateAvg>(distinctify_hash_table);
+      AggregationHandleAvg, AggregationStateAvg>(
+          distinctify_hash_table);
 }
 
 void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
-      AggregationHandleAvg,
-      PackedPayloadSeparateChainingAggregationStateHashTable>(
-          distinctify_hash_table, aggregation_hash_table, index);
+    const std::size_t index,
+    AggregationStateHashTableBase *aggregation_hash_table) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<AggregationHandleAvg>(
+      distinctify_hash_table, index, aggregation_hash_table);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/AggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp
index fb619f9..90f4aba 100644
--- a/expressions/aggregation/AggregationHandleAvg.hpp
+++ b/expressions/aggregation/AggregationHandleAvg.hpp
@@ -29,6 +29,7 @@
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -143,9 +144,8 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
   }
 
   AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      ColumnVectorsValueAccessor *aux_accessor,
-      const std::vector<attribute_id> &argument_ids) const override;
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      ValueAccessorMultiplexer *accessor_mux) const override;
 
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
@@ -201,17 +201,16 @@ class AggregationHandleAvg : public AggregationConcreteHandle {
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
 
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
+      const AggregationStateHashTableBase &distinctify_hash_table) const override;
 
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
+      const std::size_t index,
+      AggregationStateHashTableBase *aggregation_hash_table) const override;
 
  private:
   friend class AggregateFunctionAvg;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/AggregationHandleCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp
index 93321f7..bf2188a 100644
--- a/expressions/aggregation/AggregationHandleCount.cpp
+++ b/expressions/aggregation/AggregationHandleCount.cpp
@@ -25,7 +25,6 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "types/TypeFactory.hpp"
@@ -45,9 +44,8 @@ class ValueAccessor;
 template <bool count_star, bool nullable_type>
 AggregationState* AggregationHandleCount<count_star, nullable_type>
     ::accumulateValueAccessor(
-        ValueAccessor *accessor,
-        ColumnVectorsValueAccessor *aux_accessor,
-        const std::vector<attribute_id> &argument_ids) const {
+        const std::vector<MultiSourceAttributeId> &argument_ids,
+        ValueAccessorMultiplexer *accessor_mux) const {
   DCHECK(!count_star)
       << "Called non-nullary accumulation method on an AggregationHandleCount "
       << "set up for nullary COUNT(*)";
@@ -55,21 +53,19 @@ AggregationState* AggregationHandleCount<count_star, nullable_type>
   DCHECK_EQ(1u, argument_ids.size())
       << "Got wrong number of attributes for COUNT: " << argument_ids.size();
 
-  const attribute_id argument_id = argument_ids.front();
-  DCHECK_NE(argument_id, kInvalidAttributeID);
+  const ValueAccessorSource argument_source = argument_ids.front().source;
+  const attribute_id argument_id = argument_ids.front().attr_id;
 
-  ValueAccessor *target_accessor =
-      argument_id >= 0 ? accessor : aux_accessor;
-  const attribute_id target_argument_id =
-      argument_id >= 0 ? argument_id : -(argument_id+2);
+  DCHECK(argument_source != ValueAccessorSource::kInvalid);
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
   std::size_t count = 0;
   InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
-      target_accessor,
-      [&target_argument_id, &count](auto *accessor) -> void {  // NOLINT(build/c++11)
+      accessor_mux->getValueAccessorBySource(argument_source),
+      [&argument_id, &count](auto *accessor) -> void {  // NOLINT(build/c++11)
         if (nullable_type) {
           while (accessor->next()) {
-            count += !accessor->getTypedValue(target_argument_id).isNull();
+            count += !accessor->getTypedValue(argument_id).isNull();
           }
         } else {
           count = accessor->getNumTuples();
@@ -102,15 +98,14 @@ void AggregationHandleCount<count_star, nullable_type>::mergeStates(
 }
 
 template <bool count_star, bool nullable_type>
-ColumnVector*
-AggregationHandleCount<count_star, nullable_type>::finalizeHashTable(
-    const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
+ColumnVector* AggregationHandleCount<count_star, nullable_type>
+    ::finalizeHashTable(
+        const AggregationStateHashTableBase &hash_table,
+        const std::size_t index,
+        std::vector<std::vector<TypedValue>> *group_by_keys) const {
   return finalizeHashTableHelper<
-      AggregationHandleCount<count_star, nullable_type>,
-      PackedPayloadSeparateChainingAggregationStateHashTable>(
-          TypeFactory::GetType(kLong), hash_table, group_by_keys, index);
+      AggregationHandleCount<count_star, nullable_type>>(
+          TypeFactory::GetType(kLong), hash_table, index, group_by_keys);
 }
 
 template <bool count_star, bool nullable_type>
@@ -119,19 +114,19 @@ AggregationState* AggregationHandleCount<count_star, nullable_type>
         const AggregationStateHashTableBase &distinctify_hash_table) const {
   return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
       AggregationHandleCount<count_star, nullable_type>,
-      AggregationStateCount>(distinctify_hash_table);
+      AggregationStateCount>(
+          distinctify_hash_table);
 }
 
 template <bool count_star, bool nullable_type>
 void AggregationHandleCount<count_star, nullable_type>
     ::aggregateOnDistinctifyHashTableForGroupBy(
         const AggregationStateHashTableBase &distinctify_hash_table,
-        AggregationStateHashTableBase *aggregation_hash_table,
-        std::size_t index) const {
+        const std::size_t index,
+        AggregationStateHashTableBase *aggregation_hash_table) const {
   aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
-      AggregationHandleCount<count_star, nullable_type>,
-      PackedPayloadSeparateChainingAggregationStateHashTable>(
-          distinctify_hash_table, aggregation_hash_table, index);
+      AggregationHandleCount<count_star, nullable_type>>(
+          distinctify_hash_table, index, aggregation_hash_table);
 }
 
 // Explicitly instantiate and compile in the different versions of

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/AggregationHandleCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp
index 70c9c8c..429dd20 100644
--- a/expressions/aggregation/AggregationHandleCount.hpp
+++ b/expressions/aggregation/AggregationHandleCount.hpp
@@ -31,6 +31,7 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "types/LongType.hpp"
 #include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
@@ -142,9 +143,8 @@ class AggregationHandleCount : public AggregationConcreteHandle {
   }
 
   AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      ColumnVectorsValueAccessor *aux_accessor,
-      const std::vector<attribute_id> &argument_ids) const override;
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      ValueAccessorMultiplexer *accessor_mux) const override;
 
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
@@ -184,17 +184,16 @@ class AggregationHandleCount : public AggregationConcreteHandle {
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
 
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
+      const AggregationStateHashTableBase &distinctify_hash_table) const override;
 
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
+      const std::size_t index,
+      AggregationStateHashTableBase *aggregation_hash_table) const override;
 
  private:
   friend class AggregateFunctionCount;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/AggregationHandleMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp
index 35aa1a9..791f70f 100644
--- a/expressions/aggregation/AggregationHandleMax.cpp
+++ b/expressions/aggregation/AggregationHandleMax.cpp
@@ -24,7 +24,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
@@ -47,24 +47,21 @@ AggregationHandleMax::AggregationHandleMax(const Type &type)
 }
 
 AggregationState* AggregationHandleMax::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    ColumnVectorsValueAccessor *aux_accessor,
-    const std::vector<attribute_id> &argument_ids) const {
+    const std::vector<MultiSourceAttributeId> &argument_ids,
+    ValueAccessorMultiplexer *accessor_mux) const {
   DCHECK_EQ(1u, argument_ids.size())
       << "Got wrong number of attributes for MAX: " << argument_ids.size();
 
-  const attribute_id argument_id = argument_ids.front();
-  DCHECK_NE(argument_id, kInvalidAttributeID);
+  const ValueAccessorSource argument_source = argument_ids.front().source;
+  const attribute_id argument_id = argument_ids.front().attr_id;
 
-  ValueAccessor *target_accessor =
-      argument_id >= 0 ? accessor : aux_accessor;
-  const attribute_id target_argument_id =
-      argument_id >= 0 ? argument_id : -(argument_id+2);
+  DCHECK(argument_source != ValueAccessorSource::kInvalid);
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
   return new AggregationStateMax(fast_comparator_->accumulateValueAccessor(
       type_.getNullableVersion().makeNullValue(),
-      target_accessor,
-      target_argument_id));
+      accessor_mux->getValueAccessorBySource(argument_source),
+      argument_id));
 }
 
 void AggregationHandleMax::mergeStates(const AggregationState &source,
@@ -90,29 +87,25 @@ void AggregationHandleMax::mergeStates(const std::uint8_t *source,
 
 ColumnVector* AggregationHandleMax::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelper<
-      AggregationHandleMax,
-      PackedPayloadSeparateChainingAggregationStateHashTable>(
-          type_.getNullableVersion(), hash_table, group_by_keys, index);
+    const std::size_t index,
+    std::vector<std::vector<TypedValue>> *group_by_keys) const {
+  return finalizeHashTableHelper<AggregationHandleMax>(
+      type_, hash_table, index, group_by_keys);
 }
 
 AggregationState* AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
   return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
-      AggregationHandleMax,
-      AggregationStateMax>(distinctify_hash_table);
+      AggregationHandleMax, AggregationStateMax>(
+          distinctify_hash_table);
 }
 
 void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
-      AggregationHandleMax,
-      PackedPayloadSeparateChainingAggregationStateHashTable>(
-          distinctify_hash_table, aggregation_hash_table, index);
+    const std::size_t index,
+    AggregationStateHashTableBase *aggregation_hash_table) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<AggregationHandleMax>(
+      distinctify_hash_table, index, aggregation_hash_table);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/AggregationHandleMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp
index 4f6f8b9..f2e4d33 100644
--- a/expressions/aggregation/AggregationHandleMax.hpp
+++ b/expressions/aggregation/AggregationHandleMax.hpp
@@ -29,6 +29,7 @@
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -114,9 +115,8 @@ class AggregationHandleMax : public AggregationConcreteHandle {
   }
 
   AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      ColumnVectorsValueAccessor *aux_accessor,
-      const std::vector<attribute_id> &argument_ids) const override;
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      ValueAccessorMultiplexer *accessor_mux) const override;
 
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
@@ -160,17 +160,16 @@ class AggregationHandleMax : public AggregationConcreteHandle {
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
 
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
+      const AggregationStateHashTableBase &distinctify_hash_table) const override;
 
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
+      const std::size_t index,
+      AggregationStateHashTableBase *aggregation_hash_table) const override;
 
  private:
   friend class AggregateFunctionMax;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/AggregationHandleMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp
index d542c5f..971c49e 100644
--- a/expressions/aggregation/AggregationHandleMin.cpp
+++ b/expressions/aggregation/AggregationHandleMin.cpp
@@ -24,7 +24,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
@@ -47,24 +47,21 @@ AggregationHandleMin::AggregationHandleMin(const Type &type)
 }
 
 AggregationState* AggregationHandleMin::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    ColumnVectorsValueAccessor *aux_accessor,
-    const std::vector<attribute_id> &argument_ids) const {
+    const std::vector<MultiSourceAttributeId> &argument_ids,
+    ValueAccessorMultiplexer *accessor_mux) const {
   DCHECK_EQ(1u, argument_ids.size())
       << "Got wrong number of attributes for MIN: " << argument_ids.size();
 
-  const attribute_id argument_id = argument_ids.front();
-  DCHECK_NE(argument_id, kInvalidAttributeID);
+  const ValueAccessorSource argument_source = argument_ids.front().source;
+  const attribute_id argument_id = argument_ids.front().attr_id;
 
-  ValueAccessor *target_accessor =
-      argument_id >= 0 ? accessor : aux_accessor;
-  const attribute_id target_argument_id =
-      argument_id >= 0 ? argument_id : -(argument_id+2);
+  DCHECK(argument_source != ValueAccessorSource::kInvalid);
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
   return new AggregationStateMin(fast_comparator_->accumulateValueAccessor(
       type_.getNullableVersion().makeNullValue(),
-      target_accessor,
-      target_argument_id));
+      accessor_mux->getValueAccessorBySource(argument_source),
+      argument_id));
 }
 
 void AggregationHandleMin::mergeStates(const AggregationState &source,
@@ -91,29 +88,25 @@ void AggregationHandleMin::mergeStates(const std::uint8_t *source,
 
 ColumnVector* AggregationHandleMin::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelper<
-      AggregationHandleMin,
-      PackedPayloadSeparateChainingAggregationStateHashTable>(
-          type_.getNonNullableVersion(), hash_table, group_by_keys, index);
+    const std::size_t index,
+    std::vector<std::vector<TypedValue>> *group_by_keys) const {
+  return finalizeHashTableHelper<AggregationHandleMin>(
+      type_, hash_table, index, group_by_keys);
 }
 
 AggregationState* AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
   return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
-      AggregationHandleMin,
-      AggregationStateMin>(distinctify_hash_table);
+      AggregationHandleMin, AggregationStateMin>(
+          distinctify_hash_table);
 }
 
 void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
-      AggregationHandleMin,
-      PackedPayloadSeparateChainingAggregationStateHashTable>(
-          distinctify_hash_table, aggregation_hash_table, index);
+    const std::size_t index,
+    AggregationStateHashTableBase *aggregation_hash_table) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<AggregationHandleMin>(
+      distinctify_hash_table, index, aggregation_hash_table);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/AggregationHandleMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp
index 6eaa8f6..ff808ea 100644
--- a/expressions/aggregation/AggregationHandleMin.hpp
+++ b/expressions/aggregation/AggregationHandleMin.hpp
@@ -29,6 +29,7 @@
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -113,9 +114,8 @@ class AggregationHandleMin : public AggregationConcreteHandle {
   }
 
   AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      ColumnVectorsValueAccessor *aux_accessor,
-      const std::vector<attribute_id> &argument_ids) const override;
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      ValueAccessorMultiplexer *accessor_mux) const override;
 
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
@@ -158,17 +158,16 @@ class AggregationHandleMin : public AggregationConcreteHandle {
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
 
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
+      const AggregationStateHashTableBase &distinctify_hash_table) const override;
 
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
+      const std::size_t index,
+      AggregationStateHashTableBase *aggregation_hash_table) const override;
 
  private:
   friend class AggregateFunctionMin;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 7caace0..3a55545 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -26,7 +26,7 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypeFactory.hpp"
@@ -81,23 +81,24 @@ AggregationHandleSum::AggregationHandleSum(const Type &type)
 }
 
 AggregationState* AggregationHandleSum::accumulateValueAccessor(
-    ValueAccessor *accessor,
-    ColumnVectorsValueAccessor *aux_accessor,
-    const std::vector<attribute_id> &argument_ids) const {
+    const std::vector<MultiSourceAttributeId> &argument_ids,
+    ValueAccessorMultiplexer *accessor_mux) const {
   DCHECK_EQ(1u, argument_ids.size())
       << "Got wrong number of attributes for SUM: " << argument_ids.size();
 
-  const attribute_id argument_id = argument_ids.front();
-  DCHECK_NE(argument_id, kInvalidAttributeID);
+  const ValueAccessorSource argument_source = argument_ids.front().source;
+  const attribute_id argument_id = argument_ids.front().attr_id;
 
-  ValueAccessor *target_accessor =
-      argument_id >= 0 ? accessor : aux_accessor;
-  const attribute_id target_argument_id =
-      argument_id >= 0 ? argument_id : -(argument_id+2);
+  DCHECK(argument_source != ValueAccessorSource::kInvalid);
+  DCHECK_NE(argument_id, kInvalidAttributeID);
 
   std::size_t num_tuples = 0;
-  TypedValue va_sum = fast_operator_->accumulateValueAccessor(
-        blank_state_.sum_, target_accessor, target_argument_id, &num_tuples);
+  TypedValue va_sum =
+      fast_operator_->accumulateValueAccessor(
+          blank_state_.sum_,
+          accessor_mux->getValueAccessorBySource(argument_source),
+          argument_id,
+          &num_tuples);
   return new AggregationStateSum(std::move(va_sum), num_tuples == 0);
 }
 
@@ -142,29 +143,25 @@ TypedValue AggregationHandleSum::finalize(const AggregationState &state) const {
 
 ColumnVector* AggregationHandleSum::finalizeHashTable(
     const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  return finalizeHashTableHelper<
-      AggregationHandleSum,
-      PackedPayloadSeparateChainingAggregationStateHashTable>(
-          *result_type_, hash_table, group_by_keys, index);
+    const std::size_t index,
+    std::vector<std::vector<TypedValue>> *group_by_keys) const {
+  return finalizeHashTableHelper<AggregationHandleSum>(
+      *result_type_, hash_table, index, group_by_keys);
 }
 
 AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
     const AggregationStateHashTableBase &distinctify_hash_table) const {
   return aggregateOnDistinctifyHashTableForSingleUnaryHelper<
-      AggregationHandleSum,
-      AggregationStateSum>(distinctify_hash_table);
+      AggregationHandleSum, AggregationStateSum>(
+          distinctify_hash_table);
 }
 
 void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
     const AggregationStateHashTableBase &distinctify_hash_table,
-    AggregationStateHashTableBase *aggregation_hash_table,
-    std::size_t index) const {
-  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<
-      AggregationHandleSum,
-      PackedPayloadSeparateChainingAggregationStateHashTable>(
-          distinctify_hash_table, aggregation_hash_table, index);
+    const std::size_t index,
+    AggregationStateHashTableBase *aggregation_hash_table) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelper<AggregationHandleSum>(
+      distinctify_hash_table, index, aggregation_hash_table);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index df6532d..5b04d0c 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -29,6 +29,7 @@
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
@@ -136,9 +137,8 @@ class AggregationHandleSum : public AggregationConcreteHandle {
   }
 
   AggregationState* accumulateValueAccessor(
-      ValueAccessor *accessor,
-      ColumnVectorsValueAccessor *aux_accessor,
-      const std::vector<attribute_id> &argument_ids) const override;
+      const std::vector<MultiSourceAttributeId> &argument_ids,
+      ValueAccessorMultiplexer *accessor_mux) const override;
 
   void mergeStates(const AggregationState &source,
                    AggregationState *destination) const override;
@@ -187,17 +187,16 @@ class AggregationHandleSum : public AggregationConcreteHandle {
 
   ColumnVector* finalizeHashTable(
       const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
+      const std::size_t index,
+      std::vector<std::vector<TypedValue>> *group_by_keys) const override;
 
   AggregationState* aggregateOnDistinctifyHashTableForSingle(
-      const AggregationStateHashTableBase &distinctify_hash_table)
-      const override;
+      const AggregationStateHashTableBase &distinctify_hash_table) const override;
 
   void aggregateOnDistinctifyHashTableForGroupBy(
       const AggregationStateHashTableBase &distinctify_hash_table,
-      AggregationStateHashTableBase *aggregation_hash_table,
-      std::size_t index) const override;
+      const std::size_t index,
+      AggregationStateHashTableBase *aggregation_hash_table) const override;
 
  private:
   friend class AggregateFunctionSum;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index 432da09..0816db3 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -145,6 +145,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
+                      quickstep_storage_HashTableFactory
+                      quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_threading_SpinMutex
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
@@ -154,6 +157,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandle
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_types_TypedValue
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
@@ -163,7 +167,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypeFactory
@@ -180,8 +184,8 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_storage_ValueAccessorUtil
                       quickstep_types_LongType
                       quickstep_types_TypeFactory
@@ -197,7 +201,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypedValue
@@ -213,7 +217,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypedValue
@@ -229,7 +233,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_aggregation_AggregationID
                       quickstep_storage_HashTableBase
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_threading_SpinMutex
                       quickstep_types_Type
                       quickstep_types_TypeFactory

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index df4114d..5e67c64 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -526,6 +526,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
                       quickstep_relationaloperators_DropTableOperator
                       quickstep_relationaloperators_FinalizeAggregationOperator
                       quickstep_relationaloperators_HashJoinOperator
+                      quickstep_relationaloperators_InitializeAggregationOperator
                       quickstep_relationaloperators_InsertOperator
                       quickstep_relationaloperators_NestedLoopsJoinOperator
                       quickstep_relationaloperators_SampleOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index c99f032..a393185 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -48,6 +48,7 @@
 #include "storage/SubBlocksReference.hpp"
 #include "storage/TupleIdSequence.hpp"
 #include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
@@ -103,17 +104,16 @@ AggregationOperationState::AggregationOperationState(
     group_by_types_.emplace_back(&group_by_element->getType());
   }
 
-  // Prepare group-by element attribute ids and non-trivial expressions.
+  // Prepare group-by key ids and non-trivial expressions.
   for (std::unique_ptr<const Scalar> &group_by_element : group_by) {
     const attribute_id attr_id =
         group_by_element->getAttributeIdForValueAccessor();
-    if (attr_id == kInvalidAttributeID) {
-      const attribute_id non_trivial_attr_id =
-          -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2);
-      non_trivial_expressions_.emplace_back(group_by_element.release());
-      group_by_key_ids_.emplace_back(non_trivial_attr_id);
+    if (attr_id != kInvalidAttributeID) {
+      group_by_key_ids_.emplace_back(ValueAccessorSource::kBase, attr_id);
     } else {
-      group_by_key_ids_.emplace_back(attr_id);
+      group_by_key_ids_.emplace_back(ValueAccessorSource::kDerived,
+                                     non_trivial_expressions_.size());
+      non_trivial_expressions_.emplace_back(group_by_element.release());
     }
   }
 
@@ -138,17 +138,16 @@ AggregationOperationState::AggregationOperationState(
     }
 
     // Prepare argument attribute ids and non-trivial expressions.
-    std::vector<attribute_id> argument_ids;
+    std::vector<MultiSourceAttributeId> argument_ids;
     for (std::unique_ptr<const Scalar> &argument : *args_it) {
       const attribute_id attr_id =
           argument->getAttributeIdForValueAccessor();
-      if (attr_id == kInvalidAttributeID) {
-        const attribute_id non_trivial_attr_id =
-            -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2);
-        non_trivial_expressions_.emplace_back(argument.release());
-        argument_ids.emplace_back(non_trivial_attr_id);
+      if (attr_id != kInvalidAttributeID) {
+        argument_ids.emplace_back(ValueAccessorSource::kBase, attr_id);
       } else {
-        argument_ids.emplace_back(attr_id);
+        argument_ids.emplace_back(ValueAccessorSource::kDerived,
+                                  non_trivial_expressions_.size());
+        non_trivial_expressions_.emplace_back(argument.release());
       }
     }
     argument_ids_.emplace_back(std::move(argument_ids));
@@ -403,6 +402,15 @@ bool AggregationOperationState::checkAggregatePartitioned(
   if (group_by.empty()) {
     return false;
   }
+
+  // Currently we require that all the group-by keys are ScalarAttributes.
+  // TODO(jianqiao): relax this requirement.
+  for (const auto &group_by_element : group_by) {
+    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
+      return false;
+    }
+  }
+
   // There are GROUP BYs without DISTINCT. Check if the estimated number of
   // groups is large enough to warrant a partitioned aggregation.
   return estimated_num_groups >
@@ -450,16 +458,16 @@ void AggregationOperationState::aggregateBlock(const block_id input_block,
 
   accessor->beginIterationVirtual();
 
+  ValueAccessorMultiplexer accessor_mux(accessor, non_trivial_results.get());
   if (group_by_key_ids_.empty()) {
-    aggregateBlockSingleState(accessor, non_trivial_results.get());
+    aggregateBlockSingleState(&accessor_mux);
   } else {
-    aggregateBlockHashTable(accessor, non_trivial_results.get());
+    aggregateBlockHashTable(&accessor_mux);
   }
 }
 
 void AggregationOperationState::aggregateBlockSingleState(
-    ValueAccessor *accessor,
-    ColumnVectorsValueAccessor *aux_accessor) {
+    ValueAccessorMultiplexer *accessor_mux) {
   // Aggregate per-block state for each aggregate.
   std::vector<std::unique_ptr<AggregationState>> local_state;
 
@@ -473,17 +481,16 @@ void AggregationOperationState::aggregateBlockSingleState(
           argument_ids,
           {},
           distinctify_hashtables_[agg_idx].get(),
-          accessor,
-          aux_accessor);
+          accessor_mux);
     } else {
       if (argument_ids.empty()) {
         // Special case. This is a nullary aggregate (i.e. COUNT(*)).
-        state = handle->accumulateNullary(accessor->getNumTuplesVirtual());
+        ValueAccessor *base_accessor = accessor_mux->getBaseAccessor();
+        DCHECK(base_accessor != nullptr);
+        state = handle->accumulateNullary(base_accessor->getNumTuplesVirtual());
       } else {
         // Have the AggregationHandle actually do the aggregation.
-        state = handle->accumulateValueAccessor(accessor,
-                                                aux_accessor,
-                                                argument_ids);
+        state = handle->accumulateValueAccessor(argument_ids, accessor_mux);
       }
     }
     local_state.emplace_back(state);
@@ -512,36 +519,38 @@ void AggregationOperationState::mergeGroupByHashTables(
 }
 
 void AggregationOperationState::aggregateBlockHashTable(
-    ValueAccessor *accessor,
-    ColumnVectorsValueAccessor *aux_accessor) {
+    ValueAccessorMultiplexer *accessor_mux) {
   // TODO
   if (is_aggregate_collision_free_) {
-    aggregateBlockHashTableImplCollisionFree(accessor, aux_accessor);
+    aggregateBlockHashTableImplCollisionFree(accessor_mux);
   } else if (is_aggregate_partitioned_) {
-    aggregateBlockHashTableImplPartitioned(accessor, aux_accessor);
+    aggregateBlockHashTableImplPartitioned(accessor_mux);
   } else {
-    aggregateBlockHashTableImplThreadPrivate(accessor, aux_accessor);
+    aggregateBlockHashTableImplThreadPrivate(accessor_mux);
   }
 }
 
 void AggregationOperationState::aggregateBlockHashTableImplCollisionFree(
-    ValueAccessor *accessor,
-    ColumnVectorsValueAccessor *aux_accessor) {
+    ValueAccessorMultiplexer *accessor_mux) {
   DCHECK(collision_free_hashtable_ != nullptr);
 
   collision_free_hashtable_->upsertValueAccessor(argument_ids_,
                                                  group_by_key_ids_,
-                                                 accessor,
-                                                 aux_accessor);
+                                                 accessor_mux);
 }
 
 void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
-    ValueAccessor *accessor,
-    ColumnVectorsValueAccessor *aux_accessor) {
+    ValueAccessorMultiplexer *accessor_mux) {
   DCHECK(partitioned_group_by_hashtable_pool_ != nullptr);
 
+  std::vector<attribute_id> group_by_key_ids;
+  for (const MultiSourceAttributeId &key_id : group_by_key_ids_) {
+    DCHECK(key_id.source == ValueAccessorSource::kBase);
+    group_by_key_ids.emplace_back(key_id.attr_id);
+  }
+
   InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
-      accessor,
+      accessor_mux->getBaseAccessor(),
       [&](auto *accessor) -> void {  // NOLINT(build/c++11)
     // TODO(jianqiao): handle the situation when keys in non_trivial_results
     const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions();
@@ -561,7 +570,7 @@ void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
     while (accessor->next()) {
       // We need a unique_ptr because getTupleWithAttributes() uses "new".
       std::unique_ptr<Tuple> curr_tuple(
-          accessor->getTupleWithAttributes(group_by_key_ids_));
+          accessor->getTupleWithAttributes(group_by_key_ids));
       const std::size_t curr_tuple_partition_id =
           curr_tuple->getTupleHash() % num_partitions;
       partition_membership[curr_tuple_partition_id]->set(
@@ -572,18 +581,17 @@ void AggregationOperationState::aggregateBlockHashTableImplPartitioned(
       std::unique_ptr<ValueAccessor> adapter(
           accessor->createSharedTupleIdSequenceAdapter(
               *(partition_membership)[partition]));
+      ValueAccessorMultiplexer local_mux(adapter.get(), nullptr);
       partitioned_group_by_hashtable_pool_->getHashTable(partition)
           ->upsertValueAccessor(argument_ids_,
                                 group_by_key_ids_,
-                                adapter.get(),
-                                aux_accessor);
+                                &local_mux);
     }
   });
 }
 
 void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
-    ValueAccessor *accessor,
-    ColumnVectorsValueAccessor *aux_accessor) {
+    ValueAccessorMultiplexer *accessor_mux) {
   DCHECK(group_by_hashtable_pool_ != nullptr);
 
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
@@ -592,8 +600,7 @@ void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
           argument_ids_[agg_idx],
           group_by_key_ids_,
           distinctify_hashtables_[agg_idx].get(),
-          accessor,
-          aux_accessor);
+          accessor_mux);
     }
   }
 
@@ -602,8 +609,7 @@ void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate(
 
   agg_hash_table->upsertValueAccessor(argument_ids_,
                                       group_by_key_ids_,
-                                      accessor,
-                                      aux_accessor);
+                                      accessor_mux);
   group_by_hashtable_pool_->returnHashTable(agg_hash_table);
 }
 
@@ -711,7 +717,7 @@ void AggregationOperationState::finalizeHashTableImplPartitioned(
   std::vector<std::unique_ptr<ColumnVector>> final_values;
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
-        *hash_table, &group_by_keys, agg_idx);
+        *hash_table, agg_idx, &group_by_keys);
     if (agg_result_col != nullptr) {
       final_values.emplace_back(agg_result_col);
     }
@@ -808,12 +814,12 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
     if (is_distinct_[agg_idx]) {
       handles_[agg_idx]->allowUpdate();
       handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
-          *distinctify_hashtables_[agg_idx], final_hash_table, agg_idx);
+          *distinctify_hashtables_[agg_idx], agg_idx, final_hash_table);
     }
 
     ColumnVector *agg_result_col =
         handles_[agg_idx]->finalizeHashTable(
-            *final_hash_table, &group_by_keys, agg_idx);
+            *final_hash_table, agg_idx, &group_by_keys);
     DCHECK(agg_result_col != nullptr);
 
     final_values.emplace_back(agg_result_col);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 0c61f57..212fe88 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -35,6 +35,7 @@
 #include "storage/PartitionedHashTablePool.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "utility/Macros.hpp"
 
 #include "gflags/gflags.h"
@@ -200,10 +201,8 @@ class AggregationOperationState {
       const std::vector<const AggregateFunction *> &aggregate_functions) const;
 
   // Aggregate on input block.
-  void aggregateBlockSingleState(ValueAccessor *accessor,
-                                 ColumnVectorsValueAccessor *aux_accessor);
-  void aggregateBlockHashTable(ValueAccessor *accessor,
-                               ColumnVectorsValueAccessor *aux_accessor);
+  void aggregateBlockSingleState(ValueAccessorMultiplexer *accessor_mux);
+  void aggregateBlockHashTable(ValueAccessorMultiplexer *accessor_mux);
 
   // Merge locally (per storage block) aggregated states with global aggregation
   // states.
@@ -218,12 +217,9 @@ class AggregationOperationState {
                          InsertDestination *output_destination);
 
   // Specialized implementations for aggregateBlockHashTable.
-  void aggregateBlockHashTableImplCollisionFree(ValueAccessor *accessor,
-                                                ColumnVectorsValueAccessor *aux_accessor);
-  void aggregateBlockHashTableImplPartitioned(ValueAccessor *accessor,
-                                              ColumnVectorsValueAccessor *aux_accessor);
-  void aggregateBlockHashTableImplThreadPrivate(ValueAccessor *accessor,
-                                                ColumnVectorsValueAccessor *aux_accessor);
+  void aggregateBlockHashTableImplCollisionFree(ValueAccessorMultiplexer *accessor_mux);
+  void aggregateBlockHashTableImplPartitioned(ValueAccessorMultiplexer *accessor_mux);
+  void aggregateBlockHashTableImplThreadPrivate(ValueAccessorMultiplexer *accessor_mux);
 
   // Specialized implementations for finalizeHashTable.
   void finalizeHashTableImplCollisionFree(const std::size_t partition_id,
@@ -255,8 +251,8 @@ class AggregationOperationState {
   // Non-trivial group-by/argument expressions that need to be evaluated.
   std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_;
 
-  std::vector<attribute_id> group_by_key_ids_;
-  std::vector<std::vector<attribute_id>> argument_ids_;
+  std::vector<MultiSourceAttributeId> group_by_key_ids_;
+  std::vector<std::vector<MultiSourceAttributeId>> argument_ids_;
 
   std::vector<const Type *> group_by_types_;
 
@@ -270,8 +266,7 @@ class AggregationOperationState {
   //
   // TODO(shoban): We should ideally store the aggregation state together in one
   // hash table to prevent multiple lookups.
-  std::vector<std::unique_ptr<AggregationStateHashTableBase>>
-      group_by_hashtables_;
+  std::vector<std::unique_ptr<AggregationStateHashTableBase>> group_by_hashtables_;
 
   // A vector of group by hash table pools.
   std::unique_ptr<HashTablePool> group_by_hashtable_pool_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 3af14a9..d43d7a2 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -255,6 +255,7 @@ add_library(quickstep_storage_TupleIdSequence ../empty_src.cpp TupleIdSequence.h
 add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp)
 add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp)
 add_library(quickstep_storage_ValueAccessor ../empty_src.cpp ValueAccessor.hpp)
+add_library(quickstep_storage_ValueAccessorMultiplexer ../empty_src.cpp ValueAccessorMultiplexer.hpp)
 add_library(quickstep_storage_ValueAccessorUtil ../empty_src.cpp ValueAccessorUtil.hpp)
 add_library(quickstep_storage_WindowAggregationOperationState
             WindowAggregationOperationState.hpp
@@ -290,13 +291,13 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_storage_SubBlocksReference
                       quickstep_storage_TupleIdSequence
                       quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_storage_ValueAccessorUtil
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_types_containers_Tuple
                       quickstep_utility_Macros
-                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
                       quickstep_utility_lipfilter_LIPFilterAdaptiveProber)
 target_link_libraries(quickstep_storage_AggregationOperationState_proto
                       quickstep_expressions_Expressions_proto
@@ -444,6 +445,7 @@ target_link_libraries(quickstep_storage_CollisionFreeAggregationStateHashTable
                       quickstep_storage_StorageConstants
                       quickstep_storage_StorageManager
                       quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_storage_ValueAccessorUtil
                       quickstep_types_Type
                       quickstep_types_TypeID
@@ -705,6 +707,7 @@ target_link_libraries(quickstep_storage_HashTable
                       quickstep_utility_HashPair
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_HashTableBase
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_HashTable_proto
                       quickstep_types_Type_proto
@@ -806,6 +809,7 @@ target_link_libraries(quickstep_storage_PackedPayloadAggregationStateHashTable
                       quickstep_storage_StorageManager
                       quickstep_storage_TupleReference
                       quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_storage_ValueAccessorUtil
                       quickstep_threading_SpinMutex
                       quickstep_threading_SpinSharedMutex
@@ -1060,6 +1064,10 @@ target_link_libraries(quickstep_storage_ValueAccessor
                       quickstep_types_TypedValue
                       quickstep_types_containers_Tuple
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_storage_ValueAccessorMultiplexer
+                      glog
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_ValueAccessorUtil
                       glog
                       quickstep_storage_BasicColumnStoreValueAccessor
@@ -1156,6 +1164,7 @@ target_link_libraries(quickstep_storage
                       quickstep_storage_TupleReference
                       quickstep_storage_TupleStorageSubBlock
                       quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorMultiplexer
                       quickstep_storage_ValueAccessorUtil
                       quickstep_storage_WindowAggregationOperationState
                       quickstep_storage_WindowAggregationOperationState_proto)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/storage/CollisionFreeAggregationStateHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.cpp b/storage/CollisionFreeAggregationStateHashTable.cpp
index e9b7711..2f3b336 100644
--- a/storage/CollisionFreeAggregationStateHashTable.cpp
+++ b/storage/CollisionFreeAggregationStateHashTable.cpp
@@ -31,6 +31,7 @@
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
 #include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
@@ -126,50 +127,58 @@ void CollisionFreeAggregationStateHashTable::destroyPayload() {
 }
 
 bool CollisionFreeAggregationStateHashTable::upsertValueAccessor(
-    const std::vector<std::vector<attribute_id>> &argument_ids,
-    const std::vector<attribute_id> &key_attr_ids,
-    ValueAccessor *base_accessor,
-    ColumnVectorsValueAccessor *aux_accessor) {
-  DCHECK_EQ(1u, key_attr_ids.size());
+    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+    const std::vector<MultiSourceAttributeId> &key_ids,
+    ValueAccessorMultiplexer *accessor_mux) {
+  DCHECK_EQ(1u, key_ids.size());
 
-  const attribute_id key_attr_id = key_attr_ids.front();
+  const ValueAccessorSource key_source = key_ids.front().source;
+  const attribute_id key_id = key_ids.front().attr_id;
   const bool is_key_nullable = key_type_->isNullable();
 
   if (handles_.empty()) {
-    if (key_attr_id >= 0) {
-      InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
-          base_accessor,
-          [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-        upsertValueAccessorKeyOnlyHelper(is_key_nullable,
-                                         key_type_,
-                                         key_attr_id,
-                                         accessor);
-      });
-    } else {
+    InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
+        accessor_mux->getValueAccessorBySource(key_source),
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
       upsertValueAccessorKeyOnlyHelper(is_key_nullable,
                                        key_type_,
-                                       -(key_attr_id+2),
-                                       aux_accessor);
-    }
-
+                                       key_id,
+                                       accessor);
+    });
     return true;
   }
 
+  DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType()
+             == ValueAccessor::Implementation::kColumnVectors);
+  ValueAccessor *base_accessor = accessor_mux->getBaseAccessor();
+  ColumnVectorsValueAccessor *derived_accesor =
+      static_cast<ColumnVectorsValueAccessor *>(accessor_mux->getDerivedAccessor());
+
   for (std::size_t i = 0; i < num_handles_; ++i) {
     DCHECK_LE(argument_ids[i].size(), 1u);
 
-    const attribute_id argument_id =
-        argument_ids[i].empty() ? kInvalidAttributeID : argument_ids[i].front();
-
     const AggregationHandle *handle = handles_[i];
     const auto &argument_types = handle->getArgumentTypes();
+    const auto &argument_ids_i = argument_ids[i];
 
+    ValueAccessorSource argument_source;
+    attribute_id argument_id;
     const Type *argument_type;
     bool is_argument_nullable;
-    if (argument_types.empty()) {
+
+    if (argument_ids_i.empty()) {
+      argument_source = ValueAccessorSource::kInvalid;
+      argument_id = kInvalidAttributeID;
+
+      DCHECK(argument_types.empty());
       argument_type = nullptr;
       is_argument_nullable = false;
     } else {
+      DCHECK_EQ(1u, argument_ids_i.size());
+      argument_source = argument_ids_i.front().source;
+      argument_id = argument_ids_i.front().attr_id;
+
+      DCHECK_EQ(1u, argument_types.size());
       argument_type = argument_types.front();
       is_argument_nullable = argument_type->isNullable();
     }
@@ -177,14 +186,14 @@ bool CollisionFreeAggregationStateHashTable::upsertValueAccessor(
     InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
         base_accessor,
         [&](auto *accessor) -> void {  // NOLINT(build/c++11)
-      if (key_attr_id >= 0) {
-        if (argument_id >= 0) {
+      if (key_source == ValueAccessorSource::kBase) {
+        if (argument_source == ValueAccessorSource::kBase) {
           upsertValueAccessorDispatchHelper<false>(is_key_nullable,
                                                    is_argument_nullable,
                                                    key_type_,
                                                    argument_type,
                                                    handle->getAggregationID(),
-                                                   key_attr_id,
+                                                   key_id,
                                                    argument_id,
                                                    vec_tables_[i],
                                                    accessor,
@@ -195,23 +204,23 @@ bool CollisionFreeAggregationStateHashTable::upsertValueAccessor(
                                                   key_type_,
                                                   argument_type,
                                                   handle->getAggregationID(),
-                                                  key_attr_id,
-                                                  -(argument_id+2),
+                                                  key_id,
+                                                  argument_id,
                                                   vec_tables_[i],
                                                   accessor,
-                                                  aux_accessor);
+                                                  derived_accesor);
         }
       } else {
-        if (argument_id >= 0) {
+        if (argument_source == ValueAccessorSource::kBase) {
           upsertValueAccessorDispatchHelper<true>(is_key_nullable,
                                                   is_argument_nullable,
                                                   key_type_,
                                                   argument_type,
                                                   handle->getAggregationID(),
-                                                  -(key_attr_id+2),
+                                                  key_id,
                                                   argument_id,
                                                   vec_tables_[i],
-                                                  aux_accessor,
+                                                  derived_accesor,
                                                   accessor);
         } else {
           upsertValueAccessorDispatchHelper<false>(is_key_nullable,
@@ -219,11 +228,11 @@ bool CollisionFreeAggregationStateHashTable::upsertValueAccessor(
                                                    key_type_,
                                                    argument_type,
                                                    handle->getAggregationID(),
-                                                   -(key_attr_id+2),
-                                                   -(argument_id+2),
+                                                   key_id,
+                                                   argument_id,
                                                    vec_tables_[i],
-                                                   aux_accessor,
-                                                   aux_accessor);
+                                                   derived_accesor,
+                                                   derived_accesor);
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/storage/CollisionFreeAggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeAggregationStateHashTable.hpp b/storage/CollisionFreeAggregationStateHashTable.hpp
index 7ed5072..d738e4e 100644
--- a/storage/CollisionFreeAggregationStateHashTable.hpp
+++ b/storage/CollisionFreeAggregationStateHashTable.hpp
@@ -36,6 +36,7 @@
 #include "storage/StorageBlob.hpp"
 #include "storage/StorageConstants.hpp"
 #include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "types/Type.hpp"
 #include "types/TypeID.hpp"
 #include "types/TypedValue.hpp"
@@ -93,10 +94,9 @@ class CollisionFreeAggregationStateHashTable : public AggregationStateHashTableB
   }
 
   bool upsertValueAccessor(
-      const std::vector<std::vector<attribute_id>> &argument_ids,
-      const std::vector<attribute_id> &key_attr_ids,
-      ValueAccessor *base_accessor,
-      ColumnVectorsValueAccessor *aux_accessor = nullptr) override;
+      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_ids,
+      ValueAccessorMultiplexer *accessor_mux) override;
 
   void finalizeKey(const std::size_t partition_id,
                    NativeColumnVector *output_cv) const;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index 3d34600..2a6498c 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -23,6 +23,7 @@
 #include <cstddef>
 #include <vector>
 
+#include "storage/ValueAccessorMultiplexer.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
@@ -112,10 +113,9 @@ class AggregationStateHashTableBase {
    * specialization from this file.
    **/
   virtual bool upsertValueAccessor(
-      const std::vector<std::vector<attribute_id>> &argument_ids,
-      const std::vector<attribute_id> &key_attr_ids,
-      ValueAccessor *accessor,
-      ColumnVectorsValueAccessor *aux_accessor = nullptr) = 0;
+      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+      const std::vector<MultiSourceAttributeId> &key_attr_ids,
+      ValueAccessorMultiplexer *accessor_mux) = 0;
 
   virtual void destroyPayload() = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3bcb5c89/storage/PackedPayloadAggregationStateHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadAggregationStateHashTable.cpp b/storage/PackedPayloadAggregationStateHashTable.cpp
index 34c4177..0292092 100644
--- a/storage/PackedPayloadAggregationStateHashTable.cpp
+++ b/storage/PackedPayloadAggregationStateHashTable.cpp
@@ -206,20 +206,25 @@ void PackedPayloadSeparateChainingAggregationStateHashTable::destroyPayload() {
 }
 
 bool PackedPayloadSeparateChainingAggregationStateHashTable::upsertValueAccessor(
-    const std::vector<std::vector<attribute_id>> &argument_ids,
-    const std::vector<attribute_id> &key_attr_ids,
-    ValueAccessor *accessor,
-    ColumnVectorsValueAccessor *aux_accessor) {
-  if (aux_accessor == nullptr) {
+    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+    const std::vector<MultiSourceAttributeId> &key_attr_ids,
+    ValueAccessorMultiplexer *accessor_mux) {
+  DCHECK(accessor_mux->getDerivedAccessor()->getImplementationType()
+             == ValueAccessor::Implementation::kColumnVectors);
+  ValueAccessor *base_accessor = accessor_mux->getBaseAccessor();
+  ColumnVectorsValueAccessor *derived_accessor =
+      static_cast<ColumnVectorsValueAccessor *>(accessor_mux->getDerivedAccessor());
+
+  if (derived_accessor == nullptr) {
     return upsertValueAccessorCompositeKeyInternal<false>(argument_ids,
                                                           key_attr_ids,
-                                                          accessor,
-                                                          aux_accessor);
+                                                          base_accessor,
+                                                          derived_accessor);
   } else {
     return upsertValueAccessorCompositeKeyInternal<true>(argument_ids,
                                                          key_attr_ids,
-                                                         accessor,
-                                                         aux_accessor);
+                                                         base_accessor,
+                                                         derived_accessor);
   }
 }
 


Mime
View raw message