asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From luoc...@apache.org
Subject [1/2] asterixdb git commit: [ASTERIXDB-2149] Enable multiple normalized keys in sort
Date Mon, 27 Nov 2017 21:19:05 GMT
Repository: asterixdb
Updated Branches:
  refs/heads/master c5a0a1974 -> ed4693812


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
index c473819..980857a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
@@ -55,18 +55,19 @@ public class TupleSorterHeapSort implements ITupleSorter {
     }
 
     class HeapEntry implements IResetableComparable<HeapEntry> {
-        int nmk;
+        int[] nmk;
         TuplePointer tuplePointer;
 
         public HeapEntry() {
             tuplePointer = new TuplePointer();
-            nmk = 0;
+            nmk = new int[normalizedKeyTotalLength];
         }
 
         @Override
         public int compareTo(HeapEntry o) {
-            if (nmk != o.nmk) {
-                return ((((long) nmk) & 0xffffffffL) < (((long) o.nmk) & 0xffffffffL)) ? -1 : 1;
+            int cmpNormalizedKey = AbstractFrameSorter.compareNormalizeKeys(nmk, 0, o.nmk, 0, normalizedKeyTotalLength);
+            if (cmpNormalizedKey != 0 || normalizedKeyDecisive) {
+                return cmpNormalizedKey;
             }
             bufferAccessor1.reset(tuplePointer);
             bufferAccessor2.reset(o.tuplePointer);
@@ -93,13 +94,15 @@ public class TupleSorterHeapSort implements ITupleSorter {
             return 0;
         }
 
-        public void reset(int nmkey) {
-            nmk = nmkey;
+        public void reset(int[] nmkey) {
+            if (normalizedKeyTotalLength > 0) {
+                System.arraycopy(nmkey, 0, nmk, 0, normalizedKeyTotalLength);
+            }
         }
 
         @Override
         public void reset(HeapEntry other) {
-            nmk = other.nmk;
+            reset(other.nmk);
             tuplePointer.reset(other.tuplePointer);
         }
     }
@@ -111,19 +114,23 @@ public class TupleSorterHeapSort implements ITupleSorter {
     private final FrameTupleAppender outputAppender;
     private final IFrame outputFrame;
     private final int[] sortFields;
-    private final INormalizedKeyComputer nkc;
+    private final INormalizedKeyComputer[] nkcs;
+    private final boolean normalizedKeyDecisive;
+    private final int[] normalizedKeyLength;
+    private final int normalizedKeyTotalLength;
     private final IBinaryComparator[] comparators;
 
-    private HeapEntry maxEntry;
-    private HeapEntry newEntry;
+    private final HeapEntry maxEntry;
+    private final HeapEntry newEntry;
 
     private MaxHeap heap;
     private boolean isSorted;
 
+    private final int[] nmk;
+
     public TupleSorterHeapSort(IHyracksTaskContext ctx, IDeletableTupleBufferManager bufferManager, int topK,
-            int[] sortFields,
-            INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories)
-            throws HyracksDataException {
+            int[] sortFields, INormalizedKeyComputerFactory[] keyNormalizerFactories,
+            IBinaryComparatorFactory[] comparatorFactories) throws HyracksDataException {
         this.bufferManager = bufferManager;
         this.bufferAccessor1 = bufferManager.createTuplePointerAccessor();
         this.bufferAccessor2 = bufferManager.createTuplePointerAccessor();
@@ -131,7 +138,31 @@ public class TupleSorterHeapSort implements ITupleSorter {
         this.outputFrame = new VSizeFrame(ctx);
         this.outputAppender = new FrameTupleAppender();
         this.sortFields = sortFields;
-        this.nkc = firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
+
+        int runningNormalizedKeyTotalLength = 0;
+        if (keyNormalizerFactories != null) {
+            int decisivePrefixLength = AbstractFrameSorter.getDecisivePrefixLength(keyNormalizerFactories);
+
+            // we only take a prefix of the decisive normalized keys, plus at most indecisive normalized keys
+            // ideally, the caller should prepare normalizers in this way, but we just guard here to avoid
+            // computing unncessary normalized keys
+            int normalizedKeys = decisivePrefixLength < keyNormalizerFactories.length ? decisivePrefixLength + 1
+                    : decisivePrefixLength;
+            this.nkcs = new INormalizedKeyComputer[normalizedKeys];
+            this.normalizedKeyLength = new int[normalizedKeys];
+
+            for (int i = 0; i < normalizedKeys; i++) {
+                this.nkcs[i] = keyNormalizerFactories[i].createNormalizedKeyComputer();
+                this.normalizedKeyLength[i] = keyNormalizerFactories[i].getNormalizedKeyLength();
+                runningNormalizedKeyTotalLength += this.normalizedKeyLength[i];
+            }
+            this.normalizedKeyDecisive = decisivePrefixLength == comparatorFactories.length;
+        } else {
+            this.nkcs = null;
+            this.normalizedKeyLength = null;
+            this.normalizedKeyDecisive = false;
+        }
+        this.normalizedKeyTotalLength = runningNormalizedKeyTotalLength;
         this.comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -141,6 +172,7 @@ public class TupleSorterHeapSort implements ITupleSorter {
         this.maxEntry = new HeapEntry();
         this.newEntry = new HeapEntry();
         this.isSorted = false;
+        this.nmk = new int[runningNormalizedKeyTotalLength];
     }
 
     @Override
@@ -154,7 +186,7 @@ public class TupleSorterHeapSort implements ITupleSorter {
             throw new HyracksDataException(
                     "The Heap haven't be reset after sorting, the order of using this class is not correct.");
         }
-        int nmkey = getPNK(frameTupleAccessor, index);
+        int[] nmkey = getPNK(frameTupleAccessor, index);
         if (heap.getNumEntries() >= topK) {
             heap.peekMax(maxEntry);
             if (compareTuple(frameTupleAccessor, index, nmkey, maxEntry) >= 0) {
@@ -175,20 +207,29 @@ public class TupleSorterHeapSort implements ITupleSorter {
         return true;
     }
 
-    private int getPNK(IFrameTupleAccessor fta, int tIx) {
-        if (nkc == null) {
-            return 0;
+    private int[] getPNK(IFrameTupleAccessor fta, int tIx) {
+        if (nkcs == null) {
+            return nmk;
         }
-        int sfIdx = sortFields[0];
-        return nkc.normalize(fta.getBuffer().array(), fta.getAbsoluteFieldStartOffset(tIx, sfIdx),
-                fta.getFieldLength(tIx, sfIdx));
+        int keyPos = 0;
+        byte[] buffer = fta.getBuffer().array();
+        for (int i = 0; i < nkcs.length; i++) {
+            int sfIdx = sortFields[i];
+            nkcs[i].normalize(buffer, fta.getAbsoluteFieldStartOffset(tIx, sfIdx), fta.getFieldLength(tIx, sfIdx), nmk,
+                    keyPos);
+            keyPos += normalizedKeyLength[i];
+        }
+        return nmk;
     }
 
-    private int compareTuple(IFrameTupleAccessor frameTupleAccessor, int tid, int nmkey, HeapEntry maxEntry)
+    private int compareTuple(IFrameTupleAccessor frameTupleAccessor, int tid, int[] nmkey, HeapEntry maxEntry)
             throws HyracksDataException {
-        if (nmkey != maxEntry.nmk) {
-            return ((((long) nmkey) & 0xffffffffL) < (((long) maxEntry.nmk) & 0xffffffffL)) ? -1 : 1;
+        int cmpNormalizedKey =
+                AbstractFrameSorter.compareNormalizeKeys(nmkey, 0, maxEntry.nmk, 0, normalizedKeyTotalLength);
+        if (cmpNormalizedKey != 0 || normalizedKeyDecisive) {
+            return cmpNormalizedKey;
         }
+
         bufferAccessor2.reset(maxEntry.tuplePointer);
         byte[] b1 = frameTupleAccessor.getBuffer().array();
         byte[] b2 = bufferAccessor2.getBuffer().array();
@@ -254,9 +295,8 @@ public class TupleSorterHeapSort implements ITupleSorter {
         for (int i = 0; i < numEntries; i++) {
             HeapEntry minEntry = (HeapEntry) entries[i];
             bufferAccessor1.reset(minEntry.tuplePointer);
-            int flushed = FrameUtils
-                    .appendToWriter(writer, outputAppender, bufferAccessor1.getBuffer().array(),
-                            bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength());
+            int flushed = FrameUtils.appendToWriter(writer, outputAppender, bufferAccessor1.getBuffer().array(),
+                    bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength());
             if (flushed > 0) {
                 maxFrameSize = Math.max(maxFrameSize, flushed);
                 io++;
@@ -265,8 +305,7 @@ public class TupleSorterHeapSort implements ITupleSorter {
         maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
         outputAppender.write(writer, true);
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(
-                    "Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames");
+            LOGGER.info("Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames");
         }
         return maxFrameSize;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
index 160336a..56bf853 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/HeapSortMergeTest.java
@@ -24,6 +24,7 @@ import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.dataset.ResultSetId;
@@ -58,31 +59,33 @@ public class HeapSortMergeTest extends AbstractIntegrationTest {
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] ordersSplits = new FileSplit[] {
-                new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator
-                        + "orders-part1.tbl"),
-                new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator
-                        + "orders-part2.tbl") };
+                new ManagedFileSplit(NC1_ID,
+                        "data" + File.separator + "tpch0.001" + File.separator + "orders-part1.tbl"),
+                new ManagedFileSplit(NC2_ID,
+                        "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") };
         IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
+        RecordDescriptor ordersDesc =
+                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
                 new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
         int outputLimit = 5; // larger than the total record numbers.
-        TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4,
-                outputLimit, new int[] { 1, 0 }, null, new IBinaryComparatorFactory[] {
-                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+        TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4, outputLimit, new int[] { 1, 0 },
+                (INormalizedKeyComputerFactory) null,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
@@ -90,23 +93,21 @@ public class HeapSortMergeTest extends AbstractIntegrationTest {
         spec.addResultSetId(rsId);
 
         FileSplit fs = createFile(nc1);
-        IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(
-                new FileSplit[] { fs });
+        IFileSplitProvider outputSplitProvider = new ConstantFileSplitProvider(new FileSplit[] { fs });
         IOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|");
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID);
 
         spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
 
-        spec.connect(
-                new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(new int[] {
-                        1, 0 }, new IBinaryHashFunctionFactory[] {
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
-                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }), new int[] { 1, 0 },
-                        new IBinaryComparatorFactory[] {
-                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                        new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                new int[] { 1, 0 },
+                new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }),
+                new int[] { 1, 0 },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new UTF8StringNormalizedKeyComputerFactory()), sorter, 0, printer, 0);
 
         runTest(spec);
         System.out.println("Result write into :" + fs.getPath() + " in node: " + fs.getNodeName());
@@ -122,31 +123,33 @@ public class HeapSortMergeTest extends AbstractIntegrationTest {
         JobSpecification spec = new JobSpecification();
 
         FileSplit[] ordersSplits = new FileSplit[] {
-                new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator
-                        + "orders-part1.tbl"),
-                new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator
-                        + "orders-part2.tbl") };
+                new ManagedFileSplit(NC1_ID,
+                        "data" + File.separator + "tpch0.001" + File.separator + "orders-part1.tbl"),
+                new ManagedFileSplit(NC2_ID,
+                        "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") };
         IFileSplitProvider ordersSplitProvider = new ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer() });
+        RecordDescriptor ordersDesc =
+                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(),
+                        new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitProvider,
                 new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
                         UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'),
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
 
         int outputLimit = 20;
-        TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4,
-                outputLimit, new int[] { 1, 0 }, null, new IBinaryComparatorFactory[] {
-                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
-                PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, ordersDesc);
+        TopKSorterOperatorDescriptor sorter = new TopKSorterOperatorDescriptor(spec, 4, outputLimit, new int[] { 1, 0 },
+                (INormalizedKeyComputerFactory) null,
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
+                        PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
 
         LimitOperatorDescriptor filter = new LimitOperatorDescriptor(spec, ordersDesc, outputLimit);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
index cfd4f30..d3b6b5c 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/AbstractRunGeneratorTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.hyracks.tests.unit;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.DataInputStream;
 import java.util.ArrayList;
@@ -57,11 +57,11 @@ public abstract class AbstractRunGeneratorTest {
     static ISerializerDeserializer[] SerDers = new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
             new UTF8StringSerializerDeserializer() };
     static RecordDescriptor RecordDesc = new RecordDescriptor(SerDers);
-    static Random GRandom = new Random(System.currentTimeMillis());
+    static Random GRandom = new Random(0);
     static int[] SortFields = new int[] { 0, 1 };
-    static IBinaryComparatorFactory[] ComparatorFactories = new IBinaryComparatorFactory[] {
-            PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
-            PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+    static IBinaryComparatorFactory[] ComparatorFactories =
+            new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(IntegerPointable.FACTORY),
+                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
 
     static void assertMaxFrameSizesAreAllEqualsTo(List<GeneratedRunFileReader> maxSize, int pageSize) {
         for (int i = 0; i < maxSize.size(); i++) {
@@ -69,25 +69,30 @@ public abstract class AbstractRunGeneratorTest {
         }
     }
 
-    abstract AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
-            throws HyracksDataException;
+    abstract AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit,
+            int numOfInputRecord) throws HyracksDataException;
 
-    protected List<GeneratedRunFileReader> testSortRecords(int pageSize, int frameLimit, int numRuns, int minRecordSize,
-            int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
+    protected List<List<GeneratedRunFileReader>> testSortRecords(int pageSize, int frameLimit, int numRuns,
+            int minRecordSize, int maxRecordSize, HashMap<Integer, String> specialData) throws HyracksDataException {
         IHyracksTaskContext ctx = testUtils.create(pageSize);
 
         HashMap<Integer, String> keyValuePair = new HashMap<>();
         List<IFrame> frameList = new ArrayList<>();
         prepareData(ctx, frameList, pageSize * frameLimit * numRuns, minRecordSize, maxRecordSize, specialData,
                 keyValuePair);
-        AbstractSortRunGenerator runGenerator = getSortRunGenerator(ctx, frameLimit, keyValuePair.size());
-        runGenerator.open();
-        for (IFrame frame : frameList) {
-            runGenerator.nextFrame(frame.getBuffer());
+
+        List<List<GeneratedRunFileReader>> results = new ArrayList<>();
+        AbstractSortRunGenerator[] runGenerators = getSortRunGenerator(ctx, frameLimit, keyValuePair.size());
+        for (AbstractSortRunGenerator runGenerator : runGenerators) {
+            runGenerator.open();
+            for (IFrame frame : frameList) {
+                runGenerator.nextFrame(frame.getBuffer());
+            }
+            runGenerator.close();
+            matchResult(ctx, runGenerator.getRuns(), keyValuePair);
+            results.add(runGenerator.getRuns());
         }
-        runGenerator.close();
-        matchResult(ctx, runGenerator.getRuns(), keyValuePair);
-        return runGenerator.getRuns();
+        return results;
     }
 
     static void matchResult(IHyracksTaskContext ctx, List<GeneratedRunFileReader> runs,
@@ -114,7 +119,9 @@ public abstract class AbstractRunGeneratorTest {
             bbis.setByteBuffer(fta.getBuffer(),
                     fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 1) + fta.getFieldSlotsLength());
             String value = (String) RecordDesc.getFields()[1].deserialize(di);
-
+            if (!keyValuePair.containsKey(key)) {
+                assertTrue(false);
+            }
             if (!keyValuePair.get(key).equals(value)) {
                 assertTrue(false);
             }
@@ -146,7 +153,7 @@ public abstract class AbstractRunGeneratorTest {
 
     static void prepareData(IHyracksTaskContext ctx, List<IFrame> frameList, int minDataSize, int minRecordSize,
             int maxRecordSize, Map<Integer, String> specialData, Map<Integer, String> keyValuePair)
-                    throws HyracksDataException {
+            throws HyracksDataException {
 
         ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
         FrameTupleAppender appender = new FrameTupleAppender();
@@ -158,8 +165,9 @@ public abstract class AbstractRunGeneratorTest {
                 tb.addField(IntegerSerializerDeserializer.INSTANCE, entry.getKey());
                 tb.addField(new UTF8StringSerializerDeserializer(), entry.getValue());
 
-                VSizeFrame frame = new VSizeFrame(ctx, FrameHelper.calcAlignedFrameSizeToStore(
-                        tb.getFieldEndOffsets().length, tb.getSize(), ctx.getInitialFrameSize()));
+                VSizeFrame frame =
+                        new VSizeFrame(ctx, FrameHelper.calcAlignedFrameSizeToStore(tb.getFieldEndOffsets().length,
+                                tb.getSize(), ctx.getInitialFrameSize()));
                 appender.reset(frame, true);
                 assertTrue(appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()));
                 frameList.add(frame);
@@ -226,9 +234,25 @@ public abstract class AbstractRunGeneratorTest {
         int numRuns = 2;
         int minRecordSize = pageSize / 8;
         int maxRecordSize = pageSize / 8;
-        List<GeneratedRunFileReader> maxSize = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize,
-                maxRecordSize, null);
-        assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
+        List<List<GeneratedRunFileReader>> maxSizes =
+                testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, null);
+        for (List<GeneratedRunFileReader> maxSize : maxSizes) {
+            assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
+        }
+    }
+
+    @Test
+    public void testAllManySmallRecords() throws HyracksDataException {
+        int pageSize = 10240;
+        int frameLimit = 4;
+        int numRuns = 2;
+        int minRecordSize = pageSize / 8;
+        int maxRecordSize = pageSize / 8;
+        List<List<GeneratedRunFileReader>> maxSizes =
+                testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, null);
+        for (List<GeneratedRunFileReader> maxSize : maxSizes) {
+            assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize);
+        }
     }
 
     @Test
@@ -238,9 +262,11 @@ public abstract class AbstractRunGeneratorTest {
         int numRuns = 2;
         int minRecordSize = pageSize;
         int maxRecordSize = (int) (pageSize * 1.8);
-        List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
-                null);
-        assertMaxFrameSizesAreAllEqualsTo(size, pageSize * 2);
+        List<List<GeneratedRunFileReader>> maxSizes =
+                testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, null);
+        for (List<GeneratedRunFileReader> maxSize : maxSizes) {
+            assertMaxFrameSizesAreAllEqualsTo(maxSize, pageSize * 2);
+        }
     }
 
     @Test
@@ -250,15 +276,16 @@ public abstract class AbstractRunGeneratorTest {
         int numRuns = 4;
         int minRecordSize = 20;
         int maxRecordSize = pageSize / 2;
-        HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit - 1);
-        List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
-                specialPair);
-
-        int max = 0;
-        for (GeneratedRunFileReader run : size) {
-            max = Math.max(max, run.getMaxFrameSize());
+        HashMap<Integer, String> specialPair = generateBigObject(pageSize / 2, frameLimit - 1);
+        List<List<GeneratedRunFileReader>> sizes =
+                testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, specialPair);
+        for (List<GeneratedRunFileReader> size : sizes) {
+            int max = 0;
+            for (GeneratedRunFileReader run : size) {
+                max = Math.max(max, run.getMaxFrameSize());
+            }
+            assertTrue(max <= pageSize * (frameLimit - 1) && max >= pageSize * 2);
         }
-        assertTrue(max == pageSize * (frameLimit - 1));
     }
 
     @Test(expected = HyracksDataException.class)
@@ -269,8 +296,6 @@ public abstract class AbstractRunGeneratorTest {
         HashMap<Integer, String> specialPair = generateBigObject(pageSize, frameLimit);
         int minRecordSize = 10;
         int maxRecordSize = pageSize / 2;
-        List<GeneratedRunFileReader> size = testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize,
-                specialPair);
-
+        testSortRecords(pageSize, frameLimit, numRuns, minRecordSize, maxRecordSize, specialPair);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
index fd57f1e..6765d1e 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalSortRunGeneratorTest.java
@@ -20,7 +20,11 @@
 package org.apache.hyracks.tests.unit;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
 import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.Algorithm;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
@@ -28,9 +32,20 @@ import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
 public class ExternalSortRunGeneratorTest extends AbstractRunGeneratorTest {
 
     @Override
-    AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+    AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
             throws HyracksDataException {
-        return new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories, RecordDesc,
-                Algorithm.MERGE_SORT, frameLimit);
+        ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories,
+                RecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, frameLimit, Integer.MAX_VALUE);
+        ExternalSortRunGenerator runGeneratorWithOneNormalizeKey = new ExternalSortRunGenerator(ctx, SortFields,
+                new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() }, ComparatorFactories,
+                RecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, frameLimit, Integer.MAX_VALUE);
+        ExternalSortRunGenerator runGeneratorWithNormalizeKeys = new ExternalSortRunGenerator(ctx, SortFields,
+                new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory(),
+                        new UTF8StringNormalizedKeyComputerFactory() },
+                ComparatorFactories, RecordDesc, Algorithm.MERGE_SORT, EnumFreeSlotPolicy.LAST_FIT, frameLimit,
+                Integer.MAX_VALUE);
+
+        return new AbstractSortRunGenerator[] { runGenerator, runGeneratorWithOneNormalizeKey,
+                runGeneratorWithNormalizeKeys };
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java
index d219a56..5d9e771 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HeapSortRunGeneratorTest.java
@@ -19,23 +19,37 @@
 
 package org.apache.hyracks.tests.unit;
 
-import org.junit.Test;
-
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.HeapSortRunGenerator;
+import org.junit.Test;
 
 public class HeapSortRunGeneratorTest extends AbstractRunGeneratorTest {
     @Override
-    AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+    AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
             throws HyracksDataException {
-        return new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, null, ComparatorFactories,
-                RecordDesc);
+        HeapSortRunGenerator runGenerator = new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields,
+                null, ComparatorFactories, RecordDesc);
+        HeapSortRunGenerator runGeneratorWithOneNormalizedKey =
+                new HeapSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields,
+                        new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() },
+                        ComparatorFactories, RecordDesc);
+        HeapSortRunGenerator runGeneratorWithNormalizedKeys = new HeapSortRunGenerator(ctx, frameLimit,
+                numOfInputRecord, SortFields, new INormalizedKeyComputerFactory[] {
+                        new IntegerNormalizedKeyComputerFactory(), new UTF8StringNormalizedKeyComputerFactory() },
+                ComparatorFactories, RecordDesc);
+
+        return new AbstractSortRunGenerator[] { runGenerator, runGeneratorWithOneNormalizedKey,
+                runGeneratorWithNormalizedKeys };
+
     }
 
     @Test
-    public void testTopK(){
+    public void testTopK() {
 
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java
deleted file mode 100644
index d1080f8..0000000
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGenerator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hyracks.tests.unit;
-
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
-import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
-
-public class HybridSortRunGenerator extends AbstractRunGeneratorTest {
-    @Override
-    AbstractSortRunGenerator getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
-            throws HyracksDataException {
-        return new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields, null, ComparatorFactories,
-                RecordDesc);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java
new file mode 100644
index 0000000..d91f1e1
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/HybridSortRunGeneratorTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.tests.unit;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
+import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
+
+public class HybridSortRunGeneratorTest extends AbstractRunGeneratorTest {
+    @Override
+    AbstractSortRunGenerator[] getSortRunGenerator(IHyracksTaskContext ctx, int frameLimit, int numOfInputRecord)
+            throws HyracksDataException {
+        HybridTopKSortRunGenerator runGenerator = new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord,
+                SortFields, null, ComparatorFactories, RecordDesc);
+        HybridTopKSortRunGenerator runGeneratorWithOneNormalizedKey =
+                new HybridTopKSortRunGenerator(ctx, frameLimit, numOfInputRecord, SortFields,
+                        new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() },
+                        ComparatorFactories, RecordDesc);
+        HybridTopKSortRunGenerator runGeneratorWithNormalizedKeys = new HybridTopKSortRunGenerator(ctx, frameLimit,
+                numOfInputRecord, SortFields, new INormalizedKeyComputerFactory[] {
+                        new IntegerNormalizedKeyComputerFactory(), new UTF8StringNormalizedKeyComputerFactory() },
+                ComparatorFactories, RecordDesc);
+
+        return new AbstractSortRunGenerator[] { runGenerator, runGeneratorWithOneNormalizedKey,
+                runGeneratorWithNormalizedKeys };
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
index c68d59d..a219518 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/RunMergingFrameReaderTest.java
@@ -19,16 +19,8 @@
 
 package org.apache.hyracks.tests.unit;
 
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.ComparatorFactories;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.GRandom;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.RecordDesc;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SortFields;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.generateRandomRecord;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.matchResult;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.prepareData;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.testUtils;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.*;
+import static org.junit.Assert.*;
 
 import java.io.DataInputStream;
 import java.util.ArrayList;
@@ -71,7 +63,7 @@ public class RunMergingFrameReaderTest {
         private final int numFrames;
         private final int minRecordSize;
         private final int maxRecordSize;
-        private TreeMap<Integer, String> result = new TreeMap<>();
+        private final TreeMap<Integer, String> result = new TreeMap<>();
         int maxFrameSize;
 
         ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
@@ -186,8 +178,8 @@ public class RunMergingFrameReaderTest {
         prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
                 frameList, keyValueMapList);
 
-        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
-                null, RecordDesc);
+        RunMergingFrameReader reader =
+                new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, null, RecordDesc);
         testMergeSucceed(ctx, reader, keyValueMapList);
     }
 
@@ -207,8 +199,8 @@ public class RunMergingFrameReaderTest {
         prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
                 frameList, keyValueMapList);
 
-        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
-                null, RecordDesc);
+        RunMergingFrameReader reader =
+                new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, null, RecordDesc);
         testMergeSucceed(ctx, reader, keyValueMapList);
     }
 
