tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [3/3] tajo git commit: TAJO-1343: Improve the memory usage of physical executors. (jihoon)
Date Wed, 22 Jul 2015 09:05:44 GMT
TAJO-1343: Improve the memory usage of physical executors. (jihoon)

Closes #634


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4820610f
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4820610f
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4820610f

Branch: refs/heads/master
Commit: 4820610f4a2a384372aaecf5212bd486c79a65b2
Parents: e5b30e5
Author: Jihoon Son <jihoonson@apache.org>
Authored: Wed Jul 22 18:04:42 2015 +0900
Committer: Jihoon Son <jihoonson@apache.org>
Committed: Wed Jul 22 18:04:42 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../org/apache/tajo/storage/RowStoreUtil.java   |   1 -
 .../java/org/apache/tajo/conf/TajoConf.java     |   2 -
 .../java/org/apache/tajo/storage/NullTuple.java |   4 +
 .../java/org/apache/tajo/storage/Tuple.java     |   2 +
 .../java/org/apache/tajo/storage/VTuple.java    |   7 +
 .../tajo/engine/planner/KeyProjector.java       |  44 +++++
 .../tajo/engine/planner/PhysicalPlanner.java    |   4 +-
 .../apache/tajo/engine/planner/Projector.java   |  29 +--
 .../planner/physical/AggregationExec.java       |  11 --
 .../engine/planner/physical/BNLJoinExec.java    |  31 ++--
 .../planner/physical/BSTIndexScanExec.java      |  19 +-
 .../planner/physical/CommonHashJoinExec.java    |  70 +++-----
 .../engine/planner/physical/CommonJoinExec.java |  13 +-
 .../planner/physical/ComparableVector.java      |   8 -
 .../DistinctGroupbyFirstAggregationExec.java    | 179 +++++++++----------
 .../DistinctGroupbyHashAggregationExec.java     | 177 +++++++++---------
 .../DistinctGroupbySecondAggregationExec.java   | 108 ++++++-----
 .../DistinctGroupbySortAggregationExec.java     |  27 ++-
 .../DistinctGroupbyThirdAggregationExec.java    |  54 +++---
 .../planner/physical/ExternalSortExec.java      |  34 ++--
 .../planner/physical/HashAggregateExec.java     |  21 +--
 .../planner/physical/HashFullOuterJoinExec.java |  35 ++--
 .../engine/planner/physical/HashJoinExec.java   |  19 +-
 .../planner/physical/HashLeftAntiJoinExec.java  |  11 +-
 .../planner/physical/HashLeftOuterJoinExec.java |  11 +-
 .../planner/physical/HashLeftSemiJoinExec.java  |  11 +-
 .../physical/HashShuffleFileWriteExec.java      |  25 ++-
 .../tajo/engine/planner/physical/KeyTuple.java  |  85 +++++++++
 .../engine/planner/physical/MemSortExec.java    |  11 +-
 .../physical/MergeFullOuterJoinExec.java        |  77 ++++----
 .../engine/planner/physical/MergeJoinExec.java  |  72 ++++----
 .../engine/planner/physical/NLJoinExec.java     |  12 +-
 .../engine/planner/physical/ProjectionExec.java |   6 +-
 .../physical/RangeShuffleFileWriteExec.java     |  42 ++---
 .../physical/RightOuterMergeJoinExec.java       | 102 +++++------
 .../engine/planner/physical/SeqScanExec.java    |   3 +-
 .../planner/physical/SortAggregateExec.java     |  40 +++--
 .../SortBasedColPartitionStoreExec.java         |   3 +-
 .../tajo/engine/planner/physical/SortExec.java  |   3 +-
 .../engine/planner/physical/StoreTableExec.java |   2 +-
 .../tajo/engine/planner/physical/TupleList.java |  44 +++++
 .../tajo/engine/planner/physical/TupleMap.java  |  74 ++++++++
 .../tajo/engine/planner/physical/TupleSet.java  |  34 ++++
 .../engine/planner/physical/TupleSorter.java    |   7 +-
 .../planner/physical/VectorizedSorter.java      |   3 +-
 .../engine/planner/physical/WindowAggExec.java  |  63 ++++---
 .../apache/tajo/engine/query/QueryContext.java  |   7 +-
 .../apache/tajo/engine/utils/CacheHolder.java   |  13 +-
 .../org/apache/tajo/engine/utils/TupleUtil.java |  21 ---
 .../NonForwardQueryResultSystemScanner.java     |   8 +-
 .../org/apache/tajo/TajoTestingCluster.java     |   1 +
 .../tajo/engine/planner/TestLogicalPlanner.java |   4 +-
 .../planner/physical/TestTupleSorter.java       |  11 +-
 .../tajo/engine/query/TestGroupByQuery.java     |  28 +--
 .../TestGroupByQuery/testGroupbyWithJson.json   | 101 +++++++----
 .../TestGroupByQuery/testGroupbyWithLimit3.sql  |   2 +-
 .../testNestedFieldAsGroupbyKey1.sql            |   2 +
 .../TestGroupByQuery/testGroupbyWithJson.result |   4 +-
 .../testGroupbyWithLimit3.result                |   2 +-
 .../testGroupbyWithPythonFunc.result            |   2 +-
 .../testGroupbyWithPythonFunc2.result           |   4 +-
 .../testHavingWithAggFunction.result            |   2 +-
 .../TestGroupByQuery/testPythonUdaf3.result     |   2 +-
 .../testNestedFieldAsGroupbyKey1.result         |   4 +-
 .../testWindowWithAggregation4.result           |   4 +-
 .../testWindowWithAggregation6.result           |   4 +-
 .../org/apache/tajo/jdbc/MetaDataTuple.java     |   5 +
 .../org/apache/tajo/plan/LogicalPlanner.java    |   6 +-
 .../org/apache/tajo/plan/TablePropertyUtil.java |   2 -
 .../org/apache/tajo/storage/FrameTuple.java     |   5 +
 .../java/org/apache/tajo/storage/LazyTuple.java |   6 +
 .../org/apache/tajo/storage/RowStoreUtil.java   |   2 +-
 .../apache/tajo/tuple/offheap/HeapTuple.java    |   4 +
 .../apache/tajo/tuple/offheap/UnSafeTuple.java  |   4 +
 .../apache/tajo/storage/hbase/HBaseScanner.java |   9 +-
 .../java/org/apache/tajo/storage/CSVFile.java   |  64 +++----
 .../java/org/apache/tajo/storage/RawFile.java   |  42 ++---
 .../java/org/apache/tajo/storage/RowFile.java   |   3 +-
 .../apache/tajo/storage/avro/AvroScanner.java   |  26 +--
 .../apache/tajo/storage/index/bst/BSTIndex.java |  20 ++-
 .../org/apache/tajo/storage/rcfile/RCFile.java  |   9 +-
 .../sequencefile/SequenceFileScanner.java       |  50 ++++--
 .../tajo/storage/text/ByteBufLineReader.java    |   2 +-
 .../tajo/storage/text/CSVLineSerializer.java    |   1 +
 .../tajo/storage/text/DelimitedTextFile.java    |  13 +-
 .../apache/tajo/storage/text/TextLineSerDe.java |  14 +-
 .../apache/tajo/storage/TestMergeScanner.java   |   5 +-
 .../org/apache/tajo/storage/TestStorages.java   |   4 +-
 89 files changed, 1217 insertions(+), 948 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6001893..e5735f6 100644
--- a/CHANGES
+++ b/CHANGES
@@ -32,6 +32,8 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1343: Improve the memory usage of physical executors. (jihoon)
+
     TAJO-1696: Resource calculator should consider the requested disk resource 
     at the first stage. (jihoon)
 
