Repository: tajo
Updated Branches:
refs/heads/master 7c2a24090 -> 6e7f1c7b0
TAJO-1460 Apply TAJO-1407 to ExternalSortExec
Closes #474
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6e7f1c7b
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6e7f1c7b
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6e7f1c7b
Branch: refs/heads/master
Commit: 6e7f1c7b0561c7299a498b81dfb6148883a76a0d
Parents: 7c2a240
Author: babokim <babokim@gmail.com>
Authored: Thu Apr 2 15:22:37 2015 +0900
Committer: babokim <babokim@gmail.com>
Committed: Thu Apr 2 15:22:37 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../planner/physical/ComparableVector.java | 411 +++++++++++++++++++
.../planner/physical/ExternalSortExec.java | 238 ++++-------
.../engine/planner/physical/MemSortExec.java | 19 +-
.../tajo/engine/planner/physical/SortExec.java | 10 +-
.../engine/planner/physical/TupleSorter.java | 24 +-
.../planner/physical/VectorizedSorter.java | 155 +------
.../planner/physical/TestExternalSortExec.java | 4 +-
.../planner/physical/TestTupleSorter.java | 2 +-
.../testCrossJoinWithAsterisk1.sql | 2 +-
.../testCrossJoinWithAsterisk2.sql | 2 +-
.../testCrossJoinWithAsterisk3.sql | 2 +-
.../testCrossJoinWithAsterisk4.sql | 2 +-
.../TestJoinQuery/testComplexJoinCondition7.sql | 2 +-
.../testCrossJoinWithAsterisk1.sql | 2 +-
.../testCrossJoinWithAsterisk2.sql | 2 +-
.../testCrossJoinWithAsterisk3.sql | 2 +-
.../testCrossJoinWithAsterisk4.sql | 2 +-
.../queries/TestJoinQuery/testJoinWithJson.json | 8 +
.../testComplexJoinCondition7.result | 4 +-
.../apache/tajo/storage/AbstractScanner.java | 80 ++++
.../org/apache/tajo/storage/MemoryUtil.java | 4 +
22 files changed, 655 insertions(+), 325 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c3bce7d..1ccd016 100644
--- a/CHANGES
+++ b/CHANGES
@@ -10,6 +10,9 @@ Release 0.11.0 - unreleased
(jihun)
IMPROVEMENT
+
+ TAJO-1460: Apply TAJO-1407 to ExternalSortExec. (Contributed by navis,
+ Committed by hyoungjun)
TAJO-1350: Refactor FilterPushDownRule::visitJoin() into well-defined,
small methods. (jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
new file mode 100644
index 0000000..39b8c8a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ComparableVector.java
@@ -0,0 +1,411 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.primitives.Booleans;
+import com.google.common.primitives.Doubles;
+import com.google.common.primitives.Floats;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.Shorts;
+import com.google.common.primitives.UnsignedInts;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+
+import java.util.Arrays;
+import java.util.BitSet;
+
+/**
+ * Extract raw level values (primitive or String/byte[]) from each of key columns for compare
+ */
+public class ComparableVector {
+
+ protected final Tuple[] tuples; // source tuples
+ protected final TupleVector[] vectors; // values of key columns
+ protected final int[] keyIndex;
+
+ public ComparableVector(int length, SortSpec[] sortKeys, int[] keyIndex) {
+ tuples = new Tuple[length];
+ vectors = new TupleVector[sortKeys.length];
+ for (int i = 0; i < vectors.length; i++) {
+ TajoDataTypes.Type type = sortKeys[i].getSortKey().getDataType().getType();
+ boolean nullFirst = sortKeys[i].isNullFirst();
+ boolean ascending = sortKeys[i].isAscending();
+ boolean nullInvert = nullFirst && ascending || !nullFirst && !ascending;
+ vectors[i] = new TupleVector(vectorType(type), tuples.length, nullInvert, ascending);
+ }
+ this.keyIndex = keyIndex;
+ }
+
+ public int compare(final int i1, final int i2) {
+ for (TupleVector vector : vectors) {
+ int compare = vector.compare(i1, i2);
+ if (compare != 0) {
+ return compare;
+ }
+ }
+ return 0;
+ }
+
+ public void set(int index, Tuple tuple) {
+ for (int i = 0; i < vectors.length; i++) {
+ vectors[i].set(index, tuple, keyIndex[i]);
+ }
+ }
+
+ protected static class TupleVector {
+
+ private final int type;
+ private final BitSet nulls;
+ private final boolean nullInvert;
+ private final boolean ascending;
+
+ private boolean[] booleans;
+ private byte[] bits;
+ private short[] shorts;
+ private int[] ints;
+ private long[] longs;
+ private float[] floats;
+ private double[] doubles;
+ private byte[][] bytes;
+
+ private int index;
+
+ private TupleVector(int type, int length, boolean nullInvert, boolean ascending) {
+ this.type = type;
+ this.nulls = new BitSet(length);
+ this.nullInvert = nullInvert;
+ this.ascending = ascending;
+ switch (type) {
+ case 0: booleans = new boolean[length]; break;
+ case 1: bits = new byte[length]; break;
+ case 2: shorts = new short[length]; break;
+ case 3: ints = new int[length]; break;
+ case 4: longs = new long[length]; break;
+ case 5: floats = new float[length]; break;
+ case 6: doubles = new double[length]; break;
+ case 7: bytes = new byte[length][]; break;
+ case 8: ints = new int[length]; break;
+ case -1: break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ protected final void append(Tuple tuple, int field) {
+ set(index++, tuple, field);
+ }
+
+ protected final void set(int index, Tuple tuple, int field) {
+ if (tuple.isNull(field)) {
+ nulls.set(index);
+ return;
+ }
+ nulls.clear(index);
+ switch (type) {
+ case 0: booleans[index] = tuple.getBool(field); break;
+ case 1: bits[index] = tuple.getByte(field); break;
+ case 2: shorts[index] = tuple.getInt2(field); break;
+ case 3: ints[index] = tuple.getInt4(field); break;
+ case 4: longs[index] = tuple.getInt8(field); break;
+ case 5: floats[index] = tuple.getFloat4(field); break;
+ case 6: doubles[index] = tuple.getFloat8(field); break;
+ case 7: bytes[index] = tuple.getBytes(field); break;
+ case 8: ints[index] = tuple.getInt4(field); break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+
+ protected final int compare(int index1, int index2) {
+ final boolean n1 = nulls.get(index1);
+ final boolean n2 = nulls.get(index2);
+ if (n1 && n2) {
+ return 0;
+ }
+ if (n1 ^ n2) {
+ int compVal = n1 ? 1 : -1;
+ return nullInvert ? -compVal : compVal;
+ }
+ int compare;
+ switch (type) {
+ case 0: compare = Booleans.compare(booleans[index1], booleans[index2]); break;
+ case 1: compare = bits[index1] - bits[index2]; break;
+ case 2: compare = Shorts.compare(shorts[index1], shorts[index2]); break;
+ case 3: compare = Ints.compare(ints[index1], ints[index2]); break;
+ case 4: compare = Longs.compare(longs[index1], longs[index2]); break;
+ case 5: compare = Floats.compare(floats[index1], floats[index2]); break;
+ case 6: compare = Doubles.compare(doubles[index1], doubles[index2]); break;
+ case 7: compare = TextDatum.COMPARATOR.compare(bytes[index1], bytes[index2]); break;
+ case 8: compare = UnsignedInts.compare(ints[index1], ints[index2]); break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ return ascending ? compare : -compare;
+ }
+ }
+
+ public static class ComparableTuple {
+
+ private final TupleType[] keyTypes;
+ private final int[] keyIndex;
+ private final Object[] keys;
+
+ public ComparableTuple(Schema schema, int[] keyIndex) {
+ this(tupleTypes(schema, keyIndex), keyIndex);
+ }
+
+ public ComparableTuple(Schema schema, int start, int end) {
+ this(schema, toKeyIndex(start, end));
+ }
+
+ private ComparableTuple(TupleType[] keyTypes, int[] keyIndex) {
+ this.keyTypes = keyTypes;
+ this.keyIndex = keyIndex;
+ this.keys = new Object[keyIndex.length];
+ }
+
+ public int size() {
+ return keyIndex.length;
+ }
+
+ public void set(Tuple tuple) {
+ for (int i = 0; i < keyTypes.length; i++) {
+ final int field = keyIndex[i];
+ if (tuple.isNull(field)) {
+ keys[i] = null;
+ continue;
+ }
+ switch (keyTypes[i]) {
+ case BOOLEAN: keys[i] = tuple.getBool(field); break;
+ case BIT: keys[i] = tuple.getByte(field); break;
+ case INT1:
+ case INT2: keys[i] = tuple.getInt2(field); break;
+ case INT4:
+ case DATE:
+ case INET4: keys[i] = tuple.getInt4(field); break;
+ case INT8:
+ case TIME:
+ case TIMESTAMP: keys[i] = tuple.getInt8(field); break;
+ case FLOAT4: keys[i] = tuple.getFloat4(field); break;
+ case FLOAT8: keys[i] = tuple.getFloat8(field); break;
+ case TEXT:
+ case CHAR:
+ case BLOB: keys[i] = tuple.getBytes(field); break;
+ case DATUM: keys[i] = tuple.get(field); break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ ComparableTuple other = (ComparableTuple)obj;
+ for (int i = 0; i < keys.length; i++) {
+ final boolean n1 = keys[i] == null;
+ final boolean n2 = other.keys[i] == null;
+ if (n1 && n2) {
+ continue;
+ }
+ if (n1 ^ n2) {
+ return false;
+ }
+ switch (keyTypes[i]) {
+ case TEXT:
+ case CHAR:
+ case BLOB: if (!Arrays.equals((byte[])keys[i], (byte[])other.keys[i])) return false; continue;
+ default: if (!keys[i].equals(other.keys[i])) return false; continue;
+ }
+ }
+ return true;
+ }
+
+ public boolean equals(Tuple tuple) {
+ for (int i = 0; i < keys.length; i++) {
+ final int field = keyIndex[i];
+ final boolean n1 = keys[i] == null;
+ final boolean n2 = tuple.isNull(field);
+ if (n1 && n2) {
+ continue;
+ }
+ if (n1 ^ n2) {
+ return false;
+ }
+ switch (keyTypes[i]) {
+ case BOOLEAN: if ((Boolean)keys[i] != tuple.getBool(field)) return false; continue;
+ case BIT: if ((Byte)keys[i] != tuple.getByte(field)) return false; continue;
+ case INT1:
+ case INT2: if ((Short)keys[i] != tuple.getInt2(field)) return false; continue;
+ case INT4:
+ case DATE:
+ case INET4: if ((Integer)keys[i] != tuple.getInt4(field)) return false; continue;
+ case INT8:
+ case TIME:
+ case TIMESTAMP: if ((Long)keys[i] != tuple.getInt8(field)) return false; continue;
+ case FLOAT4: if ((Float)keys[i] != tuple.getFloat4(field)) return false; continue;
+ case FLOAT8: if ((Double)keys[i] != tuple.getFloat8(field)) return false; continue;
+ case TEXT:
+ case CHAR:
+ case BLOB: if (!Arrays.equals((byte[])keys[i], tuple.getBytes(field))) return false; continue;
+ case DATUM: if (!keys[i].equals(tuple.get(field))) return false; continue;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 1;
+ for (Object key : keys) {
+ int hash = key == null ? 0 :
+ key instanceof byte[] ? Arrays.hashCode((byte[])key) : key.hashCode();
+ result = 31 * result + hash;
+ }
+ return result;
+ }
+
+ public ComparableTuple copy() {
+ ComparableTuple copy = emptyCopy();
+ System.arraycopy(keys, 0, copy.keys, 0, keys.length);
+ return copy;
+ }
+
+ public ComparableTuple emptyCopy() {
+ return new ComparableTuple(keyTypes, keyIndex);
+ }
+
+ public VTuple toVTuple() {
+ VTuple vtuple = new VTuple(keyIndex.length);
+ for (int i = 0; i < keyIndex.length; i++) {
+ vtuple.put(i, toDatum(i));
+ }
+ return vtuple;
+ }
+
+ public Datum toDatum(int i) {
+ if (keys[i] == null) {
+ return NullDatum.get();
+ }
+ switch (keyTypes[i]) {
+ case NULL_TYPE: return NullDatum.get();
+ case BOOLEAN: return DatumFactory.createBool((Boolean) keys[i]);
+ case BIT: return DatumFactory.createBit((Byte)keys[i]);
+ case INT1:
+ case INT2: return DatumFactory.createInt2((Short) keys[i]);
+ case INT4: return DatumFactory.createInt4((Integer) keys[i]);
+ case DATE: return DatumFactory.createDate((Integer) keys[i]);
+ case INET4: return DatumFactory.createInet4((Integer) keys[i]);
+ case INT8: return DatumFactory.createInt8((Long) keys[i]);
+ case TIME: return DatumFactory.createTime((Long) keys[i]);
+ case TIMESTAMP: return DatumFactory.createTimestamp((Long) keys[i]);
+ case FLOAT4: return DatumFactory.createFloat4((Float) keys[i]);
+ case FLOAT8: return DatumFactory.createFloat8((Double) keys[i]);
+ case TEXT: return DatumFactory.createText((byte[]) keys[i]);
+ case CHAR: return DatumFactory.createChar((byte[]) keys[i]);
+ case BLOB: return DatumFactory.createBlob((byte[]) keys[i]);
+ case DATUM: return (Datum)keys[i];
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+ }
+
+ public static boolean isVectorizable(SortSpec[] sortKeys) {
+ if (sortKeys.length == 0) {
+ return false;
+ }
+ for (SortSpec spec : sortKeys) {
+ try {
+ vectorType(spec.getSortKey().getDataType().getType());
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static int vectorType(TajoDataTypes.Type type) {
+ switch (type) {
+ case BOOLEAN: return 0;
+ case BIT: return 1;
+ case INT1: case INT2: return 2;
+ case INT4: case DATE: return 3;
+ case INT8: case TIME: case TIMESTAMP: case INTERVAL: return 4;
+ case FLOAT4: return 5;
+ case FLOAT8: return 6;
+ case TEXT: case CHAR: case BLOB: return 7;
+ case INET4: return 8;
+ case NULL_TYPE: return -1;
+ }
+ // todo
+ throw new UnsupportedException(type.name());
+ }
+
+ private static TupleType[] tupleTypes(Schema schema, int[] keyIndex) {
+ TupleType[] types = new TupleType[keyIndex.length];
+ for (int i = 0; i < keyIndex.length; i++) {
+ types[i] = tupleType(schema.getColumn(keyIndex[i]).getDataType().getType());
+ }
+ return types;
+ }
+
+ private static TupleType tupleType(TajoDataTypes.Type type) {
+ switch (type) {
+ case BOOLEAN: return TupleType.BOOLEAN;
+ case BIT: return TupleType.BIT;
+ case INT1: return TupleType.INT1;
+ case INT2: return TupleType.INT2;
+ case INT4: return TupleType.INT4;
+ case DATE: return TupleType.DATE;
+ case INT8: return TupleType.INT8;
+ case TIME: return TupleType.TIME;
+ case TIMESTAMP: return TupleType.TIMESTAMP;
+ case FLOAT4: return TupleType.FLOAT4;
+ case FLOAT8: return TupleType.FLOAT8;
+ case TEXT: return TupleType.TEXT;
+ case CHAR: return TupleType.CHAR;
+ case BLOB: return TupleType.BLOB;
+ case INET4: return TupleType.INET4;
+ case NULL_TYPE: return TupleType.NULL_TYPE;
+ default: return TupleType.DATUM;
+ }
+ }
+
+ private static int[] toKeyIndex(int start, int end) {
+ int[] keyIndex = new int[end - start];
+ for (int i = 0; i < keyIndex.length; i++) {
+ keyIndex[i] = start + i;
+ }
+ return keyIndex;
+ }
+
+ private static enum TupleType {
+ NULL_TYPE, BOOLEAN, BIT, INT1, INT2, INT4, DATE, INET4, INT8, TIME, TIMESTAMP,
+ FLOAT4, FLOAT8, TEXT, CHAR, BLOB, DATUM
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index b3ebfb2..355f015 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.SessionVars;
import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -168,14 +167,14 @@ public class ExternalSortExec extends SortExec {
int rowNum = tupleBlock.size();
long sortStart = System.currentTimeMillis();
- Collections.sort(tupleBlock, getComparator());
+ Iterable<Tuple> sorted = getSorter(tupleBlock).sort();
long sortEnd = System.currentTimeMillis();
long chunkWriteStart = System.currentTimeMillis();
Path outputPath = getChunkPathForWrite(0, chunkId);
final RawFileAppender appender = new RawFileAppender(context.getConf(), null, inSchema, meta, outputPath);
appender.init();
- for (Tuple t : tupleBlock) {
+ for (Tuple t : sorted) {
appender.addTuple(t);
}
appender.close();
@@ -236,18 +235,13 @@ public class ExternalSortExec extends SortExec {
}
}
- if (inMemoryTable.size() > 0) { // if there are at least one or more input tuples
- if (!memoryResident) { // check if data exceeds a sort buffer. If so, it store the remain data into a chunk.
- if (inMemoryTable.size() > 0) {
- long start = System.currentTimeMillis();
- int rowNum = inMemoryTable.size();
- chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
- long end = System.currentTimeMillis();
- info(LOG, "Last Chunk #" + chunkId + " " + rowNum + " rows written (" + (end - start) + " msec)");
- }
- } else { // this case means that all data does not exceed a sort buffer
- Collections.sort(inMemoryTable, getComparator());
- }
+ if (!memoryResident && !inMemoryTable.isEmpty()) { // if there are at least one or more input tuples
+ // check if data exceeds a sort buffer. If so, it store the remain data into a chunk.
+ long start = System.currentTimeMillis();
+ int rowNum = inMemoryTable.size();
+ chunkPaths.add(sortAndStoreChunk(chunkId, inMemoryTable));
+ long end = System.currentTimeMillis();
+ info(LOG, "Last Chunk #" + chunkId + " " + rowNum + " rows written (" + (end - start) + " msec)");
}
// get total loaded (or stored) bytes and total row numbers
@@ -285,7 +279,8 @@ public class ExternalSortExec extends SortExec {
info(LOG, "Chunks creation time: " + (endTimeOfChunkSplit - startTimeOfChunkSplit) + " msec");
if (memoryResident) { // if all sorted data reside in a main-memory table.
- this.result = new MemTableScanner();
+ TupleSorter sorter = getSorter(inMemoryTable);
+ result = new MemTableScanner(sorter.sort(), inMemoryTable.size(), sortAndStoredBytes);
} else { // if input data exceeds main-memory at least once
try {
@@ -314,7 +309,7 @@ public class ExternalSortExec extends SortExec {
return result.next();
}
- private int calculateFanout(int remainInputChunks, int intputNum, int outputNum, int startIdx) {
+ private int calculateFanout(int remainInputChunks, int inputNum, int outputNum, int startIdx) {
int computedFanout = Math.min(remainInputChunks, defaultFanout);
// Why should we detect an opportunity for unbalanced merge?
@@ -322,9 +317,9 @@ public class ExternalSortExec extends SortExec {
// Assume that a fanout is given by 8 and there are 10 chunks.
// If we firstly merge 3 chunks into one chunk, there remain only 8 chunks.
// Then, we can just finish the merge phase even though we don't complete merge phase on all chunks.
- if (checkIfCanBeUnbalancedMerged(intputNum - (startIdx + computedFanout), outputNum + 1)) {
+ if (checkIfCanBeUnbalancedMerged(inputNum - (startIdx + computedFanout), outputNum + 1)) {
int candidateFanout = computedFanout;
- while(checkIfCanBeUnbalancedMerged(intputNum - (startIdx + candidateFanout), outputNum + 1)) {
+ while (checkIfCanBeUnbalancedMerged(inputNum - (startIdx + candidateFanout), outputNum + 1)) {
candidateFanout--;
}
int beforeFanout = computedFanout;
@@ -354,7 +349,7 @@ public class ExternalSortExec extends SortExec {
int remainInputRuns = inputFiles.size();
int outChunkId = 0;
int outputFileNum = 0;
- List<Future> futures = TUtil.newList();
+ List<Future<FileFragment>> futures = TUtil.newList();
// the number of files being merged in threads.
List<Integer> numberOfMergingFiles = TUtil.newList();
@@ -419,7 +414,7 @@ public class ExternalSortExec extends SortExec {
*/
int numDeletedFiles = 0;
for (FileFragment frag : inputFiles) {
- if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX) == true) {
+ if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX)) {
localFS.delete(frag.getPath(), true);
numDeletedFiles++;
LOG.info("Delete merged intermediate file: " + frag);
@@ -527,28 +522,38 @@ public class ExternalSortExec extends SortExec {
throws IOException {
if (num > 1) {
final int mid = (int) Math.ceil((float)num / 2);
- return new PairWiseMerger(inSchema,
- createKWayMergerInternal(sources, startIdx, mid),
- createKWayMergerInternal(sources, startIdx + mid, num - mid), getComparator());
+ Scanner left = createKWayMergerInternal(sources, startIdx, mid);
+ Scanner right = createKWayMergerInternal(sources, startIdx + mid, num - mid);
+ if (ComparableVector.isVectorizable(sortSpecs)) {
+ return new VectorComparePairWiseMerger(inSchema, left, right, comparator);
+ }
+ return new PairWiseMerger(inSchema, left, right, comparator);
} else {
return sources[startIdx];
}
}
- private class MemTableScanner implements Scanner {
- Iterator<Tuple> iterator;
+ private static class MemTableScanner extends AbstractScanner {
+ final Iterable<Tuple> iterable;
+ final long sortAndStoredBytes;
+ final int totalRecords;
+ Iterator<Tuple> iterator;
// for input stats
float scannerProgress;
int numRecords;
- int totalRecords;
TableStats scannerTableStats;
+ public MemTableScanner(Iterable<Tuple> iterable, int length, long inBytes) {
+ this.iterable = iterable;
+ this.totalRecords = length;
+ this.sortAndStoredBytes = inBytes;
+ }
+
@Override
public void init() throws IOException {
- iterator = inMemoryTable.iterator();
+ iterator = iterable.iterator();
- totalRecords = inMemoryTable.size();
scannerProgress = 0.0f;
numRecords = 0;
@@ -581,34 +586,6 @@ public class ExternalSortExec extends SortExec {
}
@Override
- public boolean isProjectable() {
- return false;
- }
-
- @Override
- public void setTarget(Column[] targets) {
- }
-
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- @Override
- public void setSearchCondition(Object expr) {
- }
-
- @Override
- public boolean isSplittable() {
- return false;
- }
-
- @Override
- public Schema getSchema() {
- return null;
- }
-
- @Override
public float getProgress() {
if (iterator != null && numRecords > 0) {
return (float)numRecords / (float)totalRecords;
@@ -630,19 +607,43 @@ public class ExternalSortExec extends SortExec {
CLOSED
}
+ private static class VectorComparePairWiseMerger extends PairWiseMerger {
+
+ private ComparableVector comparable;
+
+ public VectorComparePairWiseMerger(Schema schema, Scanner leftScanner, Scanner rightScanner,
+ BaseTupleComparator comparator) throws IOException {
+ super(schema, leftScanner, rightScanner, null);
+ comparable = new ComparableVector(2, comparator.getSortSpecs(), comparator.getSortKeyIds());
+ }
+
+ @Override
+ protected Tuple prepare(int index, Tuple tuple) {
+ if (tuple != null) {
+ comparable.set(index, tuple);
+ }
+ return tuple;
+ }
+
+ @Override
+ protected int compare() {
+ return comparable.compare(0, 1);
+ }
+ }
+
/**
* Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order.
*/
- private static class PairWiseMerger implements Scanner {
- private Scanner leftScan;
- private Scanner rightScan;
+ private static class PairWiseMerger extends AbstractScanner {
- private VTuple outTuple;
- private VTuple leftTuple;
- private VTuple rightTuple;
+ protected final Schema schema;
+ protected final Comparator<Tuple> comparator;
- private final Schema schema;
- private final Comparator<Tuple> comparator;
+ protected final Scanner leftScan;
+ protected final Scanner rightScan;
+
+ private Tuple leftTuple;
+ private Tuple rightTuple;
private float mergerProgress;
private TableStats mergerInputStats;
@@ -679,74 +680,30 @@ public class ExternalSortExec extends SortExec {
}
private void prepareTuplesForFirstComparison() throws IOException {
- Tuple lt = leftScan.next();
- if (lt != null) {
- leftTuple = new VTuple(lt);
- } else {
- leftTuple = null; // TODO - missed free
- }
-
- Tuple rt = rightScan.next();
- if (rt != null) {
- rightTuple = new VTuple(rt);
- } else {
- rightTuple = null; // TODO - missed free
- }
+ leftTuple = prepare(0, leftScan.next());
+ rightTuple = prepare(1, rightScan.next());
}
- public Tuple next() throws IOException {
+ protected Tuple prepare(int index, Tuple tuple) {
+ return tuple == null ? null : new VTuple(tuple);
+ }
- if (leftTuple != null && rightTuple != null) {
- if (comparator.compare(leftTuple, rightTuple) < 0) {
- outTuple = new VTuple(leftTuple);
+ protected int compare() {
+ return comparator.compare(leftTuple, rightTuple);
+ }
- Tuple lt = leftScan.next();
- if (lt != null) {
- leftTuple = new VTuple(lt);
- } else {
- leftTuple = null; // TODO - missed free
- }
- } else {
- outTuple = new VTuple(rightTuple);
-
- Tuple rt = rightScan.next();
- if (rt != null) {
- rightTuple = new VTuple(rt);
- } else {
- rightTuple = null; // TODO - missed free
- }
- }
- return outTuple;
+ public Tuple next() throws IOException {
+ if (leftTuple == null && rightTuple == null) {
+ return null;
}
-
- if (leftTuple == null) {
- if (rightTuple != null) {
- outTuple = new VTuple(rightTuple);
- } else {
- outTuple = null;
- }
-
- Tuple rt = rightScan.next();
- if (rt != null) {
- rightTuple = new VTuple(rt);
- } else {
- rightTuple = null; // TODO - missed free
- }
- } else {
- if (leftTuple != null) {
- outTuple = new VTuple(leftTuple);
- } else {
- outTuple = null;
- }
-
- Tuple lt = leftScan.next();
- if (lt != null) {
- leftTuple = new VTuple(lt);
- } else {
- leftTuple = null; // TODO - missed free
- }
+ if (rightTuple == null || (leftTuple != null && compare() < 0)) {
+ Tuple tuple = leftTuple;
+ leftTuple = prepare(0, leftScan.next());
+ return tuple;
}
- return outTuple;
+ Tuple tuple = rightTuple;
+ rightTuple = prepare(1, rightScan.next());
+ return tuple;
}
@Override
@@ -755,7 +712,6 @@ public class ExternalSortExec extends SortExec {
leftScan.reset();
rightScan.reset();
- outTuple = null;
leftTuple = null;
rightTuple = null;
@@ -765,39 +721,15 @@ public class ExternalSortExec extends SortExec {
}
}
+ @Override
public void close() throws IOException {
IOUtils.cleanup(LOG, leftScan, rightScan);
getInputStats();
- leftScan = null;
- rightScan = null;
mergerProgress = 1.0f;
setState(State.CLOSED);
}
@Override
- public boolean isProjectable() {
- return false;
- }
-
- @Override
- public void setTarget(Column[] targets) {
- }
-
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- @Override
- public void setSearchCondition(Object expr) {
- }
-
- @Override
- public boolean isSplittable() {
- return false;
- }
-
- @Override
public Schema getSchema() {
return schema;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
index a2e039c..f76e356 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
@@ -25,11 +25,10 @@ import org.apache.tajo.storage.VTuple;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-public class MemSortExec extends SortExec implements TupleSorter {
+public class MemSortExec extends SortExec {
private SortNode plan;
private List<Tuple> tupleSlots;
private boolean sorted = false;
@@ -54,7 +53,7 @@ public class MemSortExec extends SortExec implements TupleSorter {
while (!context.isStopped() && (tuple = child.next()) != null) {
tupleSlots.add(new VTuple(tuple));
}
- iterator = getSorter().sort();
+ iterator = getSorter(tupleSlots).sort().iterator();
sorted = true;
}
@@ -65,14 +64,6 @@ public class MemSortExec extends SortExec implements TupleSorter {
}
}
- private TupleSorter getSorter() {
- try {
- return new VectorizedSorter(tupleSlots, sortSpecs, comparator.getSortKeyIds());
- } catch (Exception e) {
- return this;
- }
- }
-
@Override
public void rescan() throws IOException {
super.rescan();
@@ -92,10 +83,4 @@ public class MemSortExec extends SortExec implements TupleSorter {
public SortNode getPlan() {
return this.plan;
}
-
- @Override
- public Iterator<Tuple> sort() {
- Collections.sort(tupleSlots, comparator);
- return tupleSlots.iterator();
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
index fb6a3b2..28be9de 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
@@ -22,13 +22,14 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.storage.BaseTupleComparator;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.Comparator;
+import java.util.List;
public abstract class SortExec extends UnaryPhysicalExec {
+
protected final BaseTupleComparator comparator;
protected final SortSpec [] sortSpecs;
@@ -39,6 +40,13 @@ public abstract class SortExec extends UnaryPhysicalExec {
this.comparator = new BaseTupleComparator(inSchema, sortSpecs);
}
+ protected TupleSorter getSorter(List<Tuple> tupleSlots) {
+ if (!tupleSlots.isEmpty() && ComparableVector.isVectorizable(sortSpecs)) {
+ return new VectorizedSorter(tupleSlots, sortSpecs, comparator.getSortKeyIds());
+ }
+ return new TupleSorter.DefaultSorter(tupleSlots, comparator);
+ }
+
public SortSpec[] getSortSpecs() {
return sortSpecs;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
index d240e4a..57fe816 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TupleSorter.java
@@ -19,9 +19,29 @@
package org.apache.tajo.engine.planner.physical;
import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
-import java.util.Iterator;
+import java.util.Collections;
+import java.util.List;
public interface TupleSorter {
- Iterator<Tuple> sort();
+
+ Iterable<Tuple> sort();
+
+ public static class DefaultSorter implements TupleSorter {
+
+ private final List<Tuple> target;
+ private final TupleComparator comparator;
+
+ public DefaultSorter(List<Tuple> target, TupleComparator comparator) {
+ this.target = target;
+ this.comparator = comparator;
+ }
+
+ @Override
+ public Iterable<Tuple> sort() {
+ Collections.sort(target, comparator);
+ return target;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
index 891d104..18d853f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/VectorizedSorter.java
@@ -18,22 +18,12 @@
package org.apache.tajo.engine.planner.physical;
-import com.google.common.primitives.Booleans;
-import com.google.common.primitives.Doubles;
-import com.google.common.primitives.Floats;
-import com.google.common.primitives.Ints;
-import com.google.common.primitives.Longs;
-import com.google.common.primitives.Shorts;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.QuickSort;
import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.TextDatum;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.storage.Tuple;
-import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
@@ -41,26 +31,17 @@ import java.util.List;
* Extract raw level values (primitive or String/byte[]) from each of key columns before sorting
* Uses indirection for efficient swapping
*/
-public class VectorizedSorter implements IndexedSortable, TupleSorter {
+public class VectorizedSorter extends ComparableVector implements IndexedSortable, TupleSorter {
- private final Tuple[] tuples; // source tuples
- private final TupleVector[] vectors; // values of key columns
private final int[] mappings; // index indirection
public VectorizedSorter(List<Tuple> source, SortSpec[] sortKeys, int[] keyIndex) {
- this.tuples = source.toArray(new Tuple[source.size()]);
- vectors = new TupleVector[sortKeys.length];
+ super(source.size(), sortKeys, keyIndex);
+ source.toArray(tuples); // wish it's array list
mappings = new int[tuples.length];
- for (int i = 0; i < vectors.length; i++) {
- TajoDataTypes.Type type = sortKeys[i].getSortKey().getDataType().getType();
- boolean nullFirst = sortKeys[i].isNullFirst();
- boolean ascending = sortKeys[i].isAscending();
- boolean nullInvert = nullFirst && ascending || !nullFirst && !ascending;
- vectors[i] = new TupleVector(TupleVector.getType(type), tuples.length, nullInvert, ascending);
- }
for (int i = 0; i < tuples.length; i++) {
for (int j = 0; j < keyIndex.length; j++) {
- vectors[j].add(tuples[i].get(keyIndex[j]));
+ vectors[j].append(tuples[i], keyIndex[j]);
}
mappings[i] = i;
}
@@ -68,15 +49,7 @@ public class VectorizedSorter implements IndexedSortable, TupleSorter {
@Override
public int compare(int i1, int i2) {
- final int index1 = mappings[i1];
- final int index2 = mappings[i2];
- for (TupleVector vector : vectors) {
- int compare = vector.compare(index1, index2);
- if (compare != 0) {
- return compare;
- }
- }
- return 0;
+ return super.compare(mappings[i1], mappings[i2]);
}
@Override
@@ -87,112 +60,18 @@ public class VectorizedSorter implements IndexedSortable, TupleSorter {
}
@Override
- public Iterator<Tuple> sort() {
- new QuickSort().sort(VectorizedSorter.this, 0, mappings.length);
- return new Iterator<Tuple>() {
- int index;
- public boolean hasNext() { return index < mappings.length; }
- public Tuple next() { return tuples[mappings[index++]]; }
- public void remove() { throw new UnsupportedException(); }
- };
- }
-
- private static class TupleVector {
-
- private final int type;
- private final BitSet nulls;
- private final boolean nullInvert;
- private final boolean ascending;
-
- private boolean[] booleans;
- private byte[] bits;
- private short[] shorts;
- private int[] ints;
- private long[] longs;
- private float[] floats;
- private double[] doubles;
- private byte[][] bytes;
-
- private int index;
-
- private TupleVector(int type, int length, boolean nullInvert, boolean ascending) {
- this.type = type;
- this.nulls = new BitSet(length);
- this.nullInvert = nullInvert;
- this.ascending = ascending;
- switch (type) {
- case 0: booleans = new boolean[length]; break;
- case 1: bits = new byte[length]; break;
- case 2: shorts = new short[length]; break;
- case 3: ints = new int[length]; break;
- case 4: longs = new long[length]; break;
- case 5: floats = new float[length]; break;
- case 6: doubles = new double[length]; break;
- case 7: bytes = new byte[length][]; break;
- default:
- throw new IllegalArgumentException();
- }
- }
-
- private void add(Datum datum) {
- if (datum.isNull()) {
- nulls.set(index++);
- return;
- }
- switch (type) {
- case 0: booleans[index] = datum.asBool(); break;
- case 1: bits[index] = datum.asByte(); break;
- case 2: shorts[index] = datum.asInt2(); break;
- case 3: ints[index] = datum.asInt4(); break;
- case 4: longs[index] = datum.asInt8(); break;
- case 5: floats[index] = datum.asFloat4(); break;
- case 6: doubles[index] = datum.asFloat8(); break;
- case 7: bytes[index] = datum.asByteArray(); break;
- default:
- throw new IllegalArgumentException();
- }
- index++;
- }
-
- private int compare(int index1, int index2) {
- final boolean n1 = nulls.get(index1);
- final boolean n2 = nulls.get(index2);
- if (n1 && n2) {
- return 0;
- }
- if (n1 ^ n2) {
- int compVal = n1 ? 1 : -1;
- return nullInvert ? -compVal : compVal;
+ public Iterable<Tuple> sort() {
+ new QuickSort().sort(this, 0, mappings.length);
+ return new Iterable<Tuple>() {
+ @Override
+ public Iterator<Tuple> iterator() {
+ return new Iterator<Tuple>() {
+ int index;
+ public boolean hasNext() { return index < mappings.length; }
+ public Tuple next() { return tuples[mappings[index++]]; }
+ public void remove() { throw new UnsupportedException(); }
+ };
}
- int compare;
- switch (type) {
- case 0: compare = Booleans.compare(booleans[index1], booleans[index2]); break;
- case 1: compare = bits[index1] - bits[index2]; break;
- case 2: compare = Shorts.compare(shorts[index1], shorts[index2]); break;
- case 3: compare = Ints.compare(ints[index1], ints[index2]); break;
- case 4: compare = Longs.compare(longs[index1], longs[index2]); break;
- case 5: compare = Floats.compare(floats[index1], floats[index2]); break;
- case 6: compare = Doubles.compare(doubles[index1], doubles[index2]); break;
- case 7: compare = TextDatum.COMPARATOR.compare(bytes[index1], bytes[index2]); break;
- default:
- throw new IllegalArgumentException();
- }
- return ascending ? compare : -compare;
- }
-
- public static int getType(TajoDataTypes.Type type) {
- switch (type) {
- case BOOLEAN: return 0;
- case BIT: case INT1: return 1;
- case INT2: return 2;
- case INT4: case DATE: case INET4: return 3;
- case INT8: case TIME: case TIMESTAMP: case INTERVAL: return 4;
- case FLOAT4: return 5;
- case FLOAT8: return 6;
- case TEXT: case CHAR: case BLOB: return 7;
- }
- // todo
- throw new UnsupportedException(type.name());
- }
+ };
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 5d9d46d..946e0f3 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -61,7 +61,7 @@ public class TestExternalSortExec {
private LogicalPlanner planner;
private Path testDir;
- private final int numTuple = 100000;
+ private final int numTuple = 3000000;
private Random rnd = new Random(System.currentTimeMillis());
private TableDesc employee;
@@ -161,7 +161,7 @@ public class TestExternalSortExec {
if (preVal != null) {
assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0);
}
- preVal = curVal;
+ preVal = new VTuple(curVal);
cnt++;
}
long end = System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
index fc43d42..9cc477a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestTupleSorter.java
@@ -93,7 +93,7 @@ public class TestTupleSorter {
long start = System.currentTimeMillis();
VectorizedSorter sorter = new VectorizedSorter(target, sortKeys, keyIndices);
- Iterator<Tuple> iterator = sorter.sort();
+ Iterator<Tuple> iterator = sorter.sort().iterator();
String[] result1 = new String[SAMPLING];
for (int i = 0; i < result1.length; i++) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql
index 5451b4a..111a371 100644
--- a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk1.sql
@@ -1 +1 @@
-select region.*, customer.* from region, customer order by r_regionkey,r_name;
\ No newline at end of file
+select region.*, customer.* from region, customer order by r_regionkey,r_name,c_custkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql
index e9dac51..ca1672e 100644
--- a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk2.sql
@@ -1 +1 @@
-select region.*, customer.* from customer, region order by r_regionkey,r_name;
\ No newline at end of file
+select region.*, customer.* from customer, region order by r_regionkey,r_name,c_custkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql
index c98e19f..fd44916 100644
--- a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk3.sql
@@ -1 +1 @@
-select * from customer, region order by c_custkey,c_name;
\ No newline at end of file
+select * from customer, region order by c_custkey,c_name,r_regionkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql
index 7130def..fc5b1c3 100644
--- a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testCrossJoinWithAsterisk4.sql
@@ -1 +1 @@
-select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name;
\ No newline at end of file
+select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name,c_custkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql
index ddd669c..d2114cf 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinCondition7.sql
@@ -3,4 +3,4 @@ select
n1.n_name,
n2.n_name
from nation n1 join (select * from nation union select * from nation) n2 on substr(n1.n_name, 1, 4) = substr(n2.n_name, 1, 4)
-order by n1.n_nationkey;
\ No newline at end of file
+order by n1.n_nationkey,n2.n_name;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql
index 5451b4a..111a371 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk1.sql
@@ -1 +1 @@
-select region.*, customer.* from region, customer order by r_regionkey,r_name;
\ No newline at end of file
+select region.*, customer.* from region, customer order by r_regionkey,r_name,c_custkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql
index e9dac51..ca1672e 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk2.sql
@@ -1 +1 @@
-select region.*, customer.* from customer, region order by r_regionkey,r_name;
\ No newline at end of file
+select region.*, customer.* from customer, region order by r_regionkey,r_name,c_custkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql
index c98e19f..fd44916 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk3.sql
@@ -1 +1 @@
-select * from customer, region order by c_custkey,c_name;
\ No newline at end of file
+select * from customer, region order by c_custkey,c_name,r_regionkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql
index 7130def..fc5b1c3 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinWithAsterisk4.sql
@@ -1 +1 @@
-select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name;
\ No newline at end of file
+select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name,c_custkey;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json b/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json
index bfccc6c..f6b34cd 100644
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithJson.json
@@ -63,6 +63,14 @@
},
"IsAsc": true,
"IsNullFirst": false
+ },
+ {
+ "SortKey": {
+ "ColumnName": "c_custkey",
+ "OpType": "Column"
+ },
+ "IsAsc": true,
+ "IsNullFirst": false
}
],
"Expr": {
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result b/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result
index edd83cd..bed2968 100644
--- a/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result
+++ b/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinCondition7.result
@@ -47,10 +47,10 @@ n_nationkey,n_name,n_name
22,RUSSIA,RUSSIA
22,RUSSIA,RUSSIA
23,UNITED KINGDOM,UNITED KINGDOM
-23,UNITED KINGDOM,UNITED STATES
23,UNITED KINGDOM,UNITED KINGDOM
23,UNITED KINGDOM,UNITED STATES
+23,UNITED KINGDOM,UNITED STATES
24,UNITED STATES,UNITED KINGDOM
-24,UNITED STATES,UNITED STATES
24,UNITED STATES,UNITED KINGDOM
+24,UNITED STATES,UNITED STATES
24,UNITED STATES,UNITED STATES
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
new file mode 100644
index 0000000..3719412
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/AbstractScanner.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
+
+import java.io.IOException;
+
+// dummy scanner
+public abstract class AbstractScanner implements Scanner {
+
+ @Override
+ public void init() throws IOException {
+
+ }
+
+ @Override
+ public void reset() throws IOException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return false;
+ }
+
+ @Override
+ public void setTarget(Column[] targets) {
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return false;
+ }
+
+ @Override
+ public float getProgress() {
+ return 0;
+ }
+
+ @Override
+ public TableStats getInputStats() {
+ return null;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6e7f1c7b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
index f19b61f..16477cd 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java
@@ -141,6 +141,10 @@ public class MemoryUtil {
total += TEXT_DATUM + datum.size();
break;
+ case BLOB:
+ total += BLOB_DATUM + datum.size();
+ break;
+
case DATE:
total += DATE_DATUM;
break;
|