@@ -291,8 +283,8 @@ public class RunMergingFrameReaderTest {
         prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
                 frameList, keyValueMap);
 
-        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
-                null, RecordDesc);
+        RunMergingFrameReader reader =
+                new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators, null, RecordDesc);
         testMergeSucceed(ctx, reader, keyValueMap);
     }
 
@@ -342,8 +334,8 @@ public class RunMergingFrameReaderTest {
         for (GeneratedRunFileReader run : runGenerator.getRuns()) {
             runs.add(run);
         }
-        RunMergingFrameReader reader = new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null,
-                RecordDesc);
+        RunMergingFrameReader reader =
+                new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null, RecordDesc);
 
         IFrame outFrame = new VSizeFrame(ctx);
         reader.open();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
index f621bf9..b2a8323 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/TopKRunGeneratorTest.java
@@ -19,13 +19,8 @@
 
 package org.apache.hyracks.tests.unit;
 
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.ComparatorFactories;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.RecordDesc;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SerDers;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SortFields;
-import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.assertFTADataIsSorted;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.*;
+import static org.junit.Assert.*;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -38,10 +33,13 @@ import org.apache.hyracks.api.comm.FixedSizeFrame;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.normalizers.IntegerNormalizedKeyComputerFactory;
+import org.apache.hyracks.dataflow.common.data.normalizers.UTF8StringNormalizedKeyComputerFactory;
 import org.apache.hyracks.dataflow.std.sort.AbstractSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.HeapSortRunGenerator;
 import org.apache.hyracks.dataflow.std.sort.HybridTopKSortRunGenerator;
