tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject tajo git commit: TAJO-1460 Apply TAJO-1407 to ExternalSortExec
Date Thu, 02 Apr 2015 06:24:55 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 7c2a24090 -> 6e7f1c7b0


TAJO-1460 Apply TAJO-1407 to ExternalSortExec

Closes #474


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6e7f1c7b
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6e7f1c7b
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6e7f1c7b

Branch: refs/heads/master
Commit: 6e7f1c7b0561c7299a498b81dfb6148883a76a0d
Parents: 7c2a240
Author: babokim <babokim@gmail.com>
Authored: Thu Apr 2 15:22:37 2015 +0900
Committer: babokim <babokim@gmail.com>
Committed: Thu Apr 2 15:22:37 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../planner/physical/ComparableVector.java      | 411 +++++++++++++++++++
 .../planner/physical/ExternalSortExec.java      | 238 ++++-------
 .../engine/planner/physical/MemSortExec.java    |  19 +-
 .../tajo/engine/planner/physical/SortExec.java  |  10 +-
 .../engine/planner/physical/TupleSorter.java    |  24 +-
 .../planner/physical/VectorizedSorter.java      | 155 +------
 .../planner/physical/TestExternalSortExec.java  |   4 +-
 .../planner/physical/TestTupleSorter.java       |   2 +-
 .../testCrossJoinWithAsterisk1.sql              |   2 +-
 .../testCrossJoinWithAsterisk2.sql              |   2 +-
 .../testCrossJoinWithAsterisk3.sql              |   2 +-
 .../testCrossJoinWithAsterisk4.sql              |   2 +-
 .../TestJoinQuery/testComplexJoinCondition7.sql |   2 +-
 .../testCrossJoinWithAsterisk1.sql              |   2 +-
 .../testCrossJoinWithAsterisk2.sql              |   2 +-
 .../testCrossJoinWithAsterisk3.sql              |   2 +-
 .../testCrossJoinWithAsterisk4.sql              |   2 +-
 .../queries/TestJoinQuery/testJoinWithJson.json |   8 +
 .../testComplexJoinCondition7.result            |   4 +-
 .../apache/tajo/storage/AbstractScanner.java    |  80 ++++
 .../org/apache/tajo/storage/MemoryUtil.java     |   4 +
 22 files changed, 655 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c3bce7d..1ccd016 100644
--- a/CHANGES
+++ b/CHANGES
@@ -10,6 +10,9 @@ Release 0.11.0 - unreleased
     (jihun)
 
   IMPROVEMENT
