tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject git commit: TAJO-1004: UniformRangePartition cannot deal with unicode ranges.
Date Thu, 21 Aug 2014 07:06:27 GMT
Repository: tajo
Updated Branches:
  refs/heads/master a1711d16b -> aed97a8a5


TAJO-1004: UniformRangePartition cannot deal with unicode ranges.

Closes #116


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/aed97a8a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/aed97a8a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/aed97a8a

Branch: refs/heads/master
Commit: aed97a8a5d5eac6e38103ea8356cec064cb251da
Parents: a1711d1
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Thu Aug 21 16:05:06 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Thu Aug 21 16:05:06 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../main/java/org/apache/tajo/datum/Datum.java  |   4 +
 .../java/org/apache/tajo/datum/TextDatum.java   |   8 +
 .../java/org/apache/tajo/util/StringUtils.java  |  34 +
 .../engine/planner/RangePartitionAlgorithm.java |  61 +-
 .../engine/planner/UniformRangePartition.java   | 309 +++++++--
 .../tajo/master/querymaster/SubQuery.java       |   5 +-
 .../planner/TestUniformRangePartition.java      | 635 ++++++++++++++++++-
 .../apache/tajo/engine/query/TestSortQuery.java |  73 ++-
 .../TestSortQuery/testSortOnNullColumn.sql      |  13 +
 .../TestSortQuery/testSortOnUnicodeTextAsc.sql  |   6 +
 .../TestSortQuery/testSortOnUnicodeTextDesc.sql |   6 +
 .../TestSortQuery/testSortOnNullColumn.result   |   6 +
 .../testSortOnUnicodeTextAsc.result             |   6 +
 .../testSortOnUnicodeTextDesc.result            |   6 +
 .../org/apache/tajo/jdbc/MetaDataTuple.java     |   5 +
 .../org/apache/tajo/storage/FrameTuple.java     |   5 +
 .../java/org/apache/tajo/storage/LazyTuple.java |   5 +
 .../java/org/apache/tajo/storage/Tuple.java     |   2 +
 .../org/apache/tajo/storage/TupleRange.java     |  29 +-
 .../java/org/apache/tajo/storage/VTuple.java    |   5 +
 21 files changed, 1091 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e87feaa..f0734f2 100644
--- a/CHANGES
+++ b/CHANGES
@@ -124,6 +124,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1004: UniformRangePartition cannot deal with unicode ranges.
+    (hyunsik)
+
     TAJO-1013: A complex equality condition including columns of the same
     table is recognized as a join condition. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
index f21e3d7..29d7a04 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/Datum.java
@@ -91,6 +91,10 @@ public abstract class Datum implements Comparable<Datum>, GsonObject {
     throw new InvalidCastException(type, Type.TEXT);
   }
 