@@ -126,12 +124,24 @@ public class TopKRunGeneratorTest {
     }
 
     @Test
-    public void testHybridTopKShouldSwitchToFrameSorterWhenFlushed() {
-        int topK = 1;
+    public void testHybridTopKWithOneNormalizedKey() throws HyracksDataException {
+        int topK = SORT_FRAME_LIMIT;
         IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
-        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK, SortFields, null,
-                ComparatorFactories, RecordDesc);
+        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(ctx, SORT_FRAME_LIMIT, topK, SortFields,
+                new INormalizedKeyComputerFactory[] { new IntegerNormalizedKeyComputerFactory() }, ComparatorFactories,
+                RecordDesc);
+        testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
+    }
 
+    @Test
+    public void testHybridTopKWithTwoNormalizedKeys() throws HyracksDataException {
+        int topK = SORT_FRAME_LIMIT;
+        IHyracksTaskContext ctx = AbstractRunGeneratorTest.testUtils.create(PAGE_SIZE);
+        AbstractSortRunGenerator sorter = new HybridTopKSortRunGenerator(
+                ctx, SORT_FRAME_LIMIT, topK, SortFields, new INormalizedKeyComputerFactory[] {
+                        new IntegerNormalizedKeyComputerFactory(), new UTF8StringNormalizedKeyComputerFactory() },
+                ComparatorFactories, RecordDesc);
+        testInMemoryOnly(ctx, topK, ORDER.REVERSE, sorter);
     }
 
     private void testInMemoryOnly(IHyracksTaskContext ctx, int topK, ORDER order, AbstractSortRunGenerator sorter)
