Repository: tajo
Updated Branches:
refs/heads/master aee7874fa -> 47d4fe22b
TAJO-904: ORDER BY Null first support. (Hyoungjun Kim via hyunsik)
Closes #70
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/47d4fe22
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/47d4fe22
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/47d4fe22
Branch: refs/heads/master
Commit: 47d4fe22b8e5594d82054fa5a7b5cdc23f578be6
Parents: aee7874
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Tue Jul 15 14:55:47 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Tue Jul 15 14:55:47 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/catalog/statistics/ColumnStats.java | 10 ++-
.../engine/planner/RangePartitionAlgorithm.java | 2 +-
.../org/apache/tajo/engine/utils/TupleUtil.java | 70 +++++++++++++++++---
.../tajo/master/querymaster/Repartitioner.java | 19 +++++-
.../main/java/org/apache/tajo/worker/Task.java | 4 +-
.../apache/tajo/engine/query/TestSortQuery.java | 49 ++++++++++++++
.../org/apache/tajo/storage/RowStoreUtil.java | 1 +
.../apache/tajo/storage/TableStatistics.java | 5 +-
.../tajo/pullserver/TajoPullServerService.java | 2 +-
10 files changed, 141 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 473db19..d819d20 100644
--- a/CHANGES
+++ b/CHANGES
@@ -84,6 +84,8 @@ Release 0.9.0 - unreleased
BUG FIXES
+ TAJO-904: ORDER BY Null first support. (Hyoungjun Kim via hyunsik)
+
TAJO-936: TestStorages::testSplitable is failed occasionally. (jinho)
TAJO-673: Assign proper number of tasks when inserting into partitioned table. (jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java
index 4d65d9a..1f3bca3 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/ColumnStats.java
@@ -24,13 +24,13 @@ package org.apache.tajo.catalog.statistics;
import com.google.common.base.Objects;
import com.google.gson.annotations.Expose;
import com.google.protobuf.ByteString;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.json.GsonObject;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.json.GsonObject;
import org.apache.tajo.util.TUtil;
public class ColumnStats implements ProtoObject<CatalogProtos.ColumnStatsProto>, Cloneable,
GsonObject {
@@ -109,6 +109,10 @@ public class ColumnStats implements ProtoObject<CatalogProtos.ColumnStatsProto>,
this.numNulls = numNulls;
}
+ public boolean hasNullValue() {
+ return numNulls > 0;
+ }
+
public boolean equals(Object obj) {
if (obj instanceof ColumnStats) {
ColumnStats other = (ColumnStats) obj;
http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
index a3522c7..0aa6f97 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/RangePartitionAlgorithm.java
@@ -115,7 +115,7 @@ public abstract class RangePartitionAlgorithm {
break;
case TEXT:
final char textStart = (start instanceof NullDatum || start.size() == 0) ? '0' :
start.asChars().charAt(0);
- final char textEnd = (end instanceof NullDatum || start.size() == 0) ? '0' : end.asChars().charAt(0);
+ final char textEnd = (end instanceof NullDatum || end.size() == 0) ? '0' : end.asChars().charAt(0);
if (isAscending) {
columnCard = new BigDecimal(textEnd - textStart);
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index 86f4935..f2e47bc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -22,6 +22,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
@@ -44,6 +46,7 @@ import java.util.List;
import java.util.Map;
public class TupleUtil {
+ private static final Log LOG = LogFactory.getLog(TupleUtil.class);
public static String rangeToQuery(Schema schema, TupleRange range, boolean last)
throws UnsupportedEncodingException {
@@ -72,7 +75,41 @@ public class TupleUtil {
return sb.toString();
}
- public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List<ColumnStats>
colStats) {
+ /**
+ * if max value is null, set ranges[last]
+ * @param sortSpecs
+ * @param sortSchema
+ * @param colStats
+ * @param ranges
+ */
+ public static void setMaxRangeIfNull(SortSpec[] sortSpecs, Schema sortSchema,
+ List<ColumnStats> colStats, TupleRange[] ranges)
{
+ Map<Column, ColumnStats> statMap = Maps.newHashMap();
+ for (ColumnStats stat : colStats) {
+ statMap.put(stat.getColumn(), stat);
+ }
+
+ int i = 0;
+ for (Column col : sortSchema.getColumns()) {
+ ColumnStats columnStat = statMap.get(col);
+ if (columnStat == null) {
+ continue;
+ }
+ if (columnStat.hasNullValue()) {
+ int rangeIndex = sortSpecs[i].isAscending() ? ranges.length - 1 : 0;
+ VTuple rangeTuple = sortSpecs[i].isAscending() ? (VTuple) ranges[rangeIndex].getEnd()
: (VTuple) ranges[rangeIndex].getStart();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Set null into range: " + col.getQualifiedName() + ", previous tuple
is " + rangeTuple);
+ }
+ rangeTuple.put(i, NullDatum.get());
+ LOG.info("Set null into range: " + col.getQualifiedName() + ", current tuple is "
+ rangeTuple);
+ }
+ i++;
+ }
+ }
+
+ public static TupleRange columnStatToRange(SortSpec [] sortSpecs, Schema target, List<ColumnStats>
colStats,
+ boolean checkNull) {
Map<Column, ColumnStats> statSet = Maps.newHashMap();
for (ColumnStats stat : colStats) {
@@ -98,16 +135,29 @@ public class TupleUtil {
else
startTuple.put(i, DatumFactory.createNullDatum());
- if (statSet.get(col).getMaxValue() != null)
- endTuple.put(i, statSet.get(col).getMaxValue());
- else
- endTuple.put(i, DatumFactory.createNullDatum());
+ if (checkNull) {
+ if (statSet.get(col).hasNullValue() || statSet.get(col).getMaxValue() == null)
+ endTuple.put(i, DatumFactory.createNullDatum());
+ else
+ endTuple.put(i, statSet.get(col).getMaxValue());
+ } else {
+ if (statSet.get(col).getMaxValue() != null)
+ endTuple.put(i, statSet.get(col).getMaxValue());
+ else
+ endTuple.put(i, DatumFactory.createNullDatum());
+ }
} else {
- if (statSet.get(col).getMaxValue() != null)
- startTuple.put(i, statSet.get(col).getMaxValue());
- else
- startTuple.put(i, DatumFactory.createNullDatum());
-
+ if (checkNull) {
+ if (statSet.get(col).hasNullValue() || statSet.get(col).getMaxValue() == null)
+ startTuple.put(i, DatumFactory.createNullDatum());
+ else
+ startTuple.put(i, statSet.get(col).getMaxValue());
+ } else {
+ if (statSet.get(col).getMaxValue() != null)
+ startTuple.put(i, statSet.get(col).getMaxValue());
+ else
+ startTuple.put(i, DatumFactory.createNullDatum());
+ }
if (statSet.get(col).getMinValue() != null)
endTuple.put(i, statSet.get(col).getMinValue());
else
http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 1cc5b78..055e9a2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -572,7 +572,7 @@ public class Repartitioner {
if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0 )
{
return;
}
- TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats());
+ TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(),
false);
RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
BigDecimal card = partitioner.getTotalCardinality();
@@ -580,16 +580,29 @@ public class Repartitioner {
// we set the the number of tasks to the number of range cardinality.
int determinedTaskNum;
if (card.compareTo(new BigDecimal(maxNum)) < 0) {
- LOG.info("The range cardinality (" + card
+ LOG.info(subQuery.getId() + ", The range cardinality (" + card
+ ") is less then the desired number of tasks (" + maxNum + ")");
determinedTaskNum = card.intValue();
} else {
determinedTaskNum = maxNum;
}
- LOG.info("Try to divide " + mergedRange + " into " + determinedTaskNum +
+ // for LOG
+ TupleRange mergedRangeForPrint = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(),
true);
+ LOG.info(subQuery.getId() + ", Try to divide " + mergedRangeForPrint + " into " + determinedTaskNum
+
" sub ranges (total units: " + determinedTaskNum + ")");
TupleRange [] ranges = partitioner.partition(determinedTaskNum);
+ if (ranges == null || ranges.length == 0) {
+ LOG.warn(subQuery.getId() + " no range infos.");
+ }
+ TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
+ if (LOG.isDebugEnabled()) {
+ if (ranges != null) {
+ for (TupleRange eachRange : ranges) {
+ LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
+ }
+ }
+ }
FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new
String[]{UNKNOWN_HOST});
SubQuery.scheduleFragment(subQuery, dummyFragment);
http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 991dc4b..c3f3827 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -342,10 +342,10 @@ public class Task {
builder.setResultStats(new TableStats().getProto());
}
- Iterator<Entry<Integer,String>> it = context.getShuffleFileOutputs();
+ Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
if (it.hasNext()) {
do {
- Entry<Integer,String> entry = it.next();
+ Entry<Integer, String> entry = it.next();
ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
part.setPartId(entry.getKey());
http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
index 0b1831c..a520e56 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java
@@ -21,13 +21,21 @@ package org.apache.tajo.engine.query;
import org.apache.tajo.IntegrationTest;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.KeyValueSet;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.sql.ResultSet;
import java.util.TimeZone;
+import static org.junit.Assert.assertEquals;
+
@Category(IntegrationTest.class)
public class TestSortQuery extends QueryTestCaseBase {
@@ -169,4 +177,45 @@ public class TestSortQuery extends QueryTestCaseBase {
assertResultSet(res);
cleanupQuery(res);
}
+
+ @Test
+ public final void testSortNullColumn() throws Exception {
+ try {
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname, "2");
+ KeyValueSet tableOptions = new KeyValueSet();
+ tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+ tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("name", Type.TEXT);
+ String[] data = new String[]{
+ "1|BRAZIL",
+ "2|ALGERIA",
+ "3|ARGENTINA",
+ "4|CANADA"
+ };
+ TajoTestingCluster.createTable("nullsort", schema, tableOptions, data, 2);
+
+ ResultSet res = executeString(
+ "select * from (" +
+ "select case when id > 2 then null else id end as col1, name as col2 from
nullsort) a " +
+ "order by col1, col2"
+ );
+
+ String expected = "col1,col2\n" +
+ "-------------------------------\n" +
+ "1,BRAZIL\n" +
+ "2,ALGERIA\n" +
+ "null,ARGENTINA\n" +
+ "null,CANADA\n";
+
+ assertEquals(expected, resultSetToString(res));
+
+ cleanupQuery(res);
+ } finally {
+ testingCluster.setAllTajoDaemonConfValue(ConfVars.TESTCASE_MIN_TASK_NUM.varname, "0");
+ executeString("DROP TABLE nullsort PURGE;").close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index 33b2ff3..5140a63 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -181,6 +181,7 @@ public class RowStoreUtil {
for (int i = 0; i < schema.size(); i++) {
if (tuple.isNull(i)) {
nullFlags.set(i);
+ continue;
}
col = schema.getColumn(i);
http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
index ac9bd8a..a2c08de 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
@@ -40,7 +40,6 @@ public class TableStatistics {
private long numRows = 0;
private long numBytes = 0;
-
private boolean [] comparable;
public TableStatistics(Schema schema) {
@@ -113,10 +112,10 @@ public class TableStatistics {
LOG.warn("Wrong statistics column type (" + minValues.get(i).type() +
", expected=" + schema.getColumn(i).getDataType().getType() + ")");
}
- if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type())
{
+ if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type())
{
columnStats.setMaxValue(maxValues.get(i));
} else {
- LOG.warn("Wrong statistics column type (" + minValues.get(i).type() +
+ LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() +
", expected=" + schema.getColumn(i).getDataType().getType() + ")");
}
stat.addColumnStat(columnStats);
http://git-wip-us.apache.org/repos/asf/tajo/blob/47d4fe22/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 5b76da5..12cd1a3 100644
--- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -597,7 +597,7 @@ public class TajoPullServerService extends AbstractService {
if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
comparator.compare(idxReader.getLastKey(), start) < 0) {
- LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey()
+
+ LOG.warn("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey()
+
"], but request start:" + start + ", end: " + end);
return null;
}
|