tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject git commit: TAJO-904: ORDER BY Null first support. (Hyoungjun Kim via hyunsik)
Date Tue, 15 Jul 2014 06:07:45 GMT
Repository: tajo
Updated Branches:
  refs/heads/master aee7874fa -> 47d4fe22b


TAJO-904: ORDER BY Null first support. (Hyoungjun Kim via hyunsik)

Closes #70


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/47d4fe22
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/47d4fe22
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/47d4fe22

Branch: refs/heads/master
Commit: 47d4fe22b8e5594d82054fa5a7b5cdc23f578be6
Parents: aee7874
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Tue Jul 15 14:55:47 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Tue Jul 15 14:55:47 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../tajo/catalog/statistics/ColumnStats.java    | 10 ++-
 .../engine/planner/RangePartitionAlgorithm.java |  2 +-
 .../org/apache/tajo/engine/utils/TupleUtil.java | 70 +++++++++++++++++---
 .../tajo/master/querymaster/Repartitioner.java  | 19 +++++-
 .../main/java/org/apache/tajo/worker/Task.java  |  4 +-
 .../apache/tajo/engine/query/TestSortQuery.java | 49 ++++++++++++++
 .../org/apache/tajo/storage/RowStoreUtil.java   |  1 +
 .../apache/tajo/storage/TableStatistics.java    |  5 +-
 .../tajo/pullserver/TajoPullServerService.java  |  2 +-
 10 files changed, 141 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 473db19..d819d20 100644
--- a/CHANGES
+++ b/CHANGES
@@ -84,6 +84,8 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-904: ORDER BY Null first support. (Hyoungjun Kim via hyunsik)
+
     TAJO-936: TestStorages::testSplitable is failed occasionally. (jinho)
 
     TAJO-673: Assign proper number of tasks when inserting into partitioned table. (jaehwa)

http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java
index 4d65d9a..1f3bca3 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java
@@ -24,13 +24,13 @@ package org.apache.tajo.catalog.statistics;
 import com.google.common.base.Objects;
 import com.google.gson.annotations.Expose;
 import com.google.protobuf.ByteString;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.util.TUtil;
 
 public class ColumnStats implements ProtoObject<CatalogProtos.ColumnStatsProto>, Cloneable,
GsonObject {
@@ -109,6 +109,10 @@ public class ColumnStats implements ProtoObject<CatalogProtos.ColumnStatsProto>,
     this.numNulls = numNulls;
   }
 