@@ -2534,3 +2536,4 @@ Release 0.2.0
     TAJO-252: Add DISCLAIMER file. (hyunsik)
 
     TAJO-251: Rename the legacy name *.tql to *.sql. (hyunsik)
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index 5b4a308..87282a0 100644
--- a/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -44,7 +44,6 @@ public class RowStoreUtil {
   }
 
   public static Tuple project(Tuple in, Tuple out, int[] targetIds) {
-    out.clear();
     for (int idx = 0; idx < targetIds.length; idx++) {
       out.put(idx, in.asDatum(targetIds[idx]));
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index b876737..910d6bc 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -266,8 +266,6 @@ public class TajoConf extends Configuration {
     EXECUTOR_EXTERNAL_SORT_THREAD_NUM("tajo.executor.external-sort.thread-num", 1),
     EXECUTOR_EXTERNAL_SORT_FANOUT("tajo.executor.external-sort.fanout-num", 8),
 
-    EXECUTOR_INNER_JOIN_INMEMORY_HASH_TABLE_SIZE("tajo.executor.join.inner.in-memory-table-num", (long)1000000),
-
     // Metrics ----------------------------------------------------------------
     METRICS_PROPERTY_FILENAME("tajo.metrics.property.file", "tajo-metrics.properties"),
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java
index a17ef01..967efce 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/NullTuple.java
@@ -87,6 +87,10 @@ public class NullTuple implements Tuple, Cloneable {
   }
 
   @Override
+  public void clearOffset() {
+  }
+
+  @Override
   public void put(int fieldId, Tuple tuple) {
     throw new UnsupportedOperationException();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java
index 7eb56bd..f4a61e6 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/Tuple.java
@@ -46,6 +46,8 @@ public interface Tuple extends Cloneable {
 
   int size(int fieldId);
 
+  void clearOffset();
+
   void setOffset(long offset);
 
   long getOffset();

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
index 2c81e54..d7b648d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java
@@ -73,6 +73,7 @@ public class VTuple implements Tuple, Cloneable {
 
   @Override
   public void clear() {
+    clearOffset();
     for (int i=0; i < values.length; i++) {
       values[i] = null;
     }
@@ -100,7 +101,9 @@ public class VTuple implements Tuple, Cloneable {
     return values[fieldId].size();
   }
 
+  @Override
   public void put(Datum [] values) {
+    clearOffset();
     System.arraycopy(values, 0, this.values, 0, values.length);
   }
 
@@ -111,6 +114,10 @@ public class VTuple implements Tuple, Cloneable {
     return this.values[fieldId];
   }
 
+  public void clearOffset() {
+    this.offset = -1;
+  }
+
   public void setOffset(long offset) {
     this.offset = offset;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/KeyProjector.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/KeyProjector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/KeyProjector.java
new file mode 100644
index 0000000..f6220a8
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/KeyProjector.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.planner.physical.KeyTuple;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+
+public class KeyProjector {
+
+  private final KeyTuple keyTuple;
+  private final int projectIds[];
+
+  public KeyProjector(Schema inSchema, Column[] keyColumns) {
+    keyTuple = new KeyTuple(keyColumns.length);
+    projectIds = new int[keyColumns.length];
+    for (int i = 0; i < keyColumns.length; i++) {
+      projectIds[i] = inSchema.getColumnId(keyColumns[i].getQualifiedName());
+    }
+  }
+
+  public KeyTuple project(Tuple tuple) {
+    RowStoreUtil.project(tuple, keyTuple, projectIds);
+    return keyTuple;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
index d4c57db..0983dc6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
@@ -30,7 +30,7 @@ import org.apache.tajo.exception.InternalException;
  * This class generates a physical execution plan.
  */
 public interface PhysicalPlanner {
-  public PhysicalExec createPlan(TaskAttemptContext context,
-                                 LogicalNode logicalPlan)
+  PhysicalExec createPlan(TaskAttemptContext context,
+                          LogicalNode logicalPlan)
       throws InternalException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
index a73478f..ba8ec32 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
@@ -20,42 +20,44 @@ package org.apache.tajo.engine.planner;
 
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 public class Projector {
   private final TaskAttemptContext context;
   private final Schema inSchema;
-  private final Target[] targets;
 
   // for projection
-  private final int targetNum;
   private final EvalNode[] evals;
 
+  private final Tuple outTuple;
+
   public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, Target [] targets) {
     this.context = context;
     this.inSchema = inSchema;
+    Target[] realTargets;
     if (targets == null) {
-      this.targets = PlannerUtil.schemaToTargets(outSchema);
+      realTargets = PlannerUtil.schemaToTargets(outSchema);
     } else {
-      this.targets = targets;
+      realTargets = targets;
     }
 
-    this.targetNum = this.targets.length;
-    evals = new EvalNode[targetNum];
+    outTuple = new VTuple(realTargets.length);
+    evals = new EvalNode[realTargets.length];
 
     if (context.getQueryContext().getBool(SessionVars.CODEGEN)) {
       EvalNode eval;
-      for (int i = 0; i < targetNum; i++) {
-        eval = this.targets[i].getEvalTree();
+      for (int i = 0; i < realTargets.length; i++) {
+        eval = realTargets[i].getEvalTree();
         evals[i] = context.getPrecompiledEval(inSchema, eval);
       }
     } else {
-      for (int i = 0; i < targetNum; i++) {
-        evals[i] = this.targets[i].getEvalTree();
+      for (int i = 0; i < realTargets.length; i++) {
+        evals[i] = realTargets[i].getEvalTree();
       }
     }
     init();
@@ -67,9 +69,10 @@ public class Projector {
     }
   }
 
-  public void eval(Tuple in, Tuple out) {
+  public Tuple eval(Tuple in) {
     for (int i = 0; i < evals.length; i++) {
-      out.put(i, evals[i].eval(in));
+      outTuple.put(i, evals[i].eval(in));
     }
+    return outTuple;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
index 4b53b39..fdb8fdd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -29,7 +29,6 @@ import java.io.IOException;
 public abstract class AggregationExec extends UnaryPhysicalExec {
 
   protected final int groupingKeyNum;
-  protected final int groupingKeyIds[];
   protected final int aggFunctionsNum;
   protected final AggregationFunctionCallEval aggFunctions[];
 
@@ -39,16 +38,6 @@ public abstract class AggregationExec extends UnaryPhysicalExec {
 
     final Column [] keyColumns = plan.getGroupingColumns();
     groupingKeyNum = keyColumns.length;
-    groupingKeyIds = new int[groupingKeyNum];
-    Column col;
-    for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) {
-      col = keyColumns[idx];
-      if (col.hasQualifier()) {
-        groupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName());
-      } else {
-        groupingKeyIds[idx] = inSchema.getColumnIdByName(col.getSimpleName());
-      }
-    }
 
     if (plan.hasAggFunctions()) {
       aggFunctions = plan.getAggFunctions();

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index 6e1a553..d28b7f6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -18,22 +18,18 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 
 public class BNLJoinExec extends CommonJoinExec {
 
-  private List<Tuple> leftTupleSlots;
-  private List<Tuple> rightTupleSlots;
+  private TupleList leftTupleSlots;
+  private TupleList rightTupleSlots;
   private Iterator<Tuple> leftIterator;
   private Iterator<Tuple> rightIterator;
 
@@ -41,9 +37,7 @@ public class BNLJoinExec extends CommonJoinExec {
   private boolean rightEnd;
 
   // temporal tuples and states for nested loop join
-  private FrameTuple frameTuple;
   private Tuple leftTuple = null;
-  private Tuple outputTuple = null;
   private Tuple rightNext = null;
 
   private final static int TUPLE_SLOT_SIZE = 10000;
@@ -51,8 +45,8 @@ public class BNLJoinExec extends CommonJoinExec {
   public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
                      final PhysicalExec leftExec, PhysicalExec rightExec) {
     super(context, plan, leftExec, rightExec);
-    this.leftTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
-    this.rightTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
+    this.leftTupleSlots = new TupleList(TUPLE_SLOT_SIZE);
+    this.rightTupleSlots = new TupleList(TUPLE_SLOT_SIZE);
     this.leftIterator = leftTupleSlots.iterator();
     this.rightIterator = rightTupleSlots.iterator();
     this.rightEnd = false;
@@ -62,10 +56,6 @@ public class BNLJoinExec extends CommonJoinExec {
     if (!plan.hasTargets()) {
       plan.setTargets(PlannerUtil.schemaToTargets(outSchema));
     }
-
-    // for join
-    frameTuple = new FrameTuple();
-    outputTuple = new VTuple(outSchema.size());
   }
 
   public Tuple next() throws IOException {
@@ -108,7 +98,7 @@ public class BNLJoinExec extends CommonJoinExec {
           if (rightEnd) {
             rightChild.rescan();
             rightEnd = false;
-            
+
             if (leftEnd) {
               return null;
             }
@@ -126,12 +116,12 @@ public class BNLJoinExec extends CommonJoinExec {
             }
             leftIterator = leftTupleSlots.iterator();
             leftTuple = leftIterator.next();
-            
+
           } else {
             leftIterator = leftTupleSlots.iterator();
             leftTuple = leftIterator.next();
           }
-          
+
           rightTupleSlots.clear();
           if (rightNext != null) {
             rightTupleSlots.add(rightNext);
@@ -153,7 +143,7 @@ public class BNLJoinExec extends CommonJoinExec {
               rightTupleSlots.add(t);
             }
           }
-          
+
           if ((rightNext = rightChild.next()) == null) {
             rightEnd = true;
           }
@@ -163,8 +153,7 @@ public class BNLJoinExec extends CommonJoinExec {
 
       frameTuple.set(leftTuple, rightIterator.next());
       if (!hasJoinQual || joinQual.eval(frameTuple).isTrue()) {
-        projector.eval(frameTuple, outputTuple);
-        return outputTuple;
+        return projector.eval(frameTuple);
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 54abca8..28622d7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -41,20 +41,20 @@ public class BSTIndexScanExec extends PhysicalExec {
   private BSTIndex.BSTIndexReader reader;
   
   private Projector projector;
-  
-  private Datum[] datum = null;
-  
+
   private boolean initialize = true;
 
   private float progress;
 
+  private Tuple indexLookupKey;
+
   public BSTIndexScanExec(TaskAttemptContext context, ScanNode scanNode ,
        FileFragment fragment, Path fileName , Schema keySchema,
        TupleComparator comparator , Datum[] datum) throws IOException {
     super(context, scanNode.getInSchema(), scanNode.getOutSchema());
     this.scanNode = scanNode;
     this.qual = scanNode.getQual();
-    this.datum = datum;
+    indexLookupKey = new VTuple(datum);
 
     this.fileScanner = OldStorageManager.getSeekableScanner(context.getConf(),
         scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
@@ -80,8 +80,7 @@ public class BSTIndexScanExec extends PhysicalExec {
   public Tuple next() throws IOException {
     if(initialize) {
       //TODO : more complicated condition
-      Tuple key = new VTuple(datum);
-      long offset = reader.find(key);
+      long offset = reader.find(indexLookupKey);
       if (offset == -1) {
         reader.close();
         fileScanner.close();
@@ -104,19 +103,16 @@ public class BSTIndexScanExec extends PhysicalExec {
       }
     }
     Tuple tuple;
-    Tuple outTuple = new VTuple(this.outSchema.size());
     if (!scanNode.hasQual()) {
       if ((tuple = fileScanner.next()) != null) {
-        projector.eval(tuple, outTuple);
-        return outTuple;
+        return projector.eval(tuple);
       } else {
         return null;
       }
     } else {
        while(reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
          if (qual.eval(tuple).isTrue()) {
-           projector.eval(tuple, outTuple);
-           return outTuple;
+           return projector.eval(tuple);
          } else {
            long offset = reader.next();
            if (offset == -1) return null;
@@ -140,6 +136,7 @@ public class BSTIndexScanExec extends PhysicalExec {
     scanNode = null;
     qual = null;
     projector = null;
+    indexLookupKey = null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
index a018fe1..0d64e65 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonHashJoinExec.java
@@ -20,21 +20,18 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.planner.KeyProjector;
 import org.apache.tajo.engine.utils.CacheHolder;
 import org.apache.tajo.engine.utils.TableCacheKey;
 import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.ExecutionBlockSharedResource;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 /**
  * common exec for all hash join execs
@@ -47,19 +44,18 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec {
 
   // temporal tuples and states for nested loop join
   protected boolean first = true;
-  protected Map<Tuple, T> tupleSlots;
+  protected TupleMap<T> tupleSlots;
 
   protected Iterator<Tuple> iterator;
 
-  protected final Tuple keyTuple;
-
   protected final int rightNumCols;
   protected final int leftNumCols;
 
-  protected final int[] leftKeyList;
-  protected final int[] rightKeyList;
+  protected final Column[] leftKeyList;
+  protected final Column[] rightKeyList;
 
   protected boolean finished;
+  protected final KeyProjector leftKeyExtractor;
 
   public CommonHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) {
     super(context, plan, outer, inner);
@@ -68,21 +64,18 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec {
     this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(),
         inner.getSchema(), false);
 
-    leftKeyList = new int[joinKeyPairs.size()];
-    rightKeyList = new int[joinKeyPairs.size()];
-
-    for (int i = 0; i < joinKeyPairs.size(); i++) {
-      leftKeyList[i] = outer.getSchema().getColumnId(joinKeyPairs.get(i)[0].getQualifiedName());
-    }
+    leftKeyList = new Column[joinKeyPairs.size()];
+    rightKeyList = new Column[joinKeyPairs.size()];
 
     for (int i = 0; i < joinKeyPairs.size(); i++) {
-      rightKeyList[i] = inner.getSchema().getColumnId(joinKeyPairs.get(i)[1].getQualifiedName());
+      leftKeyList[i] = outer.getSchema().getColumn(joinKeyPairs.get(i)[0].getQualifiedName());
+      rightKeyList[i] = inner.getSchema().getColumn(joinKeyPairs.get(i)[1].getQualifiedName());
     }
 
     leftNumCols = outer.getSchema().size();
     rightNumCols = inner.getSchema().size();
 
-    keyTuple = new VTuple(leftKeyList.length);
+    leftKeyExtractor = new KeyProjector(leftSchema, leftKeyList);
   }
 
   protected void loadRightToHashTable() throws IOException {
@@ -102,12 +95,12 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec {
   protected void loadRightFromCache(TableCacheKey key) throws IOException {
     ExecutionBlockSharedResource sharedResource = context.getSharedResource();
 
-    CacheHolder<Map<Tuple, List<Tuple>>> holder;
+    CacheHolder<TupleMap<TupleList>> holder;
     synchronized (sharedResource.getLock()) {
       if (sharedResource.hasBroadcastCache(key)) {
         holder = sharedResource.getBroadcastCache(key);
       } else {
-        Map<Tuple, List<Tuple>> built = buildRightToHashTable();
+        TupleMap<TupleList> built = buildRightToHashTable();
         holder = new CacheHolder.BroadcastCacheHolder(built, rightChild.getInputStats(), null);
         sharedResource.addBroadcastCache(key, holder);
       }
@@ -115,50 +108,27 @@ public abstract class CommonHashJoinExec<T> extends CommonJoinExec {
     this.tupleSlots = convert(holder.getData(), true);
   }
 
-  protected Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException {
+  protected TupleMap<TupleList> buildRightToHashTable() throws IOException {
     Tuple tuple;
-    Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000);
+    TupleMap<TupleList> map = new TupleMap<TupleList>(100000);
+    KeyProjector keyProjector = new KeyProjector(rightSchema, rightKeyList);
 
     while (!context.isStopped() && (tuple = rightChild.next()) != null) {
-      Tuple keyTuple = new VTuple(joinKeyPairs.size());
-      for (int i = 0; i < rightKeyList.length; i++) {
-        keyTuple.put(i, tuple.asDatum(rightKeyList[i]));
-      }
-
-      /*
-       * TODO
-       * Currently, some physical executors can return new instances of tuple, but others not.
-       * This sometimes causes wrong results due to the singleton Tuple instance.
-       * The below line is a temporal solution to fix this problem.
-       * This will be improved at https://issues.apache.org/jira/browse/TAJO-1343.
-       */
-      try {
-        tuple = tuple.clone();
-      } catch (CloneNotSupportedException e) {
-        throw new IOException(e);
-      }
-
-      List<Tuple> newValue = map.get(keyTuple);
+      KeyTuple keyTuple = keyProjector.project(tuple);
+      TupleList newValue = map.get(keyTuple);
       if (newValue == null) {
-        map.put(keyTuple, newValue = new ArrayList<Tuple>());
+        map.put(keyTuple, newValue = new TupleList());
       }
       // if source is scan or groupby, it needs not to be cloned
-      newValue.add(new VTuple(tuple));
+      newValue.add(tuple);
     }
     return map;
   }
 
   // todo: convert loaded data to cache condition
-  protected abstract Map<Tuple, T> convert(Map<Tuple, List<Tuple>> hashed, boolean fromCache)
+  protected abstract TupleMap<T> convert(TupleMap<TupleList> hashed, boolean fromCache)
       throws IOException;
 
-  protected Tuple toKey(final Tuple outerTuple) {
-    for (int i = 0; i < leftKeyList.length; i++) {
-      keyTuple.put(i, outerTuple.asDatum(leftKeyList[i]));
-    }
-    return keyTuple;
-  }
-
   @Override
   public void rescan() throws IOException {
     super.rescan();

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
index ec29085..4f819ad 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/CommonJoinExec.java
@@ -33,7 +33,6 @@ import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.NullTuple;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -59,7 +58,6 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec {
   protected final Schema rightSchema;
 
   protected final FrameTuple frameTuple;
-  protected final Tuple outTuple;
 
   // projection
   protected Projector projector;
@@ -83,7 +81,6 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec {
 
     // for join
     this.frameTuple = new FrameTuple();
-    this.outTuple = new VTuple(outSchema.size());
   }
 
   /**
@@ -183,13 +180,13 @@ public abstract class CommonJoinExec extends BinaryPhysicalExec {
   }
 
   /**
-   * Return an tuple iterator, containing a single NullTuple
+   * Create a list that contains a single null tuple.
    *
-   * @param width the width of tuple
-   * @return an tuple iterator, containing a single NullTuple
+   * @param width the width of null tuple which will be created.
+   * @return created list of a null tuple
    */
-  protected Iterator<Tuple> nullIterator(int width) {
-    return Arrays.asList(NullTuple.create(width)).iterator();
+  protected List<Tuple> nullTupleList(int width) {
+    return Arrays.asList(NullTuple.create(width));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
index 2d836f4..a298564 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
@@ -302,14 +302,6 @@ public class ComparableVector {
       return new ComparableTuple(keyTypes, keyIndex);
     }
 
-    public VTuple toVTuple() {
-      VTuple vtuple = new VTuple(keyIndex.length);
-      for (int i = 0; i < keyIndex.length; i++) {
-        vtuple.put(i, toDatum(i));
-      }
-      return vtuple;
-    }
-
     public Datum toDatum(int i) {
       if (keys[i] == null) {
         return NullDatum.get();

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
index 7784817..7ac3e0b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
@@ -23,13 +23,15 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Int2Datum;
-import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.KeyProjector;
 import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
 import org.apache.tajo.plan.function.FunctionContext;
 import org.apache.tajo.plan.logical.DistinctGroupbyNode;
 import org.apache.tajo.plan.logical.GroupbyNode;
+import org.apache.tajo.storage.NullTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -94,9 +96,11 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
   private long totalNumRows;
   private int fetchedRows;
 
-  private int[] groupingKeyIndexes;
   private NonDistinctHashAggregator nonDistinctHashAggregator;
-  private DistinctHashAggregator[] distinctAggregators;
+  private Map<Integer, DistinctHashAggregator> nodeSeqToDistinctAggregators = TUtil.newHashMap();
+
+  private KeyProjector nonDistinctGroupingKeyProjector;
+  private Map<Integer, KeyProjector> distinctGroupbyKeyProjectors = TUtil.newHashMap();
 
   private int resultTupleLength;
 
@@ -111,37 +115,23 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
     super.init();
 
     // finding grouping column index
-    Column[] groupingColumns = plan.getGroupingColumns();
-    groupingKeyIndexes = new int[groupingColumns.length];
-
-    int index = 0;
-    for (Column col: groupingColumns) {
-      int keyIndex;
-      if (col.hasQualifier()) {
-        keyIndex = inSchema.getColumnId(col.getQualifiedName());
-      } else {
-        keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
-      }
-      groupingKeyIndexes[index++] = keyIndex;
-    }
-    resultTupleLength = groupingKeyIndexes.length + 1;  //1 is Sequence Datum which indicates sequence of DistinctNode.
+    Column[] groupingKeyColumns = plan.getGroupingColumns();
+    nonDistinctGroupingKeyProjector = new KeyProjector(inSchema, plan.getGroupingColumns());
+    resultTupleLength = groupingKeyColumns.length + 1;  //1 is Sequence Datum which indicates sequence of DistinctNode.
 
     List<GroupbyNode> groupbyNodes = plan.getSubPlans();
 
-    List<DistinctHashAggregator> distinctAggrList = new ArrayList<DistinctHashAggregator>();
     int distinctSeq = 0;
     for (GroupbyNode eachGroupby: groupbyNodes) {
       if (eachGroupby.isDistinct()) {
-        DistinctHashAggregator aggregator = new DistinctHashAggregator(eachGroupby);
-        aggregator.setNodeSequence(distinctSeq++);
-        distinctAggrList.add(aggregator);
+        DistinctHashAggregator aggregator = new DistinctHashAggregator(eachGroupby, distinctSeq);
+        nodeSeqToDistinctAggregators.put(distinctSeq++, aggregator);
         resultTupleLength += aggregator.getTupleLength();
       } else {
         nonDistinctHashAggregator = new NonDistinctHashAggregator(eachGroupby);
         resultTupleLength += nonDistinctHashAggregator.getTupleLength();
       }
     }
-    distinctAggregators = distinctAggrList.toArray(new DistinctHashAggregator[]{});
   }
 
   private int currentAggregatorIndex = 0;
@@ -154,13 +144,13 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
 
     int prevIndex = currentAggregatorIndex;
     while (!context.isStopped()) {
-      DistinctHashAggregator aggregator = distinctAggregators[currentAggregatorIndex];
+      DistinctHashAggregator aggregator = nodeSeqToDistinctAggregators.get(currentAggregatorIndex);
       Tuple result = aggregator.next();
       if (result != null) {
         return result;
       }
       currentAggregatorIndex++;
-      currentAggregatorIndex = currentAggregatorIndex % distinctAggregators.length;
+      currentAggregatorIndex = currentAggregatorIndex % nodeSeqToDistinctAggregators.size();
       if (currentAggregatorIndex == prevIndex) {
         finished = true;
         return null;
@@ -171,29 +161,36 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
   }
 
   private void prepareInputData() throws IOException {
-    Tuple tuple = null;
+    Tuple tuple;
+
     while(!context.isStopped() && (tuple = child.next()) != null) {
-      Tuple groupingKey = new VTuple(groupingKeyIndexes.length);
-      for (int i = 0; i < groupingKeyIndexes.length; i++) {
-        groupingKey.put(i, tuple.asDatum(groupingKeyIndexes[i]));
-      }
-      for (int i = 0; i < distinctAggregators.length; i++) {
-        distinctAggregators[i].compute(groupingKey, tuple);
+
+      KeyTuple groupingKey = nonDistinctGroupingKeyProjector.project(tuple);
+      for (int i = 0; i < nodeSeqToDistinctAggregators.size(); i++) {
+        nodeSeqToDistinctAggregators.get(i).compute(groupingKey, tuple);
       }
       if (nonDistinctHashAggregator != null) {
         nonDistinctHashAggregator.compute(groupingKey, tuple);
       }
     }
-    for (int i = 0; i < distinctAggregators.length; i++) {
-      distinctAggregators[i].rescan();
+    for (int i = 0; i < nodeSeqToDistinctAggregators.size(); i++) {
+      nodeSeqToDistinctAggregators.get(i).rescan();
     }
 
-    totalNumRows = distinctAggregators[0].distinctAggrDatas.size();
+    totalNumRows = nodeSeqToDistinctAggregators.get(0).distinctAggrDatas.size();
     preparedData = true;
   }
 
   @Override
   public void close() throws IOException {
+    if (nonDistinctHashAggregator != null) {
+      nonDistinctHashAggregator.close();
+      nonDistinctHashAggregator = null;
+    }
+    for (DistinctHashAggregator aggregator : nodeSeqToDistinctAggregators.values()) {
+      aggregator.close();
+    }
+    nodeSeqToDistinctAggregators.clear();
     child.close();
   }
 
@@ -223,8 +220,8 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
   public void rescan() {
     finished = false;
     currentAggregatorIndex = 0;
-    for (int i = 0; i < distinctAggregators.length; i++) {
-      distinctAggregators[i].rescan();
+    for (int i = 0; i < nodeSeqToDistinctAggregators.size(); i++) {
+      nodeSeqToDistinctAggregators.get(i).rescan();
     }
   }
 
@@ -233,13 +230,14 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
     private final AggregationFunctionCallEval aggFunctions[];
 
     // GroupingKey -> FunctionContext[]
-    private Map<Tuple, FunctionContext[]> nonDistinctAggrDatas;
+    private TupleMap<FunctionContext[]> nonDistinctAggrDatas;
     private int tupleLength;
 
-    private Tuple dummyTuple;
+    private final Tuple dummyTuple;
+    private final Tuple outTuple;
     private NonDistinctHashAggregator(GroupbyNode groupbyNode) throws IOException {
 
-      nonDistinctAggrDatas = new HashMap<Tuple, FunctionContext[]>();
+      nonDistinctAggrDatas = new TupleMap<FunctionContext[]>();
 
       if (groupbyNode.hasAggFunctions()) {
         aggFunctions = groupbyNode.getAggFunctions();
@@ -254,14 +252,12 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
         eachFunction.setFirstPhase();
       }
 
-      dummyTuple = new VTuple(aggFunctionsNum);
-      for (int i = 0; i < aggFunctionsNum; i++) {
-        dummyTuple.put(i, NullDatum.get());
-      }
+      outTuple = new VTuple(aggFunctionsNum);
+      dummyTuple = NullTuple.create(aggFunctionsNum);
       tupleLength = aggFunctionsNum;
     }
 
-    public void compute(Tuple groupingKeyTuple, Tuple tuple) {
+    public void compute(KeyTuple groupingKeyTuple, Tuple tuple) {
       FunctionContext[] contexts = nonDistinctAggrDatas.get(groupingKeyTuple);
       if (contexts != null) {
         for (int i = 0; i < aggFunctions.length; i++) {
@@ -282,13 +278,12 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
       if (contexts == null) {
         return null;
       }
-      Tuple tuple = new VTuple(aggFunctionsNum);
 
       for (int i = 0; i < aggFunctionsNum; i++) {
-        tuple.put(i, aggFunctions[i].terminate(contexts[i]));
+        outTuple.put(i, aggFunctions[i].terminate(contexts[i]));
       }
 
-      return tuple;
+      return outTuple;
     }
 
     public int getTupleLength() {
@@ -298,57 +293,51 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
     public Tuple getDummyTuple() {
       return dummyTuple;
     }
+
+    public void close() {
+      nonDistinctAggrDatas.clear();
+      nonDistinctAggrDatas = null;
+    }
   }
 
   class DistinctHashAggregator {
 
     // GroupingKey -> DistinctKey
-    private Map<Tuple, Set<Tuple>> distinctAggrDatas;
-    private Iterator<Entry<Tuple, Set<Tuple>>> iterator = null;
+    private TupleMap<TupleSet> distinctAggrDatas;
+    private Iterator<Entry<KeyTuple, TupleSet>> iterator = null;
 
     private int nodeSequence;
     private Int2Datum nodeSequenceDatum;
 
-    private int[] distinctKeyIndexes;
-
     private int tupleLength;
-    private Tuple dummyTuple;
+    private final Tuple dummyTuple;
+    private Tuple outTuple;
     private boolean aggregatorFinished = false;
 
-    public DistinctHashAggregator(GroupbyNode groupbyNode) throws IOException {
+    public DistinctHashAggregator(GroupbyNode groupbyNode, int nodeSequence) throws IOException {
 
-      Set<Integer> groupingKeyIndexSet = new HashSet<Integer>();
-      for (Integer eachIndex: groupingKeyIndexes) {
-        groupingKeyIndexSet.add(eachIndex);
-      }
+      Set<Column> groupingKeySet = TUtil.newHashSet(plan.getGroupingColumns());
 
-      List<Integer> distinctGroupingKeyIndexSet = new ArrayList<Integer>();
+      List<Column> distinctGroupingKeyIndexSet = new ArrayList<Column>();
       Column[] groupingColumns = groupbyNode.getGroupingColumns();
       for (int idx = 0; idx < groupingColumns.length; idx++) {
         Column col = groupingColumns[idx];
-        int keyIndex;
-        if (col.hasQualifier()) {
-          keyIndex = inSchema.getColumnId(col.getQualifiedName());
-        } else {
-          keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
-        }
-        if (!groupingKeyIndexSet.contains(keyIndex)) {
-          distinctGroupingKeyIndexSet.add(keyIndex);
+        if (!groupingKeySet.contains(col)) {
+          distinctGroupingKeyIndexSet.add(col);
         }
       }
-      int index = 0;
-      this.distinctKeyIndexes = new int[distinctGroupingKeyIndexSet.size()];
-      this.dummyTuple = new VTuple(distinctGroupingKeyIndexSet.size());
-      for (Integer eachId : distinctGroupingKeyIndexSet) {
-        this.dummyTuple.put(index, NullDatum.get());
-        this.distinctKeyIndexes[index++] = eachId;
-      }
+      Column[] distinctKeyColumns = new Column[distinctGroupingKeyIndexSet.size()];
+      distinctKeyColumns = distinctGroupingKeyIndexSet.toArray(distinctKeyColumns);
+      this.dummyTuple = NullTuple.create(distinctGroupingKeyIndexSet.size());
+
+      this.distinctAggrDatas = new TupleMap<TupleSet>();
+      distinctGroupbyKeyProjectors.put(nodeSequence, new KeyProjector(inSchema, distinctKeyColumns));
+      this.tupleLength = distinctKeyColumns.length;
 
-      this.distinctAggrDatas = new HashMap<Tuple, Set<Tuple>>();
-      this.tupleLength = distinctKeyIndexes.length;
+      setNodeSequence(nodeSequence);
     }
 
-    public void setNodeSequence(int nodeSequence) {
+    private void setNodeSequence(int nodeSequence) {
       this.nodeSequence = nodeSequence;
       this.nodeSequenceDatum = new Int2Datum((short)nodeSequence);
     }
@@ -357,15 +346,12 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
       return tupleLength;
     }
 
-    public void compute(Tuple groupingKey, Tuple tuple) throws IOException {
-      Tuple distinctKeyTuple = new VTuple(distinctKeyIndexes.length);
-      for (int i = 0; i < distinctKeyIndexes.length; i++) {
-        distinctKeyTuple.put(i, tuple.asDatum(distinctKeyIndexes[i]));
-      }
+    public void compute(KeyTuple groupingKey, Tuple tuple) throws IOException {
+      KeyTuple distinctKeyTuple = distinctGroupbyKeyProjectors.get(nodeSequence).project(tuple);
 
-      Set<Tuple> distinctEntry = distinctAggrDatas.get(groupingKey);
+      TupleSet distinctEntry = distinctAggrDatas.get(groupingKey);
       if (distinctEntry == null) {
-        distinctEntry = new HashSet<Tuple>();
+        distinctEntry = new TupleSet();
         distinctAggrDatas.put(groupingKey, distinctEntry);
       }
       distinctEntry.add(distinctKeyTuple);
@@ -379,14 +365,17 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
     }
 
     public void close() throws IOException {
+      for (TupleSet set : distinctAggrDatas.values()) {
+        set.clear();
+      }
       distinctAggrDatas.clear();
       distinctAggrDatas = null;
       currentGroupingTuples = null;
       iterator = null;
     }
 
-    Entry<Tuple, Set<Tuple>> currentGroupingTuples;
-    Iterator<Tuple> distinctKeyIterator;
+    Entry<KeyTuple, TupleSet> currentGroupingTuples;
+    Iterator<KeyTuple> distinctKeyIterator;
     boolean groupingKeyChanged = false;
 
     public Tuple next() {
@@ -415,30 +404,32 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
       }
       // node sequence, groupingKeys, 1'st distinctKeys, 2'st distinctKeys, ...
       // If n'st == this.nodeSequence set with real data, otherwise set with NullDatum
-      VTuple tuple = new VTuple(resultTupleLength);
       int tupleIndex = 0;
-      tuple.put(tupleIndex++, nodeSequenceDatum);
+      if (outTuple == null) {
+        outTuple = new VTuple(resultTupleLength);
+      }
+      outTuple.put(tupleIndex++, nodeSequenceDatum);
 
       // merge grouping key
       Tuple groupingKeyTuple = currentGroupingTuples.getKey();
       int groupingKeyLength = groupingKeyTuple.size();
       for (int i = 0; i < groupingKeyLength; i++, tupleIndex++) {
-        tuple.put(tupleIndex, groupingKeyTuple.asDatum(i));
+        outTuple.put(tupleIndex, groupingKeyTuple.asDatum(i));
       }
 
       // merge distinctKey
-      for (int i = 0; i < distinctAggregators.length; i++) {
+      for (int i = 0; i < nodeSeqToDistinctAggregators.size(); i++) {
         if (i == nodeSequence) {
           Tuple distinctKeyTuple = distinctKeyIterator.next();
           int distinctKeyLength = distinctKeyTuple.size();
           for (int j = 0; j < distinctKeyLength; j++, tupleIndex++) {
-            tuple.put(tupleIndex, distinctKeyTuple.asDatum(j));
+            outTuple.put(tupleIndex, distinctKeyTuple.asDatum(j));
           }
         } else {
-          Tuple dummyTuple = distinctAggregators[i].getDummyTuple();
+          Tuple dummyTuple = nodeSeqToDistinctAggregators.get(i).getDummyTuple();
           int dummyTupleSize = dummyTuple.size();
           for (int j = 0; j < dummyTupleSize; j++, tupleIndex++) {
-            tuple.put(tupleIndex, dummyTuple.asDatum(j));
+            outTuple.put(tupleIndex, dummyTuple.asDatum(j));
           }
         }
       }
@@ -457,10 +448,10 @@ public class DistinctGroupbyFirstAggregationExec extends UnaryPhysicalExec {
         }
         int tupleSize = nonDistinctTuple.size();
         for (int j = 0; j < tupleSize; j++, tupleIndex++) {
-          tuple.put(tupleIndex, nonDistinctTuple.asDatum(j));
+          outTuple.put(tupleIndex, nonDistinctTuple.asDatum(j));
         }
       }
-      return tuple;
+      return outTuple;
     }
 
     public Tuple getDummyTuple() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
index c8a6588..2d1fa4b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
@@ -21,27 +21,34 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.planner.KeyProjector;
 import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.function.FunctionContext;
 import org.apache.tajo.plan.logical.DistinctGroupbyNode;
 import org.apache.tajo.plan.logical.GroupbyNode;
+import org.apache.tajo.storage.NullTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
+
   private boolean finished = false;
 
-  private DistinctGroupbyNode plan;
+  private final DistinctGroupbyNode plan;
   private HashAggregator[] hashAggregators;
-  private int distinctGroupingKeyIds[];
+
+  private List<Column> distinctGroupingKeyColumnSet;
   private boolean first = true;
   private int groupbyNodeNum;
   private int outputColumnNum;
@@ -50,6 +57,10 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
 
   private int[] resultColumnIdIndexes;
 
+  private Tuple outTuple;
+
+  private KeyProjector outerKeyProjector;
+
   public DistinctGroupbyHashAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, PhysicalExec subOp)
       throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema(), subOp);
@@ -60,23 +71,16 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
   public void init() throws IOException {
     super.init();
 
-    List<Integer> distinctGroupingKeyIdList = new ArrayList<Integer>();
-    for (Column col: plan.getGroupingColumns()) {
-      int keyIndex;
-      if (col.hasQualifier()) {
-        keyIndex = inSchema.getColumnId(col.getQualifiedName());
-      } else {
-        keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
-      }
-      if (!distinctGroupingKeyIdList.contains(keyIndex)) {
-        distinctGroupingKeyIdList.add(keyIndex);
+    distinctGroupingKeyColumnSet = TUtil.newList();
+    for (Column col : plan.getGroupingColumns()) {
+      if (!distinctGroupingKeyColumnSet.contains(col)) {
+        distinctGroupingKeyColumnSet.add(col);
       }
     }
-    int idx = 0;
-    distinctGroupingKeyIds = new int[distinctGroupingKeyIdList.size()];
-    for (Integer intVal: distinctGroupingKeyIdList) {
-      distinctGroupingKeyIds[idx++] = intVal;
-    }
+    Column[] distinctGroupingKeyColumns = new Column[distinctGroupingKeyColumnSet.size()];
+    distinctGroupingKeyColumns = distinctGroupingKeyColumnSet.toArray(distinctGroupingKeyColumns);
+
+    outerKeyProjector = new KeyProjector(inSchema, distinctGroupingKeyColumns);
 
     List<GroupbyNode> groupbyNodes = plan.getSubPlans();
     groupbyNodeNum = groupbyNodes.size();
@@ -88,6 +92,7 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
     }
 
     outputColumnNum = plan.getOutSchema().size();
+    outTuple = new VTuple(outputColumnNum);
 
     int allGroupbyOutColNum = 0;
     for (GroupbyNode eachGroupby: plan.getSubPlans()) {
@@ -105,7 +110,7 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
     }
   }
 
-  List<Tuple> currentAggregatedTuples = null;
+  TupleList currentAggregatedTuples = null;
   int currentAggregatedTupleIndex = 0;
   int currentAggregatedTupleSize = 0;
 
@@ -143,20 +148,21 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
     // Groupby_Key2 | Distinct1_Column_V3 |                     |                          |
     //--------------------------------------------------------------------------------------
 
-    List<List<Tuple>> tupleSlots = new ArrayList<List<Tuple>>();
+    List<TupleList> tupleSlots = new ArrayList<TupleList>();
 
     // aggregation with single grouping key
     for (int i = 0; i < hashAggregators.length; i++) {
-      if (!hashAggregators[i].iterator.hasNext()) {
+      HashAggregator hashAggregator = hashAggregators[i];
+      if (!hashAggregator.iterator.hasNext()) {
         nullCount++;
-        tupleSlots.add(new ArrayList<Tuple>());
+        tupleSlots.add(new TupleList());
         continue;
       }
-      Entry<Tuple, Map<Tuple, FunctionContext[]>> entry = hashAggregators[i].iterator.next();
+      Entry<KeyTuple, TupleMap<FunctionContext[]>> entry = hashAggregator.iterator.next();
       if (distinctGroupingKey == null) {
         distinctGroupingKey = entry.getKey();
       }
-      List<Tuple> aggregatedTuples = hashAggregators[i].aggregate(entry.getValue());
+      TupleList aggregatedTuples = hashAggregator.aggregate(entry.getValue());
       tupleSlots.add(aggregatedTuples);
     }
 
@@ -167,11 +173,7 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
       // If DistinctGroupbyHashAggregationExec does not have any rows,
       // it should return NullDatum.
       if (totalNumRows == 0 && groupbyNodeNum == 0) {
-        Tuple tuple = new VTuple(outputColumnNum);
-        for (int i = 0; i < tuple.size(); i++) {
-          tuple.put(i, DatumFactory.createNullDatum());
-        }
-        return tuple;
+        return NullTuple.create(outputColumnNum);
       } else {
         return null;
       }
@@ -206,20 +208,23 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
     */
 
     // currentAggregatedTuples has tuples which has same group key.
-    currentAggregatedTuples = new ArrayList<Tuple>();
+    if (currentAggregatedTuples == null) {
+      currentAggregatedTuples = new TupleList();
+    } else {
+      currentAggregatedTuples.clear();
+    }
     int listIndex = 0;
     while (true) {
-      // Each item in tuples is VTuple. So the tuples variable is two dimensions(tuple[aggregator][datum]).
       Tuple[] tuples = new Tuple[hashAggregators.length];
+      // Each item in tuples is VTuple. So the tuples variable is two dimensions(tuple[aggregator][datum]).
       for (int i = 0; i < hashAggregators.length; i++) {
-        List<Tuple> aggregatedTuples = tupleSlots.get(i);
+        TupleList aggregatedTuples = tupleSlots.get(i);
         if (aggregatedTuples.size() > listIndex) {
           tuples[i] = tupleSlots.get(i).get(listIndex);
         }
       }
 
       //merge
-      Tuple mergedTuple = new VTuple(outputColumnNum);
       int resultColumnIdx = 0;
 
       boolean allNull = true;
@@ -236,12 +241,12 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
               // set group key tuple
               // Because each hashAggregator has different number of tuples,
               // sometimes getting group key from each hashAggregator will be null value.
-              mergedTuple.put(mergeTupleIndex, distinctGroupingKey.asDatum(mergeTupleIndex));
+              outTuple.put(mergeTupleIndex, distinctGroupingKey.asDatum(mergeTupleIndex));
             } else {
               if (tuples[i] != null) {
-                mergedTuple.put(mergeTupleIndex, tuples[i].asDatum(j));
+                outTuple.put(mergeTupleIndex, tuples[i].asDatum(j));
               } else {
-                mergedTuple.put(mergeTupleIndex, NullDatum.get());
+                outTuple.put(mergeTupleIndex, NullDatum.get());
               }
             }
           }
@@ -253,10 +258,15 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
         break;
       }
 
-      currentAggregatedTuples.add(mergedTuple);
+      currentAggregatedTuples.add(outTuple);
       listIndex++;
     }
 
+    for (TupleList eachList : tupleSlots) {
+      eachList.clear();
+    }
+    tupleSlots.clear();
+
     currentAggregatedTupleIndex = 0;
     currentAggregatedTupleSize = currentAggregatedTuples.size();
 
@@ -267,13 +277,11 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
     }
 
     fetchedRows++;
-    Tuple tuple = currentAggregatedTuples.get(currentAggregatedTupleIndex++);
-
-    return tuple;
+    return currentAggregatedTuples.get(currentAggregatedTupleIndex++);
   }
 
   private void loadChildHashTable() throws IOException {
-    Tuple tuple = null;
+    Tuple tuple;
     while(!context.isStopped() && (tuple = child.next()) != null) {
       for (int i = 0; i < hashAggregators.length; i++) {
         hashAggregators[i].compute(tuple);
@@ -296,6 +304,13 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
     if (child != null) {
       child.close();
     }
+    if (currentAggregatedTuples != null) {
+      currentAggregatedTuples.clear();
+      currentAggregatedTuples = null;
+    }
+    if (distinctGroupingKeyColumnSet != null) {
+      distinctGroupingKeyColumnSet.clear();
+    }
   }
 
   public void rescan() throws IOException {
@@ -303,6 +318,9 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
     for (int i = 0; i < hashAggregators.length; i++) {
       hashAggregators[i].initFetch();
     }
+    if (currentAggregatedTuples != null) {
+      currentAggregatedTuples.clear();
+    }
   }
 
   public float getProgress() {
@@ -327,44 +345,33 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
 
   class HashAggregator {
     // Outer's GroupBy Key -> Each GroupByNode's Key -> FunctionContext
-    private Map<Tuple, Map<Tuple, FunctionContext[]>> hashTable;
-    private Iterator<Entry<Tuple, Map<Tuple, FunctionContext[]>>> iterator = null;
+    private TupleMap<TupleMap<FunctionContext[]>> hashTable;
+    private Iterator<Entry<KeyTuple, TupleMap<FunctionContext[]>>> iterator = null;
+
+    private final KeyProjector innerKeyProjector;
 
-    private int groupingKeyIds[];
     private final int aggFunctionsNum;
     private final AggregationFunctionCallEval aggFunctions[];
 
-    int tupleSize;
+    private final Tuple aggregatedTuple;
+
+    private final int tupleSize;
 
     public HashAggregator(GroupbyNode groupbyNode, Schema schema) throws IOException {
 
-      hashTable = new HashMap<Tuple, Map<Tuple, FunctionContext[]>>(10000);
+      hashTable = new TupleMap<TupleMap<FunctionContext[]>>(10000);
 
-      List<Integer> distinctGroupingKeyIdSet = new ArrayList<Integer>();
-      for (int i = 0; i < distinctGroupingKeyIds.length; i++) {
-        distinctGroupingKeyIdSet.add(distinctGroupingKeyIds[i]);
-      }
+      List<Column> groupingKeyColumnList = new ArrayList<Column>(distinctGroupingKeyColumnSet);
 
-      List<Integer> groupingKeyIdList = new ArrayList<Integer>(distinctGroupingKeyIdSet);
       Column[] keyColumns = groupbyNode.getGroupingColumns();
       Column col;
       for (int idx = 0; idx < keyColumns.length; idx++) {
         col = keyColumns[idx];
-        int keyIndex;
-        if (col.hasQualifier()) {
-          keyIndex = inSchema.getColumnId(col.getQualifiedName());
-        } else {
-          keyIndex = inSchema.getColumnIdByName(col.getSimpleName());
+        if (!distinctGroupingKeyColumnSet.contains(col)) {
+          groupingKeyColumnList.add(col);
         }
-        if (!distinctGroupingKeyIdSet.contains(keyIndex)) {
-          groupingKeyIdList.add(keyIndex);
-        }
-      }
-      int index = 0;
-      groupingKeyIds = new int[groupingKeyIdList.size()];
-      for (Integer eachId : groupingKeyIdList) {
-        groupingKeyIds[index++] = eachId;
       }
+      Column[] groupingKeyColumns = groupingKeyColumnList.toArray(new Column[groupingKeyColumnList.size()]);
 
       if (groupbyNode.hasAggFunctions()) {
         aggFunctions = groupbyNode.getAggFunctions();
@@ -378,7 +385,9 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
         aggFunction.bind(context.getEvalContext(), schema);
       }
 
-      tupleSize = groupingKeyIds.length + aggFunctionsNum;
+      tupleSize = groupingKeyColumns.length + aggFunctionsNum;
+      aggregatedTuple = new VTuple(groupingKeyColumns.length + aggFunctionsNum);
+      innerKeyProjector = new KeyProjector(inSchema, groupingKeyColumns);
     }
 
     public int getTupleSize() {
@@ -386,22 +395,16 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
     }
 
     public void compute(Tuple tuple) throws IOException {
-      Tuple outerKeyTuple = new VTuple(distinctGroupingKeyIds.length);
-      for (int i = 0; i < distinctGroupingKeyIds.length; i++) {
-        outerKeyTuple.put(i, tuple.asDatum(distinctGroupingKeyIds[i]));
-      }
+      KeyTuple outerKeyTuple = outerKeyProjector.project(tuple);
+      TupleMap<FunctionContext[]> distinctEntry = hashTable.get(outerKeyTuple);
 
-      Tuple keyTuple = new VTuple(groupingKeyIds.length);
-      for (int i = 0; i < groupingKeyIds.length; i++) {
-        keyTuple.put(i, tuple.asDatum(groupingKeyIds[i]));
-      }
-
-      Map<Tuple, FunctionContext[]> distinctEntry = hashTable.get(outerKeyTuple);
       if (distinctEntry == null) {
-        distinctEntry = new HashMap<Tuple, FunctionContext[]>();
+        distinctEntry = new TupleMap<FunctionContext[]>();
         hashTable.put(outerKeyTuple, distinctEntry);
       }
-      FunctionContext[] contexts = distinctEntry.get(keyTuple);
+
+      KeyTuple innerKeyTuple = innerKeyProjector.project(tuple);
+      FunctionContext[] contexts = distinctEntry.get(innerKeyTuple);
       if (contexts != null) {
         for (int i = 0; i < aggFunctions.length; i++) {
           aggFunctions[i].merge(contexts[i], tuple);
@@ -412,7 +415,7 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
           contexts[i] = aggFunctions[i].newContext();
           aggFunctions[i].merge(contexts[i], tuple);
         }
-        distinctEntry.put(keyTuple, contexts);
+        distinctEntry.put(innerKeyTuple, contexts);
       }
     }
 
@@ -420,27 +423,29 @@ public class DistinctGroupbyHashAggregationExec extends UnaryPhysicalExec {
       iterator = hashTable.entrySet().iterator();
     }
 
-    public List<Tuple> aggregate(Map<Tuple, FunctionContext[]> groupTuples) {
-      List<Tuple> aggregatedTuples = new ArrayList<Tuple>();
+    public TupleList aggregate(Map<KeyTuple, FunctionContext[]> groupTuples) {
+      TupleList aggregatedTuples = new TupleList();
 
-      for (Entry<Tuple, FunctionContext[]> entry : groupTuples.entrySet()) {
-        Tuple tuple = new VTuple(groupingKeyIds.length + aggFunctionsNum);
+      for (Entry<KeyTuple, FunctionContext[]> entry : groupTuples.entrySet()) {
         Tuple groupbyKey = entry.getKey();
         int index = 0;
         for (; index < groupbyKey.size(); index++) {
-          tuple.put(index, groupbyKey.asDatum(index));
+          aggregatedTuple.put(index, groupbyKey.asDatum(index));
         }
 
         FunctionContext[] contexts = entry.getValue();
         for (int i = 0; i < aggFunctionsNum; i++, index++) {
-          tuple.put(index, aggFunctions[i].terminate(contexts[i]));
+          aggregatedTuple.put(index, aggFunctions[i].terminate(contexts[i]));
         }
-        aggregatedTuples.add(tuple);
+        aggregatedTuples.add(aggregatedTuple);
       }
       return aggregatedTuples;
     }
 
     public void close() throws IOException {
+      for (TupleMap<FunctionContext[]> map : hashTable.values()) {
+        map.clear();
+      }
       hashTable.clear();
       hashTable = null;
       iterator = null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
index 5a262a6..b3edab6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
@@ -1,4 +1,4 @@
-  /**
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,22 +18,20 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-  import org.apache.commons.logging.Log;
-  import org.apache.commons.logging.LogFactory;
-  import org.apache.tajo.catalog.Column;
-  import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
-  import org.apache.tajo.plan.function.FunctionContext;
-  import org.apache.tajo.plan.logical.DistinctGroupbyNode;
-  import org.apache.tajo.plan.logical.GroupbyNode;
-  import org.apache.tajo.storage.Tuple;
-  import org.apache.tajo.storage.VTuple;
-  import org.apache.tajo.worker.TaskAttemptContext;
-
-  import java.io.IOException;
-  import java.util.ArrayList;
-  import java.util.HashSet;
-  import java.util.List;
-  import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
+import org.apache.tajo.plan.function.FunctionContext;
+import org.apache.tajo.plan.logical.DistinctGroupbyNode;
+import org.apache.tajo.plan.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
 
 /**
  * This class adjusts shuffle columns between DistinctGroupbyFirstAggregationExec and
@@ -86,10 +84,21 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
   private AggregationFunctionCallEval[] nonDistinctAggrFunctions;
   private int nonDistinctAggrTupleStartIndex = -1;
 
+  // Key tuples may have various lengths. The below two maps are used to cache key tuple instances.
+  // Each map is a mapping of key length to key tuple.
+  private Map<Integer, Tuple> keyTupleMap = new HashMap<Integer, Tuple>();
+  private Map<Integer, Tuple> prevKeyTupleMap = new HashMap<Integer, Tuple>();
+
+  private Tuple prevKeyTuple = null;
+  private Tuple prevTuple = null;
+  private final Tuple outTuple;
+  private int prevSeq = -1;
+
   public DistinctGroupbySecondAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec)
       throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema(), sortExec);
     this.plan = plan;
+    outTuple = new VTuple(outSchema.size());
   }
 
   @Override
@@ -158,20 +167,15 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
     }
   }
 
-  Tuple prevKeyTuple = null;
-  Tuple prevTuple = null;
-  int prevSeq = -1;
-
   @Override
   public Tuple next() throws IOException {
     if (finished) {
       return null;
     }
 
-    Tuple result = null;
     while (!context.isStopped()) {
-      Tuple childTuple = child.next();
-      if (childTuple == null) {
+      Tuple tuple = child.next();
+      if (tuple == null) {
         finished = true;
 
         if (prevTuple == null) {
@@ -181,15 +185,8 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
         if (prevSeq == 0 && nonDistinctAggrFunctions != null) {
           terminatedNonDistinctAggr(prevTuple);
         }
-        result = prevTuple;
-        break;
-      }
-
-      Tuple tuple = null;
-      try {
-        tuple = childTuple.clone();
-      } catch (CloneNotSupportedException e) {
-        throw new IOException(e.getMessage(), e);
+        outTuple.put(prevTuple.getValues());
+        return outTuple;
       }
 
       int distinctSeq = tuple.getInt2(0);
@@ -201,8 +198,8 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
           initNonDistinctAggrContext();
           mergeNonDistinctAggr(tuple);
         }
-        prevKeyTuple = keyTuple;
-        prevTuple = tuple;
+        prevKeyTuple = getKeyTuple(prevKeyTupleMap, keyTuple.getValues());
+        prevTuple = new VTuple(tuple.getValues());
         prevSeq = distinctSeq;
         continue;
       }
@@ -212,20 +209,20 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
         if (prevSeq == 0 && nonDistinctAggrFunctions != null) {
           terminatedNonDistinctAggr(prevTuple);
         }
-        result = prevTuple;
+        outTuple.put(prevTuple.getValues());
 
-        prevKeyTuple = keyTuple;
-        prevTuple = tuple;
+        prevKeyTuple = getKeyTuple(prevKeyTupleMap, keyTuple.getValues());
+        prevTuple.put(tuple.getValues());
         prevSeq = distinctSeq;
 
         if (distinctSeq == 0 && nonDistinctAggrFunctions != null) {
           initNonDistinctAggrContext();
           mergeNonDistinctAggr(tuple);
         }
-        break;
+        return outTuple;
       } else {
-        prevKeyTuple = keyTuple;
-        prevTuple = tuple;
+        prevKeyTuple = getKeyTuple(prevKeyTupleMap, keyTuple.getValues());
+        prevTuple.put(tuple.getValues());
         prevSeq = distinctSeq;
         if (distinctSeq == 0 && nonDistinctAggrFunctions != null) {
           mergeNonDistinctAggr(tuple);
@@ -233,7 +230,7 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
       }
     }
 
-    return result;
+    return null;
   }
 
   private void initNonDistinctAggrContext() {
@@ -265,8 +262,8 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
 
   private Tuple getKeyTuple(int distinctSeq, Tuple tuple) {
     int[] columnIndexes = distinctKeyIndexes[distinctSeq];
-
-    Tuple keyTuple = new VTuple(numGroupingColumns + columnIndexes.length + 1);
+    int keyLength = numGroupingColumns + columnIndexes.length + 1;
+    Tuple keyTuple = getKeyTuple(keyTupleMap, keyLength);
     keyTuple.put(0, tuple.asDatum(0));
     for (int i = 0; i < numGroupingColumns; i++) {
       keyTuple.put(i + 1, tuple.asDatum(i + 1));
@@ -278,16 +275,39 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
     return keyTuple;
   }
 
+  private static Tuple getKeyTuple(Map<Integer, Tuple> keyTupleMap, Datum[] values) {
+    Tuple keyTuple = getKeyTuple(keyTupleMap, values.length);
+    keyTuple.put(values);
+    return keyTuple;
+  }
+
+  private static Tuple getKeyTuple(Map<Integer, Tuple> keyTupleMap, int keyLength) {
+    Tuple keyTuple;
+    if (keyTupleMap.containsKey(keyLength)) {
+      keyTuple = keyTupleMap.get(keyLength);
+    } else {
+      keyTuple = new VTuple(keyLength);
+      keyTupleMap.put(keyLength, keyTuple);
+    }
+    return keyTuple;
+  }
+
   @Override
   public void rescan() throws IOException {
     super.rescan();
     prevKeyTuple = null;
     prevTuple = null;
     finished = false;
+    keyTupleMap.clear();
+    prevKeyTupleMap.clear();
   }
 
   @Override
   public void close() throws IOException {
     super.close();
+    keyTupleMap.clear();
+    prevKeyTupleMap.clear();
+    prevKeyTuple = null;
+    prevTuple = null;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
index c91dcca..58cfca4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
@@ -22,7 +22,6 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
 import org.apache.tajo.plan.logical.DistinctGroupbyNode;
 import org.apache.tajo.plan.logical.GroupbyNode;
 import org.apache.tajo.storage.Tuple;
@@ -42,6 +41,8 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec {
 
   private int[] resultColumnIdIndexes;
 
+  private final Tuple outTuple;
+
   public DistinctGroupbySortAggregationExec(final TaskAttemptContext context, DistinctGroupbyNode plan,
                                             SortAggregateExec[] aggregateExecs) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema());
@@ -50,6 +51,7 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec {
 
     currentTuples = new Tuple[groupbyNodeNum];
     outColumnNum = outSchema.size();
+    outTuple = new VTuple(outColumnNum);
 
     int allGroupbyOutColNum = 0;
     for (GroupbyNode eachGroupby: plan.getSubPlans()) {
@@ -110,23 +112,20 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec {
       return null;
     }
 
-    Tuple mergedTuple = new VTuple(outColumnNum);
-
     int mergeTupleIndex = 0;
     for (int i = 0; i < currentTuples.length; i++) {
       int tupleSize = currentTuples[i].size();
       for (int j = 0; j < tupleSize; j++) {
         if (resultColumnIdIndexes[mergeTupleIndex] >= 0) {
-          mergedTuple.put(resultColumnIdIndexes[mergeTupleIndex], currentTuples[i].asDatum(j));
+          outTuple.put(resultColumnIdIndexes[mergeTupleIndex], currentTuples[i].asDatum(j));
         }
         mergeTupleIndex++;
       }
     }
-    return mergedTuple;
+    return outTuple;
   }
 
   private Tuple getEmptyTuple() {
-    Tuple tuple = new VTuple(outSchema.size());
     NullDatum nullDatum = DatumFactory.createNullDatum();
 
     int tupleIndex = 0;
@@ -134,23 +133,23 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec {
       for (int i = 0; i < aggExec.aggFunctionsNum; i++, tupleIndex++) {
         String funcName = aggExec.aggFunctions[i].getName();
         if ("min".equals(funcName) || "max".equals(funcName) || "avg".equals(funcName) || "sum".equals(funcName)) {
-          tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createNullDatum());
+          outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createNullDatum());
         }
         else
         {
           TajoDataTypes.Type type = outSchema.getColumn(resultColumnIdIndexes[tupleIndex]).getDataType().getType();
           if (type == TajoDataTypes.Type.INT8) {
-            tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt8(nullDatum.asInt8()));
+            outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt8(nullDatum.asInt8()));
           } else if (type == TajoDataTypes.Type.INT4) {
-            tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt4(nullDatum.asInt4()));
+            outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt4(nullDatum.asInt4()));
           } else if (type == TajoDataTypes.Type.INT2) {
-            tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt2(nullDatum.asInt2()));
+            outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt2(nullDatum.asInt2()));
           } else if (type == TajoDataTypes.Type.FLOAT4) {
-            tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createFloat4(nullDatum.asFloat4()));
+            outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createFloat4(nullDatum.asFloat4()));
           } else if (type == TajoDataTypes.Type.FLOAT8) {
-            tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createFloat8(nullDatum.asFloat8()));
+            outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createFloat8(nullDatum.asFloat8()));
           } else {
-            tuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createNullDatum());
+            outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createNullDatum());
           }
         }
       }
@@ -159,7 +158,7 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec {
     finished = true;
     first = false;
 
-    return tuple;
+    return outTuple;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
index 5791230..9e9e9b4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
@@ -51,6 +51,11 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
 
   private int[] resultTupleIndexes;
 
+  private Tuple outTuple;
+  private Tuple keyTuple;
+  private Tuple prevKeyTuple = null;
+  private Tuple prevTuple = null;
+
   public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec)
       throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema(), sortExec);
@@ -63,6 +68,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
 
     numGroupingColumns = plan.getGroupingColumns().length;
     resultTupleLength = numGroupingColumns;
+    keyTuple = new VTuple(numGroupingColumns);
 
     List<GroupbyNode> groupbyNodes = plan.getSubPlans();
 
@@ -86,6 +92,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
       resultTupleLength += eachGroupby.getAggFunctions().length;
     }
     aggregators = aggregatorList.toArray(new DistinctFinalAggregator[]{});
+    outTuple = new VTuple(resultTupleLength);
 
     // make output schema mapping index
     resultTupleIndexes = new int[outSchema.size()];
@@ -128,21 +135,16 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
     }
   }
 
-  Tuple prevKeyTuple = null;
-  Tuple prevTuple = null;
-
   @Override
   public Tuple next() throws IOException {
     if (finished) {
       return null;
     }
 
-    Tuple resultTuple = new VTuple(resultTupleLength);
-
     while (!context.isStopped()) {
-      Tuple childTuple = child.next();
+      Tuple tuple = child.next();
       // Last tuple
-      if (childTuple == null) {
+      if (tuple == null) {
         finished = true;
 
         if (prevTuple == null) {
@@ -156,19 +158,13 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
         }
 
         for (int i = 0; i < numGroupingColumns; i++) {
-          resultTuple.put(resultTupleIndexes[i], prevTuple.asDatum(i + 1));
+          outTuple.put(resultTupleIndexes[i], prevTuple.asDatum(i + 1));
         }
         for (DistinctFinalAggregator eachAggr: aggregators) {
-          eachAggr.terminate(resultTuple);
+          eachAggr.terminate(outTuple);
         }
-        break;
-      }
 
-      Tuple tuple = null;
-      try {
-        tuple = childTuple.clone();
-      } catch (CloneNotSupportedException e) {
-        throw new IOException(e.getMessage(), e);
+        return outTuple;
       }
 
       int distinctSeq = tuple.getInt2(0);
@@ -176,8 +172,8 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
 
       // First tuple
       if (prevKeyTuple == null) {
-        prevKeyTuple = keyTuple;
-        prevTuple = tuple;
+        prevKeyTuple = new VTuple(keyTuple.getValues());
+        prevTuple = new VTuple(tuple.getValues());
 
         aggregators[distinctSeq].merge(tuple);
         continue;
@@ -186,38 +182,36 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
       if (!prevKeyTuple.equals(keyTuple)) {
         // new grouping key
         for (int i = 0; i < numGroupingColumns; i++) {
-          resultTuple.put(resultTupleIndexes[i], prevTuple.asDatum(i + 1));
+          outTuple.put(resultTupleIndexes[i], prevTuple.asDatum(i + 1));
         }
         for (DistinctFinalAggregator eachAggr: aggregators) {
-          eachAggr.terminate(resultTuple);
+          eachAggr.terminate(outTuple);
         }
 
-        prevKeyTuple = keyTuple;
-        prevTuple = tuple;
+        prevKeyTuple.put(keyTuple.getValues());
+        prevTuple.put(tuple.getValues());
 
         aggregators[distinctSeq].merge(tuple);
-        break;
+        return outTuple;
       } else {
-        prevKeyTuple = keyTuple;
-        prevTuple = tuple;
+        prevKeyTuple.put(keyTuple.getValues());
+        prevTuple.put(tuple.getValues());
         aggregators[distinctSeq].merge(tuple);
       }
     }
 
-    return resultTuple;
+    return null;
   }
 
   private Tuple makeEmptyTuple() {
-    Tuple resultTuple = new VTuple(resultTupleLength);
     for (DistinctFinalAggregator eachAggr: aggregators) {
-      eachAggr.terminateEmpty(resultTuple);
+      eachAggr.terminateEmpty(outTuple);
     }
 
-    return resultTuple;
+    return outTuple;
   }
 
   private Tuple getGroupingKeyTuple(Tuple tuple) {
-    Tuple keyTuple = new VTuple(numGroupingColumns);
     for (int i = 0; i < numGroupingColumns; i++) {
       keyTuple.put(i, tuple.asDatum(i + 1));
     }


Mime
View raw message