asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Luo Chen (Code Review)" <do-not-re...@asterixdb.incubator.apache.org>
Subject Change in asterixdb[master]: [ASTERIXDB-2149] Refactor key normalizer with longer keys
Date Mon, 08 Jan 2018 17:13:23 GMT
Luo Chen has submitted this change and it was merged.

Change subject: [ASTERIXDB-2149] Refactor key normalizer with longer keys
......................................................................


[ASTERIXDB-2149] Refactor key normalizer with longer keys

- user model changes: no
- storage format changes: no
- interface changes: yes. The interface of key normalized is changed.

Details:
- Refactored key normalizer to work with longer normalized keys composed
of multiple integers.
- Add tests for key normalizers
- Add key normalizer for UUID type to improve sort performance.

Change-Id: Idba747285af74195ef9953ed9bf5f6f217511380
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2225
Sonar-Qube: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
---
A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AUUIDNormalizedKeyComputerFactory.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyProperties.java
M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java
M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java
M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java
M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java
M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java
A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/NormalizedKeyUtils.java
A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/AbstractNormalizedKeyComputerFactoryTest.java
M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java
A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactoryTest.java
A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactoryTest.java
A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactoryTest.java
A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactoryTest.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java
25 files changed, 861 insertions(+), 181 deletions(-)

Approvals:
  abdullah alamoudi: Looks good to me, approved
  Jenkins: Verified; No violations found; ; Verified

Objections:
  Anon. E. Moose #1000171: Violations found



diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AUUIDNormalizedKeyComputerFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AUUIDNormalizedKeyComputerFactory.java
new file mode 100644
index 0000000..c8a774d
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AUUIDNormalizedKeyComputerFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.dataflow.data.nontagged.keynormalizers;
+
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
+import org.apache.hyracks.dataflow.common.data.normalizers.Integer64NormalizedKeyComputerFactory;
+
+public class AUUIDNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+    private static final long serialVersionUID = 1L;
+
+    public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public int getNormalizedKeyLength() {
+            return 4;
+        }
+
+        @Override
+        public boolean isDecisive() {
+            return true;
+        }
+    };
+
+    private final INormalizedKeyComputerFactory int64NormalizerFactory;
+    private final int int64NormalizedKeyLength;
+
+    public AUUIDNormalizedKeyComputerFactory() {
+        int64NormalizerFactory = new Integer64NormalizedKeyComputerFactory();
+        int64NormalizedKeyLength = int64NormalizerFactory.getNormalizedKeyProperties().getNormalizedKeyLength();
+    }
+
+    @Override
+    public INormalizedKeyComputer createNormalizedKeyComputer() {
+        final INormalizedKeyComputer nkc = int64NormalizerFactory.createNormalizedKeyComputer();
+        return new INormalizedKeyComputer() {
+
+            @Override
+            public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+                // normalize msb
+                nkc.normalize(bytes, start, length, normalizedKeys, keyStart);
+                // normalize lsb
+                nkc.normalize(bytes, start + Long.BYTES, length - Long.BYTES, normalizedKeys,
+                        keyStart + int64NormalizedKeyLength);
+            }
+
+            @Override
+            public INormalizedKeyProperties getNormalizedKeyProperties() {
+                return PROPERTIES;
+            }
+        };
+
+    }
+
+    @Override
+    public INormalizedKeyProperties getNormalizedKeyProperties() {
+        return PROPERTIES;
+    }
+
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java
index de214fc..c676d13 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java
@@ -21,6 +21,7 @@
 
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
 
 /**
  * This class uses a decorator pattern to wrap an ASC ordered INomralizedKeyComputerFactory implementation to
@@ -41,11 +42,22 @@
         return new INormalizedKeyComputer() {
 
             @Override
-            public int normalize(byte[] bytes, int start, int length) {
+            public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
                 // start +1, length -1 is because in ASTERIX data format, there is always a type tag before the value
-                return nkc.normalize(bytes, start + 1, length - 1);
+                nkc.normalize(bytes, start + 1, length - 1, normalizedKeys, keyStart);
+            }
+
+            @Override
+            public INormalizedKeyProperties getNormalizedKeyProperties() {
+                return nkc.getNormalizedKeyProperties();
             }
         };
+
+    }
+
+    @Override
+    public INormalizedKeyProperties getNormalizedKeyProperties() {
+        return nkcf.getNormalizedKeyProperties();
     }
 
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java
index 6e02029..86b33e8 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java
@@ -21,6 +21,7 @@
 
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
 
 /**
  * This class uses a decorator pattern to wrap an ASC ordered INomralizedKeyComputerFactory implementation
@@ -30,9 +31,11 @@
 
     private static final long serialVersionUID = 1L;
     private final INormalizedKeyComputerFactory nkcf;
+    private final int normalizedKeyLength;
 
     public AWrappedDescNormalizedKeyComputerFactory(INormalizedKeyComputerFactory nkcf) {
         this.nkcf = nkcf;
+        this.normalizedKeyLength = nkcf.getNormalizedKeyProperties().getNormalizedKeyLength();
     }
 
     @Override
@@ -41,11 +44,24 @@
         return new INormalizedKeyComputer() {
 
             @Override
-            public int normalize(byte[] bytes, int start, int length) {
-                int key = nkc.normalize(bytes, start + 1, length - 1);
-                return (int) ((long) 0xffffffff - (long) key);
+            public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+                nkc.normalize(bytes, start + 1, length - 1, normalizedKeys, keyStart);
+                for (int i = 0; i < normalizedKeyLength; i++) {
+                    int key = normalizedKeys[keyStart + i];
+                    normalizedKeys[keyStart + i] = ~key;
+                }
+            }
+
+            @Override
+            public INormalizedKeyProperties getNormalizedKeyProperties() {
+                return nkc.getNormalizedKeyProperties();
             }
         };
     }
 
+    @Override
+    public INormalizedKeyProperties getNormalizedKeyProperties() {
+        return nkcf.getNormalizedKeyProperties();
+    }
+
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java
index d17fc5a..d372062 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.formats.nontagged;
 
+import org.apache.asterix.dataflow.data.nontagged.keynormalizers.AUUIDNormalizedKeyComputerFactory;
 import org.apache.asterix.dataflow.data.nontagged.keynormalizers.AWrappedAscNormalizedKeyComputerFactory;
 import org.apache.asterix.dataflow.data.nontagged.keynormalizers.AWrappedDescNormalizedKeyComputerFactory;
 import org.apache.asterix.om.types.IAType;
@@ -59,6 +60,8 @@
                     return new AWrappedAscNormalizedKeyComputerFactory(new UTF8StringNormalizedKeyComputerFactory());
                 case BINARY:
                     return new AWrappedAscNormalizedKeyComputerFactory(new ByteArrayNormalizedKeyComputerFactory());
+                case UUID:
+                    return new AWrappedAscNormalizedKeyComputerFactory(new AUUIDNormalizedKeyComputerFactory());
                 default:
                     return null;
             }
@@ -81,6 +84,8 @@
                     return new AWrappedDescNormalizedKeyComputerFactory(new UTF8StringNormalizedKeyComputerFactory());
                 case BINARY:
                     return new AWrappedDescNormalizedKeyComputerFactory(new ByteArrayNormalizedKeyComputerFactory());
+                case UUID:
+                    return new AWrappedDescNormalizedKeyComputerFactory(new AUUIDNormalizedKeyComputerFactory());
                 default:
                     return null;
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
index 7bf8255..7364bea 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java
@@ -19,10 +19,7 @@
 package org.apache.hyracks.api.dataflow.value;
 
 public interface INormalizedKeyComputer {
-    public int normalize(byte[] bytes, int start, int length);
+    void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart);
 
-    default void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
-        int key = normalize(bytes, start, length);
-        normalizedKeys[keyStart] = key;
-    }
+    INormalizedKeyProperties getNormalizedKeyProperties();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
index 901702e..a1339a9 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java
@@ -23,20 +23,5 @@
 public interface INormalizedKeyComputerFactory extends Serializable {
     public INormalizedKeyComputer createNormalizedKeyComputer();
 
-    /**
-     *
-     * @return The length of the normalized key in terms of integers
-     */
-    default int getNormalizedKeyLength() {
-        return 1;
-    }
-
-    /**
-     *
-     * @return Whether we can solely rely on this normalized key to complete comparison,
-     *         even when two normalized keys are equal
-     */
-    default boolean isDecisive() {
-        return false;
-    }
+    public INormalizedKeyProperties getNormalizedKeyProperties();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyProperties.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyProperties.java
new file mode 100644
index 0000000..63e799a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyProperties.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+public interface INormalizedKeyProperties extends Serializable {
+    /**
+     *
+     * @return The length of the normalized key in terms of integers
+     */
+    public int getNormalizedKeyLength();
+
+    /**
+     *
+     * @return Whether we can solely rely on this normalized key to complete comparison,
+     *         even when two normalized keys are equal
+     */
+    public boolean isDecisive();
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java
index 3d081af..9557245 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java
@@ -21,18 +21,45 @@
 
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
 import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
 
 public class ByteArrayNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
+    private static final long serialVersionUID = 1L;
+
+    public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public int getNormalizedKeyLength() {
+            return 1;
+        }
+
+        @Override
+        public boolean isDecisive() {
+            return false;
+        }
+    };
+
     public static ByteArrayNormalizedKeyComputerFactory INSTANCE = new ByteArrayNormalizedKeyComputerFactory();
 
     @Override
     public INormalizedKeyComputer createNormalizedKeyComputer() {
         return new INormalizedKeyComputer() {
             @Override
-            public int normalize(byte[] bytes, int start, int length) {
-                return ByteArrayPointable.normalize(bytes, start);
+            public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+                normalizedKeys[keyStart] = ByteArrayPointable.normalize(bytes, start);
+            }
+
+            @Override
+            public INormalizedKeyProperties getNormalizedKeyProperties() {
+                return PROPERTIES;
             }
         };
     }