+  public char [] asUnicodeChars() {
+    throw new InvalidCastException(type, Type.TEXT);
+  }
+
   public byte[] asTextBytes() {
     return toString().getBytes();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
index b642168..ca76ed2 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/TextDatum.java
@@ -20,6 +20,7 @@ package org.apache.tajo.datum;
 
 import com.google.common.primitives.UnsignedBytes;
 import com.google.gson.annotations.Expose;
+import com.sun.tools.javac.util.Convert;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.exception.InvalidCastException;
 import org.apache.tajo.exception.InvalidOperationException;
@@ -35,6 +36,7 @@ public class TextDatum extends Datum {
   /* encoded in UTF-8 */
   @Expose private final byte[] bytes;
 
+  public static final int UNICODE_CHAR_BITS_NUM = Character.MAX_VALUE; // bits number for 2 bytes
   public static final TextDatum EMPTY_TEXT = new TextDatum("");
   public static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
 
@@ -88,11 +90,17 @@ public class TextDatum extends Datum {
     return this.bytes;
   }
 
+  @Override
   public String asChars() {
     return new String(this.bytes, defaultCharset);
   }
 
   @Override
+  public char[] asUnicodeChars() {
+    return Convert.utf2chars(this.bytes);
+  }
+
+  @Override
   public byte[] asTextBytes() {
     return this.bytes;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java
index 90391a8..96118ac 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/StringUtils.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.SignalLogger;
 
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
 import java.util.Arrays;
 import java.util.BitSet;
 
@@ -71,6 +73,12 @@ public class StringUtils {
     return buf.toString();
   }
 
+  static CharsetEncoder asciiEncoder = Charset.forName("US-ASCII").newEncoder(); // or "ISO-8859-1" for ISO Latin 1
+
+  public static boolean isPureAscii(String v) {
+    return asciiEncoder.canEncode(v);
+  }
+
   public static String quote(String str) {
     return "'" + str + "'";
   }
@@ -276,4 +284,30 @@ public class StringUtils {
     }
     return sb.toString();
   }
+
+  public static char[][] padChars(char []...bytes) {
+    char[] startChars = bytes[0];
+    char[] endChars = bytes[1];
+
+    char[][] padded = new char[2][];
+    int max = Math.max(startChars.length, endChars.length);
+
+    padded[0] = new char[max];
+    padded[1] = new char[max];
+
+    for (int i = 0; i < startChars.length; i++) {
+      padded[0][i] = startChars[i];
+    }
+    for (int i = startChars.length; i < max; i++) {
+      padded[0][i] = 0;
+    }
+    for (int i = 0; i < endChars.length; i++) {
+      padded[1][i] = endChars[i];
+    }
+    for (int i = endChars.length; i < max; i++) {
+      padded[1][i] = 0;
+    }
+
+    return padded;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
index db53cd7..38aa928 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -24,13 +24,15 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.BytesUtils;
+import org.apache.tajo.util.StringUtils;
 
 import java.math.BigInteger;
 
 public abstract class RangePartitionAlgorithm {
   protected SortSpec [] sortSpecs;
   protected TupleRange mergedRange;
-  protected final BigInteger totalCard;
+  protected final BigInteger totalCard; // total cardinality
   /** true if the end of the range is inclusive. Otherwise, it should be false. */
   protected final boolean inclusive;
 
@@ -42,7 +44,11 @@ public abstract class RangePartitionAlgorithm {
    */
   public RangePartitionAlgorithm(SortSpec [] sortSpecs, TupleRange totalRange, boolean inclusive) {
     this.sortSpecs = sortSpecs;
-    this.mergedRange = totalRange;
+    try {
+      this.mergedRange = totalRange.clone();
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
     this.inclusive = inclusive;
     this.totalCard = computeCardinalityForAllColumns(sortSpecs, totalRange, inclusive);
   }
@@ -115,21 +121,46 @@ public abstract class RangePartitionAlgorithm {
         }
         break;
       case TEXT: {
-        byte [] a;
-        byte [] b;
-        if (isAscending) {
-          a = start.asByteArray();
-          b = end.asByteArray();
+        boolean isPureAscii = StringUtils.isPureAscii(start.asChars()) && StringUtils.isPureAscii(end.asChars());
+
+        if (isPureAscii) {
+          byte[] a;
+          byte[] b;
+          if (isAscending) {
+            a = start.asByteArray();
+            b = end.asByteArray();
+          } else {
+            b = start.asByteArray();
+            a = end.asByteArray();
+          }
+
+          byte [][] padded = BytesUtils.padBytes(a, b);
+          a = padded[0];
+          b = padded[1];
+
+          byte[] prependHeader = {1, 0};
+          final BigInteger startBI = new BigInteger(Bytes.add(prependHeader, a));
+          final BigInteger stopBI = new BigInteger(Bytes.add(prependHeader, b));
+          BigInteger diffBI = stopBI.subtract(startBI);
+          columnCard = diffBI;
         } else {
-          b = start.asByteArray();
-          a = end.asByteArray();
-        }
+          char [] a;
+          char [] b;
+
+          if (isAscending) {
+            a = start.asUnicodeChars();
+            b = end.asUnicodeChars();
+          } else {
+            b = start.asUnicodeChars();
+            a = end.asUnicodeChars();
+          }
 
-        byte [] prependHeader = {1, 0};
-        final BigInteger startBI = new BigInteger(Bytes.add(prependHeader, a));
-        final BigInteger stopBI = new BigInteger(Bytes.add(prependHeader, b));
-        BigInteger diffBI = stopBI.subtract(startBI);
-        columnCard = diffBI;
+          BigInteger startBI = UniformRangePartition.charsToBigInteger(a);
+          BigInteger stopBI = UniformRangePartition.charsToBigInteger(b);
+
+          BigInteger diffBI = stopBI.subtract(startBI);
+          columnCard = diffBI;
+        }
         break;
       }
       case DATE:

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
index 0a1389a..db12285 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -21,44 +21,72 @@ package org.apache.tajo.engine.planner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.primitives.UnsignedLong;
+import com.sun.tools.javac.util.Convert;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.engine.exception.RangeOverflowException;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.util.Bytes;
 import org.apache.tajo.util.BytesUtils;
+import org.apache.tajo.util.StringUtils;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.math.RoundingMode;
 import java.util.List;
 
-
+/**
+ * It serializes multiple sort key spaces into one dimension space by regarding key spaces as
+ * arbitrary base number systems respectively.
+ */
 public class UniformRangePartition extends RangePartitionAlgorithm {
+  private final TupleRange originalRange;
   private int variableId;
   private BigInteger[] cardForEachDigit;
   private BigInteger[] colCards;
+  private boolean [] isPureAscii; // flags to indicate if i'th key contains pure ascii characters.
+  private boolean [] beginNulls; // flags to indicate if i'th begin value is null.
+  private boolean [] endNulls; // flags to indicate if i'th begin value is null.
 
   /**
    *
-   * @param totalRange
+   * @param entireRange
    * @param sortSpecs The description of sort keys
    * @param inclusive true if the end of the range is inclusive
    */
-  public UniformRangePartition(final TupleRange totalRange, final SortSpec[] sortSpecs, boolean inclusive) {
-    super(sortSpecs, totalRange, inclusive);
-    colCards = new BigInteger[sortSpecs.length];
+  public UniformRangePartition(final TupleRange entireRange, final SortSpec[] sortSpecs, boolean inclusive) {
+    super(sortSpecs, entireRange, inclusive);
+
+    this.originalRange = entireRange;
+    beginNulls = new boolean[sortSpecs.length];
+    endNulls = new boolean[sortSpecs.length];
 
+    // filling pure ascii flags
+    isPureAscii = new boolean[sortSpecs.length];
+    for (int i = 0; i < sortSpecs.length; i++) {
+      Datum startValue = entireRange.getStart().get(i);
+      Datum endValue = entireRange.getEnd().get(i);
+      isPureAscii[i] = StringUtils.isPureAscii(startValue.asChars()) && StringUtils.isPureAscii(endValue.asChars());
+      beginNulls[i] = startValue.isNull();
+      endNulls[i] = endValue.isNull();
+    }
+
+    colCards = new BigInteger[sortSpecs.length];
     normalize(sortSpecs, this.mergedRange);
 
     for (int i = 0; i < sortSpecs.length; i++) {
-      colCards[i] = computeCardinality(sortSpecs[i].getSortKey().getDataType(), totalRange.getStart().get(i),
-          totalRange.getEnd().get(i), inclusive, sortSpecs[i].isAscending());
+      Datum startValue = entireRange.getStart().get(i);
+      Datum endValue = entireRange.getEnd().get(i);
+
+      colCards[i] =  computeCardinality(sortSpecs[i].getSortKey().getDataType(), startValue, endValue,
+          inclusive, sortSpecs[i].isAscending());
     }
 
     cardForEachDigit = new BigInteger[colCards.length];
@@ -107,6 +135,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     BigInteger reminder = reverseCardsForDigit[0];
     Tuple last = mergedRange.getStart();
     TupleRange tupleRange;
+
     while(reminder.compareTo(BigInteger.ZERO) > 0) {
       if (reminder.compareTo(term) <= 0) { // final one is inclusive
         tupleRange = new TupleRange(sortSpecs, last, mergedRange.getEnd());
@@ -120,8 +149,17 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
       reminder = reminder.subtract(term);
     }
 
-    for (TupleRange r : ranges) {
-      denormalize(sortSpecs, r);
+    // Recovering the transformed same bytes tuples into the original start and end keys
+    ranges.get(0).setStart(originalRange.getStart());
+    ranges.get(ranges.size() - 1).setEnd(originalRange.getEnd());
+
+    // Ensure all keys are totally ordered in a right order.
+    for (int i = 0; i < ranges.size(); i++) {
+      if (i > 1) {
+        Preconditions.checkState(ranges.get(i - 2).compareTo(ranges.get(i - 1)) < 0,
+            "Computed ranges are not totally ordered. Previous key=" + ranges.get(i - 2) + ", Current Key="
+                + ranges.get(i - 1));
+      }
     }
 
     return ranges.toArray(new TupleRange[ranges.size()]);
@@ -133,43 +171,48 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
    * @param sortSpecs The sort specs
    * @param range Tuple range to be normalize
    */
-  public static void normalize(final SortSpec [] sortSpecs, TupleRange range) {
+  public void normalize(final SortSpec [] sortSpecs, TupleRange range) {
     // normalize text fields to have same bytes length
     for (int i = 0; i < sortSpecs.length; i++) {
       if (sortSpecs[i].getSortKey().getDataType().getType() == TajoDataTypes.Type.TEXT) {
-        byte [] startBytes;
-        byte [] endBytes;
-        if (range.getStart().isNull(i)) {
-          startBytes = BigInteger.ZERO.toByteArray();
-        } else {
-          startBytes = range.getStart().getBytes(i);
-        }
+        if (isPureAscii[i]) {
+          byte[] startBytes;
+          byte[] endBytes;
+          if (range.getStart().isNull(i)) {
+            startBytes = BigInteger.ZERO.toByteArray();
+          } else {
+            startBytes = range.getStart().getBytes(i);
+          }
+
+          if (range.getEnd().isNull(i)) {
+            endBytes = BigInteger.ZERO.toByteArray();
+          } else {
+            endBytes = range.getEnd().getBytes(i);
+          }
+
+          byte[][] padded = BytesUtils.padBytes(startBytes, endBytes);
+          range.getStart().put(i, DatumFactory.createText(padded[0]));
+          range.getEnd().put(i, DatumFactory.createText(padded[1]));
 
-        if (range.getEnd().isNull(i)) {
-          endBytes = BigInteger.ZERO.toByteArray();
         } else {
-          endBytes = range.getEnd().getBytes(i);
-        }
+          char[] startChars;
+          char[] endChars;
+          if (range.getStart().isNull(i)) {
+            startChars = new char[] {0};
+          } else {
+            startChars = range.getStart().getUnicodeChars(i);
+          }
 
-        byte [][] padded = BytesUtils.padBytes(startBytes, endBytes);
-        range.getStart().put(i, DatumFactory.createText(padded[0]));
-        range.getEnd().put(i, DatumFactory.createText(padded[1]));
-      }
-    }
-  }
+          if (range.getEnd().isNull(i)) {
+            endChars = new char[] {0};
+          } else {
+            endChars = range.getEnd().getUnicodeChars(i);
+          }
 
-  /**
-   * Normalized keys have padding values, but it will cause the key mismatch in pull server.
-   * So, it denormalize the normalized keys again.
-   *
-   * @param sortSpecs The sort specs
-   * @param range Tuple range to be denormalized
-   */
-  public static void denormalize(SortSpec [] sortSpecs, TupleRange range) {
-    for (int i = 0; i < sortSpecs.length; i++) {
-      if (sortSpecs[i].getSortKey().getDataType().getType() == TajoDataTypes.Type.TEXT) {
-        range.getStart().put(i,DatumFactory.createText(BytesUtils.trimBytes(range.getStart().getBytes(i))));
-        range.getEnd().put(i,DatumFactory.createText(BytesUtils.trimBytes(range.getEnd().getBytes(i))));
+          char[][] padded = StringUtils.padChars(startChars, endChars);
+          range.getStart().put(i, DatumFactory.createText(new String(padded[0])));
+          range.getEnd().put(i, DatumFactory.createText(new String(padded[1])));
+        }
       }
     }
   }
@@ -258,17 +301,35 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
 
       }
       case TEXT: {
-        byte [] lastBytes = last.asByteArray();
-        byte [] endBytes = mergedRange.getEnd().getBytes(colId);
+        if (isPureAscii[colId]) {
+          byte[] lastBytes = last.asByteArray();
+          byte[] endBytes = mergedRange.getEnd().getBytes(colId);
 
-        Preconditions.checkState(lastBytes.length == endBytes.length);
+          Preconditions.checkState(lastBytes.length == endBytes.length);
 
-        if (sortSpecs[colId].isAscending()) {
-          candidate = incDecimal.add(new BigDecimal(new BigInteger(lastBytes)));
-          return new BigDecimal(new BigInteger(endBytes)).compareTo(candidate) < 0;
+          if (sortSpecs[colId].isAscending()) {
+            candidate = incDecimal.add(new BigDecimal(new BigInteger(lastBytes)));
+            return new BigDecimal(new BigInteger(endBytes)).compareTo(candidate) < 0;
+          } else {
+            candidate = new BigDecimal(new BigInteger(lastBytes)).subtract(incDecimal);
+            return candidate.compareTo(new BigDecimal(new BigInteger(endBytes))) < 0;
+          }
         } else {
-          candidate = new BigDecimal(new BigInteger(lastBytes)).subtract(incDecimal);
-          return candidate.compareTo(new BigDecimal(new BigInteger(endBytes))) < 0;
+          char[] lastChars = last.asUnicodeChars();
+          char[] endChars = mergedRange.getEnd().getUnicodeChars(colId);
+
+          Preconditions.checkState(lastChars.length == endChars.length);
+
+          BigInteger lastBi = charsToBigInteger(lastChars);
+          BigInteger endBi = charsToBigInteger(endChars);
+
+          if (sortSpecs[colId].isAscending()) {
+            candidate = incDecimal.add(new BigDecimal(lastBi));
+            return new BigDecimal(endBi).compareTo(candidate) < 0;
+          } else {
+            candidate = new BigDecimal(lastBi).subtract(incDecimal);
+            return candidate.compareTo(new BigDecimal(endBi)) < 0;
+          }
         }
       }
       case INET4: {
@@ -391,6 +452,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         if (i == 0) {
           throw new RangeOverflowException(mergedRange, last, incs[i].longValue(), sortSpecs[i].isAscending());
         }
+        // increment some volume of the serialized one-dimension key space
         long rem = incrementAndGetReminder(i, last.get(i), value.longValue());
         incs[i] = BigInteger.valueOf(rem);
         incs[i - 1] = incs[i-1].add(BigInteger.ONE);
@@ -418,7 +480,11 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createChar((char) (mergedRange.getStart().get(i).asChar() + incs[i].longValue())));
           } else {
-            end.put(i, DatumFactory.createChar((char) (last.get(i).asChar() + incs[i].longValue())));
+            if (sortSpecs[i].isAscending()) {
+              end.put(i, DatumFactory.createChar((char) (last.get(i).asChar() + incs[i].longValue())));
+            } else {
+              end.put(i, DatumFactory.createChar((char) (last.get(i).asChar() - incs[i].longValue())));
+            }
           }
           break;
         case BIT:
@@ -426,7 +492,11 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
             end.put(i, DatumFactory.createBit(
                 (byte) (mergedRange.getStart().get(i).asByte() + incs[i].longValue())));
           } else {
-            end.put(i, DatumFactory.createBit((byte) (last.get(i).asByte() + incs[i].longValue())));
+            if (sortSpecs[i].isAscending()) {
+              end.put(i, DatumFactory.createBit((byte) (last.get(i).asByte() + incs[i].longValue())));
+            } else {
+              end.put(i, DatumFactory.createBit((byte) (last.get(i).asByte() - incs[i].longValue())));
+            }
           }
           break;
         case INT2:
@@ -434,7 +504,11 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
             end.put(i, DatumFactory.createInt2(
                 (short) (mergedRange.getStart().get(i).asInt2() + incs[i].longValue())));
           } else {
-            end.put(i, DatumFactory.createInt2((short) (last.get(i).asInt2() + incs[i].longValue())));
+            if (sortSpecs[i].isAscending()) {
+              end.put(i, DatumFactory.createInt2((short) (last.get(i).asInt2() + incs[i].longValue())));
+            } else {
+              end.put(i, DatumFactory.createInt2((short) (last.get(i).asInt2() - incs[i].longValue())));
+            }
           }
           break;
         case INT4:
@@ -454,7 +528,11 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
             end.put(i, DatumFactory.createInt8(
                 mergedRange.getStart().get(i).asInt8() + incs[i].longValue()));
           } else {
-            end.put(i, DatumFactory.createInt8(last.get(i).asInt8() + incs[i].longValue()));
+            if (sortSpecs[i].isAscending()) {
+              end.put(i, DatumFactory.createInt8(last.get(i).asInt8() + incs[i].longValue()));
+            } else {
+              end.put(i, DatumFactory.createInt8(last.get(i).asInt8() - incs[i].longValue()));
+            }
           }
           break;
         case FLOAT4:
@@ -462,7 +540,11 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
             end.put(i, DatumFactory.createFloat4(
                 mergedRange.getStart().get(i).asFloat4() + incs[i].longValue()));
           } else {
-            end.put(i, DatumFactory.createFloat4(last.get(i).asFloat4() + incs[i].longValue()));
+            if (sortSpecs[i].isAscending()) {
+              end.put(i, DatumFactory.createFloat4(last.get(i).asFloat4() + incs[i].longValue()));
+            } else {
+              end.put(i, DatumFactory.createFloat4(last.get(i).asFloat4() - incs[i].longValue()));
+            }
           }
           break;
         case FLOAT8:
@@ -470,7 +552,11 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
             end.put(i, DatumFactory.createFloat8(
                 mergedRange.getStart().get(i).asFloat8() + incs[i].longValue()));
           } else {
-            end.put(i, DatumFactory.createFloat8(last.get(i).asFloat8() + incs[i].longValue()));
+            if (sortSpecs[i].isAscending()) {
+              end.put(i, DatumFactory.createFloat8(last.get(i).asFloat8() + incs[i].longValue()));
+            } else {
+              end.put(i, DatumFactory.createFloat8(last.get(i).asFloat8() - incs[i].longValue()));
+            }
           }
           break;
         case TEXT:
@@ -481,24 +567,91 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
             BigInteger lastBigInt;
             if (last.isNull(i)) {
               lastBigInt = BigInteger.valueOf(0);
+              end.put(i, DatumFactory.createText(lastBigInt.add(incs[i]).toByteArray()));
             } else {
-              lastBigInt = UnsignedLong.valueOf(new BigInteger(last.get(i).asByteArray())).bigIntegerValue();
+
+              if (isPureAscii[i]) {
+                lastBigInt = UnsignedLong.valueOf(new BigInteger(last.get(i).asByteArray())).bigIntegerValue();
+                if (sortSpecs[i].isAscending()) {
+                  end.put(i, DatumFactory.createText(lastBigInt.add(incs[i]).toByteArray()));
+                } else {
+                  end.put(i, DatumFactory.createText(lastBigInt.subtract(incs[i]).toByteArray()));
+                }
+              } else {
+
+                // We consider an array of chars as a 2^16 base number system because each char is 2^16 bits.
+                // See Character.MAX_NUMBER. Then, we increase some number to the last array of chars.
+
+                char[] lastChars = last.getUnicodeChars(i);
+                int [] charIncs = new int[lastChars.length];
+
+                BigInteger remain = incs[i];
+                for (int k = lastChars.length - 1; k > 0 && remain.compareTo(BigInteger.ZERO) > 0; k--) {
+                  BigInteger digitBase = BigInteger.valueOf(TextDatum.UNICODE_CHAR_BITS_NUM).pow(k);
+
+                  if (remain.compareTo(digitBase) > 0) {
+                    charIncs[k] = remain.divide(digitBase).intValue();
+                    BigInteger sub = digitBase.multiply(BigInteger.valueOf(charIncs[k]));
+                    remain = remain.subtract(sub);
+                  }
+                }
+                charIncs[charIncs.length - 1] = remain.intValue();
+
+                for (int k = 0; k < lastChars.length; k++) {
+                  if (charIncs[k] == 0) {
+                    continue;
+                  }
+
+                  if (sortSpecs[i].isAscending()) {
+                    int sum = (int) lastChars[k] + charIncs[k];
+                    if (sum > TextDatum.UNICODE_CHAR_BITS_NUM) { // if carry occurs in the current digit
+                      charIncs[k] = sum - TextDatum.UNICODE_CHAR_BITS_NUM;
+                      charIncs[k - 1] += 1;
+
+                      lastChars[k - 1] += 1;
+                      lastChars[k] += charIncs[k];
+                    } else {
+                      lastChars[k] += charIncs[k];
+                    }
+                  } else {
+                    int sum = (int) lastChars[k] - charIncs[k];
+                    if (sum < 0) { // if carry occurs in the current digit
+                      charIncs[k] = TextDatum.UNICODE_CHAR_BITS_NUM - sum;
+                      charIncs[k - 1] -= 1;
+
+                      lastChars[k - 1] -= 1;
+                      lastChars[k] += charIncs[k];
+                    } else {
+                      lastChars[k] -= charIncs[k];
+                    }
+                  }
+                }
+
+                end.put(i, DatumFactory.createText(Convert.chars2utf(lastChars)));
+              }
             }
-            end.put(i, DatumFactory.createText(lastBigInt.add(incs[i]).toByteArray()));
           }
           break;
         case DATE:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createDate((int) (mergedRange.getStart().get(i).asInt4() + incs[i].longValue())));
           } else {
-            end.put(i, DatumFactory.createDate((int) (last.get(i).asInt4() + incs[i].longValue())));
+            if (sortSpecs[i].isAscending()) {
+              end.put(i, DatumFactory.createDate((int) (last.get(i).asInt4() + incs[i].longValue())));
+            } else {
+              end.put(i, DatumFactory.createDate((int) (last.get(i).asInt4() - incs[i].longValue())));
+            }
           }
           break;
         case TIME:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createTime(mergedRange.getStart().get(i).asInt8() + incs[i].longValue()));
           } else {
-            end.put(i, DatumFactory.createTime(last.get(i).asInt8() + incs[i].longValue()));
+            if (sortSpecs[i].isAscending()) {
+              end.put(i, DatumFactory.createTime(last.get(i).asInt8() + incs[i].longValue()));
+            } else {
+              end.put(i, DatumFactory.createTime(last.get(i).asInt8() - incs[i].longValue()));
+            }
           }
           break;
         case TIMESTAMP:
@@ -506,7 +659,11 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
             end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(
                 mergedRange.getStart().get(i).asInt8() + incs[i].longValue()));
           } else {
-            end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.get(i).asInt8() + incs[i].longValue()));
+            if (sortSpecs[i].isAscending()) {
+              end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.get(i).asInt8() + incs[i].longValue()));
+            } else {
+              end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.get(i).asInt8() - incs[i].longValue()));
+            }
           }
           break;
         case INET4:
@@ -516,17 +673,45 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
             assert ipBytes.length == 4;
             end.put(i, DatumFactory.createInet4(ipBytes));
           } else {
-            int lastVal = last.get(i).asInt4() + incs[i].intValue();
-            ipBytes = new byte[4];
-            Bytes.putInt(ipBytes, 0, lastVal);
-            end.put(i, DatumFactory.createInet4(ipBytes));
+            if (sortSpecs[i].isAscending()) {
+              int lastVal = last.get(i).asInt4() + incs[i].intValue();
+              ipBytes = new byte[4];
+              Bytes.putInt(ipBytes, 0, lastVal);
+              end.put(i, DatumFactory.createInet4(ipBytes));
+            } else {
+              int lastVal = last.get(i).asInt4() - incs[i].intValue();
+              ipBytes = new byte[4];
+              Bytes.putInt(ipBytes, 0, lastVal);
+              end.put(i, DatumFactory.createInet4(ipBytes));
+            }
           }
           break;
         default:
           throw new UnsupportedOperationException(column.getDataType() + " is not supported yet");
       }
+
+      // replace i'th end value by NULL if begin and end are all NULL
+      if (beginNulls[i] && endNulls[i]) {
+        end.put(i, NullDatum.get());
+        continue;
+      }
     }
 
     return end;
   }
