tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [04/17] git commit: TAJO-966: Range partition should support split of multiple characters.
Date Mon, 11 Aug 2014 08:11:33 GMT
TAJO-966: Range partition should support split of multiple characters.

Closes #91


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6b16264c
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6b16264c
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6b16264c

Branch: refs/heads/index_support
Commit: 6b16264c1860f7e8156466fd806ba9251147a702
Parents: eeaf379
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Mon Aug 4 14:52:11 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Mon Aug 4 14:52:26 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../java/org/apache/tajo/util/BytesUtils.java   |  35 +++
 .../exception/RangeOverflowException.java       |   5 +-
 .../engine/planner/RangePartitionAlgorithm.java |  97 +++----
 .../engine/planner/UniformRangePartition.java   | 259 ++++++++++++-------
 .../tajo/master/querymaster/Repartitioner.java  |  10 +-
 .../planner/TestUniformRangePartition.java      | 159 ++++++++++--
 7 files changed, 408 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index d6ac279..f98da28 100644
--- a/CHANGES
+++ b/CHANGES
@@ -29,6 +29,9 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-966: Range partition should support split of multiple characters.
+    (hyunsik)
+
     TAJO-987: Hash shuffle should be balanced according to intermediate
     volumes. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java
index 5f309c2..59ed4fb 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/BytesUtils.java
@@ -179,4 +179,39 @@ public class BytesUtils {
     }
     return (byte[][]) list.toArray(new byte[list.size()][]);
   }