+
+    @Override
+    public INormalizedKeyProperties getNormalizedKeyProperties() {
+        return PROPERTIES;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java
index 353793b..4d5d57c 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java
@@ -20,28 +20,53 @@
 
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
 
 public class DoubleNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
 
     private static final long serialVersionUID = 1L;
+
+    public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public int getNormalizedKeyLength() {
+            return 2;
+        }
+
+        @Override
+        public boolean isDecisive() {
+            return true;
+        }
+    };
 
     @Override
     public INormalizedKeyComputer createNormalizedKeyComputer() {
         return new INormalizedKeyComputer() {
 
             @Override
-            public int normalize(byte[] bytes, int start, int length) {
-                int prefix = IntegerPointable.getInteger(bytes, start);
-                if (prefix >= 0) {
-                    return prefix ^ Integer.MIN_VALUE;
+            public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+                long value = LongPointable.getLong(bytes, start);
+                if (value >= 0) {
+                    value = value ^ Long.MIN_VALUE;
                 } else {
-                    return (int) ((long) 0xffffffff - (long) prefix);
+                    value = ~value;
                 }
+                NormalizedKeyUtils.putLongIntoNormalizedKeys(normalizedKeys, keyStart, value);
+            }
+
+            @Override
+            public INormalizedKeyProperties getNormalizedKeyProperties() {
+                return PROPERTIES;
             }
 
         };
     }
 
+    @Override
+    public INormalizedKeyProperties getNormalizedKeyProperties() {
+        return PROPERTIES;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java
index eb571a5..3b0d9d7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java
@@ -20,28 +20,51 @@
 
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
 public class FloatNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
 
     private static final long serialVersionUID = 1L;
 
+    public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public int getNormalizedKeyLength() {
+            return 1;
+        }
+
+        @Override
+        public boolean isDecisive() {
+            return true;
+        }
+    };
+
     @Override
     public INormalizedKeyComputer createNormalizedKeyComputer() {
         return new INormalizedKeyComputer() {
-
             @Override
-            public int normalize(byte[] bytes, int start, int length) {
-                int prefix = IntegerPointable.getInteger(bytes, start);
-                if (prefix >= 0) {
-                    return prefix ^ Integer.MIN_VALUE;
+            public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+                int value = IntegerPointable.getInteger(bytes, start);
+                if (value >= 0) {
+                    normalizedKeys[keyStart] = value ^ Integer.MIN_VALUE;
                 } else {
-                    return (int) ((long) 0xffffffff - (long) prefix);
+                    // invert the key
+                    normalizedKeys[keyStart] = ~value;
                 }
             }
 
+            @Override
+            public INormalizedKeyProperties getNormalizedKeyProperties() {
+                return PROPERTIES;
+            }
         };
     }
 
+    @Override
+    public INormalizedKeyProperties getNormalizedKeyProperties() {
+        return PROPERTIES;
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java
index fa74665..853499f 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java
@@ -20,55 +20,47 @@
 
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
 import org.apache.hyracks.data.std.primitive.LongPointable;
-import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
 
 public class Integer64NormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
 
-    private static final long serialVersionUID = 8735044913496854551L;
+    private static final long serialVersionUID = 1L;
+
+    public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public int getNormalizedKeyLength() {
+            return 2;
+        }
+
+        @Override
+        public boolean isDecisive() {
+            return true;
+        }
+    };
 
     @Override
     public INormalizedKeyComputer createNormalizedKeyComputer() {
         return new INormalizedKeyComputer() {
-            private static final int POSTIVE_LONG_MASK = (3 << 30);
-            private static final int NON_NEGATIVE_INT_MASK = (2 << 30);
-            private static final int NEGATIVE_LONG_MASK = (0 << 30);
+            @Override
+            public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+                long value = LongPointable.getLong(bytes, start);
+                value = value ^ Long.MIN_VALUE;
+                NormalizedKeyUtils.putLongIntoNormalizedKeys(normalizedKeys, keyStart, value);
+            }
 
             @Override
-            public int normalize(byte[] bytes, int start, int length) {
-                long value = LongPointable.getLong(bytes, start);
-                int highValue = (int) (value >> 32);
-                if (value > Integer.MAX_VALUE) {
-                    /**
-                     * larger than Integer.MAX
-                     */
-                    int highNmk = getKey(highValue);
-                    highNmk >>= 2;
-                    highNmk |= POSTIVE_LONG_MASK;
-                    return highNmk;
-                } else if (value >=0 && value <= Integer.MAX_VALUE) {
-                    /**
-                     * smaller than Integer.MAX but >=0
-                     */
-                    int lowNmk = (int) value;
-                    lowNmk >>= 2;
-                    lowNmk |= NON_NEGATIVE_INT_MASK;
-                    return lowNmk;
-                } else {
-                    /**
-                     * less than 0: have not optimized for that
-                     */
-                    int highNmk = getKey(highValue);
-                    highNmk >>= 2;
-                    highNmk |= NEGATIVE_LONG_MASK;
-                    return highNmk;
-                }
+            public INormalizedKeyProperties getNormalizedKeyProperties() {
+                return PROPERTIES;
             }
-
-            private int getKey(int value) {
-                return value ^ Integer.MIN_VALUE;
-            }
-
         };
     }
+
+    @Override
+    public INormalizedKeyProperties getNormalizedKeyProperties() {
+        return PROPERTIES;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
index 41c0740..60b8d7d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java
@@ -20,24 +20,44 @@
 
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
 import org.apache.hyracks.data.std.primitive.IntegerPointable;
 
 public class IntegerNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
     private static final long serialVersionUID = 1L;
 
+    public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public int getNormalizedKeyLength() {
+            return 1;
+        }
+
+        @Override
+        public boolean isDecisive() {
+            return true;
+        }
+    };
+
     @Override
     public INormalizedKeyComputer createNormalizedKeyComputer() {
         return new INormalizedKeyComputer() {
             @Override
-            public int normalize(byte[] bytes, int start, int length) {
+            public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
                 int value = IntegerPointable.getInteger(bytes, start);
-                return value ^ Integer.MIN_VALUE;
+                normalizedKeys[keyStart] = value ^ Integer.MIN_VALUE;
+            }
+
+            @Override
+            public INormalizedKeyProperties getNormalizedKeyProperties() {
+                return PROPERTIES;
             }
         };
     }
 
     @Override
-    public boolean isDecisive() {
-        return true;
+    public INormalizedKeyProperties getNormalizedKeyProperties() {
+        return PROPERTIES;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java
index 589ae68..fefcebd 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java
@@ -20,18 +20,44 @@
 
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties;
 import org.apache.hyracks.util.string.UTF8StringUtil;
 
 public class UTF8StringNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory {
     private static final long serialVersionUID = 1L;
 
+    public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public int getNormalizedKeyLength() {
+            // TODO optimize normalize key for UTF8 to use multiple integers
+            return 1;
+        }
+
+        @Override
+        public boolean isDecisive() {
+            return false;
+        }
+    };
+
     @Override
     public INormalizedKeyComputer createNormalizedKeyComputer() {
         return new INormalizedKeyComputer() {
             @Override
-            public int normalize(byte[] bytes, int start, int length) {
-                return UTF8StringUtil.normalize(bytes, start);
+            public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) {
+                normalizedKeys[keyStart] = UTF8StringUtil.normalize(bytes, start);
+            }
+
+            @Override
+            public INormalizedKeyProperties getNormalizedKeyProperties() {
+                return PROPERTIES;
             }
         };
     }
+
+    @Override
+    public INormalizedKeyProperties getNormalizedKeyProperties() {
+        return PROPERTIES;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/NormalizedKeyUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/NormalizedKeyUtils.java
new file mode 100644
index 0000000..175b04a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/NormalizedKeyUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.utils;
+
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+
+public class NormalizedKeyUtils {
+
+    private NormalizedKeyUtils() {
+    }
+
+    public static int compareNormalizeKeys(int[] keys1, int start1, int[] keys2, int start2, int length) {
+        for (int i = 0; i < length; i++) {
+            int key1 = keys1[start1 + i];
+            int key2 = keys2[start2 + i];
+            if (key1 != key2) {
+                return (((key1) & 0xffffffffL) < ((key2) & 0xffffffffL)) ? -1 : 1;
+            }
+        }
+        return 0;
+    }
+
+    public static int getDecisivePrefixLength(INormalizedKeyComputerFactory[] keyNormalizerFactories) {
+        if (keyNormalizerFactories == null) {
+            return 0;
+        }
+        for (int i = 0; i < keyNormalizerFactories.length; i++) {
+            if (!keyNormalizerFactories[i].getNormalizedKeyProperties().isDecisive()) {
+                return i;
+            }
+        }
+        return keyNormalizerFactories.length;
+    }
+
+    public static void putLongIntoNormalizedKeys(int[] normalizedKeys, int keyStart, long key) {
+        int high = (int) (key >> 32);
+        normalizedKeys[keyStart] = high;
+        int low = (int) key;
+        normalizedKeys[keyStart + 1] = low;
+    }
+
+    public static int[] createNormalizedKeyArray(INormalizedKeyComputer normalizer) {
+        if (normalizer == null) {
+            return null; //NOSONAR
+        }
+        return new int[normalizer.getNormalizedKeyProperties().getNormalizedKeyLength()];
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/AbstractNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/AbstractNormalizedKeyComputerFactoryTest.java
new file mode 100644
index 0000000..2107574
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/AbstractNormalizedKeyComputerFactoryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.normalizers;
+
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public abstract class AbstractNormalizedKeyComputerFactoryTest {
+
+    @Test
+    public void testPositive() {
+        IPointable large = getLargePositive();
+        IPointable small = getSmallPositive();
+
+        normalizeAndCompare(large, small, 1);
+        normalizeAndCompare(small, large, -1);
+        normalizeAndCompare(large, large, 0);
+    }
+
+    @Test
+    public void testNegative() {
+        IPointable small = getSmallNegative();
+        IPointable large = getLargeNegative();
+
+        normalizeAndCompare(large, small, 1);
+        normalizeAndCompare(small, large, -1);
+        normalizeAndCompare(large, large, 0);
+    }
+
+    @Test
+    public void testPositiveAndNegative() {
+        IPointable smallNegative = getSmallNegative();
+        IPointable largeNegative = getLargeNegative();
+        IPointable smallPositive = getSmallPositive();
+        IPointable largePositive = getLargePositive();
+
+        normalizeAndCompare(smallNegative, smallPositive, -1);
+        normalizeAndCompare(smallNegative, largePositive, -1);
+        normalizeAndCompare(largeNegative, smallPositive, -1);
+        normalizeAndCompare(largeNegative, largePositive, -1);
+    }
+
+    protected void normalizeAndCompare(IPointable p1, IPointable p2, int result) {
+        int[] key1 = normalize(p1);
+        int[] key2 = normalize(p2);
+        int comp = NormalizedKeyUtils.compareNormalizeKeys(key1, 0, key2, 0, key1.length);
+        if (result > 0) {
+            Assert.assertTrue(comp > 0);
+        } else if (result == 0) {
+            Assert.assertTrue(comp == 0);
+        } else {
+            Assert.assertTrue(comp < 0);
+        }
+    }
+
+    protected abstract IPointable getLargePositive();
+
+    protected abstract IPointable getSmallPositive();
+
+    protected abstract IPointable getLargeNegative();
+
+    protected abstract IPointable getSmallNegative();
+
+    protected abstract int[] normalize(IPointable value);
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java
index a362e60..675bf36 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.hyracks.dataflow.common.data.normalizers;
 
-import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.*;
 
 import java.util.Random;
 
@@ -36,10 +36,10 @@
     @Test
     public void testRandomNormalizedKey() {
         for (int i = 0; i < 10; ++i) {
-            ByteArrayPointable pointable1 = generateRandomByteArrayPointableWithFixLength(
-                    Math.abs(random.nextInt((i + 1) * 10)), random);
-            ByteArrayPointable pointable2 = generateRandomByteArrayPointableWithFixLength(
-                    Math.abs(random.nextInt((i + 1) * 10)), random);
+            ByteArrayPointable pointable1 =
+                    generateRandomByteArrayPointableWithFixLength(Math.abs(random.nextInt((i + 1) * 10)), random);
+            ByteArrayPointable pointable2 =
+                    generateRandomByteArrayPointableWithFixLength(Math.abs(random.nextInt((i + 1) * 10)), random);
             assertNormalizeValue(pointable1, pointable2, computer);
         }
     }
@@ -52,11 +52,13 @@
 
     public static void assertNormalizeValue(ByteArrayPointable pointable1, ByteArrayPointable pointable2,
             INormalizedKeyComputer computer) {
-        int n1 = computer.normalize(pointable1.getByteArray(), pointable1.getStartOffset(), pointable1.getLength());
-        int n2 = computer.normalize(pointable2.getByteArray(), pointable2.getStartOffset(), pointable2.getLength());
-        if (n1 < n2) {
+        int[] key1 = new int[ByteArrayNormalizedKeyComputerFactory.PROPERTIES.getNormalizedKeyLength()];
+        int[] key2 = new int[ByteArrayNormalizedKeyComputerFactory.PROPERTIES.getNormalizedKeyLength()];
+        computer.normalize(pointable1.getByteArray(), pointable1.getStartOffset(), pointable1.getLength(), key1, 0);
+        computer.normalize(pointable2.getByteArray(), pointable2.getStartOffset(), pointable2.getLength(), key2, 0);
+        if (key1[0] < key2[0]) {
             assertTrue(pointable1.compareTo(pointable2) < 0);
-        } else if (n1 > n2) {
+        } else if (key1[0] > key2[0]) {
             assertTrue(pointable1.compareTo(pointable2) > 0);
         }
     }
@@ -70,12 +72,15 @@
         }
 
         ByteArrayPointable ptr1 = ByteArrayPointable.generatePointableFromPureBytes(new byte[] { 0, 25, 34, 42 });
-        ByteArrayPointable ptr2 = ByteArrayPointable.generatePointableFromPureBytes(
-                new byte[] { (byte) 130, 25, 34, 42 });
+        ByteArrayPointable ptr2 =
+                ByteArrayPointable.generatePointableFromPureBytes(new byte[] { (byte) 130, 25, 34, 42 });
 
-        int n1 = computer.normalize(ptr1.getByteArray(), ptr1.getStartOffset(), ptr1.getLength());
-        int n2 = computer.normalize(ptr2.getByteArray(), ptr2.getStartOffset(), ptr2.getLength());
-        assertTrue(n1 < n2);
+        int[] key1 = new int[ByteArrayNormalizedKeyComputerFactory.PROPERTIES.getNormalizedKeyLength()];
+        int[] key2 = new int[ByteArrayNormalizedKeyComputerFactory.PROPERTIES.getNormalizedKeyLength()];
+        computer.normalize(ptr1.getByteArray(), ptr1.getStartOffset(), ptr1.getLength(), key1, 0);
+        computer.normalize(ptr2.getByteArray(), ptr2.getStartOffset(), ptr2.getLength(), key2, 0);
+
+        assertTrue(key1[0] < key2[0]);
 
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactoryTest.java
new file mode 100644
index 0000000..012ef07
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.normalizers;
+
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
+
+public class DoubleNormalizedKeyComputerFactoryTest extends AbstractNormalizedKeyComputerFactoryTest {
+
+    private final INormalizedKeyComputer normalizer =
+            new DoubleNormalizedKeyComputerFactory().createNormalizedKeyComputer();
+
+    @Override
+    protected IPointable getLargePositive() {
+        return getDoublePointable(Double.MAX_VALUE - 1);
+    }
+
+    @Override
+    protected IPointable getSmallPositive() {
+        return getDoublePointable(Double.MIN_VALUE);
+    }
+
+    @Override
+    protected IPointable getLargeNegative() {
+        return getDoublePointable(Double.MIN_VALUE * -1);
+
+    }
+
+    @Override
+    protected IPointable getSmallNegative() {
+        return getDoublePointable(Double.MAX_VALUE * -1);
+    }
+
+    private IPointable getDoublePointable(double value) {
+        DoublePointable pointable = (DoublePointable) DoublePointable.FACTORY.createPointable();
+        pointable.set(new byte[Double.BYTES], 0, Double.BYTES);
+        pointable.setDouble(value);
+        return pointable;
+    }
+
+    @Override
+    protected int[] normalize(IPointable value) {
+        int[] keys = NormalizedKeyUtils.createNormalizedKeyArray(normalizer);
+        normalizer.normalize(value.getByteArray(), value.getStartOffset(), value.getLength(), keys, 0);
+        return keys;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactoryTest.java
new file mode 100644
index 0000000..cd6f4c4
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.normalizers;
+
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.FloatPointable;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
+
+public class FloatNormalizedKeyComputerFactoryTest extends AbstractNormalizedKeyComputerFactoryTest {
+
+    private final INormalizedKeyComputer normalizer =
+            new FloatNormalizedKeyComputerFactory().createNormalizedKeyComputer();
+
+    @Override
+    protected IPointable getLargePositive() {
+        return getFloatPointable(Float.MAX_VALUE - 1);
+    }
+
+    @Override
+    protected IPointable getSmallPositive() {
+        return getFloatPointable(Float.MIN_VALUE);
+    }
+
+    @Override
+    protected IPointable getLargeNegative() {
+        return getFloatPointable(Float.MIN_VALUE * -1);
+
+    }
+
+    @Override
+    protected IPointable getSmallNegative() {
+        return getFloatPointable(Float.MAX_VALUE * -1);
+    }
+
+    private IPointable getFloatPointable(float value) {
+        FloatPointable pointable = (FloatPointable) FloatPointable.FACTORY.createPointable();
+        pointable.set(new byte[Float.BYTES], 0, Float.BYTES);
+        pointable.setFloat(value);
+        return pointable;
+    }
+
+    @Override
+    protected int[] normalize(IPointable value) {
+        int[] keys = NormalizedKeyUtils.createNormalizedKeyArray(normalizer);
+        normalizer.normalize(value.getByteArray(), value.getStartOffset(), value.getLength(), keys, 0);
+        return keys;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactoryTest.java
new file mode 100644
index 0000000..6ca623d
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactoryTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.normalizers;
+
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
+
+public class Integer64NormalizedKeyComputerFactoryTest extends AbstractNormalizedKeyComputerFactoryTest {
+
+    private final INormalizedKeyComputer normalizer =
+            new Integer64NormalizedKeyComputerFactory().createNormalizedKeyComputer();
+
+    @Override
+    protected IPointable getLargePositive() {
+        return getLongPointable(Long.MAX_VALUE - 1);
+    }
+
+    @Override
+    protected IPointable getSmallPositive() {
+        return getLongPointable(1);
+    }
+
+    @Override
+    protected IPointable getLargeNegative() {
+        return getLongPointable(-1);
+    }
+
+    @Override
+    protected IPointable getSmallNegative() {
+        return getLongPointable(Long.MIN_VALUE + 1);
+    }
+
+    private IPointable getLongPointable(long value) {
+        LongPointable pointable = LongPointable.FACTORY.createPointable();
+        pointable.set(new byte[Long.BYTES], 0, Long.BYTES);
+        pointable.setLong(value);
+        return pointable;
+    }
+
+    @Override
+    protected int[] normalize(IPointable value) {
+        int[] keys = NormalizedKeyUtils.createNormalizedKeyArray(normalizer);
+        normalizer.normalize(value.getByteArray(), value.getStartOffset(), value.getLength(), keys, 0);
+        return keys;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactoryTest.java
new file mode 100644
index 0000000..9122721
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.dataflow.common.data.normalizers;
+
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
+
+public class IntegerNormalizedKeyComputerFactoryTest extends AbstractNormalizedKeyComputerFactoryTest {
+
+    private final INormalizedKeyComputer normalizer =
+            new IntegerNormalizedKeyComputerFactory().createNormalizedKeyComputer();
+
+    @Override
+    protected IPointable getLargePositive() {
+        return getIntPointable(Integer.MAX_VALUE - 1);
+    }
+
+    @Override
+    protected IPointable getSmallPositive() {
+        return getIntPointable(1);
+    }
+
+    @Override
+    protected IPointable getLargeNegative() {
+        return getIntPointable(-1);
+
+    }
+
+    @Override
+    protected IPointable getSmallNegative() {
+        return getIntPointable(Integer.MIN_VALUE + 1);
+    }
+
+    private IPointable getIntPointable(int value) {
+        IntegerPointable pointable = (IntegerPointable) IntegerPointable.FACTORY.createPointable();
+        pointable.set(new byte[Integer.BYTES], 0, Integer.BYTES);
+        pointable.setInteger(value);
+        return pointable;
+    }
+
+    @Override
+    protected int[] normalize(IPointable value) {
+        int[] keys = NormalizedKeyUtils.createNormalizedKeyArray(normalizer);
+        normalizer.normalize(value.getByteArray(), value.getStartOffset(), value.getLength(), keys, 0);
+        return keys;
+    }
+
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
index 5cc854c..10cc954 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java
@@ -42,6 +42,7 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
@@ -61,12 +62,17 @@
 
     /**
      * @param spec
-     * @param nInputs                   Number of inputs
-     * @param compareFields             The compare field list of each input.
-     *                                  All the fields order should be the same with the comparatorFactories
-     * @param extraFields               Extra field that
-     * @param firstKeyNormalizerFactory Normalizer for the first comparison key.
-     * @param comparatorFactories       A list of comparators for each field
+     * @param nInputs
+     *            Number of inputs
+     * @param compareFields
+     *            The compare field list of each input.
+     *            All the fields order should be the same with the comparatorFactories
+     * @param extraFields
+     *            Extra field that
+     * @param firstKeyNormalizerFactory
+     *            Normalizer for the first comparison key.
+     * @param comparatorFactories
+     *            A list of comparators for each field
      * @param recordDescriptor
      * @throws HyracksException
      */
@@ -147,7 +153,10 @@
 
     public static class IntersectOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable {
 
-        private enum ACTION {FAILED, CLOSE}
+        private enum ACTION {
+            FAILED,
+            CLOSE
+        }
 
         private final int inputArity;
         private final int[][] compareFields;
@@ -158,6 +167,7 @@
         private final FrameTupleAppender appender;
 
         private final INormalizedKeyComputer firstKeyNormalizerComputer;
+        private final boolean normalizedKeyDecisive;
         private final IBinaryComparator[] comparators;
 
         private boolean done = false;
@@ -186,8 +196,12 @@
             }
             this.allProjectFields = projectedFields;
             this.firstKeyNormalizerComputer =
-                    firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer();
-
+                    firstKeyNormalizerFactory != null ? firstKeyNormalizerFactory.createNormalizedKeyComputer() : null;
+            this.normalizedKeyDecisive =
+                    firstKeyNormalizerFactory != null
+                            ? firstKeyNormalizerFactory.getNormalizedKeyProperties().isDecisive()
+                                    && compareFields[0].length == 1
+                            : false;
             comparators = new IBinaryComparator[compareFields[0].length];
             for (int i = 0; i < comparators.length; i++) {
                 comparators[i] = comparatorFactory[i].createBinaryComparator();
@@ -213,6 +227,11 @@
         @Override
         public IFrameWriter getInputFrameWriter(final int index) {
             return new IFrameWriter() {
+                private final int[] normalizedKey1 =
+                        NormalizedKeyUtils.createNormalizedKeyArray(firstKeyNormalizerComputer);
+                private final int[] normalizedKey2 =
+                        NormalizedKeyUtils.createNormalizedKeyArray(firstKeyNormalizerComputer);
+
                 @Override
                 public void open() throws HyracksDataException {
                     if (index == 0) {
@@ -273,9 +292,8 @@
                                 continue;
                             }
                             while (tupleIndexMarker[i] < refAccessor[i].getTupleCount()) {
-                                int cmp =
-                                        compare(i, refAccessor[i], tupleIndexMarker[i], maxInput, refAccessor[maxInput],
-                                                tupleIndexMarker[maxInput]);
+                                int cmp = compare(i, refAccessor[i], tupleIndexMarker[i], maxInput,
+                                        refAccessor[maxInput], tupleIndexMarker[maxInput]);
                                 if (cmp == 0) {
                                     match++;
                                     break;
@@ -313,13 +331,14 @@
 
                 private int compare(int input1, FrameTupleAccessor frameTupleAccessor1, int tid1, int input2,
                         FrameTupleAccessor frameTupleAccessor2, int tid2) throws HyracksDataException {
-                    int firstNorm1 = getFirstNorm(input1, frameTupleAccessor1, tid1);
-                    int firstNorm2 = getFirstNorm(input2, frameTupleAccessor2, tid2);
-
-                    if (firstNorm1 < firstNorm2) {
-                        return -1;
-                    } else if (firstNorm1 > firstNorm2) {
-                        return 1;
+                    if (firstKeyNormalizerComputer != null) {
+                        getFirstNorm(input1, frameTupleAccessor1, tid1, normalizedKey1);
+                        getFirstNorm(input2, frameTupleAccessor2, tid2, normalizedKey2);
+                        int cmp = NormalizedKeyUtils.compareNormalizeKeys(normalizedKey1, 0, normalizedKey2, 0,
+                                normalizedKey1.length);
+                        if (cmp != 0 || normalizedKeyDecisive) {
+                            return cmp;
+                        }
                     }
 
                     for (int i = 0; i < comparators.length; i++) {
@@ -337,12 +356,12 @@
                     return 0;
                 }
 
-                private int getFirstNorm(int inputId1, FrameTupleAccessor frameTupleAccessor1, int tid1) {
-                    return firstKeyNormalizerComputer == null ?
-                            0 :
-                            firstKeyNormalizerComputer.normalize(frameTupleAccessor1.getBuffer().array(),
-                                    frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, compareFields[inputId1][0]),
-                                    frameTupleAccessor1.getFieldLength(tid1, compareFields[inputId1][0]));
+                private void getFirstNorm(int inputId1, FrameTupleAccessor frameTupleAccessor1, int tid1, int[] keys) {
+                    if (firstKeyNormalizerComputer != null) {
+                        firstKeyNormalizerComputer.normalize(frameTupleAccessor1.getBuffer().array(),
+                                frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, compareFields[inputId1][0]),
+                                frameTupleAccessor1.getFieldLength(tid1, compareFields[inputId1][0]), keys, 0);
+                    }
                 }
 
                 private int findMaxInput() throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
index eead09e..f51271e 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -35,6 +35,7 @@
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
 import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
 import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
 import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
@@ -100,7 +101,7 @@
         int runningNormalizedKeyTotalLength = 0;
 
         if (normalizedKeyComputerFactories != null) {
-            int decisivePrefixLength = getDecisivePrefixLength(normalizedKeyComputerFactories);
+            int decisivePrefixLength = NormalizedKeyUtils.getDecisivePrefixLength(normalizedKeyComputerFactories);
 
             // we only take a prefix of the decisive normalized keys, plus at most indecisive normalized keys
             // ideally, the caller should prepare normalizers in this way, but we just guard here to avoid
@@ -112,7 +113,8 @@
 
             for (int i = 0; i < normalizedKeys; i++) {
                 this.nkcs[i] = normalizedKeyComputerFactories[i].createNormalizedKeyComputer();
-                this.normalizedKeyLength[i] = normalizedKeyComputerFactories[i].getNormalizedKeyLength();
+                this.normalizedKeyLength[i] =
+                        normalizedKeyComputerFactories[i].getNormalizedKeyProperties().getNormalizedKeyLength();
                 runningNormalizedKeyTotalLength += this.normalizedKeyLength[i];
             }
             this.normalizedKeysDecisive = decisivePrefixLength == comparatorFactories.length;
@@ -246,8 +248,9 @@
 
     protected final int compare(int[] tPointers1, int tp1, int[] tPointers2, int tp2) throws HyracksDataException {
         if (nkcs != null) {
-            int cmpNormalizedKey = compareNormalizeKeys(tPointers1, tp1 * ptrSize + ID_NORMALIZED_KEY, tPointers2,
-                    tp2 * ptrSize + ID_NORMALIZED_KEY, normalizedKeyTotalLength);
+            int cmpNormalizedKey =
+                    NormalizedKeyUtils.compareNormalizeKeys(tPointers1, tp1 * ptrSize + ID_NORMALIZED_KEY, tPointers2,
+                            tp2 * ptrSize + ID_NORMALIZED_KEY, normalizedKeyTotalLength);
             if (cmpNormalizedKey != 0 || normalizedKeysDecisive) {
                 return cmpNormalizedKey;
             }
@@ -281,29 +284,6 @@
             }
         }
         return 0;
-    }
-
-    public static int compareNormalizeKeys(int[] keys1, int start1, int[] keys2, int start2, int length) {
-        for (int i = 0; i < length; i++) {
-            int key1 = keys1[start1 + i];
-            int key2 = keys2[start2 + i];
-            if (key1 != key2) {
-                return (((key1) & 0xffffffffL) < ((key2) & 0xffffffffL)) ? -1 : 1;
-            }
-        }
-        return 0;
-    }
-
-    public static int getDecisivePrefixLength(INormalizedKeyComputerFactory[] keyNormalizerFactories) {
-        if (keyNormalizerFactories == null) {
-            return 0;
-        }
-        for (int i = 0; i < keyNormalizerFactories.length; i++) {
-            if (!keyNormalizerFactories[i].isDecisive()) {
-                return i;
-            }
-        }
-        return keyNormalizerFactories.length;
     }
 
     protected void swap(int pointers1[], int pos1, int pointers2[], int pos2) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java
index f001bed..3cbe86b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
 import org.apache.hyracks.dataflow.std.sort.util.GroupFrameAccessor;
 import org.apache.hyracks.dataflow.std.util.ReferenceEntry;
 import org.apache.hyracks.dataflow.std.util.ReferencedPriorityQueue;
@@ -41,6 +42,8 @@
     private final int[] sortFields;
     private final IBinaryComparator[] comparators;
     private final INormalizedKeyComputer nmkComputer;
+    private final int normalizedKeyLength;
+    private final boolean normalizedKeyDecisive;
     private final RecordDescriptor recordDesc;
     private final int topK;
     private int tupleCount;
@@ -64,6 +67,14 @@
         this.sortFields = sortFields;
         this.comparators = comparators;
         this.nmkComputer = nmkComputer;
+        this.normalizedKeyLength =
+                nmkComputer != null ? nmkComputer.getNormalizedKeyProperties().getNormalizedKeyLength() : 0;
+        // right now we didn't take multiple key normalizers for frame merger, since during this step it won't be
+        // too many cache misses (merging multiple runs sequentially).
+        // but still, we can apply a special optimization if there is only 1 sort field
+        this.normalizedKeyDecisive =
+                nmkComputer != null ? nmkComputer.getNormalizedKeyProperties().isDecisive() && comparators.length == 1
+                        : false;
         this.recordDesc = recordDesc;
         this.topK = topK;
     }
@@ -153,8 +164,7 @@
     }
 
     private static void closeRun(int index, List<? extends IFrameReader> runCursors,
-            IFrameTupleAccessor[] tupleAccessors)
-            throws HyracksDataException {
+            IFrameTupleAccessor[] tupleAccessors) throws HyracksDataException {
         if (runCursors.get(index) != null) {
             runCursors.get(index).close();
             runCursors.set(index, null);
@@ -164,32 +174,39 @@
 
     private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
         return new Comparator<ReferenceEntry>() {
+            @Override
             public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
-                int nmk1 = tp1.getNormalizedKey();
-                int nmk2 = tp2.getNormalizedKey();
-                if (nmk1 != nmk2) {
-                    return ((((long) nmk1) & 0xffffffffL) < (((long) nmk2) & 0xffffffffL)) ? -1 : 1;
+                int[] tPointers1 = tp1.getTPointers();
+                int[] tPointers2 = tp2.getTPointers();
+                int cmp = NormalizedKeyUtils.compareNormalizeKeys(tPointers1, 0, tPointers2, 0, normalizedKeyLength);
+                if (cmp != 0) {
+                    return cmp;
+                } else if (normalizedKeyDecisive) {
+                    // we further compare the run id
+                    return compareRun(tp1, tp2);
                 }
                 IFrameTupleAccessor fta1 = tp1.getAccessor();
                 IFrameTupleAccessor fta2 = tp2.getAccessor();
                 byte[] b1 = fta1.getBuffer().array();
                 byte[] b2 = fta2.getBuffer().array();
-                int[] tPointers1 = tp1.getTPointers();
-                int[] tPointers2 = tp2.getTPointers();
 
                 for (int f = 0; f < sortFields.length; ++f) {
                     int c;
                     try {
-                        c = comparators[f].compare(b1, tPointers1[2 * f + 1], tPointers1[2 * f + 2], b2,
-                                tPointers2[2 * f + 1], tPointers2[2 * f + 2]);
+                        c = comparators[f].compare(b1, tPointers1[2 * f + normalizedKeyLength],
+                                tPointers1[2 * f + normalizedKeyLength + 1], b2,
+                                tPointers2[2 * f + normalizedKeyLength], tPointers2[2 * f + normalizedKeyLength + 1]);
                         if (c != 0) {
                             return c;
                         }
                     } catch (HyracksDataException e) {
                         throw new IllegalArgumentException(e);
                     }
-
                 }
+                return compareRun(tp1, tp2);
+            }
+
+            private int compareRun(ReferenceEntry tp1, ReferenceEntry tp2) {
                 int runid1 = tp1.getRunid();
                 int runid2 = tp2.getRunid();
                 return runid1 < runid2 ? -1 : (runid1 == runid2 ? 0 : 1);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
index b02f859..46d62a7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
@@ -34,6 +34,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils;
 import org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager;
 import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor;
 import org.apache.hyracks.dataflow.std.structures.IResetableComparable;
@@ -65,7 +66,7 @@
 
         @Override
         public int compareTo(HeapEntry o) {
-            int cmpNormalizedKey = AbstractFrameSorter.compareNormalizeKeys(nmk, 0, o.nmk, 0, normalizedKeyTotalLength);
+            int cmpNormalizedKey = NormalizedKeyUtils.compareNormalizeKeys(nmk, 0, o.nmk, 0, normalizedKeyTotalLength);
             if (cmpNormalizedKey != 0 || normalizedKeyDecisive) {
                 return cmpNormalizedKey;
             }
@@ -141,7 +142,7 @@
 
         int runningNormalizedKeyTotalLength = 0;
         if (keyNormalizerFactories != null) {
-            int decisivePrefixLength = AbstractFrameSorter.getDecisivePrefixLength(keyNormalizerFactories);
+            int decisivePrefixLength = NormalizedKeyUtils.getDecisivePrefixLength(keyNormalizerFactories);
 
             // we only take a prefix of the decisive normalized keys, plus at most indecisive normalized keys
             // ideally, the caller should prepare normalizers in this way, but we just guard here to avoid
@@ -153,7 +154,8 @@
 
             for (int i = 0; i < normalizedKeys; i++) {
                 this.nkcs[i] = keyNormalizerFactories[i].createNormalizedKeyComputer();
-                this.normalizedKeyLength[i] = keyNormalizerFactories[i].getNormalizedKeyLength();
+                this.normalizedKeyLength[i] =
+                        keyNormalizerFactories[i].getNormalizedKeyProperties().getNormalizedKeyLength();
                 runningNormalizedKeyTotalLength += this.normalizedKeyLength[i];
             }
             this.normalizedKeyDecisive = decisivePrefixLength == comparatorFactories.length;
@@ -225,7 +227,7 @@
     private int compareTuple(IFrameTupleAccessor frameTupleAccessor, int tid, int[] nmkey, HeapEntry maxEntry)
             throws HyracksDataException {
         int cmpNormalizedKey =
-                AbstractFrameSorter.compareNormalizeKeys(nmkey, 0, maxEntry.nmk, 0, normalizedKeyTotalLength);
+                NormalizedKeyUtils.compareNormalizeKeys(nmkey, 0, maxEntry.nmk, 0, normalizedKeyTotalLength);
         if (cmpNormalizedKey != 0 || normalizedKeyDecisive) {
             return cmpNormalizedKey;
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java
index 8eb5c2e..4fa4fab 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java
@@ -26,14 +26,17 @@
     private final int runid;
     private IFrameTupleAccessor acccessor;
     private int tupleIndex;
-    private int[] tPointers;
+    private final int[] tPointers;
+    private final int normalizedKeyLength;
 
     public ReferenceEntry(int runid, FrameTupleAccessor fta, int tupleIndex, int[] keyFields,
             INormalizedKeyComputer nmkComputer) {
         super();
         this.runid = runid;
         this.acccessor = fta;
-        this.tPointers = new int[1 + 2 * keyFields.length];
+        this.normalizedKeyLength =
+                nmkComputer != null ? nmkComputer.getNormalizedKeyProperties().getNormalizedKeyLength() : 0;
+        this.tPointers = new int[normalizedKeyLength + 2 * keyFields.length];
         if (fta != null) {
             initTPointer(fta, tupleIndex, keyFields, nmkComputer);
         }
@@ -59,10 +62,6 @@
         return tupleIndex;
     }
 
-    public int getNormalizedKey() {
-        return tPointers[0];
-    }
-
     public void setTupleIndex(int tupleIndex, int[] keyFields, INormalizedKeyComputer nmkComputer) {
         initTPointer(acccessor, tupleIndex, keyFields, nmkComputer);
     }
@@ -73,14 +72,11 @@
         byte[] b1 = fta.getBuffer().array();
         for (int f = 0; f < keyFields.length; ++f) {
             int fIdx = keyFields[f];
-            tPointers[2 * f + 1] = fta.getAbsoluteFieldStartOffset(tupleIndex, fIdx);
-            tPointers[2 * f + 2] = fta.getFieldLength(tupleIndex, fIdx);
-            if (f == 0) {
-                if (nmkComputer != null) {
-                    tPointers[0] = nmkComputer.normalize(b1, tPointers[1], tPointers[2]);
-                } else {
-                    tPointers[0] = 0;
-                }
+            tPointers[2 * f + normalizedKeyLength] = fta.getAbsoluteFieldStartOffset(tupleIndex, fIdx);
+            tPointers[2 * f + normalizedKeyLength + 1] = fta.getFieldLength(tupleIndex, fIdx);
+            if (f == 0 && nmkComputer != null) {
+                nmkComputer.normalize(b1, tPointers[normalizedKeyLength], tPointers[normalizedKeyLength + 1], tPointers,
+                        0);
             }
         }
     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2225
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Idba747285af74195ef9953ed9bf5f6f217511380
Gerrit-PatchSet: 13
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Luo Chen <cluo8@uci.edu>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Ian Maxon <imaxon@apache.org>
Gerrit-Reviewer: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Luo Chen <cluo8@uci.edu>
Gerrit-Reviewer: Till Westmann <tillw@apache.org>
Gerrit-Reviewer: abdullah alamoudi <bamousaa@gmail.com>

Mime
View raw message