+
+  public static BigInteger charsToBigInteger(char [] chars) {
+    BigInteger digitBase;
+    BigInteger sum = BigInteger.ZERO;
+    for (int i = chars.length - 1; i >= 0; i--) {
+      BigInteger charVal = BigInteger.valueOf(chars[(chars.length - 1) - i]);
+      if (i > 0) {
+        digitBase = charVal.multiply(BigInteger.valueOf(TextDatum.UNICODE_CHAR_BITS_NUM).pow(i));
+        sum = sum.add(digitBase);
+      } else {
+        sum = sum.add(charVal);
+      }
+    }
+    return sum;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 51116bd..40c5406 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -58,7 +58,6 @@ import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.Pair;
 import org.apache.tajo.worker.FetchImpl;
 
 import java.io.IOException;
@@ -675,7 +674,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
                               allocateContainers(subQuery);
 
                             }
-                          } catch (Exception e) {
+                          } catch (Throwable e) {
                             LOG.error("SubQuery (" + subQuery.getId() + ") ERROR: ", e);
                             subQuery.setFinishTime();
                             subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(), e.getMessage()));
@@ -686,7 +685,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
               );
           state = SubQueryState.INITED;
         }
-      } catch (Exception e) {
+      } catch (Throwable e) {
         LOG.error("SubQuery (" + subQuery.getId() + ") ERROR: ", e);
         subQuery.setFinishTime();
         subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(), e.getMessage()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
index 58653d1..2294424 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
@@ -23,6 +23,7 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
 import org.junit.Test;
@@ -33,11 +34,317 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class TestUniformRangePartition {
+
+  @Test
+  public void testPartitionForINT2Asc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.INT2);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createInt2((short) 1));
+    e.put(0, DatumFactory.createInt2((short) 30000));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForINT2Desc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.INT2);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createInt2((short) 30000));
+    e.put(0, DatumFactory.createInt2((short) 1));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForINT4Asc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.INT4);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createInt4(1));
+    e.put(0, DatumFactory.createInt4(10000));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForINT4Desc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.INT4);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createInt4(10000));
+    e.put(0, DatumFactory.createInt4(1));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForINT8Asc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.INT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createInt8(1));
+    e.put(0, DatumFactory.createInt8(10000));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForInt8Desc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.INT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createInt8(10000));
+    e.put(0, DatumFactory.createInt8(1));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForFloat4Asc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.FLOAT4);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createFloat4((float) 1.0));
+    e.put(0, DatumFactory.createFloat4((float) 10000.0));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForFloat4Desc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.FLOAT4);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createFloat4((float) 10000.0));
+    e.put(0, DatumFactory.createFloat4((float) 1.0));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForFloat8Asc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.FLOAT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createFloat8(1.0));
+    e.put(0, DatumFactory.createFloat8(10000.0));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForFloat8Desc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.FLOAT8);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createFloat8((float) 10000.0));
+    e.put(0, DatumFactory.createFloat8((float) 1.0));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
   /**
-   * It verify overflow and increment.
+   * It verify overflow and increment in normal case.
    */
   @Test