+   
+    TAJO-1460: Apply TAJO-1407 to ExternalSortExec. (Contributed by navis, 
+    Committed by hyoungjun)
 
     TAJO-1350: Refactor FilterPushDownRule::visitJoin() into well-defined, 
     small methods. (jihoon)

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
new file mode 100644
index 0000000..39b8c8a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
@@ -0,0 +1,411 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.primitives.Booleans;
+import com.google.common.primitives.Doubles;
+import com.google.common.primitives.Floats;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.Shorts;
+import com.google.common.primitives.UnsignedInts;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+/**
+ * Extract raw level values (primitive or String/byte[]) from each of key columns for compare
+ */
+public class ComparableVector {
+
+  protected final Tuple[] tuples;         // source tuples
+  protected final TupleVector[] vectors;  // values of key columns
+  protected final int[] keyIndex;
+
+  public ComparableVector(int length, SortSpec[] sortKeys, int[] keyIndex) {
+    tuples = new Tuple[length];
+    vectors = new TupleVector[sortKeys.length];
+    for (int i = 0; i < vectors.length; i++) {
+      TajoDataTypes.Type type = sortKeys[i].getSortKey().getDataType().getType();
+      boolean nullFirst = sortKeys[i].isNullFirst();
+      boolean ascending = sortKeys[i].isAscending();
+      boolean nullInvert = nullFirst && ascending || !nullFirst && !ascending;
+      vectors[i] = new TupleVector(vectorType(type), tuples.length, nullInvert, ascending);
+    }
+    this.keyIndex = keyIndex;
+  }
+
+  public int compare(final int i1, final int i2) {
+    for (TupleVector vector : vectors) {
+      int compare = vector.compare(i1, i2);
+      if (compare != 0) {
+        return compare;
+      }
+    }
+    return 0;
+  }
+
+  public void set(int index, Tuple tuple) {
+    for (int i = 0; i < vectors.length; i++) {
+      vectors[i].set(index, tuple, keyIndex[i]);
+    }
+  }
+
+  protected static class TupleVector {
+
+    private final int type;
+    private final BitSet nulls;
+    private final boolean nullInvert;
+    private final boolean ascending;
+
+    private boolean[] booleans;
+    private byte[] bits;
+    private short[] shorts;
+    private int[] ints;
+    private long[] longs;
+    private float[] floats;
+    private double[] doubles;
+    private byte[][] bytes;
+
+    private int index;
+
+    private TupleVector(int type, int length, boolean nullInvert, boolean ascending) {
+      this.type = type;
+      this.nulls = new BitSet(length);
+      this.nullInvert = nullInvert;
+      this.ascending = ascending;
+      switch (type) {
+        case 0: booleans = new boolean[length]; break;
+        case 1: bits = new byte[length]; break;
+        case 2: shorts = new short[length]; break;
+        case 3: ints = new int[length]; break;
+        case 4: longs = new long[length]; break;
+        case 5: floats = new float[length]; break;
+        case 6: doubles = new double[length]; break;
+        case 7: bytes = new byte[length][]; break;
+        case 8: ints = new int[length]; break;
+        case -1: break;
+        default:
+          throw new IllegalArgumentException();
+      }
+    }
+
+    protected final void append(Tuple tuple, int field) {
+      set(index++, tuple, field);
+    }
+
+    protected final void set(int index, Tuple tuple, int field) {
+      if (tuple.isNull(field)) {
+        nulls.set(index);
+        return;
+      }
+      nulls.clear(index);
+      switch (type) {
+        case 0: booleans[index] = tuple.getBool(field); break;
+        case 1: bits[index] = tuple.getByte(field); break;
+        case 2: shorts[index] = tuple.getInt2(field); break;
+        case 3: ints[index] = tuple.getInt4(field); break;
+        case 4: longs[index] = tuple.getInt8(field); break;
+        case 5: floats[index] = tuple.getFloat4(field); break;
+        case 6: doubles[index] = tuple.getFloat8(field); break;
+        case 7: bytes[index] = tuple.getBytes(field); break;
+        case 8: ints[index] = tuple.getInt4(field); break;
+        default:
+          throw new IllegalArgumentException();
+      }
+    }
+
+    protected final int compare(int index1, int index2) {
+      final boolean n1 = nulls.get(index1);
+      final boolean n2 = nulls.get(index2);
+      if (n1 && n2) {
+        return 0;
+      }
+      if (n1 ^ n2) {
+        int compVal = n1 ? 1 : -1;
+        return nullInvert ? -compVal : compVal;
+      }
+      int compare;
+      switch (type) {
+        case 0: compare = Booleans.compare(booleans[index1], booleans[index2]); break;
+        case 1: compare = bits[index1] - bits[index2]; break;
+        case 2: compare = Shorts.compare(shorts[index1], shorts[index2]); break;
+        case 3: compare = Ints.compare(ints[index1], ints[index2]); break;
+        case 4: compare = Longs.compare(longs[index1], longs[index2]); break;
+        case 5: compare = Floats.compare(floats[index1], floats[index2]); break;
+        case 6: compare = Doubles.compare(doubles[index1], doubles[index2]); break;
+        case 7: compare = TextDatum.COMPARATOR.compare(bytes[index1], bytes[index2]); break;
+        case 8: compare = UnsignedInts.compare(ints[index1], ints[index2]); break;
+        default:
+          throw new IllegalArgumentException();
+      }
+      return ascending ? compare : -compare;
+    }
+  }
+
+  public static class ComparableTuple {
+
+    private final TupleType[] keyTypes;
+    private final int[] keyIndex;
+    private final Object[] keys;
+
+    public ComparableTuple(Schema schema, int[] keyIndex) {
+      this(tupleTypes(schema, keyIndex), keyIndex);
+    }
+
+    public ComparableTuple(Schema schema, int start, int end) {
+      this(schema, toKeyIndex(start, end));
+    }
+
+    private ComparableTuple(TupleType[] keyTypes, int[] keyIndex) {
+      this.keyTypes = keyTypes;
+      this.keyIndex = keyIndex;
+      this.keys = new Object[keyIndex.length];
+    }
+
+    public int size() {
+      return keyIndex.length;
+    }
+
+    public void set(Tuple tuple) {
+      for (int i = 0; i < keyTypes.length; i++) {
+        final int field = keyIndex[i];
+        if (tuple.isNull(field)) {
+          keys[i] = null;
+          continue;
+        }
+        switch (keyTypes[i]) {
+          case BOOLEAN: keys[i] = tuple.getBool(field); break;
+          case BIT: keys[i] = tuple.getByte(field); break;
+          case INT1:
+          case INT2: keys[i] = tuple.getInt2(field); break;
+          case INT4:
+          case DATE:
+          case INET4: keys[i] = tuple.getInt4(field); break;
+          case INT8:
+          case TIME:
+          case TIMESTAMP: keys[i] = tuple.getInt8(field); break;
+          case FLOAT4: keys[i] = tuple.getFloat4(field); break;
+          case FLOAT8: keys[i] = tuple.getFloat8(field); break;
+          case TEXT:
+          case CHAR:
+          case BLOB: keys[i] = tuple.getBytes(field); break;
+          case DATUM: keys[i] = tuple.get(field); break;
+          default:
+            throw new IllegalArgumentException();
+        }
+      }
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      ComparableTuple other = (ComparableTuple)obj;
+      for (int i = 0; i < keys.length; i++) {
+        final boolean n1 = keys[i] == null;
+        final boolean n2 = other.keys[i] == null;
+        if (n1 && n2) {
+          continue;
+        }
+        if (n1 ^ n2) {
+          return false;
+        }
+        switch (keyTypes[i]) {
+          case TEXT:
+          case CHAR:
+          case BLOB: if (!Arrays.equals((byte[])keys[i], (byte[])other.keys[i])) return false; continue;
+          default: if (!keys[i].equals(other.keys[i])) return false; continue;
+        }
+      }
+      return true;
+    }
+
+    public boolean equals(Tuple tuple) {
+      for (int i = 0; i < keys.length; i++) {
+        final int field = keyIndex[i];
+        final boolean n1 = keys[i] == null;
+        final boolean n2 = tuple.isNull(field);
+        if (n1 && n2) {
+          continue;
+        }
+        if (n1 ^ n2) {
+          return false;
+        }
+        switch (keyTypes[i]) {
+          case BOOLEAN: if ((Boolean)keys[i] != tuple.getBool(field)) return false; continue;
+          case BIT: if ((Byte)keys[i] != tuple.getByte(field)) return false; continue;
+          case INT1:
+          case INT2: if ((Short)keys[i] != tuple.getInt2(field)) return false; continue;
+          case INT4:
+          case DATE:
+          case INET4: if ((Integer)keys[i] != tuple.getInt4(field)) return false; continue;
+          case INT8:
+          case TIME:
+          case TIMESTAMP: if ((Long)keys[i] != tuple.getInt8(field)) return false; continue;
+          case FLOAT4: if ((Float)keys[i] != tuple.getFloat4(field)) return false; continue;
+          case FLOAT8: if ((Double)keys[i] != tuple.getFloat8(field)) return false; continue;
+          case TEXT:
+          case CHAR:
+          case BLOB: if (!Arrays.equals((byte[])keys[i], tuple.getBytes(field))) return false; continue;
+          case DATUM: if (!keys[i].equals(tuple.get(field))) return false; continue;
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = 1;
+      for (Object key : keys) {
+        int hash = key == null ? 0 :
+            key instanceof byte[] ? Arrays.hashCode((byte[])key) : key.hashCode();
+        result = 31 * result + hash;
+      }
+      return result;
+    }
+
+    public ComparableTuple copy() {
+      ComparableTuple copy = emptyCopy();
+      System.arraycopy(keys, 0, copy.keys, 0, keys.length);
+      return copy;
+    }
+
+    public ComparableTuple emptyCopy() {
+      return new ComparableTuple(keyTypes, keyIndex);
+    }
+
+    public VTuple toVTuple() {
+      VTuple vtuple = new VTuple(keyIndex.length);
+      for (int i = 0; i < keyIndex.length; i++) {
+        vtuple.put(i, toDatum(i));
+      }
+      return vtuple;
+    }
+
+    public Datum toDatum(int i) {
+      if (keys[i] == null) {
+        return NullDatum.get();
+      }
+      switch (keyTypes[i]) {
+        case NULL_TYPE: return NullDatum.get();
+        case BOOLEAN: return DatumFactory.createBool((Boolean) keys[i]);
+        case BIT: return DatumFactory.createBit((Byte)keys[i]);
+        case INT1:
+        case INT2: return DatumFactory.createInt2((Short) keys[i]);
+        case INT4: return DatumFactory.createInt4((Integer) keys[i]);
+        case DATE: return DatumFactory.createDate((Integer) keys[i]);
+        case INET4: return DatumFactory.createInet4((Integer) keys[i]);
+        case INT8: return DatumFactory.createInt8((Long) keys[i]);
+        case TIME: return DatumFactory.createTime((Long) keys[i]);
+        case TIMESTAMP: return DatumFactory.createTimestamp((Long) keys[i]);
+        case FLOAT4: return DatumFactory.createFloat4((Float) keys[i]);
+        case FLOAT8: return DatumFactory.createFloat8((Double) keys[i]);
+        case TEXT: return DatumFactory.createText((byte[]) keys[i]);
+        case CHAR: return DatumFactory.createChar((byte[]) keys[i]);
+        case BLOB: return DatumFactory.createBlob((byte[]) keys[i]);
+        case DATUM: return (Datum)keys[i];
+        default:
+          throw new IllegalArgumentException();
+      }
+    }
+  }
+
+  public static boolean isVectorizable(SortSpec[] sortKeys) {
+    if (sortKeys.length == 0) {
+      return false;
+    }
+    for (SortSpec spec : sortKeys) {
+      try {
+        vectorType(spec.getSortKey().getDataType().getType());
+      } catch (Exception e) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static int vectorType(TajoDataTypes.Type type) {
+    switch (type) {
+      case BOOLEAN: return 0;
+      case BIT: return 1;
+      case INT1: case INT2: return 2;
+      case INT4: case DATE: return 3;
+      case INT8: case TIME: case TIMESTAMP: case INTERVAL: return 4;
+      case FLOAT4: return 5;
+      case FLOAT8: return 6;
+      case TEXT: case CHAR: case BLOB: return 7;
+      case INET4: return 8;
+      case NULL_TYPE: return -1;
+    }
+    // todo
+    throw new UnsupportedException(type.name());
+  }
+
+  private static TupleType[] tupleTypes(Schema schema, int[] keyIndex) {
+    TupleType[] types = new TupleType[keyIndex.length];
+    for (int i = 0; i < keyIndex.length; i++) {
+      types[i] = tupleType(schema.getColumn(keyIndex[i]).getDataType().getType());
+    }
+    return types;
+  }
+
+  private static TupleType tupleType(TajoDataTypes.Type type) {
+    switch (type) {
+      case BOOLEAN: return TupleType.BOOLEAN;
+      case BIT: return TupleType.BIT;
+      case INT1: return TupleType.INT1;
+      case INT2: return TupleType.INT2;
+      case INT4: return TupleType.INT4;
+      case DATE: return TupleType.DATE;
+      case INT8: return TupleType.INT8;
+      case TIME: return TupleType.TIME;
+      case TIMESTAMP: return TupleType.TIMESTAMP;
+      case FLOAT4: return TupleType.FLOAT4;
+      case FLOAT8: return TupleType.FLOAT8;
+      case TEXT: return TupleType.TEXT;
+      case CHAR: return TupleType.CHAR;
+      case BLOB: return TupleType.BLOB;
+      case INET4: return TupleType.INET4;
+      case NULL_TYPE: return TupleType.NULL_TYPE;
+      default: return TupleType.DATUM;
+    }
+  }
+
+  private static int[] toKeyIndex(int start, int end) {
+    int[] keyIndex = new int[end - start];
+    for (int i = 0; i < keyIndex.length; i++) {
+      keyIndex[i] = start + i;
+    }
+    return keyIndex;
+  }
+
+  private static enum TupleType {
+    NULL_TYPE, BOOLEAN, BIT, INT1, INT2, INT4, DATE, INET4, INT8, TIME, TIMESTAMP,
+    FLOAT4, FLOAT8, TEXT, CHAR, BLOB, DATUM
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index b3ebfb2..355f015 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -168,14 +167,14 @@ public class ExternalSortExec extends SortExec {
     int rowNum = tupleBlock.size();
 
     long sortStart = System.currentTimeMillis();
-    Collections.sort(tupleBlock, getComparator());
+    Iterable<Tuple> sorted = getSorter(tupleBlock).sort();
     long sortEnd = System.currentTimeMillis();
 
     long chunkWriteStart = System.currentTimeMillis();
     Path outputPath = getChunkPathForWrite(0, chunkId);
     final RawFileAppender appender = new RawFileAppender(context.getConf(), null, inSchema, meta, outputPath);
     appender.init();
-    for (Tuple t : tupleBlock) {
+    for (Tuple t : sorted) {
       appender.addTuple(t);
     }
     appender.close();
@@ -236,18 +235,13 @@ public class ExternalSortExec extends SortExec {
       }
     }
 
-    if (inMemoryTable.size() > 0) { // if there are at least one or more input tuples
-      if (!memoryResident) { // check if data exceeds a sort buffer. If so, it store the remain data into a chunk.
-        if (inMemoryTable.size() > 0) {
-          long start = System.currentTimeMillis();
-          int rowNum = inMemoryTable.size();
-          chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
-          long end = System.currentTimeMillis();
-          info(LOG, "Last Chunk #" + chunkId + " " + rowNum + " rows written (" + (end - start) + " msec)");
-        }
-      } else { // this case means that all data does not exceed a sort buffer
-        Collections.sort(inMemoryTable, getComparator());
-      }
+    if (!memoryResident && !inMemoryTable.isEmpty()) { // if there are at least one or more input tuples
+      // check if data exceeds a sort buffer. If so, it store the remain data into a chunk.
+      long start = System.currentTimeMillis();
+      int rowNum = inMemoryTable.size();
+      chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
+      long end = System.currentTimeMillis();
+      info(LOG, "Last Chunk #" + chunkId + " " + rowNum + " rows written (" + (end - start) + " msec)");
     }
 
     // get total loaded (or stored) bytes and total row numbers
@@ -285,7 +279,8 @@ public class ExternalSortExec extends SortExec {
         info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec");
 
         if (memoryResident) { // if all sorted data reside in a main-memory table.
-          this.result = new MemTableScanner();
+          TupleSorter sorter = getSorter(inMemoryTable);
+          result = new MemTableScanner(sorter.sort(), inMemoryTable.size(), sortAndStoredBytes);
         } else { // if input data exceeds main-memory at least once
 
           try {
@@ -314,7 +309,7 @@ public class ExternalSortExec extends SortExec {
     return result.next();
   }
 
-  private int calculateFanout(int remainInputChunks, int intputNum, int outputNum, int startIdx) {
+  private int calculateFanout(int remainInputChunks, int inputNum, int outputNum, int startIdx) {
     int computedFanout = Math.min(remainInputChunks, defaultFanout);
 
     // Why should we detect an opportunity for unbalanced merge?
@@ -322,9 +317,9 @@ public class ExternalSortExec extends SortExec {
     // Assume that a fanout is given by 8 and there are 10 chunks.
     // If we firstly merge 3 chunks into one chunk, there remain only 8 chunks.
     // Then, we can just finish the merge phase even though we don't complete merge phase on all chunks.
-    if (checkIfCanBeUnbalancedMerged(intputNum - (startIdx + computedFanout), outputNum + 1)) {
+    if (checkIfCanBeUnbalancedMerged(inputNum - (startIdx + computedFanout), outputNum + 1)) {
       int candidateFanout = computedFanout;
-      while(checkIfCanBeUnbalancedMerged(intputNum - (startIdx + candidateFanout), outputNum + 1)) {
+      while (checkIfCanBeUnbalancedMerged(inputNum - (startIdx + candidateFanout), outputNum + 1)) {
         candidateFanout--;
       }
       int beforeFanout = computedFanout;
@@ -354,7 +349,7 @@ public class ExternalSortExec extends SortExec {
       int remainInputRuns = inputFiles.size();
       int outChunkId = 0;
       int outputFileNum = 0;
-      List<Future> futures = TUtil.newList();
+      List<Future<FileFragment>> futures = TUtil.newList();
       // the number of files being merged in threads.
       List<Integer> numberOfMergingFiles = TUtil.newList();
 
@@ -419,7 +414,7 @@ public class ExternalSortExec extends SortExec {
        */
       int numDeletedFiles = 0;
       for (FileFragment frag : inputFiles) {
-        if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX) == true) {
+        if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX)) {
           localFS.delete(frag.getPath(), true);
           numDeletedFiles++;
           LOG.info("Delete merged intermediate file: " + frag);
@@ -527,28 +522,38 @@ public class ExternalSortExec extends SortExec {
       throws IOException {
     if (num > 1) {
       final int mid = (int) Math.ceil((float)num / 2);
-      return new PairWiseMerger(inSchema,
-          createKWayMergerInternal(sources, startIdx, mid),
-          createKWayMergerInternal(sources, startIdx + mid, num - mid), getComparator());
+      Scanner left = createKWayMergerInternal(sources, startIdx, mid);
+      Scanner right = createKWayMergerInternal(sources, startIdx + mid, num - mid);
+      if (ComparableVector.isVectorizable(sortSpecs)) {
+        return new VectorComparePairWiseMerger(inSchema, left, right, comparator);
+      }
+      return new PairWiseMerger(inSchema, left, right, comparator);
     } else {
       return sources[startIdx];
     }
   }
 
-  private class MemTableScanner implements Scanner {
-    Iterator<Tuple> iterator;
+  private static class MemTableScanner extends AbstractScanner {
+    final Iterable<Tuple> iterable;
+    final long sortAndStoredBytes;
+    final int totalRecords;
 
+    Iterator<Tuple> iterator;
     // for input stats
     float scannerProgress;
     int numRecords;
-    int totalRecords;
     TableStats scannerTableStats;
 
+    public MemTableScanner(Iterable<Tuple> iterable, int length, long inBytes) {
+      this.iterable = iterable;
+      this.totalRecords = length;
+      this.sortAndStoredBytes = inBytes;
+    }
+
     @Override
     public void init() throws IOException {
-      iterator = inMemoryTable.iterator();
+      iterator = iterable.iterator();
 
-      totalRecords = inMemoryTable.size();
       scannerProgress = 0.0f;
       numRecords = 0;
 
@@ -581,34 +586,6 @@ public class ExternalSortExec extends SortExec {
     }
 
     @Override
-    public boolean isProjectable() {
-      return false;
-    }
-
-    @Override
-    public void setTarget(Column[] targets) {
-    }
-
-    @Override
-    public boolean isSelectable() {
-      return false;
-    }
-
-    @Override
-    public void setSearchCondition(Object expr) {
-    }
-
-    @Override
-    public boolean isSplittable() {
-      return false;
-    }
-
-    @Override
-    public Schema getSchema() {
-      return null;
-    }
-
-    @Override
     public float getProgress() {
       if (iterator != null && numRecords > 0) {
         return (float)numRecords / (float)totalRecords;
@@ -630,19 +607,43 @@ public class ExternalSortExec extends SortExec {
     CLOSED
   }
 
+  private static class VectorComparePairWiseMerger extends PairWiseMerger {
+
+    private ComparableVector comparable;
+
+    public VectorComparePairWiseMerger(Schema schema, Scanner leftScanner, Scanner rightScanner,
+                                       BaseTupleComparator comparator) throws IOException {
+      super(schema, leftScanner, rightScanner, null);
+      comparable = new ComparableVector(2, comparator.getSortSpecs(), comparator.getSortKeyIds());
+    }
+
+    @Override
+    protected Tuple prepare(int index, Tuple tuple) {
+      if (tuple != null) {
+        comparable.set(index, tuple);
+      }
+      return tuple;
+    }
+
+    @Override
+    protected int compare() {
+      return comparable.compare(0, 1);
+    }
+  }
+
   /**
    * Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order.
    */
-  private static class PairWiseMerger implements Scanner {
-    private Scanner leftScan;
-    private Scanner rightScan;
+  private static class PairWiseMerger extends AbstractScanner {
 
-    private VTuple outTuple;
-    private VTuple leftTuple;
-    private VTuple rightTuple;
+    protected final Schema schema;
+    protected final Comparator<Tuple> comparator;
 
-    private final Schema schema;
-    private final Comparator<Tuple> comparator;
+    protected final Scanner leftScan;
+    protected final Scanner rightScan;
+
+    private Tuple leftTuple;
+    private Tuple rightTuple;
 
     private float mergerProgress;
     private TableStats mergerInputStats;
@@ -679,74 +680,30 @@ public class ExternalSortExec extends SortExec {
     }
 
     private void prepareTuplesForFirstComparison() throws IOException {
-      Tuple lt = leftScan.next();
-      if (lt != null) {
-        leftTuple = new VTuple(lt);
-      } else {
-        leftTuple = null; // TODO - missed free
-      }
-
-      Tuple rt = rightScan.next();
-      if (rt != null) {
-        rightTuple = new VTuple(rt);
-      } else {
-        rightTuple = null; // TODO - missed free
-      }
+      leftTuple = prepare(0, leftScan.next());
+      rightTuple = prepare(1, rightScan.next());
     }
 
-    public Tuple next() throws IOException {
+    protected Tuple prepare(int index, Tuple tuple) {
+      return tuple == null ? null : new VTuple(tuple);
+    }
 
-      if (leftTuple != null && rightTuple != null) {
-        if (comparator.compare(leftTuple, rightTuple) < 0) {
-          outTuple = new VTuple(leftTuple);
+    protected int compare() {
+      return comparator.compare(leftTuple, rightTuple);
+    }
 
-          Tuple lt = leftScan.next();
-          if (lt != null) {
-            leftTuple = new VTuple(lt);
-          } else {
-            leftTuple = null; // TODO - missed free
-          }
-        } else {
-          outTuple = new VTuple(rightTuple);
-
-          Tuple rt = rightScan.next();
-          if (rt != null) {
-            rightTuple = new VTuple(rt);
-          } else {
-            rightTuple = null; // TODO - missed free
-          }
-        }
-        return outTuple;
+    public Tuple next() throws IOException {
+      if (leftTuple == null && rightTuple == null) {
+        return null;
       }
-
-      if (leftTuple == null) {
-        if (rightTuple != null) {
-          outTuple = new VTuple(rightTuple);
-        } else {
-          outTuple = null;
-        }
-
-        Tuple rt = rightScan.next();
-        if (rt != null) {
-          rightTuple = new VTuple(rt);
-        } else {
-          rightTuple = null; // TODO - missed free
-        }
-      } else {
-        if (leftTuple != null) {
-          outTuple = new VTuple(leftTuple);
-        } else {
-          outTuple = null;
-        }
-
-        Tuple lt = leftScan.next();
-        if (lt != null) {
-          leftTuple = new VTuple(lt);
-        } else {
-          leftTuple = null; // TODO - missed free
-        }
+      if (rightTuple == null || (leftTuple != null && compare() < 0)) {
+        Tuple tuple = leftTuple;
+        leftTuple = prepare(0, leftScan.next());
+        return tuple;
       }
-      return outTuple;
+      Tuple tuple = rightTuple;
+      rightTuple = prepare(1, rightScan.next());
+      return tuple;
     }
 
     @Override
@@ -755,7 +712,6 @@ public class ExternalSortExec extends SortExec {
         leftScan.reset();
         rightScan.reset();
 
-        outTuple = null;
         leftTuple = null;
         rightTuple = null;
 
@@ -765,39 +721,15 @@ public class ExternalSortExec extends SortExec {
       }
     }
 
+    @Override
     public void close() throws IOException {
       IOUtils.cleanup(LOG, leftScan, rightScan);
       getInputStats();
-      leftScan = null;
-      rightScan = null;
       mergerProgress = 1.0f;
       setState(State.CLOSED);
     }
 
     @Override
-    public boolean isProjectable() {
-      return false;
-    }
-
-    @Override
-    public void setTarget(Column[] targets) {
-    }
-
-    @Override
-    public boolean isSelectable() {
-      return false;
-    }
-
-    @Override
-    public void setSearchCondition(Object expr) {
-    }
-
-    @Override
-    public boolean isSplittable() {
-      return false;
-    }
-
-    @Override
     public Schema getSchema() {
       return schema;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
index a2e039c..f76e356 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
@@ -25,11 +25,10 @@ import org.apache.tajo.storage.VTuple;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
-public class MemSortExec extends SortExec implements TupleSorter {
+public class MemSortExec extends SortExec {
   private SortNode plan;
   private List<Tuple> tupleSlots;
   private boolean sorted = false;
@@ -54,7 +53,7 @@ public class MemSortExec extends SortExec implements TupleSorter {
       while (!context.isStopped() && (tuple = child.next()) != null) {
         tupleSlots.add(new VTuple(tuple));
       }
-      iterator = getSorter().sort();
+      iterator = getSorter(tupleSlots).sort().iterator();
       sorted = true;
     }
 
@@ -65,14 +64,6 @@ public class MemSortExec extends SortExec implements TupleSorter {
     }
   }
 
-  private TupleSorter getSorter() {
-    try {
-      return new VectorizedSorter(tupleSlots, sortSpecs, comparator.getSortKeyIds());
-    } catch (Exception e) {
-      return this;
-    }
-  }
-
   @Override
   public void rescan() throws IOException {
     super.rescan();
@@ -92,10 +83,4 @@ public class MemSortExec extends SortExec implements TupleSorter {
   public SortNode getPlan() {
     return this.plan;
   }
-
-  @Override
-  public Iterator<Tuple> sort() {
-    Collections.sort(tupleSlots, comparator);
-    return tupleSlots.iterator();
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
index fb6a3b2..28be9de 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
@@ -22,13 +22,14 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.Comparator;
+import java.util.List;
 
 public abstract class SortExec extends UnaryPhysicalExec {
+
   protected final BaseTupleComparator comparator;
   protected final SortSpec [] sortSpecs;
 
@@ -39,6 +40,13 @@ public abstract class SortExec extends UnaryPhysicalExec {
     this.comparator = new BaseTupleComparator(inSchema, sortSpecs);
   }
 
+  protected TupleSorter getSorter(List<Tuple> tupleSlots) {
+    if (!tupleSlots.isEmpty() && ComparableVector.isVectorizable(sortSpecs)) {
+      return new VectorizedSorter(tupleSlots, sortSpecs, comparator.getSortKeyIds());
+    }
+    return new TupleSorter.DefaultSorter(tupleSlots, comparator);
+  }
+
   public SortSpec[] getSortSpecs() {
     return sortSpecs;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
index d240e4a..57fe816 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
@@ -19,9 +19,29 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
 
-import java.util.Iterator;
+import java.util.Collections;
+import java.util.List;
 
 public interface TupleSorter {
-  Iterator<Tuple> sort();
+
+  Iterable<Tuple> sort();
+
+  public static class DefaultSorter implements TupleSorter {
+
+    private final List<Tuple> target;
+    private final TupleComparator comparator;
+
+    public DefaultSorter(List<Tuple> target, TupleComparator comparator) {
+      this.target = target;
+      this.comparator = comparator;
+    }
+
+    @Override
+    public Iterable<Tuple> sort() {
+      Collections.sort(target, comparator);
+      return target;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
index 891d104..18d853f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
@@ -18,22 +18,12 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import com.google.common.primitives.Booleans;
-import com.google.common.primitives.Doubles;
-import com.google.common.primitives.Floats;
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-import com.google.common.primitives.Shorts;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.QuickSort;
 import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.Tuple;
 
-import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
 
@@ -41,26 +31,17 @@ import java.util.List;
  * Extract raw level values (primitive or String/byte[]) from each of key columns before sorting
  * Uses indirection for efficient swapping
  */
-public class VectorizedSorter implements IndexedSortable, TupleSorter {
+public class VectorizedSorter extends ComparableVector implements IndexedSortable, TupleSorter {
 
-  private final Tuple[] tuples;         // source tuples
-  private final TupleVector[] vectors;  // values of key columns
   private final int[] mappings;         // index indirection
 
   public VectorizedSorter(List<Tuple> source, SortSpec[] sortKeys, int[] keyIndex) {
-    this.tuples = source.toArray(new Tuple[source.size()]);
-    vectors = new TupleVector[sortKeys.length];
+    super(source.size(), sortKeys, keyIndex);
+    source.toArray(tuples);   // wish it's array list
     mappings = new int[tuples.length];
-    for (int i = 0; i < vectors.length; i++) {
-      TajoDataTypes.Type type = sortKeys[i].getSortKey().getDataType().getType();
-      boolean nullFirst = sortKeys[i].isNullFirst();
-      boolean ascending = sortKeys[i].isAscending();
-      boolean nullInvert = nullFirst && ascending || !nullFirst && !ascending;
-      vectors[i] = new TupleVector(TupleVector.getType(type), tuples.length, nullInvert, ascending);
-    }
     for (int i = 0; i < tuples.length; i++) {
       for (int j = 0; j < keyIndex.length; j++) {
-        vectors[j].add(tuples[i].get(keyIndex[j]));
+        vectors[j].append(tuples[i], keyIndex[j]);
       }
       mappings[i] = i;
     }
@@ -68,15 +49,7 @@ public class VectorizedSorter implements IndexedSortable, TupleSorter {
 
   @Override
   public int compare(int i1, int i2) {
-    final int index1 = mappings[i1];
-    final int index2 = mappings[i2];
-    for (TupleVector vector : vectors) {
-      int compare = vector.compare(index1, index2);
-      if (compare != 0) {
-        return compare;
-      }
-    }
-    return 0;
+    return super.compare(mappings[i1], mappings[i2]);
   }
 
   @Override
@@ -87,112 +60,18 @@ public class VectorizedSorter implements IndexedSortable, TupleSorter {
   }
 
   @Override
-  public Iterator<Tuple> sort() {
-    new QuickSort().sort(VectorizedSorter.this, 0, mappings.length);
-    return new Iterator<Tuple>() {
-      int index;
-      public boolean hasNext() { return index < mappings.length; }
-      public Tuple next() { return tuples[mappings[index++]]; }
-      public void remove() { throw new UnsupportedException(); }
-    };
-  }
-
-  private static class TupleVector {
-
-    private final int type;
-    private final BitSet nulls;
-    private final boolean nullInvert;
-    private final boolean ascending;
-
-    private boolean[] booleans;
-    private byte[] bits;
-    private short[] shorts;
-    private int[] ints;
-    private long[] longs;
-    private float[] floats;
-    private double[] doubles;
-    private byte[][] bytes;
-
-    private int index;
-
-    private TupleVector(int type, int length, boolean nullInvert, boolean ascending) {
-      this.type = type;
-      this.nulls = new BitSet(length);
-      this.nullInvert = nullInvert;
-      this.ascending = ascending;
-      switch (type) {
-        case 0: booleans = new boolean[length]; break;
-        case 1: bits = new byte[length]; break;
-        case 2: shorts = new short[length]; break;
-        case 3: ints = new int[length]; break;
-        case 4: longs = new long[length]; break;
-        case 5: floats = new float[length]; break;
-        case 6: doubles = new double[length]; break;
-        case 7: bytes = new byte[length][]; break;
-        default:
-          throw new IllegalArgumentException();
-      }
-    }
-
-    private void add(Datum datum) {
-      if (datum.isNull()) {
-        nulls.set(index++);
-        return;
-      }
-      switch (type) {
-        case 0: booleans[index] = datum.asBool(); break;
-        case 1: bits[index] = datum.asByte(); break;
-        case 2: shorts[index] = datum.asInt2(); break;
-        case 3: ints[index] = datum.asInt4(); break;
-        case 4: longs[index] = datum.asInt8(); break;
-        case 5: floats[index] = datum.asFloat4(); break;
-        case 6: doubles[index] = datum.asFloat8(); break;
-        case 7: bytes[index] = datum.asByteArray(); break;
-        default:
-          throw new IllegalArgumentException();
-      }
-      index++;
-    }
-
-    private int compare(int index1, int index2) {
-      final boolean n1 = nulls.get(index1);
-      final boolean n2 = nulls.get(index2);
-      if (n1 && n2) {
-        return 0;
-      }
-      if (n1 ^ n2) {
-        int compVal = n1 ? 1 : -1;
-        return nullInvert ? -compVal : compVal;
+  public Iterable<Tuple> sort() {
+    new QuickSort().sort(this, 0, mappings.length);
+    return new Iterable<Tuple>() {
+      @Override
+      public Iterator<Tuple> iterator() {
+        return new Iterator<Tuple>() {
+          int index;
+          public boolean hasNext() { return index < mappings.length; }
+          public Tuple next() { return tuples[mappings[index++]]; }
+          public void remove() { throw new UnsupportedException(); }
+        };
       }
-      int compare;
-      switch (type) {
-        case 0: compare = Booleans.compare(booleans[index1], booleans[index2]); break;
-        case 1: compare = bits[index1] - bits[index2]; break;
-        case 2: compare = Shorts.compare(shorts[index1], shorts[index2]); break;
-        case 3: compare = Ints.compare(ints[index1], ints[index2]); break;
-        case 4: compare = Longs.compare(longs[index1], longs[index2]); break;
-        case 5: compare = Floats.compare(floats[index1], floats[index2]); break;
-        case 6: compare = Doubles.compare(doubles[index1], doubles[index2]); break;
-        case 7: compare = TextDatum.COMPARATOR.compare(bytes[index1], bytes[index2]); break;
-        default:
-          throw new IllegalArgumentException();
-      }
-      return ascending ? compare : -compare;
-    }
-
-    public static int getType(TajoDataTypes.Type type) {
-      switch (type) {
-        case BOOLEAN: return 0;
-        case BIT: case INT1: return 1;
-        case INT2: return 2;
-        case INT4: case DATE: case INET4: return 3;
-        case INT8: case TIME: case TIMESTAMP: case INTERVAL: return 4;
-        case FLOAT4: return 5;
-        case FLOAT8: return 6;
-        case TEXT: case CHAR: case BLOB: return 7;
-      }
-      // todo
-      throw new UnsupportedException(type.name());
-    }
+    };
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 5d9d46d..946e0f3 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -61,7 +61,7 @@ public class TestExternalSortExec {
   private LogicalPlanner planner;
   private Path testDir;
 
-  private final int numTuple = 100000;
+  private final int numTuple = 3000000;
   private Random rnd = new Random(System.currentTimeMillis());
 
   private TableDesc employee;
@@ -161,7 +161,7 @@ public class TestExternalSortExec {
       if (preVal != null) {
         assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
       }
-      preVal = curVal;
+      preVal = new VTuple(curVal);
       cnt++;
     }
     long end = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
index fc43d42..9cc477a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
@@ -93,7 +93,7 @@ public class TestTupleSorter {
 
       long start = System.currentTimeMillis();
       VectorizedSorter sorter = new VectorizedSorter(target, sortKeys, keyIndices);
-      Iterator<Tuple> iterator = sorter.sort();
+      Iterator<Tuple> iterator = sorter.sort().iterator();
 
       String[] result1 = new String[SAMPLING];
       for (int i = 0; i < result1.length; i++) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql
index 5451b4a..111a371 100644
--- a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql
@@ -1 +1 @@
-select region.*, customer.* from region, customer order by r_regionkey,r_name;
\ No newline at end of file
+select region.*, customer.* from region, customer order by r_regionkey,r_name,c_custkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql
index e9dac51..ca1672e 100644
--- a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql
@@ -1 +1 @@
-select region.*, customer.* from customer, region order by r_regionkey,r_name;
\ No newline at end of file
+select region.*, customer.* from customer, region order by r_regionkey,r_name,c_custkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql
index c98e19f..fd44916 100644
--- a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql
@@ -1 +1 @@
-select * from customer, region order by c_custkey,c_name;
\ No newline at end of file
+select * from customer, region order by c_custkey,c_name,r_regionkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql
index 7130def..fc5b1c3 100644
--- a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql
@@ -1 +1 @@
-select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name;
\ No newline at end of file
+select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name,c_custkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql
index ddd669c..d2114cf 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql
@@ -3,4 +3,4 @@ select
   n1.n_name,
   n2.n_name
 from nation n1 join (select * from nation union select * from nation) n2 on substr(n1.n_name, 1, 4) = substr(n2.n_name, 1, 4)
-order by n1.n_nationkey;
\ No newline at end of file
+order by n1.n_nationkey,n2.n_name;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql
index 5451b4a..111a371 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql
@@ -1 +1 @@
-select region.*, customer.* from region, customer order by r_regionkey,r_name;
\ No newline at end of file
+select region.*, customer.* from region, customer order by r_regionkey,r_name,c_custkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql
index e9dac51..ca1672e 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql
@@ -1 +1 @@
-select region.*, customer.* from customer, region order by r_regionkey,r_name;
\ No newline at end of file
+select region.*, customer.* from customer, region order by r_regionkey,r_name,c_custkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql
index c98e19f..fd44916 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql
@@ -1 +1 @@
-select * from customer, region order by c_custkey,c_name;
\ No newline at end of file
+select * from customer, region order by c_custkey,c_name,r_regionkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql
index 7130def..fc5b1c3 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql
@@ -1 +1 @@
-select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name;
\ No newline at end of file
+select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name,c_custkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json b/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json
index bfccc6c..f6b34cd 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json
@@ -63,6 +63,14 @@
                 },
                 "IsAsc": true,
                 "IsNullFirst": false
+            },
+            {
+                "SortKey": {
+                    "ColumnName": "c_custkey",
+                    "OpType": "Column"
+                },
+                "IsAsc": true,
+                "IsNullFirst": false
             }
         ],
         "Expr": {

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result b/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result
index edd83cd..bed2968 100644
--- a/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result
+++ b/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result
@@ -47,10 +47,10 @@ n_nationkey,n_name,n_name
 22,RUSSIA,RUSSIA
 22,RUSSIA,RUSSIA
 23,UNITED KINGDOM,UNITED KINGDOM
-23,UNITED KINGDOM,UNITED STATES
 23,UNITED KINGDOM,UNITED KINGDOM
 23,UNITED KINGDOM,UNITED STATES
+23,UNITED KINGDOM,UNITED STATES
 24,UNITED STATES,UNITED KINGDOM
-24,UNITED STATES,UNITED STATES
 24,UNITED STATES,UNITED KINGDOM
+24,UNITED STATES,UNITED STATES
 24,UNITED STATES,UNITED STATES
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
new file mode 100644
index 0000000..3719412
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
+
+import java.io.IOException;
+
+// dummy scanner
+public abstract class AbstractScanner implements Scanner {
+
+  @Override
+  public void init() throws IOException {
+
+  }
+
+  @Override
+  public void reset() throws IOException {
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return false;
+  }
+
+  @Override
+  public void setTarget(Column[] targets) {
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return false;
+  }
+
+  @Override
+  public void setSearchCondition(Object expr) {
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return false;
+  }
+
+  @Override
+  public float getProgress() {
+    return 0;
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    return null;
+  }
+
+  @Override
+  public Schema getSchema() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
index f19b61f..16477cd 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
@@ -141,6 +141,10 @@ public class MemoryUtil {
         total += TEXT_DATUM + datum.size();
         break;
 
+      case BLOB:
+        total += BLOB_DATUM + datum.size();
+        break;
+
       case DATE:
         total += DATE_DATUM;
         break;


Mime
View raw message