+
+  /**
+   * It gets the maximum length among all given the array of bytes.
+   * Then, it adds padding (i.e., \0) to byte arrays which are shorter
+   * than the maximum length.
+   *
+   * @param bytes Byte arrays to be padded
+   * @return The array of padded bytes
+   */
+  public static byte[][] padBytes(byte []...bytes) {
+    byte [][] padded = new byte[bytes.length][];
+
+    int maxLen = Integer.MIN_VALUE;
+
+    for (int i = 0; i < bytes.length; i++) {
+      maxLen = Math.max(maxLen, bytes[i].length);
+    }
+
+    for (int i = 0; i < bytes.length; i++) {
+      int padLen = maxLen - bytes[i].length;
+      if (padLen == 0) {
+        padded[i] = bytes[i];
+      } else if (padLen > 0) {
+        padded[i] = Bytes.padTail(bytes[i], padLen);
+      } else {
+        throw new RuntimeException("maximum length: " + maxLen + ", bytes[" + i + "].length:" + bytes[i].length);
+      }
+    }
+
+    return padded;
+  }
+
+  public static byte [] trimBytes(byte [] bytes) {
+    return new String(bytes).trim().getBytes();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/tajo-core/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java b/tajo-core/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java
index 409d6ed..013c9c2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/exception/RangeOverflowException.java
@@ -22,7 +22,8 @@ import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
 
 public class RangeOverflowException extends RuntimeException {
-  public RangeOverflowException(TupleRange range, Tuple overflowValue, long inc) {
-    super("Overflow Error: tried to increase " + inc + " to " + overflowValue + ", but the range " + range);
+  public RangeOverflowException(TupleRange range, Tuple overflowValue, long inc, boolean ascending) {
+    super("Overflow Error: tried to " + (ascending ? "increase " : "decrease ") + inc + " to " + overflowValue
+        + ", but the range " + range);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
index 0aa6f97..db53cd7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -18,20 +18,19 @@
 
 package org.apache.tajo.engine.planner;
 
-import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.util.Bytes;
 
-import java.math.BigDecimal;
+import java.math.BigInteger;
 
 public abstract class RangePartitionAlgorithm {
   protected SortSpec [] sortSpecs;
-  protected TupleRange range;
-  protected final BigDecimal totalCard;
+  protected TupleRange mergedRange;
+  protected final BigInteger totalCard;
   /** true if the end of the range is inclusive. Otherwise, it should be false. */
   protected final boolean inclusive;
 
@@ -43,7 +42,7 @@ public abstract class RangePartitionAlgorithm {
    */
   public RangePartitionAlgorithm(SortSpec [] sortSpecs, TupleRange totalRange, boolean inclusive) {
     this.sortSpecs = sortSpecs;
-    this.range = totalRange;
+    this.mergedRange = totalRange;
     this.inclusive = inclusive;
     this.totalCard = computeCardinalityForAllColumns(sortSpecs, totalRange, inclusive);
   }
@@ -56,117 +55,119 @@ public abstract class RangePartitionAlgorithm {
    * @param end
    * @return
    */
-  public static BigDecimal computeCardinality(DataType dataType, Datum start, Datum end,
+  public static BigInteger computeCardinality(DataType dataType, Datum start, Datum end,
                                               boolean inclusive, boolean isAscending) {
-    BigDecimal columnCard;
+    BigInteger columnCard;
 
     switch (dataType.getType()) {
       case BOOLEAN:
-        columnCard = new BigDecimal(2);
+        columnCard = BigInteger.valueOf(2);
         break;
       case CHAR:
         if (isAscending) {
-          columnCard = new BigDecimal(end.asChar() - start.asChar());
+          columnCard = BigInteger.valueOf((int)end.asChar() - (int)start.asChar());
         } else {
-          columnCard = new BigDecimal(start.asChar() - end.asChar());
+          columnCard = BigInteger.valueOf(start.asChar() - end.asChar());
         }
         break;
       case BIT:
         if (isAscending) {
-          columnCard = new BigDecimal(end.asByte() - start.asByte());
+          columnCard = BigInteger.valueOf(end.asByte() - start.asByte());
         } else {
-          columnCard = new BigDecimal(start.asByte() - end.asByte());
+          columnCard = BigInteger.valueOf(start.asByte() - end.asByte());
         }
         break;
       case INT2:
         if (isAscending) {
-          columnCard = new BigDecimal(end.asInt2() - start.asInt2());
+          columnCard = BigInteger.valueOf(end.asInt2() - start.asInt2());
         } else {
-          columnCard = new BigDecimal(start.asInt2() - end.asInt2());
+          columnCard = BigInteger.valueOf(start.asInt2() - end.asInt2());
         }
         break;
       case INT4:
         if (isAscending) {
-          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+          columnCard = BigInteger.valueOf(end.asInt4() - start.asInt4());
         } else {
-          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+          columnCard = BigInteger.valueOf(start.asInt4() - end.asInt4());
         }
         break;
-      case INT8:
+    case INT8:
+    case TIME:
+    case TIMESTAMP:
         if (isAscending) {
-          columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+          columnCard = BigInteger.valueOf(end.asInt8() - start.asInt8());
         } else {
-          columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+          columnCard = BigInteger.valueOf(start.asInt8() - end.asInt8());
         }
         break;
       case FLOAT4:
         if (isAscending) {
-          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+          columnCard = BigInteger.valueOf(end.asInt4() - start.asInt4());
         } else {
-          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+          columnCard = BigInteger.valueOf(start.asInt4() - end.asInt4());
         }
         break;
       case FLOAT8:
         if (isAscending) {
-          columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+          columnCard = BigInteger.valueOf(end.asInt8() - start.asInt8());
         } else {
-          columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+          columnCard = BigInteger.valueOf(start.asInt8() - end.asInt8());
         }
         break;
-      case TEXT:
-        final char textStart =  (start instanceof NullDatum || start.size() == 0) ? '0' : start.asChars().charAt(0);
-        final char textEnd = (end instanceof NullDatum || end.size() == 0) ? '0' : end.asChars().charAt(0);
+      case TEXT: {
+        byte [] a;
+        byte [] b;
         if (isAscending) {
-          columnCard = new BigDecimal(textEnd - textStart);
+          a = start.asByteArray();
+          b = end.asByteArray();
         } else {
-          columnCard = new BigDecimal(textStart - textEnd);
+          b = start.asByteArray();
+          a = end.asByteArray();
         }
+
+        byte [] prependHeader = {1, 0};
+        final BigInteger startBI = new BigInteger(Bytes.add(prependHeader, a));
+        final BigInteger stopBI = new BigInteger(Bytes.add(prependHeader, b));
+        BigInteger diffBI = stopBI.subtract(startBI);
+        columnCard = diffBI;
         break;
+      }
       case DATE:
         if (isAscending) {
-          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
-        } else {
-          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
-        }
-        break;
-      case TIME:
-      case TIMESTAMP:
-        if (isAscending) {
-          columnCard = new BigDecimal(end.asInt8() - start.asInt8());
+          columnCard = BigInteger.valueOf(end.asInt4() - start.asInt4());
         } else {
-          columnCard = new BigDecimal(start.asInt8() - end.asInt8());
+          columnCard = BigInteger.valueOf(start.asInt4() - end.asInt4());
         }
         break;
       case INET4:
         if (isAscending) {
-          columnCard = new BigDecimal(end.asInt4() - start.asInt4());
+          columnCard = BigInteger.valueOf(end.asInt4() - start.asInt4());
         } else {
-          columnCard = new BigDecimal(start.asInt4() - end.asInt4());
+          columnCard = BigInteger.valueOf(start.asInt4() - end.asInt4());
         }
         break;
       default:
         throw new UnsupportedOperationException(dataType + " is not supported yet");
     }
 
-    return inclusive ? columnCard.add(new BigDecimal(1)).abs() : columnCard.abs();
+    return inclusive ? columnCard.add(BigInteger.valueOf(1)).abs() : columnCard.abs();
   }
 
   /**
    * It computes the value cardinality of a tuple range.
    * @return
    */
-  public static BigDecimal computeCardinalityForAllColumns(SortSpec[] sortSpecs, TupleRange range, boolean inclusive) {
+  public static BigInteger computeCardinalityForAllColumns(SortSpec[] sortSpecs, TupleRange range, boolean inclusive) {
     Tuple start = range.getStart();
     Tuple end = range.getEnd();
-    Column col;
 
-    BigDecimal cardinality = new BigDecimal(1);
-    BigDecimal columnCard;
+    BigInteger cardinality = BigInteger.ONE;
+    BigInteger columnCard;
     for (int i = 0; i < sortSpecs.length; i++) {
       columnCard = computeCardinality(sortSpecs[i].getSortKey().getDataType(), start.get(i), end.get(i), inclusive,
           sortSpecs[i].isAscending());
 
-      if (new BigDecimal(0).compareTo(columnCard) < 0) {
+      if (BigInteger.ZERO.compareTo(columnCard) < 0) {
         cardinality = cardinality.multiply(columnCard);
       }
     }
@@ -174,7 +175,7 @@ public abstract class RangePartitionAlgorithm {
     return cardinality;
   }
 
-  public BigDecimal getTotalCardinality() {
+  public BigInteger getTotalCardinality() {
     return totalCard;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
index 88cb061..0a1389a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/UniformRangePartition.java
@@ -20,26 +20,29 @@ package org.apache.tajo.engine.planner;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.primitives.UnsignedLong;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.exception.RangeOverflowException;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.BytesUtils;
 
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.math.RoundingMode;
 import java.util.List;
 
 
 public class UniformRangePartition extends RangePartitionAlgorithm {
   private int variableId;
-  private BigDecimal[] cardForEachDigit;
-  private BigDecimal[] colCards;
+  private BigInteger[] cardForEachDigit;
+  private BigInteger[] colCards;
 
   /**
    *
@@ -47,15 +50,18 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
    * @param sortSpecs The description of sort keys
    * @param inclusive true if the end of the range is inclusive
    */
-  public UniformRangePartition(TupleRange totalRange, SortSpec[] sortSpecs, boolean inclusive) {
+  public UniformRangePartition(final TupleRange totalRange, final SortSpec[] sortSpecs, boolean inclusive) {
     super(sortSpecs, totalRange, inclusive);
-    colCards = new BigDecimal[sortSpecs.length];
+    colCards = new BigInteger[sortSpecs.length];
+
+    normalize(sortSpecs, this.mergedRange);
+
     for (int i = 0; i < sortSpecs.length; i++) {
       colCards[i] = computeCardinality(sortSpecs[i].getSortKey().getDataType(), totalRange.getStart().get(i),
           totalRange.getEnd().get(i), inclusive, sortSpecs[i].isAscending());
     }
 
-    cardForEachDigit = new BigDecimal[colCards.length];
+    cardForEachDigit = new BigInteger[colCards.length];
     for (int i = 0; i < colCards.length ; i++) {
       if (i == 0) {
         cardForEachDigit[i] = colCards[i];
@@ -74,17 +80,17 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     Preconditions.checkArgument(partNum > 0,
         "The number of partitions must be positive, but the given number: "
             + partNum);
-    Preconditions.checkArgument(totalCard.compareTo(new BigDecimal(partNum)) >= 0,
+    Preconditions.checkArgument(totalCard.compareTo(BigInteger.valueOf(partNum)) >= 0,
         "the number of partition cannot exceed total cardinality (" + totalCard + ")");
 
     int varId;
     for (varId = 0; varId < cardForEachDigit.length; varId++) {
-      if (cardForEachDigit[varId].compareTo(new BigDecimal(partNum)) >= 0)
+      if (cardForEachDigit[varId].compareTo(BigInteger.valueOf(partNum)) >= 0)
         break;
     }
     this.variableId = varId;
 
-    BigDecimal [] reverseCardsForDigit = new BigDecimal[variableId+1];
+    BigInteger [] reverseCardsForDigit = new BigInteger[variableId+1];
     for (int i = variableId; i >= 0; i--) {
       if (i == variableId) {
         reverseCardsForDigit[i] = colCards[i];
@@ -94,25 +100,81 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     }
 
     List<TupleRange> ranges = Lists.newArrayList();
-    BigDecimal term = reverseCardsForDigit[0].divide(
-        new BigDecimal(partNum), RoundingMode.CEILING);
-    BigDecimal reminder = reverseCardsForDigit[0];
-    Tuple last = range.getStart();
-    while(reminder.compareTo(new BigDecimal(0)) > 0) {
+
+    BigDecimal x = new BigDecimal(reverseCardsForDigit[0]);
+
+    BigInteger term = x.divide(BigDecimal.valueOf(partNum), RoundingMode.CEILING).toBigInteger();
+    BigInteger reminder = reverseCardsForDigit[0];
+    Tuple last = mergedRange.getStart();
+    TupleRange tupleRange;
+    while(reminder.compareTo(BigInteger.ZERO) > 0) {
       if (reminder.compareTo(term) <= 0) { // final one is inclusive
-        ranges.add(new TupleRange(sortSpecs, last, range.getEnd()));
+        tupleRange = new TupleRange(sortSpecs, last, mergedRange.getEnd());
       } else {
-        Tuple next = increment(last, term.longValue(), variableId);
-        ranges.add(new TupleRange(sortSpecs, last, next));
+        Tuple next = increment(last, term, variableId);
+        tupleRange = new TupleRange(sortSpecs, last, next);
       }
+
+      ranges.add(tupleRange);
       last = ranges.get(ranges.size() - 1).getEnd();
       reminder = reminder.subtract(term);
     }
 
+    for (TupleRange r : ranges) {
+      denormalize(sortSpecs, r);
+    }
+
     return ranges.toArray(new TupleRange[ranges.size()]);
   }
 
   /**
+   * It normalizes the start and end keys to have the same length bytes if they are texts or bytes.
+   *
+   * @param sortSpecs The sort specs
+   * @param range Tuple range to be normalize
+   */
+  public static void normalize(final SortSpec [] sortSpecs, TupleRange range) {
+    // normalize text fields to have same bytes length
+    for (int i = 0; i < sortSpecs.length; i++) {
+      if (sortSpecs[i].getSortKey().getDataType().getType() == TajoDataTypes.Type.TEXT) {
+        byte [] startBytes;
+        byte [] endBytes;
+        if (range.getStart().isNull(i)) {
+          startBytes = BigInteger.ZERO.toByteArray();
+        } else {
+          startBytes = range.getStart().getBytes(i);
+        }
+
+        if (range.getEnd().isNull(i)) {
+          endBytes = BigInteger.ZERO.toByteArray();
+        } else {
+          endBytes = range.getEnd().getBytes(i);
+        }
+
+        byte [][] padded = BytesUtils.padBytes(startBytes, endBytes);
+        range.getStart().put(i, DatumFactory.createText(padded[0]));
+        range.getEnd().put(i, DatumFactory.createText(padded[1]));
+      }
+    }
+  }
+
+  /**
+   * Normalized keys have padding values, but it will cause the key mismatch in pull server.
+   * So, it denormalize the normalized keys again.
+   *
+   * @param sortSpecs The sort specs
+   * @param range Tuple range to be denormalized
+   */
+  public static void denormalize(SortSpec [] sortSpecs, TupleRange range) {
+    for (int i = 0; i < sortSpecs.length; i++) {
+      if (sortSpecs[i].getSortKey().getDataType().getType() == TajoDataTypes.Type.TEXT) {
+        range.getStart().put(i,DatumFactory.createText(BytesUtils.trimBytes(range.getStart().getBytes(i))));
+        range.getEnd().put(i,DatumFactory.createText(BytesUtils.trimBytes(range.getEnd().getBytes(i))));
+      }
+    }
+  }
+
+  /**
   *  Check whether an overflow occurs or not.
    *
    * @param colId The column id to be checked
@@ -121,105 +183,111 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
    * @param sortSpecs
    * @return
    */
-  public boolean isOverflow(int colId, Datum last, BigDecimal inc, SortSpec [] sortSpecs) {
+  public boolean isOverflow(int colId, Datum last, BigInteger inc, SortSpec [] sortSpecs) {
     Column column = sortSpecs[colId].getSortKey();
+    BigDecimal incDecimal = new BigDecimal(inc);
     BigDecimal candidate;
     boolean overflow = false;
 
     switch (column.getDataType().getType()) {
       case BIT: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = inc.add(new BigDecimal(last.asByte()));
-          return new BigDecimal(range.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal(last.asByte()));
+          return new BigDecimal(mergedRange.getEnd().get(colId).asByte()).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal(last.asByte()).subtract(inc);
-          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asByte())) < 0;
+          candidate = new BigDecimal(last.asByte()).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asByte())) < 0;
         }
       }
       case CHAR: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = inc.add(new BigDecimal((int)last.asChar()));
-          return new BigDecimal((int)range.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal((int)last.asChar()));
+          return new BigDecimal((int) mergedRange.getEnd().get(colId).asChar()).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal((int)last.asChar()).subtract(inc);
-          return candidate.compareTo(new BigDecimal((int)range.getEnd().get(colId).asChar())) < 0;
+          candidate = new BigDecimal((int)last.asChar()).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal((int) mergedRange.getEnd().get(colId).asChar())) < 0;
         }
       }
       case INT2: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = inc.add(new BigDecimal(last.asInt2()));
-          return new BigDecimal(range.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal(last.asInt2()));
+          return new BigDecimal(mergedRange.getEnd().get(colId).asInt2()).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal(last.asInt2()).subtract(inc);
-          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt2())) < 0;
+          candidate = new BigDecimal(last.asInt2()).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asInt2())) < 0;
         }
       }
       case DATE:
       case INT4: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = inc.add(new BigDecimal(last.asInt4()));
-          return new BigDecimal(range.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal(last.asInt4()));
+          return new BigDecimal(mergedRange.getEnd().get(colId).asInt4()).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal(last.asInt4()).subtract(inc);
-          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt4())) < 0;
+          candidate = new BigDecimal(last.asInt4()).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asInt4())) < 0;
         }
       }
       case TIME:
       case TIMESTAMP:
       case INT8: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = inc.add(new BigDecimal(last.asInt8()));
-          return new BigDecimal(range.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal(last.asInt8()));
+          return new BigDecimal(mergedRange.getEnd().get(colId).asInt8()).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal(last.asInt8()).subtract(inc);
-          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asInt8())) < 0;
+          candidate = new BigDecimal(last.asInt8()).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asInt8())) < 0;
         }
       }
       case FLOAT4: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = inc.add(new BigDecimal(last.asFloat4()));
