tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [2/2] git commit: TAJO-36: Improve ExternalSortExec with N-merge sort and final pass omission.
Date Tue, 04 Feb 2014 21:09:15 GMT
TAJO-36: Improve ExternalSortExec with N-merge sort and final pass omission.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/5177dcfa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/5177dcfa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/5177dcfa

Branch: refs/heads/master
Commit: 5177dcfa4b44e953919f47b94d39f9c5f7afb38b
Parents: 0781a38
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Wed Feb 5 00:20:42 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Wed Feb 5 00:23:59 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   5 +-
 .../java/org/apache/tajo/datum/Float8Datum.java |   3 -
 .../java/org/apache/tajo/util/ClassSize.java    | 353 ++++++++++++
 .../org/apache/tajo/util/CommonTestingUtil.java |  12 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  12 +-
 .../planner/physical/ExternalSortExec.java      | 563 +++++++++++++++----
 .../engine/planner/physical/PhysicalExec.java   |  21 +-
 .../physical/PhysicalPlanningException.java     |   2 +-
 .../planner/physical/UnaryPhysicalExec.java     |   2 +-
 .../apache/tajo/engine/eval/ExprTestBase.java   |   2 +-
 .../planner/physical/TestBNLJoinExec.java       |   8 +-
 .../planner/physical/TestExternalSortExec.java  |  45 +-
 .../planner/physical/TestHashAntiJoinExec.java  |   8 +-
 .../planner/physical/TestHashJoinExec.java      |   8 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   8 +-
 .../planner/physical/TestMergeJoinExec.java     |   8 +-
 .../engine/planner/physical/TestNLJoinExec.java |   8 +-
 .../planner/physical/TestPhysicalPlanner.java   |   8 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |  17 +-
 .../java/org/apache/tajo/storage/Tuple.java     |  88 ++-
 .../org/apache/tajo/jdbc/MetaDataTuple.java     |  88 +--
 .../apache/tajo/jdbc/TajoDatabaseMetaData.java  |   2 +-
 .../org/apache/tajo/storage/FrameTuple.java     |  71 +--
 .../java/org/apache/tajo/storage/LazyTuple.java |  78 +--
 .../org/apache/tajo/storage/MemoryUtil.java     | 161 ++++++
 .../java/org/apache/tajo/storage/RawFile.java   | 285 +++++++---
 .../java/org/apache/tajo/storage/RowFile.java   |  32 +-
 .../org/apache/tajo/storage/RowStoreUtil.java   |   4 +-
 .../java/org/apache/tajo/storage/Tuple.java     |  36 +-
 .../java/org/apache/tajo/storage/VTuple.java    |  59 +-
 .../org/apache/tajo/storage/TestLazyTuple.java  |   6 +-
 .../org/apache/tajo/storage/TestStorages.java   |   4 +-
 .../org/apache/tajo/storage/TestVTuple.java     |   6 +-
 .../apache/tajo/storage/v2/TestStorages.java    |   4 +-
 35 files changed, 1467 insertions(+), 553 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd757b5..ac08b75 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,9 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-36: Improve ExternalSortExec with N-merge sort and final pass