+  public boolean hasNullValue() {
+    return numNulls > 0;
+  }
+
   public boolean equals(Object obj) {
     if (obj instanceof ColumnStats) {
       ColumnStats other = (ColumnStats) obj;

http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
index a3522c7..0aa6f97 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -115,7 +115,7 @@ public abstract class RangePartitionAlgorithm {
         break;
       case TEXT:
         final char textStart =  (start instanceof NullDatum || start.size() == 0) ? '0' :
start.asChars().charAt(0);
-        final char textEnd = (end instanceof NullDatum || start.size() == 0) ? '0' : end.asChars().charAt(0);
+        final char textEnd = (end instanceof NullDatum || end.size() == 0) ? '0' : end.asChars().charAt(0);
         if (isAscending) {
           columnCard = new BigDecimal(textEnd - textStart);
         } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index 86f4935..f2e47bc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -22,6 +22,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
@@ -44,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 
 public class TupleUtil {
+  private static final Log LOG = LogFactory.getLog(TupleUtil.class);
 
   public static String rangeToQuery(Schema schema, TupleRange range, boolean last)
       throws UnsupportedEncodingException {
@@ -72,7 +75,41 @@ public class TupleUtil {
     return sb.toString();
   }
 
-  public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List<ColumnStats>
colStats) {
+  /**
+   * if max value is null, set ranges[last]
+   * @param sortSpecs
+   * @param sortSchema
+   * @param colStats
+   * @param ranges
+   */
+  public static void setMaxRangeIfNull(SortSpec[] sortSpecs, Schema sortSchema,
+                                       List<ColumnStats> colStats, TupleRange[] ranges)
{
+    Map<Column, ColumnStats> statMap = Maps.newHashMap();
+    for (ColumnStats stat : colStats) {
+      statMap.put(stat.getColumn(), stat);
+    }
+
+    int i = 0;
+    for (Column col : sortSchema.getColumns()) {
+      ColumnStats columnStat = statMap.get(col);
+      if (columnStat == null) {
+        continue;
+      }
+      if (columnStat.hasNullValue()) {
+        int rangeIndex = sortSpecs[i].isAscending() ? ranges.length - 1 : 0;
+        VTuple rangeTuple = sortSpecs[i].isAscending() ? (VTuple) ranges[rangeIndex].getEnd()
: (VTuple) ranges[rangeIndex].getStart();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Set null into range: " + col.getQualifiedName() + ", previous tuple
is " + rangeTuple);
+        }
+        rangeTuple.put(i, NullDatum.get());
+        LOG.info("Set null into range: " + col.getQualifiedName() + ", current tuple is "
+ rangeTuple);
+      }
+      i++;
+    }
+  }
+
+  public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List<ColumnStats>
colStats,
+                                             boolean checkNull) {
 
     Map<Column, ColumnStats> statSet = Maps.newHashMap();
     for (ColumnStats stat : colStats) {
@@ -98,16 +135,29 @@ public class TupleUtil {
         else
           startTuple.put(i, DatumFactory.createNullDatum());
 
-        if (statSet.get(col).getMaxValue() != null)
-          endTuple.put(i, statSet.get(col).getMaxValue());
-        else
-          endTuple.put(i, DatumFactory.createNullDatum());
+        if (checkNull) {
+          if (statSet.get(col).hasNullValue() || statSet.get(col).getMaxValue() == null)
+            endTuple.put(i, DatumFactory.createNullDatum());
+          else
+            endTuple.put(i, statSet.get(col).getMaxValue());
+        } else {
+          if (statSet.get(col).getMaxValue() != null)
+            endTuple.put(i, statSet.get(col).getMaxValue());
+          else
+            endTuple.put(i, DatumFactory.createNullDatum());
+        }
       } else {
-        if (statSet.get(col).getMaxValue() != null)
-          startTuple.put(i, statSet.get(col).getMaxValue());
-        else
-          startTuple.put(i, DatumFactory.createNullDatum());
-
+        if (checkNull) {
+          if (statSet.get(col).hasNullValue() || statSet.get(col).getMaxValue() == null)
+            startTuple.put(i, DatumFactory.createNullDatum());
+          else
+            startTuple.put(i, statSet.get(col).getMaxValue());
+        } else {
+          if (statSet.get(col).getMaxValue() != null)
+            startTuple.put(i, statSet.get(col).getMaxValue());
+          else
+            startTuple.put(i, DatumFactory.createNullDatum());
+        }
         if (statSet.get(col).getMinValue() != null)
           endTuple.put(i, statSet.get(col).getMinValue());
         else

http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 1cc5b78..055e9a2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -572,7 +572,7 @@ public class Repartitioner {
     if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0 )
{
       return;
     }
-    TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats());
+    TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(),
false);
     RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
     BigDecimal card = partitioner.getTotalCardinality();
 
@@ -580,16 +580,29 @@ public class Repartitioner {
     // we set the the number of tasks to the number of range cardinality.
     int determinedTaskNum;
     if (card.compareTo(new BigDecimal(maxNum)) < 0) {
-      LOG.info("The range cardinality (" + card
+      LOG.info(subQuery.getId() + ", The range cardinality (" + card
           + ") is less then the desired number of tasks (" + maxNum + ")");
       determinedTaskNum = card.intValue();
     } else {
       determinedTaskNum = maxNum;
     }
 
-    LOG.info("Try to divide " + mergedRange + " into " + determinedTaskNum +
+    // for LOG
+    TupleRange mergedRangeForPrint = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(),
true);
+    LOG.info(subQuery.getId() + ", Try to divide " + mergedRangeForPrint + " into " + determinedTaskNum
+
         " sub ranges (total units: " + determinedTaskNum + ")");
     TupleRange [] ranges = partitioner.partition(determinedTaskNum);
+    if (ranges == null || ranges.length == 0) {
+      LOG.warn(subQuery.getId() + " no range infos.");
+    }
+    TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
+    if (LOG.isDebugEnabled()) {
+      if (ranges != null) {
+        for (TupleRange eachRange : ranges) {
+          LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
+        }
+      }
+    }
 
     FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new
String[]{UNKNOWN_HOST});
     SubQuery.scheduleFragment(subQuery, dummyFragment);

http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 991dc4b..c3f3827 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -342,10 +342,10 @@ public class Task {
       builder.setResultStats(new TableStats().getProto());
     }
 
-    Iterator<Entry<Integer,String>> it = context.getShuffleFileOutputs();
+    Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
     if (it.hasNext()) {
       do {
-        Entry<Integer,String> entry = it.next();
+        Entry<Integer, String> entry = it.next();
         ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
         part.setPartId(entry.getKey());
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
index 0b1831c..a520e56 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
@@ -21,13 +21,21 @@ package org.apache.tajo.engine.query;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.KeyValueSet;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.sql.ResultSet;
 import java.util.TimeZone;
 
+import static org.junit.Assert.assertEquals;
+
 @Category(IntegrationTest.class)
 public class TestSortQuery extends QueryTestCaseBase {
 
@@ -169,4 +177,45 @@ public class TestSortQuery extends QueryTestCaseBase {
     assertResultSet(res);
     cleanupQuery(res);
   }
+
+  @Test
+  public final void testSortNullColumn() throws Exception {
+    try {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname, "2");
+      KeyValueSet tableOptions = new KeyValueSet();
+      tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+      tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.INT4);
+      schema.addColumn("name", Type.TEXT);
+      String[] data = new String[]{
+          "1|BRAZIL",
+          "2|ALGERIA",
+          "3|ARGENTINA",
+          "4|CANADA"
+      };
+      TajoTestingCluster.createTable("nullsort", schema, tableOptions, data, 2);
+
+      ResultSet res = executeString(
+          "select * from (" +
+              "select case when id > 2 then null else id end as col1, name as col2 from
nullsort) a " +
+          "order by col1, col2"
+      );
+
+      String expected = "col1,col2\n" +
+          "-------------------------------\n" +
+          "1,BRAZIL\n" +
+          "2,ALGERIA\n" +
+          "null,ARGENTINA\n" +
+          "null,CANADA\n";
+
+      assertEquals(expected, resultSetToString(res));
+
+      cleanupQuery(res);
+    } finally {
+      testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname, "0");
+      executeString("DROP TABLE nullsort PURGE;").close();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index 33b2ff3..5140a63 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -181,6 +181,7 @@ public class RowStoreUtil {
       for (int i = 0; i < schema.size(); i++) {
         if (tuple.isNull(i)) {
           nullFlags.set(i);
+          continue;
         }
 
         col = schema.getColumn(i);

http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
index ac9bd8a..a2c08de 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
@@ -40,7 +40,6 @@ public class TableStatistics {
   private long numRows = 0;
   private long numBytes = 0;
 
-
   private boolean [] comparable;
 
   public TableStatistics(Schema schema) {
@@ -113,10 +112,10 @@ public class TableStatistics {
         LOG.warn("Wrong statistics column type (" + minValues.get(i).type() +
             ", expected=" + schema.getColumn(i).getDataType().getType() + ")");
       }
-      if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type())
{
+      if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type())
{
         columnStats.setMaxValue(maxValues.get(i));
       } else {
-        LOG.warn("Wrong statistics column type (" + minValues.get(i).type() +
+        LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() +
             ", expected=" + schema.getColumn(i).getDataType().getType() + ")");
       }
       stat.addColumnStat(columnStats);

http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 5b76da5..12cd1a3 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -597,7 +597,7 @@ public class TajoPullServerService extends AbstractService {
 
     if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
         comparator.compare(idxReader.getLastKey(), start) < 0) {
-      LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey()
+
+      LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey()
+
           "], but request start:" + start + ", end: " + end);
       return null;
     }


Mime
View raw message