-          return new BigDecimal(range.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal(last.asFloat4()));
+          return new BigDecimal(mergedRange.getEnd().get(colId).asFloat4()).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal(last.asFloat4()).subtract(inc);
-          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat4())) < 0;
+          candidate = new BigDecimal(last.asFloat4()).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asFloat4())) < 0;
         }
       }
       case FLOAT8: {
         if (sortSpecs[colId].isAscending()) {
-          candidate = inc.add(new BigDecimal(last.asFloat8()));
-          return new BigDecimal(range.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal(last.asFloat8()));
+          return new BigDecimal(mergedRange.getEnd().get(colId).asFloat8()).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal(last.asFloat8()).subtract(inc);
-          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asFloat8())) < 0;
+          candidate = new BigDecimal(last.asFloat8()).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal(mergedRange.getEnd().get(colId).asFloat8())) < 0;
         }
 
       }
       case TEXT: {
+        byte [] lastBytes = last.asByteArray();
+        byte [] endBytes = mergedRange.getEnd().getBytes(colId);
+
+        Preconditions.checkState(lastBytes.length == endBytes.length);
+
         if (sortSpecs[colId].isAscending()) {
-          candidate = inc.add(new BigDecimal((int)(last instanceof NullDatum ? '0' : last.asChars().charAt(0))));
-          return new BigDecimal(range.getEnd().get(colId).asChars().charAt(0)).compareTo(candidate) < 0;
+          candidate = incDecimal.add(new BigDecimal(new BigInteger(lastBytes)));
+          return new BigDecimal(new BigInteger(endBytes)).compareTo(candidate) < 0;
         } else {
-          candidate = new BigDecimal((int)(last.asChars().charAt(0))).subtract(inc);
-          return candidate.compareTo(new BigDecimal(range.getEnd().get(colId).asChars().charAt(0))) < 0;
+          candidate = new BigDecimal(new BigInteger(lastBytes)).subtract(incDecimal);
+          return candidate.compareTo(new BigDecimal(new BigInteger(endBytes))) < 0;
         }
       }
       case INET4: {
         int candidateIntVal;
         byte[] candidateBytesVal = new byte[4];
         if (sortSpecs[colId].isAscending()) {
-          candidateIntVal = inc.intValue() + last.asInt4();
-          if (candidateIntVal - inc.intValue() != last.asInt4()) {
+          candidateIntVal = incDecimal.intValue() + last.asInt4();
+          if (candidateIntVal - incDecimal.intValue() != last.asInt4()) {
             return true;
           }
           Bytes.putInt(candidateBytesVal, 0, candidateIntVal);
-          return Bytes.compareTo(range.getEnd().get(colId).asByteArray(), candidateBytesVal) < 0;
+          return Bytes.compareTo(mergedRange.getEnd().get(colId).asByteArray(), candidateBytesVal) < 0;
         } else {
-          candidateIntVal = last.asInt4() - inc.intValue();
-          if (candidateIntVal + inc.intValue() != last.asInt4()) {
+          candidateIntVal = last.asInt4() - incDecimal.intValue();
+          if (candidateIntVal + incDecimal.intValue() != last.asInt4()) {
             return true;
           }
           Bytes.putInt(candidateBytesVal, 0, candidateIntVal);
-          return Bytes.compareTo(candidateBytesVal, range.getEnd().get(colId).asByteArray()) < 0;
+          return Bytes.compareTo(candidateBytesVal, mergedRange.getEnd().get(colId).asByteArray()) < 0;
         }
       }
     }
@@ -232,20 +300,20 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     switch (column.getDataType().getType()) {
       case BIT: {
         long candidate = last.asByte() + inc;
-        byte end = range.getEnd().get(colId).asByte();
+        byte end = mergedRange.getEnd().get(colId).asByte();
         reminder = candidate - end;
         break;
       }
       case CHAR: {
         long candidate = last.asChar() + inc;
-        char end = range.getEnd().get(colId).asChar();
+        char end = mergedRange.getEnd().get(colId).asChar();
         reminder = candidate - end;
         break;
       }
       case DATE:
       case INT4: {
         int candidate = (int) (last.asInt4() + inc);
-        int end = range.getEnd().get(colId).asInt4();
+        int end = mergedRange.getEnd().get(colId).asInt4();
         reminder = candidate - end;
         break;
       }
@@ -254,26 +322,34 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
       case INT8:
       case INET4: {
         long candidate = last.asInt8() + inc;
-        long end = range.getEnd().get(colId).asInt8();
+        long end = mergedRange.getEnd().get(colId).asInt8();
         reminder = candidate - end;
         break;
       }
       case FLOAT4: {
         float candidate = last.asFloat4() + inc;
-        float end = range.getEnd().get(colId).asFloat4();
+        float end = mergedRange.getEnd().get(colId).asFloat4();
         reminder = (long) (candidate - end);
         break;
       }
       case FLOAT8: {
         double candidate = last.asFloat8() + inc;
-        double end = range.getEnd().get(colId).asFloat8();
+        double end = mergedRange.getEnd().get(colId).asFloat8();
         reminder = (long) Math.ceil(candidate - end);
         break;
       }
       case TEXT: {
-        char candidate = ((char)(last.asChars().charAt(0) + inc));
-        char end = range.getEnd().get(colId).asChars().charAt(0);
-        reminder = (char) (candidate - end);
+        byte [] lastBytes = last.asByteArray();
+        byte [] endBytes = mergedRange.getEnd().get(colId).asByteArray();
+
+        Preconditions.checkState(lastBytes.length == endBytes.length);
+
+        BigInteger lastBInt = new BigInteger(lastBytes);
+        BigInteger endBInt = new BigInteger(endBytes);
+        BigInteger incBInt = BigInteger.valueOf(inc);
+
+        BigInteger candidate = lastBInt.add(incBInt);
+        reminder = candidate.subtract(endBInt).longValue();
         break;
       }
     }
@@ -285,16 +361,16 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
   /**
    *
    * @param last
-   * @param inc
+   * @param interval
    * @return
    */
-  public Tuple increment(final Tuple last, final long inc, final int baseDigit) {
-    BigDecimal [] incs = new BigDecimal[last.size()];
+  public Tuple increment(final Tuple last, BigInteger interval, final int baseDigit) {
+    BigInteger [] incs = new BigInteger[last.size()];
     boolean [] overflowFlag = new boolean[last.size()];
-    BigDecimal [] result;
-    BigDecimal value = new BigDecimal(inc);
+    BigInteger [] result;
+    BigInteger value = interval;
 
-    BigDecimal [] reverseCardsForDigit = new BigDecimal[baseDigit + 1];
+    BigInteger [] reverseCardsForDigit = new BigInteger[baseDigit + 1];
     for (int i = baseDigit; i >= 0; i--) {
       if (i == baseDigit) {
         reverseCardsForDigit[i] = colCards[i];
@@ -313,11 +389,11 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
     for (int i = finalId; i >= 0; i--) {
       if (isOverflow(i, last.get(i), incs[i], sortSpecs)) {
         if (i == 0) {
-          throw new RangeOverflowException(range, last, incs[i].longValue());
+          throw new RangeOverflowException(mergedRange, last, incs[i].longValue(), sortSpecs[i].isAscending());
         }
         long rem = incrementAndGetReminder(i, last.get(i), value.longValue());
-        incs[i] = new BigDecimal(rem);
-        incs[i - 1] = incs[i-1].add(new BigDecimal(1));
+        incs[i] = BigInteger.valueOf(rem);
+        incs[i - 1] = incs[i-1].add(BigInteger.ONE);
         overflowFlag[i] = true;
       } else {
         if (i > 0) {
@@ -329,7 +405,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
 
     for (int i = 0; i < incs.length; i++) {
       if (incs[i] == null) {
-        incs[i] = new BigDecimal(0);
+        incs[i] = BigInteger.ZERO;
       }
     }
 
@@ -340,7 +416,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
       switch (column.getDataType().getType()) {
         case CHAR:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createChar((char) (range.getStart().get(i).asChar() + incs[i].longValue())));
+            end.put(i, DatumFactory.createChar((char) (mergedRange.getStart().get(i).asChar() + incs[i].longValue())));
           } else {
             end.put(i, DatumFactory.createChar((char) (last.get(i).asChar() + incs[i].longValue())));
           }
@@ -348,7 +424,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         case BIT:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createBit(
-                (byte) (range.getStart().get(i).asByte() + incs[i].longValue())));
+                (byte) (mergedRange.getStart().get(i).asByte() + incs[i].longValue())));
           } else {
             end.put(i, DatumFactory.createBit((byte) (last.get(i).asByte() + incs[i].longValue())));
           }
@@ -356,7 +432,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         case INT2:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createInt2(
-                (short) (range.getStart().get(i).asInt2() + incs[i].longValue())));
+                (short) (mergedRange.getStart().get(i).asInt2() + incs[i].longValue())));
           } else {
             end.put(i, DatumFactory.createInt2((short) (last.get(i).asInt2() + incs[i].longValue())));
           }