@@ -148,7 +158,7 @@ public class TopKRunGeneratorTest {
 
         List<IFrame> frameList = new ArrayList<>();
         int minDataSize = PAGE_SIZE * NUM_PAGES * 4 / 5;
-        int minRecordSize = 16;
+        int minRecordSize = 64;
         int maxRecordSize = 64;
 
         AbstractRunGeneratorTest.prepareData(ctx, frameList, minDataSize, minRecordSize, maxRecordSize, null,
@@ -162,7 +172,6 @@ public class TopKRunGeneratorTest {
 
         doSort(sorter, buffer);
 
-        assertEquals(0, sorter.getRuns().size());
         validateResult(sorter, topKAnswer);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index 650c60d..23a6be0 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.io.FileSplit;
@@ -139,11 +140,11 @@ public class WordCountMain {
         JobSpecification spec = new JobSpecification(frameSize);
 
         IFileSplitProvider splitsProvider = new ConstantFileSplitProvider(inSplits);
-        RecordDescriptor wordDesc = new RecordDescriptor(
-                new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
+        RecordDescriptor wordDesc =
+                new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer() });
 
-        FileScanOperatorDescriptor wordScanner = new FileScanOperatorDescriptor(spec, splitsProvider,
-                new WordTupleParserFactory(), wordDesc);
+        FileScanOperatorDescriptor wordScanner =
+                new FileScanOperatorDescriptor(spec, splitsProvider, new WordTupleParserFactory(), wordDesc);
         createPartitionConstraint(spec, wordScanner, inSplits);
 
         RecordDescriptor groupResultDesc = new RecordDescriptor(new ISerializerDeserializer[] {
@@ -170,13 +171,16 @@ public class WordCountMain {
                             PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }));
             spec.connect(scanGroupConn, wordScanner, 0, gBy, 0);
         } else {
-            IBinaryComparatorFactory[] cfs = new IBinaryComparatorFactory[] {
-                    PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
-            IOperatorDescriptor sorter = "memsort".equalsIgnoreCase(algo)
-                    ? new InMemorySortOperatorDescriptor(spec, keys, new UTF8StringNormalizedKeyComputerFactory(), cfs,
-                            wordDesc)
-                    : new ExternalSortOperatorDescriptor(spec, frameLimit, keys,
-                            new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc);
+            IBinaryComparatorFactory[] cfs =
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) };
+            IOperatorDescriptor sorter =
+                    "memsort".equalsIgnoreCase(algo)
+                            ? new InMemorySortOperatorDescriptor(spec, keys,
+                                    new INormalizedKeyComputerFactory[] {
+                                            new UTF8StringNormalizedKeyComputerFactory() },
+                                    cfs, wordDesc)
+                            : new ExternalSortOperatorDescriptor(spec, frameLimit, keys,
+                                    new UTF8StringNormalizedKeyComputerFactory(), cfs, wordDesc);
             createPartitionConstraint(spec, sorter, outSplits);
 
             IConnectorDescriptor scanSortConn = new MToNPartitioningConnectorDescriptor(spec,
@@ -195,9 +199,9 @@ public class WordCountMain {
         }
 
         IFileSplitProvider outSplitProvider = new ConstantFileSplitProvider(outSplits);
-        IOperatorDescriptor writer = "text".equalsIgnoreCase(format)
-                ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",")
-                : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
+        IOperatorDescriptor writer =
+                "text".equalsIgnoreCase(format) ? new PlainFileWriterOperatorDescriptor(spec, outSplitProvider, ",")
+                        : new FrameFileWriterOperatorDescriptor(spec, outSplitProvider);
         createPartitionConstraint(spec, writer, outSplits);
 
         IConnectorDescriptor gbyPrinterConn = new OneToOneConnectorDescriptor(spec);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ed469381/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
index 8ab0708..7e56004 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Sort.java
@@ -19,10 +19,7 @@
 
 package org.apache.hyracks.examples.tpch.client;
 
-import static org.apache.hyracks.examples.tpch.client.Common.createPartitionConstraint;
-import static org.apache.hyracks.examples.tpch.client.Common.orderParserFactories;
-import static org.apache.hyracks.examples.tpch.client.Common.ordersDesc;
-import static org.apache.hyracks.examples.tpch.client.Common.parseFileSplits;
+import static org.apache.hyracks.examples.tpch.client.Common.*;
 
 import java.util.EnumSet;
 
@@ -31,6 +28,7 @@ import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
@@ -131,12 +129,12 @@ public class Sort {
         createPartitionConstraint(spec, ordScanner, ordersSplits);
         AbstractSorterOperatorDescriptor sorter;
         if (usingHeapSorter && limit < Integer.MAX_VALUE) {
-            sorter = new TopKSorterOperatorDescriptor(spec, frameLimit, limit, SortFields, null,
-                    SortFieldsComparatorFactories, ordersDesc);
+            sorter = new TopKSorterOperatorDescriptor(spec, frameLimit, limit, SortFields,
+                    (INormalizedKeyComputerFactory) null, SortFieldsComparatorFactories, ordersDesc);
         } else {
             if (memBufferAlg.equalsIgnoreCase("bestfit")) {
-                sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields,
-                        null, SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT,
+                sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null,
+                        SortFieldsComparatorFactories, ordersDesc, Algorithm.MERGE_SORT,
                         EnumFreeSlotPolicy.SMALLEST_FIT, limit);
             } else if (memBufferAlg.equalsIgnoreCase("biggestfit")) {
                 sorter = new ExternalSortOperatorDescriptor(spec, frameLimit, SortFields, null,


Mime
View raw message