tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [43/52] [abbrv] tajo git commit: TAJO-1343: Improve the memory usage of physical executors. (jihoon)
Date Wed, 22 Jul 2015 13:01:16 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 13b5a3a..adbafd9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -31,13 +30,11 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.PhysicalPlanningException;
 import org.apache.tajo.plan.logical.SortNode;
 import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.Scanner;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.unit.StorageUnit;
@@ -47,7 +44,9 @@ import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.*;
 
 import static org.apache.tajo.storage.RawFile.RawFileAppender;
@@ -81,7 +80,7 @@ public class ExternalSortExec extends SortExec {
   /** If there are available multiple cores, it tries parallel merge. */
   private ExecutorService executorService;
   /** used for in-memory sort of each chunk. */
-  private List<Tuple> inMemoryTable;
+  private TupleList inMemoryTable;
   /** temporal dir */
   private final Path sortTmpDir;
   /** It enables round-robin disks allocation */
@@ -120,7 +119,7 @@ public class ExternalSortExec extends SortExec {
     this.sortBufferBytesNum = context.getQueryContext().getLong(SessionVars.EXTSORT_BUFFER_SIZE) * StorageUnit.MB;
     this.allocatedCoreNum = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM);
     this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum);
-    this.inMemoryTable = new ArrayList<Tuple>(100000);
+    this.inMemoryTable = new TupleList(100000);
 
     this.sortTmpDir = getExecutorTmpDir();
     localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