@@ -364,7 +440,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         case INT4:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createInt4(
-                (int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
+                (int) (mergedRange.getStart().get(i).asInt4() + incs[i].longValue())));
           } else {
             if (sortSpecs[i].isAscending()) {
               end.put(i, DatumFactory.createInt4((int) (last.get(i).asInt4() + incs[i].longValue())));
@@ -376,7 +452,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         case INT8:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createInt8(
-                range.getStart().get(i).asInt8() + incs[i].longValue()));
+                mergedRange.getStart().get(i).asInt8() + incs[i].longValue()));
           } else {
             end.put(i, DatumFactory.createInt8(last.get(i).asInt8() + incs[i].longValue()));
           }
@@ -384,7 +460,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         case FLOAT4:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createFloat4(
-                range.getStart().get(i).asFloat4() + incs[i].longValue()));
+                mergedRange.getStart().get(i).asFloat4() + incs[i].longValue()));
           } else {
             end.put(i, DatumFactory.createFloat4(last.get(i).asFloat4() + incs[i].longValue()));
           }
@@ -392,30 +468,35 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         case FLOAT8:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createFloat8(
-                range.getStart().get(i).asFloat8() + incs[i].longValue()));
+                mergedRange.getStart().get(i).asFloat8() + incs[i].longValue()));
           } else {
             end.put(i, DatumFactory.createFloat8(last.get(i).asFloat8() + incs[i].longValue()));
           }
           break;
         case TEXT:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createText(((char) (range.getStart().get(i).asChars().charAt(0)
+            end.put(i, DatumFactory.createText(((char) (mergedRange.getStart().get(i).asChars().charAt(0)
                 + incs[i].longValue())) + ""));
           } else {
-            end.put(i, DatumFactory.createText(
-                ((char) ((last.get(i) instanceof NullDatum ? '0': last.get(i).asChars().charAt(0)) + incs[i].longValue())) + ""));
+            BigInteger lastBigInt;
+            if (last.isNull(i)) {
+              lastBigInt = BigInteger.valueOf(0);
+            } else {
+              lastBigInt = UnsignedLong.valueOf(new BigInteger(last.get(i).asByteArray())).bigIntegerValue();
+            }
+            end.put(i, DatumFactory.createText(lastBigInt.add(incs[i]).toByteArray()));
           }
           break;
         case DATE:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createDate((int) (range.getStart().get(i).asInt4() + incs[i].longValue())));
+            end.put(i, DatumFactory.createDate((int) (mergedRange.getStart().get(i).asInt4() + incs[i].longValue())));
           } else {
             end.put(i, DatumFactory.createDate((int) (last.get(i).asInt4() + incs[i].longValue())));
           }
           break;
         case TIME:
           if (overflowFlag[i]) {
-            end.put(i, DatumFactory.createTime(range.getStart().get(i).asInt8() + incs[i].longValue()));
+            end.put(i, DatumFactory.createTime(mergedRange.getStart().get(i).asInt8() + incs[i].longValue()));
           } else {
             end.put(i, DatumFactory.createTime(last.get(i).asInt8() + incs[i].longValue()));
           }