+    omission. (hyunsik)
+
     TAJO-564: Show execution block's progress in querydetail.jsp.
     (hyoungjunkim via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index de1b3c9..1699848 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -169,7 +169,10 @@ public class TajoConf extends Configuration {
     //////////////////////////////////
     // Physical Executors
     //////////////////////////////////
-    EXECUTOR_SORT_EXTENAL_BUFFER_SIZE("tajo.executor.sort.external-buffer-num", 1000000),
+    EXECUTOR_EXTERNAL_SORT_THREAD_NUM("tajo.executor.external-sort.thread-num", 1),
+    EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200),
+    EXECUTOR_EXTERNAL_SORT_FANOUT("tajo.executor.external-sort.fanout-num", 8),
+
     EXECUTOR_INNER_JOIN_INMEMORY_HASH_TABLE_SIZE("tajo.executor.join.inner.in-memory-table-num", (long)1000000),
     EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD("tajo.executor.join.inner.in-memory-hash-threshold-bytes",
         (long)256 * 1048576),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
index 5566b3a..010cf29 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Float8Datum.java
@@ -16,9 +16,6 @@
  * limitations under the License.
  */
 
-/**
- *
- */
 package org.apache.tajo.datum;
 
 import com.google.gson.annotations.Expose;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-common/src/main/java/org/apache/tajo/util/ClassSize.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ClassSize.java b/tajo-common/src/main/java/org/apache/tajo/util/ClassSize.java
new file mode 100644
index 0000000..708eae9
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/ClassSize.java
@@ -0,0 +1,353 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * Class for determining the "size" of a class, an attempt to calculate the
+ * actual bytes that an object of this class will occupy in memory
+ *
+ * The core of this class is taken from the Derby project
+ */
+
+public class ClassSize {
+  static final Log LOG = LogFactory.getLog(ClassSize.class);
+
+  private static int nrOfRefsPerObj = 2;
+
+  /** Array overhead */
+  public static final int ARRAY;
+
+  /** Overhead for ArrayList(0) */
+  public static final int ARRAYLIST;
+
+  /** Overhead for ByteBuffer */
+  public static final int BYTE_BUFFER;
+
+  /** Overhead for an Short */
+  public static final int SHORT;
+
+  /** Overhead for an Integer */
+  public static final int INTEGER;
+
+  /** Overhead for an Long */
+  public static final int LONG;
+
+  /** Overhead for an Float */
+  public static final int FLOAT;
+
+  /** Overhead for an Double */
+  public static final int DOUBLE;
+
+  /** Overhead for entry in map */
+  public static final int MAP_ENTRY;
+
+  /** Object overhead is minimum 2 * reference size (8 bytes on 64-bit) */
+  public static final int OBJECT;
+
+  /** Reference size is 8 bytes on 64-bit, 4 bytes on 32-bit */
+  public static final int REFERENCE;
+
+  /** String overhead */
+  public static final int STRING;
+
+  /** Overhead for TreeMap */
+  public static final int TREEMAP;
+
+  /** Overhead for ConcurrentHashMap */
+  public static final int CONCURRENT_HASHMAP;
+
+  /** Overhead for ConcurrentHashMap.Entry */
+  public static final int CONCURRENT_HASHMAP_ENTRY;
+
+  /** Overhead for ConcurrentHashMap.Segment */
+  public static final int CONCURRENT_HASHMAP_SEGMENT;
+
+  /** Overhead for ConcurrentSkipListMap */
+  public static final int CONCURRENT_SKIPLISTMAP;
+
+  /** Overhead for ConcurrentSkipListMap Entry */
+  public static final int CONCURRENT_SKIPLISTMAP_ENTRY;
+
+  /** Overhead for ReentrantReadWriteLock */
+  public static final int REENTRANT_LOCK;
+
+  /** Overhead for AtomicLong */
+  public static final int ATOMIC_LONG;
+
+  /** Overhead for AtomicInteger */
+  public static final int ATOMIC_INTEGER;
+
+  /** Overhead for AtomicBoolean */
+  public static final int ATOMIC_BOOLEAN;
+
+  /** Overhead for CopyOnWriteArraySet */
+  public static final int COPYONWRITE_ARRAYSET;
+
+  /** Overhead for CopyOnWriteArrayList */
+  public static final int COPYONWRITE_ARRAYLIST;
+
+  /** Overhead for timerange */
+  public static final int TIMERANGE;
+
+  /** Overhead for TimeRangeTracker */
+  public static final int TIMERANGE_TRACKER;
+
+  /** Overhead for KeyValueSkipListSet */
+  public static final int KEYVALUE_SKIPLIST_SET;
+
+  /* Are we running on jdk7? */
+  private static final boolean JDK7;
+  static {
+    final String version = System.getProperty("java.version");
+    // Verify String looks like this: 1.6.0_29
+    if (!version.matches("\\d\\.\\d\\..*")) {
+      throw new RuntimeException("Unexpected version format: " + version);
+    }
+    // Convert char to int
+    int major = (int)(version.charAt(0) - '0');
+    int minor = (int)(version.charAt(2) - '0');
+    JDK7 = major == 1 && minor == 7;
+  }
+
+  /**
+   * Method for reading the arc settings and setting overheads according
+   * to 32-bit or 64-bit architecture.
+   */
+  static {
+    //Default value is set to 8, covering the case when arcModel is unknown
+    if (is32BitJVM()) {
+      REFERENCE = 4;
+    } else {
+      REFERENCE = 8;
+    }
+
+    OBJECT = 2 * REFERENCE;
+
+    ARRAY = align(3 * REFERENCE);
+
+    ARRAYLIST = align(OBJECT + align(REFERENCE) + align(ARRAY) +
+        (2 * Bytes.SIZEOF_INT));
+
+    //noinspection PointlessArithmeticExpression
+    BYTE_BUFFER = align(OBJECT + align(REFERENCE) + align(ARRAY) +
+        (5 * Bytes.SIZEOF_INT) +
+        (3 * Bytes.SIZEOF_BOOLEAN) + Bytes.SIZEOF_LONG);
+
+    SHORT = align(OBJECT + Bytes.SIZEOF_SHORT);
+
+    INTEGER = align(OBJECT + Bytes.SIZEOF_INT);
+
+    LONG = align(OBJECT + Bytes.SIZEOF_LONG);
+
+    FLOAT = align(OBJECT + Bytes.SIZEOF_FLOAT);
+
+    DOUBLE = align(OBJECT + Bytes.SIZEOF_DOUBLE);
+
+    MAP_ENTRY = align(OBJECT + 5 * REFERENCE + Bytes.SIZEOF_BOOLEAN);
+
+    TREEMAP = align(OBJECT + (2 * Bytes.SIZEOF_INT) + align(7 * REFERENCE));
+
+    // STRING is different size in jdk6 and jdk7. Just use what we estimate as size rather than
+    // have a conditional on whether jdk7.
+    STRING = (int) estimateBase(String.class, false);
+
+    // CONCURRENT_HASHMAP is different size in jdk6 and jdk7; it looks like its different between
+    // 23.6-b03 and 23.0-b21. Just use what we estimate as size rather than have a conditional on
+    // whether jdk7.
+    CONCURRENT_HASHMAP = (int) estimateBase(ConcurrentHashMap.class, false);
+
+    CONCURRENT_HASHMAP_ENTRY = align(REFERENCE + OBJECT + (3 * REFERENCE) +
+        (2 * Bytes.SIZEOF_INT));
+
+    CONCURRENT_HASHMAP_SEGMENT = align(REFERENCE + OBJECT +
+        (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_FLOAT + ARRAY);
+
+    // The size changes from jdk7 to jdk8, estimate the size rather than use a conditional
+    CONCURRENT_SKIPLISTMAP = (int) estimateBase(ConcurrentSkipListMap.class, false);
+
+    CONCURRENT_SKIPLISTMAP_ENTRY = align(
+        align(OBJECT + (3 * REFERENCE)) + /* one node per entry */
+        align((OBJECT + (3 * REFERENCE))/2)); /* one index per two entries */
+
+    REENTRANT_LOCK = align(OBJECT + (3 * REFERENCE));
+
+    ATOMIC_LONG = align(OBJECT + Bytes.SIZEOF_LONG);
+
+    ATOMIC_INTEGER = align(OBJECT + Bytes.SIZEOF_INT);
+
+    ATOMIC_BOOLEAN = align(OBJECT + Bytes.SIZEOF_BOOLEAN);
+
+    COPYONWRITE_ARRAYSET = align(OBJECT + REFERENCE);
+
+    COPYONWRITE_ARRAYLIST = align(OBJECT + (2 * REFERENCE) + ARRAY);
+
+    TIMERANGE = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2 + Bytes.SIZEOF_BOOLEAN);
+
+    TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2);
+
+    KEYVALUE_SKIPLIST_SET = align(OBJECT + REFERENCE);
+  }
+
+  /**
+   * The estimate of the size of a class instance depends on whether the JVM
+   * uses 32 or 64 bit addresses, that is it depends on the size of an object
+   * reference. It is a linear function of the size of a reference, e.g.
+   * 24 + 5*r where r is the size of a reference (usually 4 or 8 bytes).
+   *
+   * This method returns the coefficients of the linear function, e.g. {24, 5}
+   * in the above example.
+   *
+   * @param cl A class whose instance size is to be estimated
+   * @param debug debug flag
+   * @return an array of 3 integers. The first integer is the size of the
+   * primitives, the second the number of arrays and the third the number of
+   * references.
+   */
+  @SuppressWarnings("unchecked")
+  private static int [] getSizeCoefficients(Class cl, boolean debug) {
+    int primitives = 0;
+    int arrays = 0;
+    //The number of references that a new object takes
+    int references = nrOfRefsPerObj;
+    int index = 0;
+
+    for ( ; null != cl; cl = cl.getSuperclass()) {
+      Field[] field = cl.getDeclaredFields();
+      if (null != field) {
+        for (Field aField : field) {
+          if (Modifier.isStatic(aField.getModifiers())) continue;
+          Class fieldClass = aField.getType();
+          if (fieldClass.isArray()) {
+            arrays++;
+            references++;
+          } else if (!fieldClass.isPrimitive()) {
+            references++;
+          } else {// Is simple primitive
+            String name = fieldClass.getName();
+
+            if (name.equals("int") || name.equals("I"))
+              primitives += Bytes.SIZEOF_INT;
+            else if (name.equals("long") || name.equals("J"))
+              primitives += Bytes.SIZEOF_LONG;
+            else if (name.equals("boolean") || name.equals("Z"))
+              primitives += Bytes.SIZEOF_BOOLEAN;
+            else if (name.equals("short") || name.equals("S"))
+              primitives += Bytes.SIZEOF_SHORT;
+            else if (name.equals("byte") || name.equals("B"))
+              primitives += Bytes.SIZEOF_BYTE;
+            else if (name.equals("char") || name.equals("C"))
+              primitives += Bytes.SIZEOF_CHAR;
+            else if (name.equals("float") || name.equals("F"))
+              primitives += Bytes.SIZEOF_FLOAT;
+            else if (name.equals("double") || name.equals("D"))
+              primitives += Bytes.SIZEOF_DOUBLE;
+          }
+          if (debug) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("" + index + " " + aField.getName() + " " + aField.getType());
+            }
+          }
+          index++;
+        }
+      }
+    }
+    return new int [] {primitives, arrays, references};
+  }
+
+  /**
+   * Estimate the static space taken up by a class instance given the
+   * coefficients returned by getSizeCoefficients.
+   *
+   * @param coeff the coefficients
+   *
+   * @param debug debug flag
+   * @return the size estimate, in bytes
+   */
+  private static long estimateBaseFromCoefficients(int [] coeff, boolean debug) {
+    long prealign_size = coeff[0] + align(coeff[1] * ARRAY) + coeff[2] * REFERENCE;
+
+    // Round up to a multiple of 8
+    long size = align(prealign_size);
+    if(debug) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Primitives=" + coeff[0] + ", arrays=" + coeff[1] +
+            ", references(includes " + nrOfRefsPerObj +
+            " for object overhead)=" + coeff[2] + ", refSize " + REFERENCE +
+            ", size=" + size + ", prealign_size=" + prealign_size);
+      }
+    }
+    return size;
+  }
+
+  /**
+   * Estimate the static space taken up by the fields of a class. This includes
+   * the space taken up by by references (the pointer) but not by the referenced
+   * object. So the estimated size of an array field does not depend on the size
+   * of the array. Similarly the size of an object (reference) field does not
+   * depend on the object.
+   *
+   * @param cl class
+   * @param debug debug flag
+   * @return the size estimate in bytes.
+   */
+  @SuppressWarnings("unchecked")
+  public static long estimateBase(Class cl, boolean debug) {
+    return estimateBaseFromCoefficients( getSizeCoefficients(cl, debug), debug);
+  }
+
+  /**
+   * Aligns a number to 8.
+   * @param num number to align to 8
+   * @return smallest number >= input that is a multiple of 8
+   */
+  public static int align(int num) {
+    return (int)(align((long)num));
+  }
+
+  /**
+   * Aligns a number to 8.
+   * @param num number to align to 8
+   * @return smallest number >= input that is a multiple of 8
+   */
+  public static long align(long num) {
+    //The 7 comes from that the alignSize is 8 which is the number of bytes
+    //stored and sent together
+    return  ((num + 7) >> 3) << 3;
+  }
+
+  /**
+   * Determines if we are running in a 32-bit JVM. Some unit tests need to
+   * know this too.
+   */
+  public static boolean is32BitJVM() {
+    return System.getProperty("sun.arch.data.model").equals("32");
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java
index cd9d188..cae7129 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/CommonTestingUtil.java
@@ -37,14 +37,20 @@ public class CommonTestingUtil {
   public static Path getTestDir(String dir) throws IOException {
     Path path = new Path(dir);
     FileSystem fs = FileSystem.getLocal(new Configuration());
-    if(fs.exists(path))
-      fs.delete(path, true);
-
+    cleanupTestDir(dir);
     fs.mkdirs(path);
 
     return fs.makeQualified(path);
   }
 
+  public static void cleanupTestDir(String dir) throws IOException {
+    Path path = new Path(dir);
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    if(fs.exists(path)) {
+      fs.delete(path, true);
+    }
+  }
+
   public static Path getTestDir() throws IOException {
     String randomStr = UUID.randomUUID().toString();
     Path path = new Path("target/test-data", randomStr);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 671cec3..b8c52c0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -845,17 +845,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
   public SortExec createBestSortPlan(TaskAttemptContext context, SortNode sortNode,
                                      PhysicalExec child) throws IOException {
-    String [] outerLineage = PlannerUtil.getRelationLineage(sortNode.getChild());
-    long estimatedSize = estimateSizeRecursive(context, outerLineage);
-    final long threshold = 1048576 * 2000;
-
-    // if the relation size is less than the reshold,
-    // the in-memory sort will be used.
-    if (estimatedSize <= threshold) {
-      return new MemSortExec(context, sortNode, child);
-    } else {
-      return new ExternalSortExec(context, sm, sortNode, child);
-    }
+    return new ExternalSortExec(context, sm, sortNode, child);
   }
 
   public PhysicalExec createIndexScanExec(TaskAttemptContext ctx,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 0a8cb62..a3b37fc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -18,197 +18,566 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.logical.SortNode;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.*;
 
-public class ExternalSortExec extends SortExec {
-  private SortNode plan;
+import static org.apache.tajo.storage.RawFile.RawFileAppender;
+import static org.apache.tajo.storage.RawFile.RawFileScanner;
 
-  private final List<Tuple> tupleSlots;
-  private boolean sorted = false;
-  private RawFile.RawFileScanner result;
-  private RawFile.RawFileAppender appender;
-  private FileSystem localFS;
+/**
+ * This external sort algorithm can be characterized by the followings:
+ *
+ * <ul>
+ *   <li>in-memory sort if input data size fits a sort buffer</li>
+ *   <li>k-way merge sort if input data size exceeds the size of sort buffer</li>
+ *   <li>parallel merge</li>
+ *   <li>final merge avoidance</li>
+ *   <li>Unbalance merge if needed</li>
+ * </ul>
+ */
+public class ExternalSortExec extends SortExec {
+  /** Class logger */
+  private static final Log LOG = LogFactory.getLog(ExternalSortExec.class);
 
+  private final SortNode plan;
   private final TableMeta meta;
+  /** the defaultFanout of external sort */
+  private final int defaultFanout;
+  /** It's the size of in-memory table. If memory consumption exceeds it, store the memory table into a disk. */
+  private final int sortBufferBytesNum;
+  /** the number of available cores */
+  private final int allocatedCoreNum;
+  /** If there are available multiple cores, it tries parallel merge. */
+  private final ExecutorService executorService;
+  /** used for in-memory sort of each chunk. */
+  private final List<Tuple> inMemoryTable;
+  /** temporal dir */
   private final Path sortTmpDir;
-  private int MEM_TUPLE_NUM;
+  /** It enables round-robin disks allocation */
+  private final LocalDirAllocator localDirAllocator;
+  /** local file system */
+  private final RawLocalFileSystem localFS;
+  /** final output files */
+  private List<Path> finalOutputFiles = null;
+
+  ///////////////////////////////////////////////////
+  // transient variables
+  ///////////////////////////////////////////////////
+  /** already sorted or not */
+  private boolean sorted = false;
+  /** a flag to point whether sorted data resides in memory or not */
+  private boolean memoryResident = true;
+  /** the final result */
+  private Scanner result;
 
   public ExternalSortExec(final TaskAttemptContext context,
-      final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
+                          final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
       throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema(), child, plan.getSortKeys());
     this.plan = plan;
+    this.meta = CatalogUtil.newTableMeta(StoreType.ROWFILE);
 
-    this.MEM_TUPLE_NUM = context.getConf().getIntVar(ConfVars.EXECUTOR_SORT_EXTENAL_BUFFER_SIZE);
-    this.tupleSlots = new ArrayList<Tuple>(MEM_TUPLE_NUM);
-
-    this.sortTmpDir = new Path(context.getWorkDir(), UUID.randomUUID().toString());
-    this.localFS = FileSystem.getLocal(context.getConf());
-    meta = CatalogUtil.newTableMeta(StoreType.ROWFILE);
+    this.defaultFanout = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT);
+    if (defaultFanout < 2) {
+      throw new PhysicalPlanningException(ConfVars.EXECUTOR_EXTERNAL_SORT_FANOUT.varname + " cannot be lower than 2");
+    }
+    // TODO - sort buffer and core num should be changed to use the allocated container resource.
+    this.sortBufferBytesNum = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE) * 1048576;
+    this.allocatedCoreNum = context.getConf().getIntVar(ConfVars.EXECUTOR_EXTERNAL_SORT_THREAD_NUM);
+    this.executorService = Executors.newFixedThreadPool(this.allocatedCoreNum);
+    this.inMemoryTable = new ArrayList<Tuple>(100000);
+
+    this.sortTmpDir = getExecutorTmpDir();
+    localDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+    localFS = new RawLocalFileSystem();
   }
 
   public void init() throws IOException {
     super.init();
-    localFS.mkdirs(sortTmpDir);
   }
 
   public SortNode getPlan() {
     return this.plan;
   }
 
-  private void sortAndStoreChunk(int chunkId, List<Tuple> tupleSlots)
+  /**
+   * Sort tuple block and store them into a chunk file
+   */
+  private final Path sortAndStoreChunk(int chunkId, List<Tuple> tupleBlock)
       throws IOException {
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW);
-    Collections.sort(tupleSlots, getComparator());
-    // TODO - RawFile requires the local file path.
-    // So, I add the scheme 'file:/' to path. But, it should be improved.
-    Path localPath = new Path(sortTmpDir + "/0_" + chunkId);
+    int rowNum = tupleBlock.size();
 
-    appender = new RawFile.RawFileAppender(context.getConf(), inSchema, meta, localPath);
-    appender.init();
+    long sortStart = System.currentTimeMillis();
+    Collections.sort(tupleBlock, getComparator());
+    long sortEnd = System.currentTimeMillis();
 
-    for (Tuple t : tupleSlots) {
+    long chunkWriteStart = System.currentTimeMillis();
+    Path outputPath = getChunkPathForWrite(0, chunkId);
+    final RawFileAppender appender = new RawFileAppender(context.getConf(), inSchema, meta, outputPath);
+    appender.init();
+    for (Tuple t : tupleBlock) {
       appender.addTuple(t);
     }
     appender.close();
-    tupleSlots.clear();
+    tupleBlock.clear();
+    long chunkWriteEnd = System.currentTimeMillis();
+
+
+    info(LOG, "Chunk #" + chunkId + " sort and written (" +
+        FileUtil.humanReadableByteCount(appender.getOffset(), false) + " bytes, " + rowNum + " rows, " +
+        ", sort time: " + (sortEnd - sortStart) + " msec, " +
+        "write time: " + (chunkWriteEnd - chunkWriteStart) + " msec)");
+    return outputPath;
   }
 
   /**
    * It divides all tuples into a number of chunks, then sort for each chunk.
-   * @return the number of stored chunks
+   *
+   * @return All paths of chunks
    * @throws java.io.IOException
    */
-  private int sortAndStoreAllChunks() throws IOException {
-    int chunkId = 0;
-
+  private final List<Path> sortAndStoreAllChunks() throws IOException {
     Tuple tuple;
+    int memoryConsumption = 0;
+    List<Path> chunkPaths = TUtil.newList();
+
+    int chunkId = 0;
+    long runStartTime = System.currentTimeMillis();
     while ((tuple = child.next()) != null) { // partition sort start
-      tupleSlots.add(new VTuple(tuple));
-      if (tupleSlots.size() == MEM_TUPLE_NUM) {
-        sortAndStoreChunk(chunkId, tupleSlots);
+      Tuple vtuple = new VTuple(tuple);
+      inMemoryTable.add(vtuple);
+      memoryConsumption += MemoryUtil.calculateMemorySize(vtuple);
+
+      if (memoryConsumption > sortBufferBytesNum) {
+        long runEndTime = System.currentTimeMillis();
+        info(LOG, chunkId + " run loading time: " + (runEndTime - runStartTime) + " msec");
+        runStartTime = runEndTime;
+
+        info(LOG, "Memory consumption exceeds " + sortBufferBytesNum + " bytes");
+        memoryResident = false;
+
+        chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
+
+        memoryConsumption = 0;
         chunkId++;
       }
     }
 
-    if (tupleSlots.size() > 0) {
-      sortAndStoreChunk(chunkId, tupleSlots);
-      chunkId++;
+    if (inMemoryTable.size() > 0) { // if there are at least one or more input tuples
+      if (!memoryResident) { // check if data exceeds a sort buffer. If so, it store the remain data into a chunk.
+        if (inMemoryTable.size() > 0) {
+          long start = System.currentTimeMillis();
+          int rowNum = inMemoryTable.size();
+          chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
+          long end = System.currentTimeMillis();
+          info(LOG, "Last Chunk #" + chunkId + " " + rowNum + " rows written (" + (end - start) + " msec)");
+        }
+      } else { // this case means that all data does not exceed a sort buffer
+        Collections.sort(inMemoryTable, getComparator());
+      }
     }
 
-    return chunkId;
+    return chunkPaths;
   }
 
-  private Path getChunkPath(int level, int chunkId) {
-    return StorageUtil.concatPath(sortTmpDir, "" + level + "_" + chunkId);
+  /**
+   * Get a local path from all temporal paths in round-robin manner.
+   */
+  private synchronized Path getChunkPathForWrite(int level, int chunkId) throws IOException {
+    return localDirAllocator.getLocalPathForWrite(sortTmpDir + "/" + level +"_" + chunkId, context.getConf());
   }
 
   @Override
   public Tuple next() throws IOException {
-    if (!sorted) {
 
-      // the total number of chunks for zero level
-      int totalChunkNumForLevel = sortAndStoreAllChunks();
+    if (!sorted) { // if not sorted, first sort all data
 
-      // if there are no chunk
-      if (totalChunkNumForLevel == 0) {
-        return null;
+      // Try to sort all data, and store them into a number of chunks if memory exceeds
+      long startTimeOfChunkSplit = System.currentTimeMillis();
+      List<Path> chunks = sortAndStoreAllChunks();
+      long endTimeOfChunkSplit = System.currentTimeMillis();
+      info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec");
+
+      if (memoryResident) { // if all sorted data reside in a main-memory table.
+        this.result = new MemTableScanner();
+      } else { // if input data exceeds main-memory at least once
+
+        try {
+          this.result = externalMergeAndSort(chunks);
+        } catch (Exception exception) {
+          throw new PhysicalPlanningException(exception);
+        }
+
+      }
+
+      sorted = true;
+      result.init();
+    }
+
+    return result.next();
+  }
+
+  private int calculateFanout(int remainInputChunks, int intputNum, int outputNum, int startIdx) {
+    int computedFanout = Math.min(remainInputChunks, defaultFanout);
+
+    // Why should we detect an opportunity for unbalanced merge?
+    //
+    // Assume that a fanout is given by 8 and there are 10 chunks.
+    // If we firstly merge 3 chunks into one chunk, there remain only 8 chunks.
+    // Then, we can just finish the merge phase even though we don't complete merge phase on all chunks.
+    if (checkIfCanBeUnbalancedMerged(intputNum - (startIdx + computedFanout), outputNum + 1)) {
+      int candidateFanout = computedFanout;
+      while(checkIfCanBeUnbalancedMerged(intputNum - (startIdx + candidateFanout), outputNum + 1)) {
+        candidateFanout--;
+      }
+      int beforeFanout = computedFanout;
+      if (computedFanout > candidateFanout + 1) {
+        computedFanout = candidateFanout + 1;
+        info(LOG, "Fanout reduced for unbalanced merge: " + beforeFanout + " -> " + computedFanout);
       }
+    }
 
-      int level = 0;
-      int chunkId = 0;
+    return computedFanout;
+  }
 
-      // continue until the chunk remains only one
-      while (totalChunkNumForLevel > 1) {
+  private Scanner externalMergeAndSort(List<Path> chunks) throws IOException, ExecutionException, InterruptedException {
+    int level = 0;
+    final List<Path> inputFiles = TUtil.newList(chunks);
+    final List<Path> outputFiles = TUtil.newList();
+    int remainRun = inputFiles.size();
 
-        while (chunkId < totalChunkNumForLevel) {
+    long mergeStart = System.currentTimeMillis();
 
-          Path nextChunk = getChunkPath(level + 1, chunkId / 2);
+    // continue until the remain runs are larger than defaultFanout
+    while (remainRun > defaultFanout) {
 
-          // if number of chunkId is odd just copy it.
-          if (chunkId + 1 >= totalChunkNumForLevel) {
+      // reset outChunkId
+      int remainInputRuns = inputFiles.size();
+      int outChunkId = 0;
+      int outputFileNum = 0;
+      List<Future> futures = TUtil.newList();
 
-            Path chunk = getChunkPath(level, chunkId);
-            localFS.moveFromLocalFile(chunk, nextChunk);
+      for (int startIdx = 0; startIdx < inputFiles.size();) {
 
-          } else {
+        // calculate proper fanout
+        int fanout = calculateFanout(remainInputRuns, inputFiles.size(), outputFileNum, startIdx);
+        // launch a merger runner
+        futures.add(executorService.submit(new KWayMergerCaller(level, outChunkId++, inputFiles, startIdx, fanout)));
+        outputFileNum++;
 
-            Path leftChunk = getChunkPath(level, chunkId);
-            Path rightChunk = getChunkPath(level, chunkId + 1);
+        startIdx += fanout;
+        remainInputRuns = inputFiles.size() - startIdx;
 
-            appender = new RawFile.RawFileAppender(context.getConf(), inSchema, meta, nextChunk);
-            appender.init();
-            merge(appender, leftChunk, rightChunk);
+        // If unbalanced merge is available, it finishes the merge phase earlier.
+        if (checkIfCanBeUnbalancedMerged(remainInputRuns, outputFileNum)) {
+          info(LOG, "Unbalanced merge possibility detected: number of remain input (" + remainInputRuns
+              + ") and output files (" + outputFileNum + ") <= " + defaultFanout);
 
-            appender.flush();
-            appender.close();
+          List<Path> switched = TUtil.newList();
+          // switch the remain inputs to the next outputs
+          for (int j = startIdx; j < inputFiles.size(); j++) {
+            switched.add(inputFiles.get(j));
           }
+          inputFiles.removeAll(switched);
+          outputFiles.addAll(switched);
 
-          chunkId += 2;
+          break;
         }
+      }
 
-        level++;
-        // init chunkId for each level
-        chunkId = 0;
-        // calculate the total number of chunks for next level
-        totalChunkNumForLevel = totalChunkNumForLevel / 2
-            + totalChunkNumForLevel % 2;
+      // wait for all sort runners
+      for (Future<Path> future : futures) {
+        outputFiles.add(future.get());
       }
 
-      Path result = getChunkPath(level, 0);
-      this.result = new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), meta, result);
-      sorted = true;
+      // delete merged intermediate files
+      for (Path path : inputFiles) {
+        localFS.delete(path, true);
+      }
+      info(LOG, inputFiles.size() + " merged intermediate files deleted");
+
+      // switch input files to output files, and then clear outputFiles
+      inputFiles.clear();
+      inputFiles.addAll(outputFiles);
+      remainRun = inputFiles.size();
+      outputFiles.clear();
+      level++;
     }
 
-    return result.next();
+    long mergeEnd = System.currentTimeMillis();
+    info(LOG, "Total merge time: " + (mergeEnd - mergeStart) + " msec");
+
+    // final result
+    finalOutputFiles = inputFiles;
+    result = createFinalMerger(inputFiles);
+    return result;
+  }
+
+  /**
+   * Merge Thread
+   */
+  private class KWayMergerCaller implements Callable<Path> {
+    final int level;
+    final int nextRunId;
+    final List<Path> inputFiles;
+    final int startIdx;
+    final int mergeFanout;
+
+    public KWayMergerCaller(final int level, final int nextRunId, final List<Path> inputFiles,
+                            final int startIdx, final int mergeFanout) {
+      this.level = level;
+      this.nextRunId = nextRunId;
+      this.inputFiles = inputFiles;
+      this.startIdx = startIdx;
+      this.mergeFanout = mergeFanout;
+    }
+
+    @Override
+    public Path call() throws Exception {
+      final Path outputPath = getChunkPathForWrite(level + 1, nextRunId);
+      info(LOG, mergeFanout + " files are being merged to an output file " + outputPath.getName());
+      long mergeStartTime = System.currentTimeMillis();
+      final RawFileAppender output = new RawFileAppender(context.getConf(), inSchema, meta, outputPath);
+      output.init();
+      final Scanner merger = createKWayMerger(inputFiles, startIdx, mergeFanout);
+      merger.init();
+      Tuple mergeTuple;
+      while((mergeTuple = merger.next()) != null) {
+        output.addTuple(mergeTuple);
+      }
+      merger.close();
+      output.close();
+      long mergeEndTime = System.currentTimeMillis();
+      info(LOG, outputPath.getName() + " is written to a disk. ("
+          + FileUtil.humanReadableByteCount(output.getOffset(), false)
+          + " bytes, " + (mergeEndTime - mergeStartTime) + " msec)");
+      return outputPath;
+    }
+  }
+
+  /**
+   * It checks if unbalanced merge is possible.
+   */
+  private boolean checkIfCanBeUnbalancedMerged(int remainInputNum, int outputNum) {
+    return (remainInputNum + outputNum) <= defaultFanout;
+  }
+
+  /**
+   * Create a merged file scanner or k-way merge scanner.
+   */
+  private Scanner createFinalMerger(List<Path> inputs) throws IOException {
+    if (inputs.size() == 1) {
+      this.result = getFileScanner(inputs.get(0));
+    } else {
+      this.result = createKWayMerger(inputs, 0, inputs.size());
+    }
+    return result;
+  }
+
+  private Scanner getFileScanner(Path path) throws IOException {
+    return new RawFileScanner(context.getConf(), plan.getInSchema(), meta, path);
   }
 
-  private void merge(RawFile.RawFileAppender appender, Path left, Path right)
+  private Scanner createKWayMerger(List<Path> inputs, final int startChunkId, final int num) throws IOException {
+    final Scanner [] sources = new Scanner[num];
+    for (int i = 0; i < num; i++) {
+      sources[i] = getFileScanner(inputs.get(startChunkId + i));
+    }
+
+    return createKWayMergerInternal(sources, 0, num);
+  }
+
+  private Scanner createKWayMergerInternal(final Scanner [] sources, final int startIdx, final int num)
       throws IOException {
-    RawFile.RawFileScanner leftScan = new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), meta, left);
+    if (num > 1) {
+      final int mid = (int) Math.ceil((float)num / 2);
+      return new PairWiseMerger(
+          createKWayMergerInternal(sources, startIdx, mid),
+          createKWayMergerInternal(sources, startIdx + mid, num - mid));
+    } else {
+      return sources[startIdx];
+    }
+  }
 
-    RawFile.RawFileScanner rightScan =
-        new RawFile.RawFileScanner(context.getConf(), plan.getInSchema(), meta, right);
+  private class MemTableScanner implements Scanner {
+    Iterator<Tuple> iterator;
 
-    Tuple leftTuple = leftScan.next();
-    Tuple rightTuple = rightScan.next();
+    @Override
+    public void init() throws IOException {
+      iterator = inMemoryTable.iterator();
+    }
 
-    Comparator<Tuple> comparator = getComparator();
-    while (leftTuple != null && rightTuple != null) {
-      if (comparator.compare(leftTuple, rightTuple) < 0) {
-        appender.addTuple(leftTuple);
-        leftTuple = leftScan.next();
+    @Override
+    public Tuple next() throws IOException {
+      if (iterator.hasNext()) {
+        return iterator.next();
       } else {
-        appender.addTuple(rightTuple);
-        rightTuple = rightScan.next();
+        return null;
       }
     }
 
-    if (leftTuple == null) {
-      appender.addTuple(rightTuple);
-      while ((rightTuple = rightScan.next()) != null) {
-        appender.addTuple(rightTuple);
+    @Override
+    public void reset() throws IOException {
+      init();
+    }
+
+    @Override
+    public void close() throws IOException {
+      iterator = null;
+    }
+
+    @Override
+    public boolean isProjectable() {
+      return false;
+    }
+
+    @Override
+    public void setTarget(Column[] targets) {
+    }
+
+    @Override
+    public boolean isSelectable() {
+      return false;
+    }
+
+    @Override
+    public void setSearchCondition(Object expr) {
+    }
+
+    @Override
+    public boolean isSplittable() {
+      return false;
+    }
+
+    @Override
+    public Schema getSchema() {
+      return null;
+    }
+  }
+
+  /**
+   * Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order.
+   */
+  private class PairWiseMerger implements Scanner {
+    private final Scanner leftScan;
+    private final Scanner rightScan;
+
+    private Tuple leftTuple;
+    private Tuple rightTuple;
+
+    private final Comparator<Tuple> comparator = getComparator();
+
+    public PairWiseMerger(Scanner leftScanner, Scanner rightScanner) throws IOException {
+      this.leftScan = leftScanner;
+      this.rightScan = rightScanner;
+    }
+
+    @Override
+    public void init() throws IOException {
+      leftScan.init();
+      rightScan.init();
+
+      leftTuple = leftScan.next();
+      rightTuple = rightScan.next();
+    }
+
+    public Tuple next() throws IOException {
+      Tuple outTuple;
+      if (leftTuple != null && rightTuple != null) {
+        if (comparator.compare(leftTuple, rightTuple) < 0) {
+          outTuple = leftTuple;
+          leftTuple = leftScan.next();
+        } else {
+          outTuple = rightTuple;
+          rightTuple = rightScan.next();
+        }
+        return outTuple;
       }
-    } else {
-      appender.addTuple(leftTuple);
-      while ((leftTuple = leftScan.next()) != null) {
-        appender.addTuple(leftTuple);
+
+      if (leftTuple == null) {
+        outTuple = rightTuple;
+        rightTuple = rightScan.next();
+      } else {
+        outTuple = leftTuple;
+        leftTuple = leftScan.next();
+      }
+      return outTuple;
+    }
+
+    @Override
+    public void reset() throws IOException {
+      leftScan.reset();
+      rightScan.reset();
+      init();
+    }
+
+    public void close() throws IOException {
+      leftScan.close();
+      rightScan.close();
+    }
+
+    @Override
+    public boolean isProjectable() {
+      return false;
+    }
+
+    @Override
+    public void setTarget(Column[] targets) {
+    }
+
+    @Override
+    public boolean isSelectable() {
+      return false;
+    }
+
+    @Override
+    public void setSearchCondition(Object expr) {
+    }
+
+    @Override
+    public boolean isSplittable() {
+      return false;
+    }
+
+    @Override
+    public Schema getSchema() {
+      return inSchema;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (result != null) {
+      result.close();
+    }
+
+    if (finalOutputFiles != null) {
+      for (Path path : finalOutputFiles) {
+        localFS.delete(path, true);
       }
     }
 
-    leftScan.close();
-    rightScan.close();
+    inMemoryTable.clear();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index 4e6cd64..033dcd9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -18,12 +18,15 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SchemaObject;
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
+import java.util.UUID;
 
 public abstract class PhysicalExec implements SchemaObject {
   protected final TaskAttemptContext context;
@@ -50,4 +53,20 @@ public abstract class PhysicalExec implements SchemaObject {
   public abstract void rescan() throws IOException;
 
   public abstract void close() throws IOException;
+
+  protected void info(Log log, String message) {
+    log.info("["+ context.getTaskId() + "] " + message);
+  }
+
+  protected void warn(Log log, String message) {
+    log.warn("[" + context.getTaskId() + "] " + message);
+  }
+
+  protected void fatal(Log log, String message) {
+    log.fatal("[" + context.getTaskId() + "] " + message);
+  }
+
+  protected Path getExecutorTmpDir() {
+    return new Path(UUID.randomUUID().toString());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
index 0d7554d..62add1e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
@@ -25,7 +25,7 @@ public class PhysicalPlanningException extends IOException {
     super(message);
   }
 
-  public PhysicalPlanningException(IOException ioe) {
+  public PhysicalPlanningException(Exception ioe) {
     super(ioe);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
index 6ed741b..a8dd877 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
@@ -19,8 +19,8 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 76bc9e8..fbfc76f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -115,7 +115,7 @@ public class ExprTestBase {
       vtuple = new VTuple(inputSchema.getColumnNum());
       for (int i = 0; i < inputSchema.getColumnNum(); i++) {
         // If null value occurs, null datum is manually inserted to an input tuple.
-        if (lazyTuple.get(i) instanceof TextDatum && lazyTuple.getText(i).asChars().equals("")) {
+        if (lazyTuple.get(i) instanceof TextDatum && lazyTuple.get(i).asChars().equals("")) {
           vtuple.put(i, NullDatum.get());
         } else {
           vtuple.put(i, lazyTuple.get(i));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 5f0457e..235fbcf 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -200,10 +200,10 @@ public class TestBNLJoinExec {
     exec.init();
     while ((tuple = exec.next()) != null) {
       count++;
-      assertTrue(i == tuple.getInt(0).asInt4());
-      assertTrue(i == tuple.getInt(1).asInt4());
-      assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
-      assertTrue(10 + i == tuple.getInt(3).asInt4());
+      assertTrue(i == tuple.get(0).asInt4());
+      assertTrue(i == tuple.get(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+      assertTrue(10 + i == tuple.get(3).asInt4());
       i += 2;
     }
     exec.close();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index e236126..d7a3eb1 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -56,11 +56,9 @@ public class TestExternalSortExec {
   private AbstractStorageManager sm;
   private Path testDir;
 
-
-  private final int numTuple = 1000000;
+  private final int numTuple = 100000;
   private Random rnd = new Random(System.currentTimeMillis());
 
-
   private TableDesc employee;
 
   @Before
@@ -69,12 +67,14 @@ public class TestExternalSortExec {
     util = new TajoTestingCluster();
     catalog = util.startCatalogCluster().getCatalog();
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
     sm = StorageManagerFactory.getStorageManager(conf, testDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerId", Type.INT4);
     schema.addColumn("empId", Type.INT4);
     schema.addColumn("deptName", Type.TEXT);
+    schema.addColumn("text_field", Type.TEXT);
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
@@ -83,15 +83,19 @@ public class TestExternalSortExec {
     appender.init();
     Tuple tuple = new VTuple(schema.getColumnNum());
     for (int i = 0; i < numTuple; i++) {
-      tuple.put(new Datum[] { DatumFactory.createInt4(rnd.nextInt(50)),
+      tuple.put(new Datum[] {
+          DatumFactory.createInt4(rnd.nextInt(50)),
           DatumFactory.createInt4(rnd.nextInt(100)),
-          DatumFactory.createText("dept_" + 123) });
+          DatumFactory.createText("dept_" + i),
+          DatumFactory.createText("f_" + i)
+      });
       appender.addTuple(tuple);
     }
     appender.flush();
     appender.close();
 
-    System.out.println("Total Rows: " + appender.getStats().getNumRows());
+    System.out.println(appender.getStats().getNumRows() + " rows (" + (appender.getStats().getNumBytes() / 1048576) +
+        " MB)");
 
     employee = new TableDesc("employee", schema, employeeMeta, employeePath);
     catalog.addTable(employee);
@@ -101,16 +105,17 @@ public class TestExternalSortExec {
 
   @After
   public void tearDown() throws Exception {
+    CommonTestingUtil.cleanupTestDir(TEST_PATH);
     util.shutdownCatalogCluster();
   }
 
   String[] QUERIES = {
-      "select managerId, empId, deptName from employee order by managerId, empId desc"
+      "select managerId, empId from employee order by managerId, empId"
   };
 
   @Test
   public final void testNext() throws IOException, PlanningException {
-    FileFragment[] frags = sm.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
+    FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
     Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
     TaskAttemptContext ctx = new TaskAttemptContext(conf,
@@ -120,15 +125,15 @@ public class TestExternalSortExec {
     LogicalPlan plan = planner.createPlan(expr);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     
     ProjectionExec proj = (ProjectionExec) exec;
 
     // TODO - should be planed with user's optimization hint
     if (!(proj.getChild() instanceof ExternalSortExec)) {
-      UnaryPhysicalExec sortExec = (UnaryPhysicalExec) proj.getChild();
-      SeqScanExec scan = (SeqScanExec)sortExec.getChild();
+      UnaryPhysicalExec sortExec = proj.getChild();
+      SeqScanExec scan = sortExec.getChild();
 
       ExternalSortExec extSort = new ExternalSortExec(ctx, sm,
           ((MemSortExec)sortExec).getPlan(), scan);
@@ -136,22 +141,26 @@ public class TestExternalSortExec {
     }
 
     Tuple tuple;
-    Datum preVal = null;
-    Datum curVal;
+    Tuple preVal = null;
+    Tuple curVal;
     int cnt = 0;
     exec.init();
     long start = System.currentTimeMillis();
+    TupleComparator comparator = new TupleComparator(proj.getSchema(),
+        new SortSpec[]{
+            new SortSpec(new Column("managerId", Type.INT4)),
+            new SortSpec(new Column("empId", Type.INT4))
+        });
 
     while ((tuple = exec.next()) != null) {
-      curVal = tuple.get(0);
+      curVal = tuple;
       if (preVal != null) {
-        assertTrue(preVal.lessThanEqual(curVal).asBool());
+        assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
       }
       preVal = curVal;
       cnt++;
     }
     long end = System.currentTimeMillis();
-    exec.close();
     assertEquals(numTuple, cnt);
 
     // for rescan test
@@ -159,9 +168,9 @@ public class TestExternalSortExec {
     exec.rescan();
     cnt = 0;
     while ((tuple = exec.next()) != null) {
-      curVal = tuple.get(0);
+      curVal = tuple;
       if (preVal != null) {
-        assertTrue(preVal.lessThanEqual(curVal).asBool());
+        assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
       }
       preVal = curVal;
       cnt++;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 65ed3c4..f4350fb 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -192,10 +192,10 @@ public class TestHashAntiJoinExec {
     exec.init();
     while ((tuple = exec.next()) != null) {
       count++;
-      assertTrue(i == tuple.getInt(0).asInt4());
-      assertTrue(i == tuple.getInt(1).asInt4()); // expected empid [0, 2, 4, 6, 8]
-      assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
-      assertTrue(10 + i == tuple.getInt(3).asInt4());
+      assertTrue(i == tuple.get(0).asInt4());
+      assertTrue(i == tuple.get(1).asInt4()); // expected empid [0, 2, 4, 6, 8]
+      assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+      assertTrue(10 + i == tuple.get(3).asInt4());
 
       i += 2;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index 2a1af7c..48cd265 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -162,10 +162,10 @@ public class TestHashJoinExec {
     exec.init();
     while ((tuple = exec.next()) != null) {
       count++;
-      assertTrue(i == tuple.getInt(0).asInt4());
-      assertTrue(i == tuple.getInt(1).asInt4());
-      assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
-      assertTrue(10 + i == tuple.getInt(3).asInt4());
+      assertTrue(i == tuple.get(0).asInt4());
+      assertTrue(i == tuple.get(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+      assertTrue(10 + i == tuple.get(3).asInt4());
 
       i += 2;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 0b23bb5..8eec324 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -197,10 +197,10 @@ public class TestHashSemiJoinExec {
     // expect result without duplicated tuples.
     while ((tuple = exec.next()) != null) {
       count++;
-      assertTrue(i == tuple.getInt(0).asInt4());
-      assertTrue(i == tuple.getInt(1).asInt4());
-      assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
-      assertTrue(10 + i == tuple.getInt(3).asInt4());
+      assertTrue(i == tuple.get(0).asInt4());
+      assertTrue(i == tuple.get(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+      assertTrue(10 + i == tuple.get(3).asInt4());
 
       i += 2;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index e6dd0a5..af72541 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -176,10 +176,10 @@ public class TestMergeJoinExec {
     exec.init();
     while ((tuple = exec.next()) != null) {
       count++;
-      assertTrue(i == tuple.getInt(0).asInt4());
-      assertTrue(i == tuple.getInt(1).asInt4());
-      assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
-      assertTrue((10 + i) == tuple.getInt(3).asInt4());
+      assertTrue(i == tuple.get(0).asInt4());
+      assertTrue(i == tuple.get(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+      assertTrue((10 + i) == tuple.get(3).asInt4());
 
       i += 2;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index 50d431c..004cb57 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -191,10 +191,10 @@ public class TestNLJoinExec {
     exec.init();
     while ((tuple = exec.next()) != null) {
       count++;
-      assertTrue(i == tuple.getInt(0).asInt4());
-      assertTrue(i == tuple.getInt(1).asInt4());
-      assertTrue(("dept_" + i).equals(tuple.getString(2).asChars()));
-      assertTrue(10 + i == tuple.getInt(3).asInt4());
+      assertTrue(i == tuple.get(0).asInt4());
+      assertTrue(i == tuple.get(1).asInt4());
+      assertTrue(("dept_" + i).equals(tuple.get(2).asChars()));
+      assertTrue(10 + i == tuple.get(3).asInt4());
       i += 2;
     }
     exec.close();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 7cdf6ec..c97ed00 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -341,9 +341,9 @@ public class TestPhysicalPlanner {
     exec.rescan();
     i = 0;
     while ((tuple = exec.next()) != null) {
-      assertEquals(6, tuple.getInt(2).asInt4()); // sum
-      assertEquals(3, tuple.getInt(3).asInt4()); // max
-      assertEquals(1, tuple.getInt(4).asInt4()); // min
+      assertEquals(6, tuple.get(2).asInt4()); // sum
+      assertEquals(3, tuple.get(3).asInt4()); // max
+      assertEquals(1, tuple.get(4).asInt4()); // min
       i++;
     }
     exec.close();
@@ -806,7 +806,7 @@ public class TestPhysicalPlanner {
         "name_1", "name_2", "name_3", "name_4", "name_5");
     exec.init();
     while ((tuple = exec.next()) != null) {
-      assertTrue(expected.contains(tuple.getString(0).asChars()));
+      assertTrue(expected.contains(tuple.get(0).asChars()));
       cnt++;
     }
     exec.close();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 018042c..78e846d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -35,10 +35,7 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.physical.RangeShuffleFileWriteExec;
-import org.apache.tajo.engine.planner.physical.MemSortExec;
-import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.engine.planner.physical.ProjectionExec;
+import org.apache.tajo.engine.planner.physical.*;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.index.bst.BSTIndex;
@@ -142,12 +139,12 @@ public class TestRangeRetrieverHandler {
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
-    MemSortExec sort = null;
+    ExternalSortExec sort = null;
     if (exec instanceof ProjectionExec) {
       ProjectionExec projExec = (ProjectionExec) exec;
       sort = projExec.getChild();
-    } else if (exec instanceof MemSortExec) {
-      sort = (MemSortExec) exec;
+    } else if (exec instanceof ExternalSortExec) {
+      sort = (ExternalSortExec) exec;
     } else {
       assertTrue(false);
     }
@@ -263,12 +260,12 @@ public class TestRangeRetrieverHandler {
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
-    MemSortExec sort = null;
+    ExternalSortExec sort = null;
     if (exec instanceof ProjectionExec) {
       ProjectionExec projExec = (ProjectionExec) exec;
       sort = projExec.getChild();
-    } else if (exec instanceof MemSortExec) {
-      sort = (MemSortExec) exec;
+    } else if (exec instanceof ExternalSortExec) {
+      sort = (ExternalSortExec) exec;
     } else {
       assertTrue(false);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java
index 3b6a550..f98e6f1 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/storage/Tuple.java
@@ -18,63 +18,51 @@
 
 package org.apache.tajo.storage;
 
-import org.apache.tajo.datum.*;
-
-import java.net.InetAddress;
+import org.apache.tajo.datum.Datum;
 
 public interface Tuple extends Cloneable {
-
-  public int size();
-
-  public boolean contains(int fieldid);
+  
+	public int size();
+	
+	public boolean contains(int fieldid);
 
   public boolean isNull(int fieldid);
+	
+	public void clear();
+	
+	public void put(int fieldId, Datum value);
 
-  public void clear();
-
-  public void put(int fieldId, Datum value);
-
-  public void put(int fieldId, Datum [] values);
+  public void put(int fieldId, Datum[] values);
 
   public void put(int fieldId, Tuple tuple);
-
-  public void put(Datum [] values);
-
-  public Datum get(int fieldId);
-
-  public void setOffset(long offset);
-
-  public long getOffset();
-
-  public BooleanDatum getBoolean(int fieldId);
-
-  public BitDatum getByte(int fieldId);
-
-  public CharDatum getChar(int fieldId);
-
-  public BlobDatum getBytes(int fieldId);
-
-  public Int2Datum getShort(int fieldId);
-
-  public Int4Datum getInt(int fieldId);
-
-  public Int8Datum getLong(int fieldId);
-
-  public Float4Datum getFloat(int fieldId);
-
-  public Float8Datum getDouble(int fieldId);
-
-  public Inet4Datum getIPv4(int fieldId);
-
-  public byte [] getIPv4Bytes(int fieldId);
-
-  public InetAddress getIPv6(int fieldId);
-
-  public byte [] getIPv6Bytes(int fieldId);
-
-  public TextDatum getString(int fieldId);
-
-  public TextDatum getText(int fieldId);
+	
+	public void put(Datum[] values);
+	
+	public Datum get(int fieldId);
+	
+	public void setOffset(long offset);
+	
+	public long getOffset();
+
+	public boolean getBool(int fieldId);
+
+	public byte getByte(int fieldId);
+
+  public char getChar(int fieldId);
+	
+	public byte [] getBytes(int fieldId);
+	
+	public short getInt2(int fieldId);
+	
+	public int getInt4(int fieldId);
+	
+	public long getInt8(int fieldId);
+	
+	public float getFloat4(int fieldId);
+	
+	public double getFloat8(int fieldId);
+	
+	public String getText(int fieldId);
 
   public Tuple clone() throws CloneNotSupportedException;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
index 7887c9e..a88a791 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
@@ -16,11 +16,11 @@ package org.apache.tajo.jdbc; /**
  * limitations under the License.
  */
 
-import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.Tuple;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -76,7 +76,7 @@ public class MetaDataTuple implements Tuple {
 
   @Override
   public Datum get(int fieldId) {
-    return getText(fieldId);
+    return values.get(fieldId);
   }
 
   @Override
@@ -90,99 +90,53 @@ public class MetaDataTuple implements Tuple {
   }
 
   @Override
-  public BooleanDatum getBoolean(int fieldId) {
-    throw new UnsupportedException("getBoolean");
+  public boolean getBool(int fieldId) {
+    throw new UnsupportedException("getBool");
   }
 
   @Override
-  public BitDatum getByte(int fieldId) {
+  public byte getByte(int fieldId) {
     throw new UnsupportedException("getByte");
   }
 
   @Override
-  public CharDatum getChar(int fieldId) {
-    throw new UnsupportedException("getBoolean");
+  public char getChar(int fieldId) {
+    throw new UnsupportedException("getChar");
   }
 
   @Override
-  public BlobDatum getBytes(int fieldId) {
+  public byte [] getBytes(int fieldId) {
     throw new UnsupportedException("BlobDatum");
   }
 
   @Override
-  public Int2Datum getShort(int fieldId) {
-    if(isNull(fieldId)) {
-      return null;
-    }
-    return new Int2Datum((short)Integer.parseInt(values.get(fieldId).toString()));
-  }
-
-  @Override
-  public Int4Datum getInt(int fieldId) {
-    if(isNull(fieldId)) {
-      return null;
-    }
-    return new Int4Datum(Integer.parseInt(values.get(fieldId).toString()));
-  }
-
-  @Override
-  public Int8Datum getLong(int fieldId) {
-    if(isNull(fieldId)) {
-      return null;
-    }
-    return new Int8Datum(Long.parseLong(values.get(fieldId).toString()));
-  }
-
-  @Override
-  public Float4Datum getFloat(int fieldId) {
-    if(isNull(fieldId)) {
-      return null;
-    }
-    return new Float4Datum(Float.parseFloat(values.get(fieldId).toString()));
-  }
-
-  @Override
-  public Float8Datum getDouble(int fieldId) {
-    if(isNull(fieldId)) {
-      return null;
-    }
-    return new Float8Datum(Float.parseFloat(values.get(fieldId).toString()));
+  public short getInt2(int fieldId) {
+    return (short)Integer.parseInt(values.get(fieldId).toString());
   }
 
   @Override
-  public Inet4Datum getIPv4(int fieldId) {
-    throw new UnsupportedException("getIPv4");
+  public int getInt4(int fieldId) {
+    return Integer.parseInt(values.get(fieldId).toString());
   }
 
   @Override
-  public byte[] getIPv4Bytes(int fieldId) {
-    throw new UnsupportedException("getIPv4Bytes");
+  public long getInt8(int fieldId) {
+    return Long.parseLong(values.get(fieldId).toString());
   }
 
   @Override
-  public InetAddress getIPv6(int fieldId) {
-    throw new UnsupportedException("getIPv6");
+  public float getFloat4(int fieldId) {
+    return Float.parseFloat(values.get(fieldId).toString());
   }
 
   @Override
-  public byte[] getIPv6Bytes(int fieldId) {
-    throw new UnsupportedException("getIPv6Bytes");
+  public double getFloat8(int fieldId) {
+    return Float.parseFloat(values.get(fieldId).toString());
   }
 
   @Override
-  public TextDatum getString(int fieldId) {
-    if(isNull(fieldId)) {
-      return null;
-    }
-    return new TextDatum(values.get(fieldId).toString());
-  }
-
-  @Override
-  public TextDatum getText(int fieldId) {
-    if(isNull(fieldId)) {
-      return null;
-    }
-    return new TextDatum(values.get(fieldId).toString());
+  public String getText(int fieldId) {
+    return values.get(fieldId).toString();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java
index e868701..2637e6b 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoDatabaseMetaData.java
@@ -407,7 +407,7 @@ public class TajoDatabaseMetaData implements DatabaseMetaData {
         Collections.sort(resultTables, new Comparator<MetaDataTuple> () {
           @Override
           public int compare(MetaDataTuple table1, MetaDataTuple table2) {
-            return table1.getString(2).compareTo(table2.getString(2));
+            return table1.getText(2).compareTo(table2.getText(2));
           }
         });
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
index f05a316..5e2f28c 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
@@ -22,12 +22,10 @@
 package org.apache.tajo.storage;
 
 import com.google.common.base.Preconditions;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.exception.UnsupportedException;
 
-import java.net.InetAddress;
-
 /**
  * An instance of FrameTuple is an immutable tuple.
  * It contains two tuples and pretends to be one instance of Tuple for
@@ -123,78 +121,53 @@ public class FrameTuple implements Tuple, Cloneable {
   }
 
   @Override
-  public BooleanDatum getBoolean(int fieldId) {
-    return (BooleanDatum) get(fieldId);
-  }
-
-  @Override
-  public BitDatum getByte(int fieldId) {
-    return (BitDatum) get(fieldId);
-  }
-
-  @Override
-  public CharDatum getChar(int fieldId) {
-    return (CharDatum) get(fieldId);
-  }
-
-  @Override
-  public BlobDatum getBytes(int fieldId) {
-    return (BlobDatum) get(fieldId);
+  public boolean getBool(int fieldId) {
+    return get(fieldId).asBool();
   }
 
   @Override
-  public Int2Datum getShort(int fieldId) {
-    return (Int2Datum) get(fieldId);
+  public byte getByte(int fieldId) {
+    return get(fieldId).asByte();
   }
 
   @Override
-  public Int4Datum getInt(int fieldId) {
-    return (Int4Datum) get(fieldId);
+  public char getChar(int fieldId) {
+    return get(fieldId).asChar();
   }
 
   @Override
-  public Int8Datum getLong(int fieldId) {
-    return (Int8Datum) get(fieldId);
+  public byte [] getBytes(int fieldId) {
+    return get(fieldId).asByteArray();
   }
 
   @Override
-  public Float4Datum getFloat(int fieldId) {
-    return (Float4Datum) get(fieldId);
+  public short getInt2(int fieldId) {
+    return get(fieldId).asInt2();
   }
 
   @Override
-  public Float8Datum getDouble(int fieldId) {
-    return (Float8Datum) get(fieldId);
+  public int getInt4(int fieldId) {
+    return get(fieldId).asInt4();
   }
 
   @Override
-  public Inet4Datum getIPv4(int fieldId) {
-    return (Inet4Datum) get(fieldId);
+  public long getInt8(int fieldId) {
+    return get(fieldId).asInt8();
   }
 
   @Override
-  public byte[] getIPv4Bytes(int fieldId) { 
-    return get(fieldId).asByteArray();
-  }
-
-  @Override
-  public InetAddress getIPv6(int fieldId) {
-    throw new UnimplementedException();
-  }
-  
-  @Override
-  public byte[] getIPv6Bytes(int fieldId) {
-    throw new UnimplementedException();
+  public float getFloat4(int fieldId) {
+    return get(fieldId).asFloat4();
   }
 
   @Override
-  public TextDatum getString(int fieldId) {
-    return (TextDatum) get(fieldId);
+  public double getFloat8(int fieldId) {
+    return get(fieldId).asFloat8();
   }
 
   @Override
-  public TextDatum getText(int fieldId) {
-    return (TextDatum) get(fieldId);
+  public String getText(int fieldId) {
+    return get(fieldId).asChars();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
index 4d484df..7878004 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -19,10 +19,9 @@
 package org.apache.tajo.storage;
 
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.datum.exception.InvalidCastException;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
 
-import java.net.InetAddress;
 import java.util.Arrays;
 
 public class LazyTuple implements Tuple, Cloneable {
@@ -143,86 +142,53 @@ public class LazyTuple implements Tuple, Cloneable {
   }
 
   @Override
-  public BooleanDatum getBoolean(int fieldId) {
-    return (BooleanDatum) get(fieldId);
+  public boolean getBool(int fieldId) {
+    return get(fieldId).asBool();
   }
 
   @Override
-  public BitDatum getByte(int fieldId) {
-    return (BitDatum) get(fieldId);
+  public byte getByte(int fieldId) {
+    return get(fieldId).asByte();
   }
 
   @Override
-  public CharDatum getChar(int fieldId) {
-    return (CharDatum) get(fieldId);
+  public char getChar(int fieldId) {
+    return get(fieldId).asChar();
   }
 
   @Override
-  public BlobDatum getBytes(int fieldId) {
-    return (BlobDatum) get(fieldId);
-  }
-
-  @Override
-  public Int2Datum getShort(int fieldId) {
-    return (Int2Datum) get(fieldId);
-  }
-
-  @Override
-  public Int4Datum getInt(int fieldId) {
-    return (Int4Datum) get(fieldId);
-  }
-
-  @Override
-  public Int8Datum getLong(int fieldId) {
-    return (Int8Datum) get(fieldId);
-  }
-
-  @Override
-  public Float4Datum getFloat(int fieldId) {
-    return (Float4Datum) get(fieldId);
-  }
-
-  @Override
-  public Float8Datum getDouble(int fieldId) {
-    return (Float8Datum) get(fieldId);
+  public byte [] getBytes(int fieldId) {
+    return get(fieldId).asByteArray();
   }
 
   @Override
-  public Inet4Datum getIPv4(int fieldId) {
-    return (Inet4Datum) get(fieldId);
+  public short getInt2(int fieldId) {
+    return get(fieldId).asInt2();
   }
 
   @Override
-  public byte[] getIPv4Bytes(int fieldId) {
-    return get(fieldId).asByteArray();
+  public int getInt4(int fieldId) {
+    return get(fieldId).asInt4();
   }
 
   @Override
-  public InetAddress getIPv6(int fieldId) {
-    throw new InvalidCastException("IPv6 is unsupported yet");
+  public long getInt8(int fieldId) {
+    return get(fieldId).asInt8();
   }
 
   @Override
-  public byte[] getIPv6Bytes(int fieldId) {
-    throw new InvalidCastException("IPv6 is unsupported yet");
+  public float getFloat4(int fieldId) {
+    return get(fieldId).asFloat4();
   }
 
   @Override
-  public TextDatum getString(int fieldId) {
-    return (TextDatum) get(fieldId);
+  public double getFloat8(int fieldId) {
+    return get(fieldId).asInt8();
   }
 
   @Override
-  public TextDatum getText(int fieldId) {
-    return (TextDatum) get(fieldId);
-  }
-
-  public byte[] getTextBytes(int fieldId) {
-    if(textBytes[fieldId] != null)
-      return textBytes[fieldId];
-    else {
-      return get(fieldId).asTextBytes();
-    }
+  public String getText(int fieldId) {
+    return get(fieldId).asChars();
   }
 
   public String toString() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5177dcfa/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
new file mode 100644
index 0000000..3218f4d
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.datum.*;
+import org.apache.tajo.util.ClassSize;
+
+public class MemoryUtil {
+
+  /** Overhead for an NullDatum */
+  public static final long NULL_DATUM;
+
+  /** Overhead for an BoolDatum */
+  public static final long BOOL_DATUM;
+
+  /** Overhead for an CharDatum */
+  public static final long CHAR_DATUM;
+
+  /** Overhead for an BitDatum */
+  public static final long BIT_DATUM;
+
+  /** Overhead for an Int2Datum */
+  public static final long INT2_DATUM;
+
+  /** Overhead for an Int4Datum */
+  public static final long INT4_DATUM;
+
+  /** Overhead for an Int8Datum */
+  public static final long INT8_DATUM;
+
+  /** Overhead for an Float4Datum */
+  public static final long FLOAT4_DATUM;
+
+  /** Overhead for an Float8Datum */
+  public static final long FLOAT8_DATUM;
+
+  /** Overhead for an TextDatum */
+  public static final long TEXT_DATUM;
+
+  /** Overhead for an BlobDatum */
+  public static final long BLOB_DATUM;
+
+  /** Overhead for an DateDatum */
+  public static final long DATE_DATUM;
+
+  /** Overhead for an TimeDatum */
+  public static final long TIME_DATUM;
+
+  /** Overhead for an TimestampDatum */
+  public static final long TIMESTAMP_DATUM;
+
+  static {
+    NULL_DATUM = ClassSize.estimateBase(NullDatum.class, false);
+
+    CHAR_DATUM = ClassSize.estimateBase(CharDatum.class, false);
+
+    BOOL_DATUM = ClassSize.estimateBase(BooleanDatum.class, false);
+
+    BIT_DATUM = ClassSize.estimateBase(BitDatum.class, false);
+
+    INT2_DATUM = ClassSize.estimateBase(Int2Datum.class, false);
+
+    INT4_DATUM = ClassSize.estimateBase(Int4Datum.class, false);
+
+    INT8_DATUM = ClassSize.estimateBase(Int8Datum.class, false);
+
+    FLOAT4_DATUM = ClassSize.estimateBase(Float4Datum.class, false);
+
+    FLOAT8_DATUM = ClassSize.estimateBase(Float8Datum.class, false);
+
+    TEXT_DATUM = ClassSize.estimateBase(TextDatum.class, false);
+
+    BLOB_DATUM = ClassSize.estimateBase(BlobDatum.class, false);
+
+    DATE_DATUM = ClassSize.estimateBase(DateDatum.class, false);
+
+    TIME_DATUM = ClassSize.estimateBase(TimeDatum.class, false);
+
+    TIMESTAMP_DATUM = ClassSize.estimateBase(TimestampDatum.class, false);
+  }
+
+  public static long calculateMemorySize(Tuple tuple) {
+    long total = ClassSize.OBJECT;
+    for (Datum datum : tuple.getValues()) {
+      switch (datum.type()) {
+
+      case NULL_TYPE:
+        total += NULL_DATUM;
+        break;
+
+      case BOOLEAN:
+        total += BOOL_DATUM;
+        break;
+
+      case BIT:
+        total += BIT_DATUM;
+        break;
+
+      case CHAR:
+        total += CHAR_DATUM + datum.size();
+        break;
+
+      case INT1:
+      case INT2:
+        total += INT2_DATUM;
+        break;
+
+      case INT4:
+        total += INT4_DATUM;
+        break;
+
+      case INT8:
+        total += INT8_DATUM;
+        break;
+
+      case FLOAT4:
+        total += FLOAT4_DATUM;
+        break;
+
+      case FLOAT8:
+        total += FLOAT4_DATUM;
+        break;
+
+      case TEXT:
+        total += TEXT_DATUM + datum.size();
+        break;
+
+      case DATE:
+        total += DATE_DATUM;
+        break;
+
+      case TIME:
+        total += TIME_DATUM;
+        break;
+
+      case TIMESTAMP:
+        total += TIMESTAMP_DATUM;
+        break;
+
+      }
+    }
+
+    return total;
+  }
+}


Mime
View raw message