-  public void testIncrement1() {
+  public void testIncrementOfText() {
     Schema schema = new Schema()
         .addColumn("l_returnflag", Type.TEXT)
         .addColumn("l_linestatus", Type.TEXT);
@@ -84,7 +391,7 @@ public class TestUniformRangePartition {
    * It verify overflow with the number that exceeds the last digit.
    */
   @Test
-  public void testIncrement2() {
+  public void testIncrementOfText2() {
     Schema schema = new Schema()
         .addColumn("l_returnflag", Type.TEXT)
         .addColumn("l_linestatus", Type.TEXT);
@@ -129,7 +436,7 @@ public class TestUniformRangePartition {
    * It verify the case where two or more digits are overflow.
    */
   @Test
-  public void testIncrement3() {
+  public void testIncrementOfText3() {
     Schema schema = new Schema()
         .addColumn("l_returnflag", Type.TEXT)
         .addColumn("l_linestatus", Type.TEXT)
@@ -162,7 +469,278 @@ public class TestUniformRangePartition {
   }
 
   @Test
-  public void testIncrement4() {
+  public void testIncrementOfUnicode() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("가가가"));
+    Tuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("하하하"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    TupleComparator comp = new TupleComparator(schema, sortSpecs);
+
+    Tuple tuple = s;
+    Tuple prevTuple = null;
+    for (int i = 0; i < 100; i++) {
+      tuple = partitioner.increment(tuple, BigInteger.valueOf(30000), 0);
+      if (prevTuple != null) {
+        assertTrue("prev=" + prevTuple + ", current=" + tuple, comp.compare(prevTuple, tuple) < 0);
+      }
+      prevTuple = tuple;
+    }
+  }
+
+  @Test
+  public void testIncrementOfUnicodeOneCharSinglePartition() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("가"));
+    Tuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("다"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 1;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testIncrementOfUnicodeOneCharMultiPartition() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("가"));
+    Tuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("꽥"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 8;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForUnicodeTextAsc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createText("가가가"));
+    e.put(0, DatumFactory.createText("하하하"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForUnicodeDiffLenBeginTextAsc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createText("가"));
+    e.put(0, DatumFactory.createText("하하하"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForUnicodeDiffLenEndTextAsc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createText("가가가"));
+    e.put(0, DatumFactory.createText("하"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForUnicodeTextDesc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createText("하하하"));
+    e.put(0, DatumFactory.createText("가가가"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForUnicodeDiffLenBeginTextDesc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createText("하"));
+    e.put(0, DatumFactory.createText("가가가"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForUnicodeDiffLenEndTextDesc() {
+    Schema schema = new Schema()
+        .addColumn("col1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    Tuple s = new VTuple(1);
+    Tuple e = new VTuple(1);
+    s.put(0, DatumFactory.createText("하"));
+    e.put(0, DatumFactory.createText("가가가"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+
+    UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
+    int partNum = 64;
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+      prev = r;
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testIncrementOfInt8() {
     Schema schema = new Schema()
         .addColumn("l_orderkey", Type.INT8)
         .addColumn("l_linenumber", Type.INT8);
@@ -189,7 +767,7 @@ public class TestUniformRangePartition {
     assertEquals(39, range3.get(1).asInt4());
   }
 
-  @Test public void testIncrement5() {
+  @Test public void testIncrementOfInt8AndFinal() {
     Schema schema = new Schema()
         .addColumn("l_orderkey", Type.INT8)
         .addColumn("l_linenumber", Type.INT8)
@@ -222,7 +800,7 @@ public class TestUniformRangePartition {
   }
 
   @Test
-  public void testIncrement6() {
+  public void testIncrementOfFloat8() {
     Schema schema = new Schema()
         .addColumn("l_orderkey", Type.FLOAT8)
         .addColumn("l_linenumber", Type.FLOAT8)
@@ -255,7 +833,7 @@ public class TestUniformRangePartition {
   }
 
   @Test
-  public void testIncrement7() {
+  public void testIncrementOfInet4() {
     Schema schema = new Schema()
         .addColumn("l_orderkey", Type.INET4)
         .addColumn("l_linenumber", Type.INET4)
@@ -309,11 +887,10 @@ public class TestUniformRangePartition {
 
     TupleRange prev = null;
     for (TupleRange r : ranges) {
-      if (prev == null) {
-        prev = r;
-      } else {
+      if (prev != null) {
         assertTrue(prev.compareTo(r) < 0);
       }
+      prev = r;
     }
   }
 
@@ -379,12 +956,11 @@ public class TestUniformRangePartition {
     TupleRange [] ranges = partitioner.partition(48);
 
     TupleRange prev = null;
-    for (TupleRange r : ranges) {
-      if (prev == null) {
-        prev = r;
-      } else {
-        assertTrue(prev.compareTo(r) < 0);
+    for (int i = 0; i < ranges.length; i++) {
+      if (prev != null) {
+        assertTrue(i + "th, prev=" + prev + ",cur=" + ranges[i], prev.compareTo(ranges[i]) < 0);
       }
+      prev = ranges[i];
     }
     assertEquals(48, ranges.length);
     assertTrue(ranges[0].getStart().equals(s));
@@ -412,11 +988,10 @@ public class TestUniformRangePartition {
 
     TupleRange prev = null;
     for (TupleRange r : ranges) {
-      if (prev == null) {
-        prev = r;
-      } else {
+      if (prev != null) {
         assertTrue(prev.compareTo(r) < 0);
       }
+      prev = r;
     }
     assertEquals(partNum, ranges.length);
     assertTrue(ranges[0].getStart().equals(s));
@@ -445,11 +1020,10 @@ public class TestUniformRangePartition {
 
     TupleRange prev = null;
     for (TupleRange r : ranges) {
-      if (prev == null) {
-        prev = r;
-      } else {
-        assertTrue(prev.compareTo(r) > 0);
+      if (prev != null) {
+        assertTrue(prev.compareTo(r) < 0);
       }
+      prev = r;
     }
     assertEquals(partNum, ranges.length);
     assertTrue(ranges[0].getStart().equals(s));
@@ -477,11 +1051,10 @@ public class TestUniformRangePartition {
 
     TupleRange prev = null;
     for (TupleRange r : ranges) {
-      if (prev == null) {
-        prev = r;
-      } else {
+      if (prev != null) {
         assertTrue(prev.compareTo(r) < 0);
       }
+      prev = r;
     }
     assertEquals(partNum, ranges.length);
     assertTrue(ranges[0].getStart().equals(s));
@@ -532,11 +1105,10 @@ public class TestUniformRangePartition {
 
     TupleRange prev = null;
     for (TupleRange r : ranges) {
-      if (prev == null) {
-        prev = r;
-      } else {
+      if (prev != null) {
         assertTrue(prev.compareTo(r) < 0);
       }
+      prev = r;
     }
   }
 
@@ -561,11 +1133,10 @@ public class TestUniformRangePartition {
 
     TupleRange prev = null;
     for (TupleRange r : ranges) {
-      if (prev == null) {
-        prev = r;
-      } else {
+      if (prev != null) {
         assertTrue(prev.compareTo(r) < 0);
       }
+      prev = r;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
index 7d2c5d2..df02708 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
@@ -34,8 +34,6 @@ import org.junit.experimental.categories.Category;
 import java.sql.ResultSet;
 import java.util.TimeZone;
 
-import static org.junit.Assert.assertEquals;
-
 @Category(IntegrationTest.class)
 public class TestSortQuery extends QueryTestCaseBase {
 
@@ -179,7 +177,7 @@ public class TestSortQuery extends QueryTestCaseBase {
   }
 
   @Test
-  public final void testSortNullColumn() throws Exception {
+  public final void testSortOnNullColumn() throws Exception {
     try {
       testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "2");
       KeyValueSet tableOptions = new KeyValueSet();
@@ -197,25 +195,68 @@ public class TestSortQuery extends QueryTestCaseBase {
       };
       TajoTestingCluster.createTable("nullsort", schema, tableOptions, data, 2);
 
-      ResultSet res = executeString(
-          "select * from (" +
-              "select case when id > 2 then null else id end as col1, name as col2 from nullsort) a " +
-          "order by col1, col2"
-      );
+      ResultSet res = executeQuery();
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "0");
+      executeString("DROP TABLE nullsort PURGE;").close();
+    }
+  }
 
-      String expected = "col1,col2\n" +
-          "-------------------------------\n" +
-          "1,BRAZIL\n" +
-          "2,ALGERIA\n" +
-          "null,ARGENTINA\n" +
-          "null,CANADA\n";
+  @Test
+  public final void testSortOnUnicodeTextAsc() throws Exception {
+    try {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "2");
+      KeyValueSet tableOptions = new KeyValueSet();
+      tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+      tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
 
-      assertEquals(expected, resultSetToString(res));
+      Schema schema = new Schema();
+      schema.addColumn("col1", Type.INT4);
+      schema.addColumn("col2", Type.TEXT);
+      String[] data = new String[]{
+          "1|하하하",
+          "2|캬캬캬",
+          "3|가가가",
+          "4|냐하하"
+      };
+      TajoTestingCluster.createTable("unicode_sort1", schema, tableOptions, data, 2);
 
+      ResultSet res = executeQuery();
+      assertResultSet(res);
       cleanupQuery(res);
     } finally {
       testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "0");
-      executeString("DROP TABLE nullsort PURGE;").close();
+      executeString("DROP TABLE unicode_sort1 PURGE;").close();
+    }
+  }
+
+  @Test
+  public final void testSortOnUnicodeTextDesc() throws Exception {
+    try {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "2");
+      KeyValueSet tableOptions = new KeyValueSet();
+      tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+      tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+      Schema schema = new Schema();
+      schema.addColumn("col1", Type.INT4);
+      schema.addColumn("col2", Type.TEXT);
+      String[] data = new String[]{
+          "1|하하하",
+          "2|캬캬캬",
+          "3|가가가",
+          "4|냐하하"
+      };
+      TajoTestingCluster.createTable("unicode_sort2", schema, tableOptions, data, 2);
+
+      ResultSet res = executeQuery();
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "0");
+      executeString("DROP TABLE unicode_sort2 PURGE;").close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-core/src/test/resources/queries/TestSortQuery/testSortOnNullColumn.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSortQuery/testSortOnNullColumn.sql b/tajo-core/src/test/resources/queries/TestSortQuery/testSortOnNullColumn.sql
new file mode 100644
index 0000000..6707d27
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSortQuery/testSortOnNullColumn.sql
@@ -0,0 +1,13 @@
+select
+  *
+from (
+  select
+    case when id > 2 then null else id end as col1,
+    name as col2
+  from
+    nullsort
+) a
+
+order by
+  col1,
+  col2;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-core/src/test/resources/queries/TestSortQuery/testSortOnUnicodeTextAsc.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSortQuery/testSortOnUnicodeTextAsc.sql b/tajo-core/src/test/resources/queries/TestSortQuery/testSortOnUnicodeTextAsc.sql
new file mode 100644
index 0000000..6ed1847
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSortQuery/testSortOnUnicodeTextAsc.sql
@@ -0,0 +1,6 @@
+select
+  *
+from
+  unicode_sort1
+order by
+  col2 asc;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-core/src/test/resources/queries/TestSortQuery/testSortOnUnicodeTextDesc.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSortQuery/testSortOnUnicodeTextDesc.sql b/tajo-core/src/test/resources/queries/TestSortQuery/testSortOnUnicodeTextDesc.sql
new file mode 100644
index 0000000..efbb684
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSortQuery/testSortOnUnicodeTextDesc.sql
@@ -0,0 +1,6 @@
+select
+  *
+from
+  unicode_sort2
+order by
+  col2 desc;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-core/src/test/resources/results/TestSortQuery/testSortOnNullColumn.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestSortQuery/testSortOnNullColumn.result b/tajo-core/src/test/resources/results/TestSortQuery/testSortOnNullColumn.result
new file mode 100644
index 0000000..0ee6535
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestSortQuery/testSortOnNullColumn.result
@@ -0,0 +1,6 @@
+col1,col2
+-------------------------------
+1,BRAZIL
+2,ALGERIA
+null,ARGENTINA
+null,CANADA
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-core/src/test/resources/results/TestSortQuery/testSortOnUnicodeTextAsc.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestSortQuery/testSortOnUnicodeTextAsc.result b/tajo-core/src/test/resources/results/TestSortQuery/testSortOnUnicodeTextAsc.result
new file mode 100644
index 0000000..eca4f68
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestSortQuery/testSortOnUnicodeTextAsc.result
@@ -0,0 +1,6 @@
+col1,col2
+-------------------------------
+3,가가가
+4,냐하하
+2,캬캬캬
+1,하하하
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-core/src/test/resources/results/TestSortQuery/testSortOnUnicodeTextDesc.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestSortQuery/testSortOnUnicodeTextDesc.result b/tajo-core/src/test/resources/results/TestSortQuery/testSortOnUnicodeTextDesc.result
new file mode 100644
index 0000000..9f53136
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestSortQuery/testSortOnUnicodeTextDesc.result
@@ -0,0 +1,6 @@
+col1,col2
+-------------------------------
+1,하하하
+2,캬캬캬
+4,냐하하
+3,가가가
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
index a88a791..04338ca 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/MetaDataTuple.java
@@ -140,6 +140,11 @@ public class MetaDataTuple implements Tuple {
   }
 
   @Override
+  public char[] getUnicodeChars(int fieldId) {
+    return values.get(fieldId).asUnicodeChars();
+  }
+
+  @Override
   public Tuple clone() throws CloneNotSupportedException {
     throw new UnsupportedException("clone");
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
index 5e2f28c..1376a05 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
@@ -171,6 +171,11 @@ public class FrameTuple implements Tuple, Cloneable {
   }
 
   @Override
+  public char [] getUnicodeChars(int fieldId) {
+    return get(fieldId).asUnicodeChars();
+  }
+
+  @Override
   public Tuple clone() throws CloneNotSupportedException {
     FrameTuple frameTuple = (FrameTuple) super.clone();
     frameTuple.set(this.left.clone(), this.right.clone());

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
index 27d2691..d8dca0e 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -191,6 +191,11 @@ public class LazyTuple implements Tuple, Cloneable {
     return get(fieldId).asChars();
   }
 
+  @Override
+  public char[] getUnicodeChars(int fieldId) {
+    return get(fieldId).asUnicodeChars();
+  }
+
   public String toString() {
     boolean first = true;
     StringBuilder str = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
index a05dc71..5a173f7 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
@@ -64,6 +64,8 @@ public interface Tuple extends Cloneable {
 	
 	public String getText(int fieldId);
 
+  public char [] getUnicodeChars(int fieldId);
+
   public Tuple clone() throws CloneNotSupportedException;
 
   public Datum[] getValues();

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
index 3232056..6cc09d4 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
@@ -27,12 +27,12 @@ import java.util.Comparator;
 /**
  * It represents a pair of start and end tuples.
  */
-public class TupleRange implements Comparable<TupleRange> {
-  private final Tuple start;
-  private final Tuple end;
+public class TupleRange implements Comparable<TupleRange>, Cloneable {
+  private Tuple start;
+  private Tuple end;
   private final TupleComparator comp;
 
-  public TupleRange(final SortSpec [] sortSpecs, final Tuple start, final Tuple end) {
+  public TupleRange(final SortSpec[] sortSpecs, final Tuple start, final Tuple end) {
     this.comp = new TupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs);
     // if there is only one value, start == end
     this.start = start;
@@ -48,16 +48,24 @@ public class TupleRange implements Comparable<TupleRange> {
     return schema;
   }
 
+  public void setStart(Tuple tuple) {
+    this.start = tuple;
+  }
+
   public final Tuple getStart() {
     return this.start;
   }
 
+  public void setEnd(Tuple tuple) {
+    this.end = tuple;
+  }
+
   public final Tuple getEnd() {
     return this.end;
   }
 
   public String toString() {
-    return "[" + this.start + ", " + this.end+")";
+    return "[" + this.start + ", " + this.end + ")";
   }
 
   @Override
@@ -67,7 +75,7 @@ public class TupleRange implements Comparable<TupleRange> {
 
   @Override
   public boolean equals(Object obj) {
-    if (obj instanceof  TupleRange) {
+    if (obj instanceof TupleRange) {
       TupleRange other = (TupleRange) obj;
       return this.start.equals(other.start) && this.end.equals(other.end);
     } else {
@@ -94,4 +102,11 @@ public class TupleRange implements Comparable<TupleRange> {
       return right.compareTo(left);
     }
   }
-}
+
+  public TupleRange clone() throws CloneNotSupportedException {
+    TupleRange newRange = (TupleRange) super.clone();
+    newRange.setStart(start.clone());
+    newRange.setEnd(end.clone());
+    return newRange;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/aed97a8a/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
index 6d602f8..326fb6d 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
@@ -173,6 +173,11 @@ public class VTuple implements Tuple, Cloneable {
 	}
 
   @Override
+  public char[] getUnicodeChars(int fieldId) {
+    return values[fieldId].asUnicodeChars();
+  }
+
+  @Override
   public Tuple clone() throws CloneNotSupportedException {
     VTuple tuple = (VTuple) super.clone();
 


Mime
View raw message