@@ -423,7 +504,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         case TIMESTAMP:
           if (overflowFlag[i]) {
             end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(
-                range.getStart().get(i).asInt8() + incs[i].longValue()));
+                mergedRange.getStart().get(i).asInt8() + incs[i].longValue()));
           } else {
             end.put(i, DatumFactory.createTimestmpDatumWithJavaMillis(last.get(i).asInt8() + incs[i].longValue()));
           }
@@ -431,7 +512,7 @@ public class UniformRangePartition extends RangePartitionAlgorithm {
         case INET4:
           byte[] ipBytes;
           if (overflowFlag[i]) {
-            ipBytes = range.getStart().get(i).asByteArray();
+            ipBytes = mergedRange.getStart().get(i).asByteArray();
             assert ipBytes.length == 4;
             end.put(i, DatumFactory.createInet4(ipBytes));
           } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 1fa3f11..43d6fd2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -56,7 +56,7 @@ import org.apache.tajo.worker.FetchImpl;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.net.URI;
 import java.util.*;
 import java.util.Map.Entry;
@@ -574,12 +574,12 @@ public class Repartitioner {
     }
     TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), false);
     RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
-    BigDecimal card = partitioner.getTotalCardinality();
+    BigInteger card = partitioner.getTotalCardinality();
 
     // if the number of the range cardinality is less than the desired number of tasks,
     // we set the the number of tasks to the number of range cardinality.
     int determinedTaskNum;