@@ -161,7 +160,7 @@ public class ExternalSortExec extends SortExec {
   /**
    * Sort a tuple block and store them into a chunk file
    */
-  private Path sortAndStoreChunk(int chunkId, List<Tuple> tupleBlock)
+  private Path sortAndStoreChunk(int chunkId, TupleList tupleBlock)
       throws IOException {
     TableMeta meta = CatalogUtil.newTableMeta("RAW");
     int rowNum = tupleBlock.size();
@@ -203,9 +202,8 @@ public class ExternalSortExec extends SortExec {
     int chunkId = 0;
     long runStartTime = System.currentTimeMillis();
     while (!context.isStopped() && (tuple = child.next()) != null) { // partition sort start
-      Tuple vtuple = new VTuple(tuple);
-      inMemoryTable.add(vtuple);
-      memoryConsumption += MemoryUtil.calculateMemorySize(vtuple);
+      inMemoryTable.add(tuple);
+      memoryConsumption += MemoryUtil.calculateMemorySize(tuple);
 
       if (memoryConsumption > sortBufferBytesNum) {
         long runEndTime = System.currentTimeMillis();
@@ -645,6 +643,8 @@ public class ExternalSortExec extends SortExec {
     private Tuple leftTuple;
     private Tuple rightTuple;
 
+    private final Tuple outTuple;
+
     private float mergerProgress;
     private TableStats mergerInputStats;
 
@@ -656,6 +656,7 @@ public class ExternalSortExec extends SortExec {
       this.leftScan = leftScanner;
       this.rightScan = rightScanner;
       this.comparator = comparator;
+      this.outTuple = new VTuple(schema.size());
     }
 
     private void setState(State state) {
@@ -685,25 +686,26 @@ public class ExternalSortExec extends SortExec {
     }
 
     protected Tuple prepare(int index, Tuple tuple) {
-      return tuple == null ? null : new VTuple(tuple);
+      return tuple;
     }
 
     protected int compare() {
       return comparator.compare(leftTuple, rightTuple);
     }
 
+    @Override
     public Tuple next() throws IOException {
       if (leftTuple == null && rightTuple == null) {
         return null;
       }
       if (rightTuple == null || (leftTuple != null && compare() < 0)) {
-        Tuple tuple = leftTuple;
+        outTuple.put(leftTuple.getValues());
         leftTuple = prepare(0, leftScan.next());
-        return tuple;
+        return outTuple;
       }
-      Tuple tuple = rightTuple;
+      outTuple.put(rightTuple.getValues());
       rightTuple = prepare(1, rightScan.next());
-      return tuple;
+      return outTuple;
     }
 
     @Override
@@ -726,6 +728,8 @@ public class ExternalSortExec extends SortExec {
       IOUtils.cleanup(LOG, leftScan, rightScan);
       getInputStats();
       mergerProgress = 1.0f;
+      leftTuple = null;
+      rightTuple = null;
       setState(State.CLOSED);
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index e6d1a96..b657622 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.engine.planner.KeyProjector;
 import org.apache.tajo.plan.function.FunctionContext;
 import org.apache.tajo.plan.logical.GroupbyNode;
 import org.apache.tajo.storage.Tuple;
@@ -25,9 +26,7 @@ import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Map.Entry;
 
 /**
@@ -35,25 +34,23 @@ import java.util.Map.Entry;
  */
 public class HashAggregateExec extends AggregationExec {
   private Tuple tuple = null;
-  private Map<Tuple, FunctionContext[]> hashTable;
+  private TupleMap<FunctionContext[]> hashTable;
+  private KeyProjector hashKeyProjector;
   private boolean computed = false;
-  private Iterator<Entry<Tuple, FunctionContext []>> iterator = null;
+  private Iterator<Entry<KeyTuple, FunctionContext []>> iterator = null;
 
   public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode plan, PhysicalExec subOp) throws IOException {
     super(ctx, plan, subOp);
-    hashTable = new HashMap<Tuple, FunctionContext []>(100000);
+    hashKeyProjector = new KeyProjector(inSchema, plan.getGroupingColumns());
+    hashTable = new TupleMap<FunctionContext []>(10000);
     this.tuple = new VTuple(plan.getOutSchema().size());
   }
 
   private void compute() throws IOException {
     Tuple tuple;
-    Tuple keyTuple;
+    KeyTuple keyTuple;
     while(!context.isStopped() && (tuple = child.next()) != null) {
-      keyTuple = new VTuple(groupingKeyIds.length);
-      // build one key tuple
-      for(int i = 0; i < groupingKeyIds.length; i++) {
-        keyTuple.put(i, tuple.asDatum(groupingKeyIds[i]));
-      }
+      keyTuple = hashKeyProjector.project(tuple);
 
       FunctionContext [] contexts = hashTable.get(keyTuple);
       if(contexts != null) {
@@ -92,7 +89,7 @@ public class HashAggregateExec extends AggregationExec {
     FunctionContext [] contexts;
 
     if (iterator.hasNext()) {
-      Entry<Tuple, FunctionContext []> entry = iterator.next();
+      Entry<KeyTuple, FunctionContext []> entry = iterator.next();
       Tuple keyTuple = entry.getKey();
       contexts =  entry.getValue();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index 1645263..c0a8622 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -25,22 +25,26 @@ import org.apache.tajo.util.Pair;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
-public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List<Tuple>>> {
+public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, TupleList>> {
 
   private boolean finalLoop; // final loop for right unmatched
+  private final List<Tuple> nullTupleList;
 
   public HashFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
                                PhysicalExec inner) {
     super(context, plan, outer, inner);
+    nullTupleList = nullTupleList(rightNumCols);
   }
 
   public Iterator<Tuple> getUnmatchedRight() {
 
     return new Iterator<Tuple>() {
 
-      private Iterator<Pair<Boolean, List<Tuple>>> iterator1 = tupleSlots.values().iterator();
+      private Iterator<Pair<Boolean, TupleList>> iterator1 = tupleSlots.values().iterator();
       private Iterator<Tuple> iterator2;
 
       @Override
@@ -49,7 +53,7 @@ public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List
           return true;
         }
         for (iterator2 = null; !hasMore() && iterator1.hasNext();) {
-          Pair<Boolean, List<Tuple>> next = iterator1.next();
+          Pair<Boolean, TupleList> next = iterator1.next();
           if (!next.getFirst()) {
             iterator2 = next.getSecond().iterator();
           }
@@ -81,8 +85,7 @@ public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List
     while (!context.isStopped() && !finished) {
       if (iterator != null && iterator.hasNext()) {
         frameTuple.setRight(iterator.next());
-        projector.eval(frameTuple, outTuple);
-        return outTuple;
+        return projector.eval(frameTuple);
       }
       if (finalLoop) {
         finished = true;
@@ -100,18 +103,18 @@ public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List
       frameTuple.setLeft(leftTuple);
 
       if (leftFiltered(leftTuple)) {
-        iterator = nullIterator(rightNumCols);
+        iterator = nullTupleList.iterator();
         continue;
       }
       // getting corresponding right
-      Pair<Boolean, List<Tuple>> hashed = tupleSlots.get(toKey(leftTuple));
+      Pair<Boolean, TupleList> hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple));
       if (hashed == null) {
-        iterator = nullIterator(rightNumCols);
+        iterator = nullTupleList.iterator();
         continue;
       }
       Iterator<Tuple> rightTuples = rightFiltered(hashed.getSecond());
       if (!rightTuples.hasNext()) {
-        iterator = nullIterator(rightNumCols);
+        iterator = nullTupleList.iterator();
         continue;
       }
       iterator = rightTuples;
@@ -122,12 +125,12 @@ public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List
   }
 
   @Override
-  protected Map<Tuple, Pair<Boolean, List<Tuple>>> convert(Map<Tuple, List<Tuple>> hashed,
-                                                           boolean fromCache) throws IOException {
-    Map<Tuple, Pair<Boolean, List<Tuple>>> tuples = new HashMap<Tuple, Pair<Boolean, List<Tuple>>>(hashed.size());
-    for (Map.Entry<Tuple, List<Tuple>> entry : hashed.entrySet()) {
+  protected TupleMap<Pair<Boolean, TupleList>> convert(TupleMap<TupleList> hashed,
+                                                       boolean fromCache) throws IOException {
+    TupleMap<Pair<Boolean, TupleList>> tuples = new TupleMap<Pair<Boolean, TupleList>>(hashed.size());
+    for (Map.Entry<KeyTuple, TupleList> entry : hashed.entrySet()) {
       // flag: initially false (whether this join key had at least one match on the counter part)
-      tuples.put(entry.getKey(), new Pair<Boolean, List<Tuple>>(false, entry.getValue()));
+      tuples.putWihtoutKeyCopy(entry.getKey(), new Pair<Boolean, TupleList>(false, entry.getValue()));
     }
     return tuples;
   }
@@ -135,7 +138,7 @@ public class HashFullOuterJoinExec extends CommonHashJoinExec<Pair<Boolean, List
   @Override
   public void rescan() throws IOException {
     super.rescan();
-    for (Pair<Boolean, List<Tuple>> value : tupleSlots.values()) {
+    for (Pair<Boolean, TupleList> value : tupleSlots.values()) {
       value.setFirst(false);
     }
     finalLoop = false;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index 3065c15..bd817bb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -23,12 +23,9 @@ import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 
-public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> {
+public class HashJoinExec extends CommonHashJoinExec<TupleList> {
 
   public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
       PhysicalExec rightExec) {
@@ -36,9 +33,9 @@ public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> {
   }
 
   @Override
-  protected Map<Tuple, List<Tuple>> convert(Map<Tuple, List<Tuple>> hashed, boolean fromCache)
+  protected TupleMap<TupleList> convert(TupleMap<TupleList> hashed, boolean fromCache)
       throws IOException {
-    return fromCache ? new HashMap<Tuple, List<Tuple>>(hashed) : hashed;
+    return fromCache ? new TupleMap<TupleList>(hashed) : hashed;
   }
 
   @Override
@@ -50,8 +47,7 @@ public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> {
     while (!context.isStopped() && !finished) {
       if (iterator != null && iterator.hasNext()) {
         frameTuple.setRight(iterator.next());
-        projector.eval(frameTuple, outTuple);
-        return outTuple;
+        return projector.eval(frameTuple);
       }
 
       Tuple leftTuple = leftChild.next(); // it comes from a disk
@@ -63,7 +59,7 @@ public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> {
       frameTuple.setLeft(leftTuple);
 
       // getting corresponding right
-      Iterable<Tuple> hashed = getRights(toKey(leftTuple));
+      Iterable<Tuple> hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple));
       Iterator<Tuple> rightTuples = rightFiltered(hashed);
       if (rightTuples.hasNext()) {
         iterator = rightTuples;
@@ -72,9 +68,4 @@ public class HashJoinExec extends CommonHashJoinExec<List<Tuple>> {
 
     return null;
   }
-
-  private Iterable<Tuple> getRights(Tuple key) {
-    return tupleSlots.get(key);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
index 8239270..746bdb9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -18,9 +18,9 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.List;
@@ -32,6 +32,8 @@ import java.util.List;
  */
 public class HashLeftAntiJoinExec extends HashJoinExec {
 
+  private final List<Tuple> nullTupleList = nullTupleList(0);
+
   public HashLeftAntiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild,
                               PhysicalExec notInSideChild) {
     super(context, plan, fromSideChild, notInSideChild);
@@ -57,8 +59,7 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
     while(!context.isStopped() && !finished) {
       if (iterator != null && iterator.hasNext()) {
         frameTuple.setRight(iterator.next());
-        projector.eval(frameTuple, outTuple);
-        return outTuple;
+        return projector.eval(frameTuple);
       }
       // getting new outer
       Tuple leftTuple = leftChild.next(); // it comes from a disk
@@ -70,9 +71,9 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
       frameTuple.setLeft(leftTuple);
 
       // Try to find a hash bucket in in-memory hash table
-      List<Tuple> hashed = tupleSlots.get(toKey(leftTuple));
+      TupleList hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple));
       if (hashed == null || !rightFiltered(hashed).hasNext()) {
-        iterator = nullIterator(0);
+        iterator = nullTupleList.iterator();
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index 27f683b..b652c3c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -31,10 +31,12 @@ import java.util.List;
 public class HashLeftOuterJoinExec extends HashJoinExec {
 
   private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class);
+  private final List<Tuple> nullTupleList;
 
   public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
                                PhysicalExec rightChild) {
     super(context, plan, leftChild, rightChild);
+    nullTupleList = nullTupleList(rightNumCols);
   }
 
   @Override
@@ -46,8 +48,7 @@ public class HashLeftOuterJoinExec extends HashJoinExec {
     while (!context.isStopped() && !finished) {
       if (iterator != null && iterator.hasNext()) {
         frameTuple.setRight(iterator.next());
-        projector.eval(frameTuple, outTuple);
-        return outTuple;
+        return projector.eval(frameTuple);
       }
       Tuple leftTuple = leftChild.next(); // it comes from a disk
       if (leftTuple == null) { // if no more tuples in left tuples on disk, a join is completed.
@@ -57,17 +58,17 @@ public class HashLeftOuterJoinExec extends HashJoinExec {
       frameTuple.setLeft(leftTuple);
 
       if (leftFiltered(leftTuple)) {
-        iterator = nullIterator(rightNumCols);
+        iterator = nullTupleList.iterator();
         continue;
       }
 
       // getting corresponding right
-      List<Tuple> hashed = tupleSlots.get(toKey(leftTuple));
+      TupleList hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple));
       Iterator<Tuple> rightTuples = rightFiltered(hashed);
       if (!rightTuples.hasNext()) {
         //this left tuple doesn't have a match on the right.But full outer join => we should keep it anyway
         //output a tuple with the nulls padded rightTuple
-        iterator = nullIterator(rightNumCols);
+        iterator = nullTupleList.iterator();
         continue;
       }
       iterator = rightTuples;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
index 41e842a..42b78e8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -18,9 +18,9 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.List;
@@ -32,6 +32,8 @@ import java.util.List;
  */
 public class HashLeftSemiJoinExec extends HashJoinExec {
 
+  private final List<Tuple> nullTupleList = nullTupleList(0);
+
   public HashLeftSemiJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec fromSideChild,
                               PhysicalExec inSideChild) {
     super(context, plan, fromSideChild, inSideChild);
@@ -59,8 +61,7 @@ public class HashLeftSemiJoinExec extends HashJoinExec {
     while(!context.isStopped() && !finished) {
       if (iterator != null && iterator.hasNext()) {
         frameTuple.setRight(iterator.next());
-        projector.eval(frameTuple, outTuple);
-        return outTuple;
+        return projector.eval(frameTuple);
       }
       // getting new outer
       Tuple leftTuple = leftChild.next(); // it comes from a disk
@@ -72,10 +73,10 @@ public class HashLeftSemiJoinExec extends HashJoinExec {
       frameTuple.setLeft(leftTuple);
 
       // Try to find a hash bucket in in-memory hash table
-      List<Tuple> hashed = tupleSlots.get(toKey(leftTuple));
+      TupleList hashed = tupleSlots.get(leftKeyExtractor.project(leftTuple));
       if (hashed != null && rightFiltered(hashed).hasNext()) {
         // if found, it gets a hash bucket from the hash table.
-        iterator = nullIterator(0);
+        iterator = nullTupleList.iterator();
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index 1a92a7a..bc4382e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -33,9 +33,7 @@ import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -47,7 +45,6 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
   private ShuffleFileWriteNode plan;
   private final TableMeta meta;
   private Partitioner partitioner;
-//  private final Path storeTablePath;
   private Map<Integer, HashShuffleAppender> appenderMap = new HashMap<Integer, HashShuffleAppender>();
   private final int numShuffleOutputs;
   private final int [] shuffleKeyIds;
@@ -92,8 +89,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
     return appender;
   }
 
-//  Map<Integer, Long> partitionStats = new HashMap<Integer, Long>();
-  Map<Integer, List<Tuple>> partitionTuples = new HashMap<Integer, List<Tuple>>();
+  Map<Integer, TupleList> partitionTuples = new HashMap<Integer, TupleList>();
   long writtenBytes = 0L;
 
   @Override
@@ -108,17 +104,14 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
         numRows++;
 
         partId = partitioner.getPartition(tuple);
-        List<Tuple> partitionTupleList = partitionTuples.get(partId);
+        TupleList partitionTupleList = partitionTuples.get(partId);
         if (partitionTupleList == null) {
-          partitionTupleList = new ArrayList<Tuple>(1000);
+          partitionTupleList = new TupleList(1000);
           partitionTuples.put(partId, partitionTupleList);
         }
-        try {
-          partitionTupleList.add(tuple.clone());
-        } catch (CloneNotSupportedException e) {
-        }
+        partitionTupleList.add(tuple);
         if (tupleCount >= numHashShuffleBufferTuples) {
-          for (Map.Entry<Integer, List<Tuple>> entry : partitionTuples.entrySet()) {
+          for (Map.Entry<Integer, TupleList> entry : partitionTuples.entrySet()) {
             int appendPartId = entry.getKey();
             HashShuffleAppender appender = getAppender(appendPartId);
             int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue());
@@ -130,7 +123,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
       }
 
       // processing remained tuples
-      for (Map.Entry<Integer, List<Tuple>> entry : partitionTuples.entrySet()) {
+      for (Map.Entry<Integer, TupleList> entry : partitionTuples.entrySet()) {
         int appendPartId = entry.getKey();
         HashShuffleAppender appender = getAppender(appendPartId);
         int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue());
@@ -168,6 +161,12 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
       appenderMap = null;
     }
 
+    for (TupleList eachList : partitionTuples.values()) {
+      eachList.clear();
+    }
+    partitionTuples.clear();
+    partitionTuples = null;
+
     partitioner = null;
     plan = null;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/KeyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/KeyTuple.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/KeyTuple.java
new file mode 100644
index 0000000..39b13f8
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/KeyTuple.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+/**
+ * KeyTuple is to keep its hash value in memory to avoid frequent expensive hash calculation.
+ * Datum.hashCode() uses MurmurHash, so its cost is not so cheap.
+ *
+ */
+public class KeyTuple extends VTuple {
+  private int hashCode;
+
+  public KeyTuple(int size) {
+    super(size);
+    updateHashCode();
+  }
+
+  public KeyTuple(Tuple tuple) {
+    super(tuple);
+    updateHashCode();
+  }
+
+  public KeyTuple(Datum[] datums) {
+    super(datums);
+    updateHashCode();
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    super.put(fieldId, tuple);
+    updateHashCode();
+  }
+
+  private void updateHashCode() {
+    this.hashCode = super.hashCode();
+  }
+
+  @Override
+  public void put(int fieldId, Datum value) {
+    super.put(fieldId, value);
+    updateHashCode();
+  }
+
+  @Override
+  public void clear() {
+    super.clear();
+    updateHashCode();
+  }
+
+  @Override
+  public void put(Datum [] values) {
+    super.put(values);
+    updateHashCode();
+  }
+
+  @Override
+  public int hashCode() {
+    return hashCode;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return super.equals(obj);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
index f76e356..029592a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
@@ -18,19 +18,16 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.plan.logical.SortNode;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 
 public class MemSortExec extends SortExec {
   private SortNode plan;
-  private List<Tuple> tupleSlots;
+  private TupleList tupleSlots;
   private boolean sorted = false;
   private Iterator<Tuple> iterator;
 
@@ -42,7 +39,7 @@ public class MemSortExec extends SortExec {
 
   public void init() throws IOException {
     super.init();
-    this.tupleSlots = new ArrayList<Tuple>(10000);
+    this.tupleSlots = new TupleList(10000);
   }
 
   @Override
@@ -51,7 +48,7 @@ public class MemSortExec extends SortExec {
     if (!sorted) {
       Tuple tuple;
       while (!context.isStopped() && (tuple = child.next()) != null) {
-        tupleSlots.add(new VTuple(tuple));
+        tupleSlots.add(tuple);
       }
       iterator = getSorter(tupleSlots).sort().iterator();
       sorted = true;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index 13b73c3..41e3648 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -20,30 +20,27 @@ package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.base.Preconditions;
 import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.NullTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 
 public class MergeFullOuterJoinExec extends CommonJoinExec {
 
   // temporal tuples and states for nested loop join
-  private FrameTuple frameTuple;
   private Tuple leftTuple = null;
   private Tuple rightTuple = null;
-  private Tuple outTuple = null;
   private Tuple leftNext = null;
+  private Tuple prevLeftTuple = null;
+  private Tuple prevRightTuple = null;
 
-  private List<Tuple> leftTupleSlots;
-  private List<Tuple> rightTupleSlots;
+  private TupleList leftTupleSlots;
+  private TupleList rightTupleSlots;
 
   private JoinTupleComparator joincomparator = null;
   private TupleComparator[] tupleComparator = null;
@@ -59,13 +56,16 @@ public class MergeFullOuterJoinExec extends CommonJoinExec {
   boolean endInPopulationStage = false;
   private boolean initRightDone = false;
 
+  private final Tuple leftNullTuple;
+  private final Tuple rightNullTuple;
+
   public MergeFullOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild,
                                 PhysicalExec rightChild, SortSpec[] leftSortKey, SortSpec[] rightSortKey) {
     super(context, plan, leftChild, rightChild);
     Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
         "but there is no join condition");
-    this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
-    this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    this.leftTupleSlots = new TupleList(INITIAL_TUPLE_SLOT);
+    this.rightTupleSlots = new TupleList(INITIAL_TUPLE_SLOT);
     SortSpec[][] sortSpecs = new SortSpec[2][];
     sortSpecs[0] = leftSortKey;
     sortSpecs[1] = rightSortKey;
@@ -75,16 +75,18 @@ public class MergeFullOuterJoinExec extends CommonJoinExec {
     this.tupleComparator = PhysicalPlanUtil.getComparatorsFromJoinQual(
         plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema());
 
-    // for join
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.size());
-
     leftNumCols = leftChild.getSchema().size();
     rightNumCols = rightChild.getSchema().size();
+
+    prevLeftTuple = new VTuple(leftChild.getSchema().size());
+    prevRightTuple = new VTuple(rightChild.getSchema().size());
+
+    leftNullTuple = NullTuple.create(leftNumCols);
+    rightNullTuple = NullTuple.create(rightNumCols);
   }
 
   public Tuple next() throws IOException {
-    Tuple previous;
+    Tuple outTuple;
 
     while (!context.isStopped()) {
       boolean newRound = false;
@@ -122,9 +124,8 @@ public class MergeFullOuterJoinExec extends CommonJoinExec {
 
           if((leftTuple == null) && (rightTuple != null)){
             // output a tuple with the nulls padded leftTuple
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
-            frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(frameTuple, outTuple);
+            frameTuple.set(leftNullTuple, rightTuple);
+            outTuple = projector.eval(frameTuple);
             // we simulate we found a match, which is exactly the null padded one
             rightTuple = rightChild.next();
             return outTuple;
@@ -132,9 +133,8 @@ public class MergeFullOuterJoinExec extends CommonJoinExec {
 
           if((leftTuple != null) && (rightTuple == null)){
             // output a tuple with the nulls padded leftTuple
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
-            frameTuple.set(leftTuple, nullPaddedTuple);
-            projector.eval(frameTuple, outTuple);
+            frameTuple.set(leftTuple, rightNullTuple);
+            outTuple = projector.eval(frameTuple);
             // we simulate we found a match, which is exactly the null padded one
             leftTuple = leftChild.next();
             return outTuple;
@@ -178,9 +178,9 @@ public class MergeFullOuterJoinExec extends CommonJoinExec {
 
             //before getting a new tuple from the right,  a leftnullpadded tuple should be built
             //output a tuple with the nulls padded leftTuple
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
+            Tuple nullPaddedTuple = leftNullTuple;
             frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(frameTuple, outTuple);
+            outTuple = projector.eval(frameTuple);
             // BEFORE RETURN, MOVE FORWARD
             rightTuple = rightChild.next();
             if(rightTuple == null) {
@@ -192,9 +192,9 @@ public class MergeFullOuterJoinExec extends CommonJoinExec {
           } else if (cmp < 0) {
             // before getting a new tuple from the left,  a rightnullpadded tuple should be built
             // output a tuple with the nulls padded rightTuple
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
+            Tuple nullPaddedTuple = rightNullTuple;
             frameTuple.set(leftTuple, nullPaddedTuple);
-            projector.eval(frameTuple, outTuple);
+            outTuple = projector.eval(frameTuple);
             // we simulate we found a match, which is exactly the null padded one
             // BEFORE RETURN, MOVE FORWARD
             leftTuple = leftChild.next();
@@ -219,28 +219,27 @@ public class MergeFullOuterJoinExec extends CommonJoinExec {
           boolean endLeft = false;
           boolean endRight = false;
 
-          previous = new VTuple(leftTuple);
+          prevLeftTuple.put(leftTuple.getValues());
           do {
-            leftTupleSlots.add(new VTuple(leftTuple));
+            leftTupleSlots.add(leftTuple);
             leftTuple = leftChild.next();
             if(leftTuple == null) {
               endLeft = true;
             }
 
 
-          } while ((endLeft != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
+          } while ((endLeft != true) && (tupleComparator[0].compare(prevLeftTuple, leftTuple) == 0));
           posLeftTupleSlots = 0;
 
-
-          previous = new VTuple(rightTuple);
+          prevRightTuple.put(rightTuple.getValues());
           do {
-            rightTupleSlots.add(new VTuple(rightTuple));
+            rightTupleSlots.add(rightTuple);
             rightTuple = rightChild.next();
             if(rightTuple == null) {
               endRight = true;
             }
 
-          } while ((endRight != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
+          } while ((endRight != true) && (tupleComparator[1].compare(prevRightTuple, rightTuple) == 0) );
           posRightTupleSlots = 0;
 
           if ((endLeft == true) || (endRight == true)) {
@@ -261,31 +260,29 @@ public class MergeFullOuterJoinExec extends CommonJoinExec {
       // (i.e. refers to next round)
       if(!end || (end && endInPopulationStage)){
         if(posLeftTupleSlots == 0){
-          leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+          leftNext = leftTupleSlots.get(posLeftTupleSlots);
           posLeftTupleSlots = posLeftTupleSlots + 1;
         }
 
         if(posRightTupleSlots <= (rightTupleSlots.size() -1)) {
-          Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+          Tuple aTuple = rightTupleSlots.get(posRightTupleSlots);
           posRightTupleSlots = posRightTupleSlots + 1;
           frameTuple.set(leftNext, aTuple);
           joinQual.eval(frameTuple);
-          projector.eval(frameTuple, outTuple);
-          return outTuple;
+          return projector.eval(frameTuple);
         } else {
           // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
           if(posLeftTupleSlots <= (leftTupleSlots.size()-1)) {
             //rewind the right slots position
             posRightTupleSlots = 0;
-            Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
+            Tuple aTuple = rightTupleSlots.get(posRightTupleSlots);
             posRightTupleSlots = posRightTupleSlots + 1;
-            leftNext = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+            leftNext = leftTupleSlots.get(posLeftTupleSlots);
             posLeftTupleSlots = posLeftTupleSlots + 1;
 
             frameTuple.set(leftNext, aTuple);
             joinQual.eval(frameTuple);
-            projector.eval(frameTuple, outTuple);
-            return outTuple;
+            return projector.eval(frameTuple);
           }
         }
       } // the second if end false

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index bf9b4cd..3d8c108 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -21,28 +21,25 @@ package org.apache.tajo.engine.planner.physical;
 import com.google.common.base.Preconditions;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 
 public class MergeJoinExec extends CommonJoinExec {
 
   // temporal tuples and states for nested loop join
-  private FrameTuple frameTuple;
   private Tuple outerTuple = null;
   private Tuple innerTuple = null;
-  private Tuple outTuple = null;
   private Tuple outerNext = null;
+  private final Tuple prevOuterTuple;
+  private final Tuple prevInnerTuple;
 
-  private List<Tuple> outerTupleSlots;
-  private List<Tuple> innerTupleSlots;
+  private TupleList outerTupleSlots;
+  private TupleList innerTupleSlots;
   private Iterator<Tuple> outerIterator;
   private Iterator<Tuple> innerIterator;
 
@@ -59,8 +56,8 @@ public class MergeJoinExec extends CommonJoinExec {
     Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
         "but there is no join condition");
 
-    this.outerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
-    this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    this.outerTupleSlots = new TupleList(INITIAL_TUPLE_SLOT);
+    this.innerTupleSlots = new TupleList(INITIAL_TUPLE_SLOT);
     SortSpec[][] sortSpecs = new SortSpec[2][];
     sortSpecs[0] = outerSortKey;
     sortSpecs[1] = innerSortKey;
@@ -71,14 +68,12 @@ public class MergeJoinExec extends CommonJoinExec {
         plan.getJoinQual(), outer.getSchema(), inner.getSchema());
     this.outerIterator = outerTupleSlots.iterator();
     this.innerIterator = innerTupleSlots.iterator();
-    
-    // for join
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.size());
+
+    prevOuterTuple = new VTuple(leftChild.getSchema().size());
+    prevInnerTuple = new VTuple(rightChild.getSchema().size());
   }
 
   public Tuple next() throws IOException {
-    Tuple previous;
 
     while (!context.isStopped()) {
       if (!outerIterator.hasNext() && !innerIterator.hasNext()) {
@@ -108,32 +103,28 @@ public class MergeJoinExec extends CommonJoinExec {
           }
         }
 
-        try {
-          previous = outerTuple.clone();
-          do {
-            outerTupleSlots.add(outerTuple.clone());
-            outerTuple = leftChild.next();
-            if (outerTuple == null) {
-              end = true;
-              break;
-            }
-          } while (tupleComparator[0].compare(previous, outerTuple) == 0);
-          outerIterator = outerTupleSlots.iterator();
-          outerNext = outerIterator.next();
-
-          previous = innerTuple.clone();
-          do {
-            innerTupleSlots.add(innerTuple.clone());
-            innerTuple = rightChild.next();
-            if (innerTuple == null) {
-              end = true;
-              break;
-            }
-          } while (tupleComparator[1].compare(previous, innerTuple) == 0);
-          innerIterator = innerTupleSlots.iterator();
-        } catch (CloneNotSupportedException e) {
+        prevOuterTuple.put(outerTuple.getValues());
+        do {
+          outerTupleSlots.add(outerTuple);
+          outerTuple = leftChild.next();
+          if (outerTuple == null) {
+            end = true;
+            break;
+          }
+        } while (tupleComparator[0].compare(prevOuterTuple, outerTuple) == 0);
+        outerIterator = outerTupleSlots.iterator();
+        outerNext = outerIterator.next();
 
-        }
+        prevInnerTuple.put(innerTuple.getValues());
+        do {
+          innerTupleSlots.add(innerTuple);
+          innerTuple = rightChild.next();
+          if (innerTuple == null) {
+            end = true;
+            break;
+          }
+        } while (tupleComparator[1].compare(prevInnerTuple, innerTuple) == 0);
+        innerIterator = innerTupleSlots.iterator();
       }
 
       if(!innerIterator.hasNext()){
@@ -144,8 +135,7 @@ public class MergeJoinExec extends CommonJoinExec {
       frameTuple.set(outerNext, innerIterator.next());
 
       if (joinQual.eval(frameTuple).isTrue()) {
-        projector.eval(frameTuple, outTuple);
-        return outTuple;
+        return projector.eval(frameTuple);
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index 964a523..d3214c3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -19,9 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -30,18 +28,14 @@ public class NLJoinExec extends CommonJoinExec {
 
   // temporal tuples and states for nested loop join
   private boolean needNewOuter;
-  private FrameTuple frameTuple;
   private Tuple outerTuple = null;
   private Tuple innerTuple = null;
-  private Tuple outTuple = null;
 
   public NLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
       PhysicalExec inner) {
     super(context, plan, outer, inner);
     // for join
     needNewOuter = true;
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.size());
   }
 
   public Tuple next() throws IOException {
@@ -64,12 +58,10 @@ public class NLJoinExec extends CommonJoinExec {
       frameTuple.set(outerTuple, innerTuple);
       if (hasJoinQual) {
         if (joinQual.eval(frameTuple).isTrue()) {
-          projector.eval(frameTuple, outTuple);
-          return outTuple;
+          return projector.eval(frameTuple);
         }
       } else {
-        projector.eval(frameTuple, outTuple);
-        return outTuple;
+        return projector.eval(frameTuple);
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index 72a667d..8a79005 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -24,7 +24,6 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.plan.logical.Projectable;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -33,7 +32,6 @@ public class ProjectionExec extends UnaryPhysicalExec {
   private Projectable plan;
 
   // for projection
-  private Tuple outTuple;
   private Projector projector;
   
   public ProjectionExec(TaskAttemptContext context, Projectable plan,
@@ -45,7 +43,6 @@ public class ProjectionExec extends UnaryPhysicalExec {
   public void init() throws IOException {
     super.init();
 
-    this.outTuple = new VTuple(outSchema.size());
     this.projector = new Projector(context, inSchema, outSchema, this.plan.getTargets());
   }
 
@@ -57,8 +54,7 @@ public class ProjectionExec extends UnaryPhysicalExec {
       return null;
     }
 
-    projector.eval(tuple, outTuple);
-    return outTuple;
+    return projector.eval(tuple);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index bbb21fe..ac3d1b2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -24,8 +24,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.KeyProjector;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -40,9 +44,8 @@ import java.io.IOException;
  * specified order of shuffle keys.
  */
 public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
-  private static Log LOG = LogFactory.getLog(RangeShuffleFileWriteExec.class);
+  private final static Log LOG = LogFactory.getLog(RangeShuffleFileWriteExec.class);
   private final SortSpec[] sortSpecs;
-  private int [] indexKeys = null;
   private Schema keySchema;
 
   private BSTIndex.BSTIndexWriter indexWriter;
@@ -50,6 +53,8 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
   private FileAppender appender;
   private TableMeta meta;
 
+  private KeyProjector keyProjector;
+
   public RangeShuffleFileWriteExec(final TaskAttemptContext context,
                                    final PhysicalExec child, final Schema inSchema, final Schema outSchema,
                                    final SortSpec[] sortSpecs) throws IOException {
@@ -60,14 +65,8 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
   public void init() throws IOException {
     super.init();
 
-    indexKeys = new int[sortSpecs.length];
     keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
-
-    Column col;
-    for (int i = 0 ; i < sortSpecs.length; i++) {
-      col = sortSpecs[i].getSortKey();
-      indexKeys[i] = inSchema.getColumnId(col.getQualifiedName());
-    }
+    keyProjector = new KeyProjector(inSchema, keySchema.toArray());
 
     BSTIndex bst = new BSTIndex(new TajoConf());
     this.comp = new BaseTupleComparator(keySchema, sortSpecs);
@@ -91,24 +90,17 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
   public Tuple next() throws IOException {
     Tuple tuple;
     Tuple keyTuple;
-    Tuple prevKeyTuple = null;
+    Tuple prevKeyTuple = new VTuple(keySchema.size());
     long offset;
 
-
-    try {
-      while(!context.isStopped() && (tuple = child.next()) != null) {
-        offset = appender.getOffset();
-        appender.addTuple(tuple);
-        keyTuple = new VTuple(keySchema.size());
-        RowStoreUtil.project(tuple, keyTuple, indexKeys);
-        if (prevKeyTuple == null || !prevKeyTuple.equals(keyTuple)) {
-          indexWriter.write(keyTuple, offset);
-          prevKeyTuple = keyTuple;
-        }
+    while(!context.isStopped() && (tuple = child.next()) != null) {
+      offset = appender.getOffset();
+      appender.addTuple(tuple);
+      keyTuple = keyProjector.project(tuple);
+      if (!prevKeyTuple.equals(keyTuple)) {
+        indexWriter.write(keyTuple, offset);
+        prevKeyTuple.put(keyTuple.getValues());
       }
-    } catch (RuntimeException e) {
-      e.printStackTrace();
-      throw e;
     }
 
     return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index fd825b1..239c6ab 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -20,29 +20,26 @@ package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.base.Preconditions;
 import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.plan.logical.JoinNode;
-import org.apache.tajo.storage.FrameTuple;
+import org.apache.tajo.storage.NullTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 public class RightOuterMergeJoinExec extends CommonJoinExec {
   // temporal tuples and states for nested loop join
-  private FrameTuple frameTuple;
   private Tuple leftTuple = null;
   private Tuple rightTuple = null;
-  private Tuple outTuple = null;
+  private Tuple prevLeftTuple;
+  private Tuple prevRightTuple;
   private Tuple nextLeft = null;
+  private Tuple nullPaddedTuple;
 
-  private List<Tuple> leftTupleSlots;
-  private List<Tuple> innerTupleSlots;
+  private TupleList leftTupleSlots;
+  private TupleList innerTupleSlots;
 
   private JoinTupleComparator joinComparator = null;
   private TupleComparator [] tupleComparator = null;
@@ -62,8 +59,8 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
     super(context, plan, outer, inner);
     Preconditions.checkArgument(plan.hasJoinQual(), "Sort-merge join is only used for the equi-join, " +
         "but there is no join condition");
-    this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
-    this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
+    this.leftTupleSlots = new TupleList(INITIAL_TUPLE_SLOT);
+    this.innerTupleSlots = new TupleList(INITIAL_TUPLE_SLOT);
     SortSpec[][] sortSpecs = new SortSpec[2][];
     sortSpecs[0] = outerSortKey;
     sortSpecs[1] = innerSortKey;
@@ -72,22 +69,8 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
     this.tupleComparator = PhysicalPlanUtil.getComparatorsFromJoinQual(
         plan.getJoinQual(), outer.getSchema(), inner.getSchema());
 
-    // for join
-    frameTuple = new FrameTuple();
-    outTuple = new VTuple(outSchema.size());
-
     leftNumCols = outer.getSchema().size();
-  }
-
-  /**
-   * creates a tuple of a given size filled with NULL values in all fields
-   */
-  private Tuple createNullPaddedTuple(int columnNum){
-    VTuple tuple = new VTuple(columnNum);
-    for (int i = 0; i < columnNum; i++) {
-      tuple.put(i, DatumFactory.createNullDatum());
-    }
-    return tuple;
+    nullPaddedTuple = NullTuple.create(leftNumCols);
   }
 
   /**
@@ -101,7 +84,9 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
    * @return
    * @throws IOException
    */
+  @Override
   public Tuple next() throws IOException {
+    Tuple outTuple;
 
     while (!context.isStopped()) {
       boolean newRound = false;
@@ -130,9 +115,8 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
             return null;
           } else {
             // output a tuple with the nulls padded leftTuple
-            Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
             frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(frameTuple, outTuple);
+            outTuple = projector.eval(frameTuple);
 
             // we simulate we found a match, which is exactly the null padded one
             rightTuple = rightChild.next();
@@ -168,11 +152,10 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
           }
         }
         if (rightFiltered(rightTuple)) {
-          Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
           frameTuple.set(nullPaddedTuple, rightTuple);
-          projector.eval(frameTuple, outTuple);
-
+          outTuple = projector.eval(frameTuple);
           rightTuple = null;
+
           return outTuple;
         }
         initRightDone = true;
@@ -202,9 +185,8 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
           if (cmp > 0) {
             // before getting a new tuple from the right,  a left null padded tuple should be built
             // output a tuple with the nulls padded left tuple
-            Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
             frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(frameTuple, outTuple);
+            outTuple = projector.eval(frameTuple);
 
             // we simulate we found a match, which is exactly the null padded one
             // BEFORE RETURN, MOVE FORWARD
@@ -225,7 +207,6 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
         // END MOVE FORWARDING STAGE
         //////////////////////////////////////////////////////////////////////
 
-        Tuple previous = null;
         // once a match is found, retain all tuples with this key in tuple slots on each side
         if(!end) {
           endInPopulationStage = false;
@@ -233,26 +214,33 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
           boolean endOuter = false;
           boolean endInner = false;
 
-          previous = new VTuple(leftTuple);
+          if (prevLeftTuple == null) {
+            prevLeftTuple = new VTuple(leftTuple.getValues());
+          } else {
+            prevLeftTuple.put(leftTuple.getValues());
+          }
           do {
-            leftTupleSlots.add(new VTuple(leftTuple));
+            leftTupleSlots.add(leftTuple);
             leftTuple = leftChild.next();
             if( leftTuple == null) {
               endOuter = true;
             }
-          } while ((endOuter != true) && (tupleComparator[0].compare(previous, leftTuple) == 0));
+          } while ((endOuter != true) && (tupleComparator[0].compare(prevLeftTuple, leftTuple) == 0));
           posLeftTupleSlots = 0;
 
-          previous = new VTuple(rightTuple);
-
+          if (prevRightTuple == null) {
+            prevRightTuple = new VTuple(rightTuple.getValues());
+          } else {
+            prevRightTuple.put(rightTuple.getValues());
+          }
           do {
-            innerTupleSlots.add(new VTuple(rightTuple));
+            innerTupleSlots.add(rightTuple);
             rightTuple = rightChild.next();
             if(rightTuple == null) {
               endInner = true;
             }
 
-          } while ((endInner != true) && (tupleComparator[1].compare(previous, rightTuple) == 0) );
+          } while ((endInner != true) && (tupleComparator[1].compare(prevRightTuple, rightTuple) == 0) );
           posRightTupleSlots = 0;
 
           if ((endOuter == true) || (endInner == true)) {
@@ -260,10 +248,9 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
             endInPopulationStage = true;
           }
         } // if end false
-        if (previous != null && rightFiltered(previous)) {
-          Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
-          frameTuple.set(nullPaddedTuple, previous);
-          projector.eval(frameTuple, outTuple);
+        if (prevRightTuple != null && rightFiltered(prevRightTuple)) {
+          frameTuple.set(nullPaddedTuple, prevRightTuple);
+          outTuple = projector.eval(frameTuple);
 
           // reset tuple slots for a new round
           leftTupleSlots.clear();
@@ -283,48 +270,42 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
       if ((end == false) || ((end == true) && (endInPopulationStage == true))){
 
         if(posLeftTupleSlots == 0){
-          nextLeft = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+          nextLeft = leftTupleSlots.get(posLeftTupleSlots);
           posLeftTupleSlots = posLeftTupleSlots + 1;
         }
 
 
         if(posRightTupleSlots <= (innerTupleSlots.size() -1)) {
 
-          Tuple aTuple = new VTuple(innerTupleSlots.get(posRightTupleSlots));
+          Tuple aTuple = innerTupleSlots.get(posRightTupleSlots);
           posRightTupleSlots = posRightTupleSlots + 1;
 
           frameTuple.set(nextLeft, aTuple);
           if (joinQual.eval(frameTuple).asBool()) {
-            projector.eval(frameTuple, outTuple);
-            return outTuple;
+            return projector.eval(frameTuple);
           } else {
             // padding null
-            Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
             frameTuple.set(nullPaddedTuple, aTuple);
-            projector.eval(frameTuple, outTuple);
-            return outTuple;
+            return projector.eval(frameTuple);
           }
         } else {
           // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
           if(posLeftTupleSlots <= (leftTupleSlots.size() - 1)) {
             //rewind the right slots position
             posRightTupleSlots = 0;
-            Tuple aTuple = new VTuple(innerTupleSlots.get(posRightTupleSlots));
+            Tuple aTuple = innerTupleSlots.get(posRightTupleSlots);
             posRightTupleSlots = posRightTupleSlots + 1;
-            nextLeft = new VTuple (leftTupleSlots.get(posLeftTupleSlots));
+            nextLeft = leftTupleSlots.get(posLeftTupleSlots);
             posLeftTupleSlots = posLeftTupleSlots + 1;
 
             frameTuple.set(nextLeft, aTuple);
 
             if (joinQual.eval(frameTuple).asBool()) {
-              projector.eval(frameTuple, outTuple);
-              return outTuple;
+              return projector.eval(frameTuple);
             } else {
               // padding null
-              Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
               frameTuple.set(nullPaddedTuple, aTuple);
-              projector.eval(frameTuple, outTuple);
-              return outTuple;
+              return projector.eval(frameTuple);
             }
           }
         }
@@ -340,6 +321,9 @@ public class RightOuterMergeJoinExec extends CommonJoinExec {
     innerTupleSlots.clear();
     posRightTupleSlots = -1;
     posLeftTupleSlots = -1;
+    leftTuple = rightTuple = null;
+    prevLeftTuple = prevRightTuple = null;
+    nextLeft = null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index b4f7a38..b49fa40 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -247,9 +247,8 @@ public class SeqScanExec extends ScanExec {
   public Tuple next() throws IOException {
 
     while(scanIt.hasNext()) {
-      Tuple outTuple = new VTuple(outColumnNum);
       Tuple t = scanIt.next();
-      projector.eval(t, outTuple);
+      Tuple outTuple = projector.eval(t);
       outTuple.setOffset(t.getOffset());
       return outTuple;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
index 2feecd1..71602b8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.catalog.Column;
 import org.apache.tajo.plan.function.FunctionContext;
 import org.apache.tajo.plan.logical.GroupbyNode;
 import org.apache.tajo.storage.Tuple;
@@ -41,24 +42,38 @@ import java.io.IOException;
  * it makes an output tuple.
  */
 public class SortAggregateExec extends AggregationExec {
+  private final int groupingKeyIds[];
   private Tuple lastKey = null;
+  private final Tuple currentKey;
+  private final Tuple outTuple;
   private boolean finished = false;
   private FunctionContext contexts[];
 
   public SortAggregateExec(TaskAttemptContext context, GroupbyNode plan, PhysicalExec child) throws IOException {
     super(context, plan, child);
     contexts = new FunctionContext[plan.getAggFunctions() == null ? 0 : plan.getAggFunctions().length];
+
+    final Column [] keyColumns = plan.getGroupingColumns();
+    groupingKeyIds = new int[groupingKeyNum];
+    Column col;
+    for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) {
+      col = keyColumns[idx];
+      if (col.hasQualifier()) {
+        groupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName());
+      } else {
+        groupingKeyIds[idx] = inSchema.getColumnIdByName(col.getSimpleName());
+      }
+    }
+    currentKey = new VTuple(groupingKeyNum);
+    outTuple = new VTuple(outSchema.size());
   }
 
   @Override
   public Tuple next() throws IOException {
-    Tuple currentKey;
     Tuple tuple = null;
-    Tuple outputTuple = null;
 
     while(!context.isStopped() && (tuple = child.next()) != null) {
       // get a key tuple
-      currentKey = new VTuple(groupingKeyIds.length);
       for(int i = 0; i < groupingKeyIds.length; i++) {
         currentKey.put(i, tuple.asDatum(groupingKeyIds[i]));
       }
@@ -75,7 +90,7 @@ public class SortAggregateExec extends AggregationExec {
               aggFunctions[i].merge(contexts[i], tuple);
             }
           }
-          lastKey = currentKey;
+          lastKey = new VTuple(currentKey.getValues());
         } else {
           // aggregate
           for (int i = 0; i < aggFunctionsNum; i++) {
@@ -85,14 +100,13 @@ public class SortAggregateExec extends AggregationExec {
 
       } else { /** Finalization State */
         // finalize aggregate and return
-        outputTuple = new VTuple(outSchema.size());
         int tupleIdx = 0;
 
         for(; tupleIdx < groupingKeyNum; tupleIdx++) {
-          outputTuple.put(tupleIdx, lastKey.asDatum(tupleIdx));
+          outTuple.put(tupleIdx, lastKey.asDatum(tupleIdx));
         }
         for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
-          outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
+          outTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
         }
 
         for(int evalIdx = 0; evalIdx < aggFunctionsNum; evalIdx++) {
@@ -100,8 +114,8 @@ public class SortAggregateExec extends AggregationExec {
           aggFunctions[evalIdx].merge(contexts[evalIdx], tuple);
         }
 
-        lastKey = currentKey;
-        return outputTuple;
+        lastKey.put(currentKey.getValues());
+        return outTuple;
       }
     } // while loop
 
@@ -110,17 +124,17 @@ public class SortAggregateExec extends AggregationExec {
       return null;
     }
     if (!finished) {
-      outputTuple = new VTuple(outSchema.size());
       int tupleIdx = 0;
       for(; tupleIdx < groupingKeyNum; tupleIdx++) {
-        outputTuple.put(tupleIdx, lastKey.asDatum(tupleIdx));
+        outTuple.put(tupleIdx, lastKey.asDatum(tupleIdx));
       }
       for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
-        outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
+        outTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
       }
       finished = true;
+      return outTuple;
     }
-    return outputTuple;
+    return null;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
index a40fc1d..3a0dd38 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
@@ -22,7 +22,6 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
-import org.apache.tajo.datum.Datum;
 import org.apache.tajo.plan.logical.StoreTableNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
@@ -85,7 +84,7 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec {
           StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats());
 
           appender = getNextPartitionAppender(getSubdirectory(currentKey));
-          prevKey = new VTuple(currentKey);
+          prevKey.put(currentKey.getValues());
 
           // reset all states for file rotating
           writtenFileNum = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
index 28be9de..b652b0a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
@@ -26,7 +26,6 @@ import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.Comparator;
-import java.util.List;
 
 public abstract class SortExec extends UnaryPhysicalExec {
 
@@ -40,7 +39,7 @@ public abstract class SortExec extends UnaryPhysicalExec {
     this.comparator = new BaseTupleComparator(inSchema, sortSpecs);
   }
 
-  protected TupleSorter getSorter(List<Tuple> tupleSlots) {
+  protected TupleSorter getSorter(TupleList tupleSlots) {
     if (!tupleSlots.isEmpty() && ComparableVector.isVectorizable(sortSpecs)) {
       return new VectorizedSorter(tupleSlots, sortSpecs, comparator.getSortKeyIds());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 6031fdb..c317f7f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -49,7 +49,6 @@ public class StoreTableExec extends UnaryPhysicalExec {
   private PersistentStoreNode plan;
   private TableMeta meta;
   private Appender appender;
-  private Tuple tuple;
 
   // for file punctuation
   private TableStats sumStats;                  // for aggregating all stats of written files
@@ -125,6 +124,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
    */
   @Override
   public Tuple next() throws IOException {
+    Tuple tuple;
     while(!context.isStopped() && (tuple = child.next()) != null) {
       appender.addTuple(tuple);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleList.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleList.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleList.java
new file mode 100644
index 0000000..71ccae1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleList.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.util.ArrayList;
+
+/**
+ * In TupleList, input tuples are automatically cloned whenever the add() method is called.
+ * This data structure is usually used in physical operators like hash join or hash aggregation.
+ */
+public class TupleList extends ArrayList<Tuple> {
+
+  public TupleList() {
+    super();
+  }
+
+  public TupleList(int initialCapacity) {
+    super(initialCapacity);
+  }
+
+  @Override
+  public boolean add(Tuple tuple) {
+    return super.add(new VTuple(tuple));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleMap.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleMap.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleMap.java
new file mode 100644
index 0000000..6f72522
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleMap.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.annotation.Nullable;
+
+import java.util.HashMap;
+
+/**
+ * TupleMap is a map which uses KeyTuple with its key.
+ * Please note that every put() call creates a copy of input KeyTuple.
+ * This data structure is usually used in physical operators like hash join or hash aggregation.
+ *
+ * @param <E> value type
+ */
+public class TupleMap<E> extends HashMap<KeyTuple, E> {
+
+  public TupleMap() {
+    super();
+  }
+
+  public TupleMap(int initialCapacity) {
+    super(initialCapacity);
+  }
+
+  public TupleMap(TupleMap tupleMap){
+    super(tupleMap);
+  }
+
+  /**
+   * Add a pair of (key, value).
+   * The key is always copied.
+   *
+   * @param key
+   * @param value
+   * @return
+   */
+  @Override
+  public E put(@Nullable KeyTuple key, E value) {
+    if (key != null) {
+      return super.put(new KeyTuple(key.getValues()), value);
+    } else {
+      return super.put(null, value);
+    }
+  }
+
+  /**
+   * Add a pair of (key, value).
+   * The key is not copied.
+   *
+   * @param key
+   * @param value
+   * @return
+   */
+  public E putWihtoutKeyCopy(@Nullable KeyTuple key, E value) {
+    return super.put(key, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSet.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSet.java
new file mode 100644
index 0000000..ff131c2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSet.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import java.util.HashSet;
+
+/**
+ * TupleSet is a set which accepts only KeyTuple.
+ * Input tuples are automatically cloned whenever the add() method is called.
+ * This data structure is usually used in physical operators like hash join or hash aggregation.
+ */
+public class TupleSet extends HashSet<KeyTuple> {
+
+  @Override
+  public boolean add(KeyTuple tuple) {
+    return super.add(new KeyTuple(tuple));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
index 57fe816..abf2808 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
@@ -22,18 +22,17 @@ import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 
 import java.util.Collections;
-import java.util.List;
 
 public interface TupleSorter {
 
   Iterable<Tuple> sort();
 
-  public static class DefaultSorter implements TupleSorter {
+  class DefaultSorter implements TupleSorter {
 
-    private final List<Tuple> target;
+    private final TupleList target;
     private final TupleComparator comparator;
 
-    public DefaultSorter(List<Tuple> target, TupleComparator comparator) {
+    public DefaultSorter(TupleList target, TupleComparator comparator) {
       this.target = target;
       this.comparator = comparator;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4820610f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
index 18d853f..d750f15 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
@@ -25,7 +25,6 @@ import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.Tuple;
 
 import java.util.Iterator;
-import java.util.List;
 
 /**
  * Extract raw level values (primitive or String/byte[]) from each of key columns before sorting
@@ -35,7 +34,7 @@ public class VectorizedSorter extends ComparableVector implements IndexedSortabl
 
   private final int[] mappings;         // index indirection
 
-  public VectorizedSorter(List<Tuple> source, SortSpec[] sortKeys, int[] keyIndex) {
+  public VectorizedSorter(TupleList source, SortSpec[] sortKeys, int[] keyIndex) {
     super(source.size(), sortKeys, keyIndex);
     source.toArray(tuples);   // wish it's array list
     mappings = new int[tuples.length];


Mime
View raw message