-    if (card.compareTo(new BigDecimal(maxNum)) < 0) {
+    if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) {
       LOG.info(subQuery.getId() + ", The range cardinality (" + card
           + ") is less then the desired number of tasks (" + maxNum + ")");
       determinedTaskNum = card.intValue();
@@ -587,9 +587,7 @@ public class Repartitioner {
       determinedTaskNum = maxNum;
     }
 
-    // for LOG
-    TupleRange mergedRangeForPrint = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), true);
-    LOG.info(subQuery.getId() + ", Try to divide " + mergedRangeForPrint + " into " + determinedTaskNum +
+    LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum +
         " sub ranges (total units: " + determinedTaskNum + ")");
     TupleRange [] ranges = partitioner.partition(determinedTaskNum);
     if (ranges == null || ranges.length == 0) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/6b16264c/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
index f4c114f..58653d1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestUniformRangePartition.java
@@ -27,6 +27,8 @@ import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
 import org.junit.Test;
 
+import java.math.BigInteger;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -68,11 +70,11 @@ public class TestUniformRangePartition {
     result[10] = "DB";
     result[11] = "DC";
 
-    Tuple end = partitioner.increment(s, 1, 1);
+    Tuple end = partitioner.increment(s, BigInteger.valueOf(1), 1);
     assertEquals("A", end.get(0).asChars());
     assertEquals("B", end.get(1).asChars());
     for (int i = 2; i < 11; i++ ) {
-      end = partitioner.increment(end, 1, 1);
+      end = partitioner.increment(end, BigInteger.valueOf(1), 1);
       assertEquals(result[i].charAt(0), end.get(0).asChars().charAt(0));
       assertEquals(result[i].charAt(1), end.get(1).asChars().charAt(0));
     }
@@ -115,10 +117,10 @@ public class TestUniformRangePartition {
     result[10] = "DB";
     result[11] = "DC";
 
-    Tuple end = partitioner.increment(s, 6, 1);
+    Tuple end = partitioner.increment(s, BigInteger.valueOf(6), 1);
     assertEquals("C", end.get(0).asChars());
     assertEquals("A", end.get(1).asChars());
-    end = partitioner.increment(end, 5, 1);
+    end = partitioner.increment(end, BigInteger.valueOf(5), 1);
     assertEquals("D", end.get(0).asChars());
     assertEquals("C", end.get(1).asChars());
   }
@@ -149,11 +151,11 @@ public class TestUniformRangePartition {
     UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
     assertEquals(24, partitioner.getTotalCardinality().intValue());
 
-    Tuple overflowBefore = partitioner.increment(s, 5, 2);
+    Tuple overflowBefore = partitioner.increment(s, BigInteger.valueOf(5), 2);
     assertEquals("A", overflowBefore.get(0).asChars());
     assertEquals("B", overflowBefore.get(1).asChars());
     assertEquals("C", overflowBefore.get(2).asChars());
-    Tuple overflowed = partitioner.increment(overflowBefore, 1, 2);
+    Tuple overflowed = partitioner.increment(overflowBefore, BigInteger.valueOf(1), 2);
     assertEquals("B", overflowed.get(0).asChars());
     assertEquals("A", overflowed.get(1).asChars());
     assertEquals("A", overflowed.get(2).asChars());
@@ -179,10 +181,10 @@ public class TestUniformRangePartition {
     UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
     assertEquals(200, partitioner.getTotalCardinality().longValue());
 
-    Tuple range2 = partitioner.increment(s, 100, 1);
+    Tuple range2 = partitioner.increment(s, BigInteger.valueOf(100), 1);
     assertEquals(15, range2.get(0).asInt4());
     assertEquals(20, range2.get(1).asInt4());
-    Tuple range3 = partitioner.increment(range2, 99, 1);
+    Tuple range3 = partitioner.increment(range2, BigInteger.valueOf(99), 1);
     assertEquals(19, range3.get(0).asInt4());
     assertEquals(39, range3.get(1).asInt4());
   }
@@ -209,11 +211,11 @@ public class TestUniformRangePartition {
     UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
     assertEquals(24, partitioner.getTotalCardinality().longValue());
 
-    Tuple beforeOverflow = partitioner.increment(s, 5, 2);
+    Tuple beforeOverflow = partitioner.increment(s, BigInteger.valueOf(5), 2);
     assertEquals(1, beforeOverflow.get(0).asInt8());
     assertEquals(2, beforeOverflow.get(1).asInt8());
     assertEquals(3, beforeOverflow.get(2).asInt8());
-    Tuple overflow = partitioner.increment(beforeOverflow, 1, 2);
+    Tuple overflow = partitioner.increment(beforeOverflow, BigInteger.valueOf(1), 2);
     assertEquals(2, overflow.get(0).asInt8());
     assertEquals(1, overflow.get(1).asInt8());
     assertEquals(1, overflow.get(2).asInt8());
@@ -242,11 +244,11 @@ public class TestUniformRangePartition {
     UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
     assertEquals(24, partitioner.getTotalCardinality().longValue());
 
-    Tuple beforeOverflow = partitioner.increment(s, 5, 2);
+    Tuple beforeOverflow = partitioner.increment(s, BigInteger.valueOf(5), 2);
     assertTrue(1.1d == beforeOverflow.get(0).asFloat8());
     assertTrue(2.1d == beforeOverflow.get(1).asFloat8());
     assertTrue(3.1d == beforeOverflow.get(2).asFloat8());
-    Tuple overflow = partitioner.increment(beforeOverflow, 1, 2);
+    Tuple overflow = partitioner.increment(beforeOverflow, BigInteger.valueOf(1), 2);
     assertTrue(2.1d == overflow.get(0).asFloat8());
     assertTrue(1.1d == overflow.get(1).asFloat8());
     assertTrue(1.1d == overflow.get(2).asFloat8());
@@ -275,11 +277,11 @@ public class TestUniformRangePartition {
     UniformRangePartition partitioner = new UniformRangePartition(expected, sortSpecs);
     assertEquals(24, partitioner.getTotalCardinality().longValue());
 
-    Tuple beforeOverflow = partitioner.increment(s, 5, 2);
+    Tuple beforeOverflow = partitioner.increment(s, BigInteger.valueOf(5), 2);
     assertTrue("127.0.1.1".equals(beforeOverflow.get(0).asChars()));
     assertTrue("127.0.0.2".equals(beforeOverflow.get(1).asChars()));
     assertTrue("128.0.0.255".equals(beforeOverflow.get(2).asChars()));
-    Tuple overflow = partitioner.increment(beforeOverflow, 1, 2);
+    Tuple overflow = partitioner.increment(beforeOverflow, BigInteger.valueOf(1), 2);
     assertTrue("127.0.1.2".equals(overflow.get(0).asChars()));
     assertTrue("127.0.0.1".equals(overflow.get(1).asChars()));
     assertTrue("128.0.0.253".equals(overflow.get(2).asChars()));
@@ -360,6 +362,133 @@ public class TestUniformRangePartition {
   }
 
   @Test
+  public void testPartitionForMultipleChars() {
+    Schema schema = new Schema()
+        .addColumn("KEY1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("AAA"));
+    Tuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("ZZZ"));
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner =
+        new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(48);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev == null) {
+        prev = r;
+      } else {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+    }
+    assertEquals(48, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[47].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForMultipleChars2() {
+    Schema schema = new Schema()
+        .addColumn("KEY1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("A1"));
+    Tuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("A999975"));
+
+    final int partNum = 2;
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner =
+        new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev == null) {
+        prev = r;
+      } else {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForMultipleChars2Desc() {
+    Schema schema = new Schema()
+        .addColumn("KEY1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+    sortSpecs[0].setDescOrder();
+
+    Tuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("A999975"));
+    Tuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("A1"));
+
+    final int partNum = 48;
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner =
+        new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev == null) {
+        prev = r;
+      } else {
+        assertTrue(prev.compareTo(r) > 0);
+      }
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
+  public void testPartitionForMultipleCharsWithSameFirstChar() {
+    Schema schema = new Schema()
+        .addColumn("KEY1", Type.TEXT);
+
+    SortSpec [] sortSpecs = PlannerUtil.schemaToSortSpecs(schema);
+
+    Tuple s = new VTuple(1);
+    s.put(0, DatumFactory.createText("AAA"));
+    Tuple e = new VTuple(1);
+    e.put(0, DatumFactory.createText("AAZ"));
+
+    final int partNum = 4;
+
+    TupleRange expected = new TupleRange(sortSpecs, s, e);
+    RangePartitionAlgorithm partitioner =
+        new UniformRangePartition(expected, sortSpecs, true);
+    TupleRange [] ranges = partitioner.partition(partNum);
+
+    TupleRange prev = null;
+    for (TupleRange r : ranges) {
+      if (prev == null) {
+        prev = r;
+      } else {
+        assertTrue(prev.compareTo(r) < 0);
+      }
+    }
+    assertEquals(partNum, ranges.length);
+    assertTrue(ranges[0].getStart().equals(s));
+    assertTrue(ranges[partNum - 1].getEnd().equals(e));
+  }
+
+  @Test
   public void testPartitionForOnePartNumWithBothValueNull() {
     Schema schema = new Schema()
         .addColumn("l_returnflag", Type.TEXT)
@@ -406,7 +535,7 @@ public class TestUniformRangePartition {
       if (prev == null) {
         prev = r;
       } else {
-        assertTrue(prev.compareTo(r) > 0);
+        assertTrue(prev.compareTo(r) < 0);
       }
     }
   }


Mime
View raw message