From notifications-return-76293-archive-asf-public=cust-asf.ponee.io@asterixdb.apache.org Mon Jan 8 18:13:39 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 3B001180607 for ; Mon, 8 Jan 2018 18:13:39 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2A624160C2C; Mon, 8 Jan 2018 17:13:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4C68E160C1E for ; Mon, 8 Jan 2018 18:13:37 +0100 (CET) Received: (qmail 22130 invoked by uid 500); 8 Jan 2018 17:13:36 -0000 Mailing-List: contact notifications-help@asterixdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@asterixdb.apache.org Delivered-To: mailing list notifications@asterixdb.apache.org Received: (qmail 22120 invoked by uid 99); 8 Jan 2018 17:13:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Jan 2018 17:13:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id DA6ED180702 for ; Mon, 8 Jan 2018 17:13:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.126 X-Spam-Level: ** X-Spam-Status: No, score=2.126 tagged_above=-999 required=6.31 tests=[MISSING_HEADERS=1.207, SPF_FAIL=0.919] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id h3PLNcZb-quF for ; Mon, 8 Jan 2018 17:13:24 +0000 (UTC) Received: from vitalstatistix.ics.uci.edu (vitalstatistix.ics.uci.edu [128.195.52.38]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id 0E98F5F22E for ; Mon, 8 Jan 2018 17:13:24 +0000 (UTC) Received: from localhost (localhost [127.0.0.1]) by vitalstatistix.ics.uci.edu (Postfix) with ESMTP id 9D5001009B8; Mon, 8 Jan 2018 09:13:23 -0800 (PST) Date: Mon, 8 Jan 2018 09:13:23 -0800 From: "Luo Chen (Code Review)" CC: Jenkins , abdullah alamoudi , Ian Maxon , Till Westmann Reply-To: cluo8@uci.edu X-Gerrit-MessageType: merged Subject: Change in asterixdb[master]: [ASTERIXDB-2149] Refactor key normalizer with longer keys X-Gerrit-Change-Id: Idba747285af74195ef9953ed9bf5f6f217511380 X-Gerrit-ChangeURL: X-Gerrit-Commit: 90af4b24f8b6a9259bfc307ea76bcc0f7b3e2806 In-Reply-To: References: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Content-Disposition: inline User-Agent: Gerrit/2.12.7 Message-Id: <20180108171323.9D5001009B8@vitalstatistix.ics.uci.edu> Luo Chen has submitted this change and it was merged. Change subject: [ASTERIXDB-2149] Refactor key normalizer with longer keys ...................................................................... [ASTERIXDB-2149] Refactor key normalizer with longer keys - user model changes: no - storage format changes: no - interface changes: yes. The interface of key normalized is changed. Details: - Refactored key normalizer to work with longer normalized keys composed of multiple integers. - Add tests for key normalizers - Add key normalizer for UUID type to improve sort performance. Change-Id: Idba747285af74195ef9953ed9bf5f6f217511380 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2225 Sonar-Qube: Jenkins Tested-by: Jenkins Contrib: Jenkins Integration-Tests: Jenkins Reviewed-by: abdullah alamoudi --- A asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AUUIDNormalizedKeyComputerFactory.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java A hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyProperties.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/NormalizedKeyUtils.java A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/AbstractNormalizedKeyComputerFactoryTest.java M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactoryTest.java A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactoryTest.java A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactoryTest.java A hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactoryTest.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java 25 files changed, 861 insertions(+), 181 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; ; Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AUUIDNormalizedKeyComputerFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AUUIDNormalizedKeyComputerFactory.java new file mode 100644 index 0000000..c8a774d --- /dev/null +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AUUIDNormalizedKeyComputerFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.dataflow.data.nontagged.keynormalizers; + +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties; +import org.apache.hyracks.dataflow.common.data.normalizers.Integer64NormalizedKeyComputerFactory; + +public class AUUIDNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory { + private static final long serialVersionUID = 1L; + + public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() { + private static final long serialVersionUID = 1L; + + @Override + public int getNormalizedKeyLength() { + return 4; + } + + @Override + public boolean isDecisive() { + return true; + } + }; + + private final INormalizedKeyComputerFactory int64NormalizerFactory; + private final int int64NormalizedKeyLength; + + public AUUIDNormalizedKeyComputerFactory() { + int64NormalizerFactory = new Integer64NormalizedKeyComputerFactory(); + int64NormalizedKeyLength = int64NormalizerFactory.getNormalizedKeyProperties().getNormalizedKeyLength(); + } + + @Override + public INormalizedKeyComputer createNormalizedKeyComputer() { + final INormalizedKeyComputer nkc = int64NormalizerFactory.createNormalizedKeyComputer(); + return new INormalizedKeyComputer() { + + @Override + public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) { + // normalize msb + nkc.normalize(bytes, start, length, normalizedKeys, keyStart); + // normalize lsb + nkc.normalize(bytes, start + Long.BYTES, length - Long.BYTES, normalizedKeys, + keyStart + int64NormalizedKeyLength); + } + + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; + } + }; + + } + + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; + } + +} diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java index de214fc..c676d13 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedAscNormalizedKeyComputerFactory.java @@ -21,6 +21,7 @@ import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties; /** * This class uses a decorator pattern to wrap an ASC ordered INomralizedKeyComputerFactory implementation to @@ -41,11 +42,22 @@ return new INormalizedKeyComputer() { @Override - public int normalize(byte[] bytes, int start, int length) { + public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) { // start +1, length -1 is because in ASTERIX data format, there is always a type tag before the value - return nkc.normalize(bytes, start + 1, length - 1); + nkc.normalize(bytes, start + 1, length - 1, normalizedKeys, keyStart); + } + + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return nkc.getNormalizedKeyProperties(); } }; + + } + + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return nkcf.getNormalizedKeyProperties(); } } diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java index 6e02029..86b33e8 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/keynormalizers/AWrappedDescNormalizedKeyComputerFactory.java @@ -21,6 +21,7 @@ import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties; /** * This class uses a decorator pattern to wrap an ASC ordered INomralizedKeyComputerFactory implementation @@ -30,9 +31,11 @@ private static final long serialVersionUID = 1L; private final INormalizedKeyComputerFactory nkcf; + private final int normalizedKeyLength; public AWrappedDescNormalizedKeyComputerFactory(INormalizedKeyComputerFactory nkcf) { this.nkcf = nkcf; + this.normalizedKeyLength = nkcf.getNormalizedKeyProperties().getNormalizedKeyLength(); } @Override @@ -41,11 +44,24 @@ return new INormalizedKeyComputer() { @Override - public int normalize(byte[] bytes, int start, int length) { - int key = nkc.normalize(bytes, start + 1, length - 1); - return (int) ((long) 0xffffffff - (long) key); + public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) { + nkc.normalize(bytes, start + 1, length - 1, normalizedKeys, keyStart); + for (int i = 0; i < normalizedKeyLength; i++) { + int key = normalizedKeys[keyStart + i]; + normalizedKeys[keyStart + i] = ~key; + } + } + + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return nkc.getNormalizedKeyProperties(); } }; } + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return nkcf.getNormalizedKeyProperties(); + } + } diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java index d17fc5a..d372062 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/NormalizedKeyComputerFactoryProvider.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.formats.nontagged; +import org.apache.asterix.dataflow.data.nontagged.keynormalizers.AUUIDNormalizedKeyComputerFactory; import org.apache.asterix.dataflow.data.nontagged.keynormalizers.AWrappedAscNormalizedKeyComputerFactory; import org.apache.asterix.dataflow.data.nontagged.keynormalizers.AWrappedDescNormalizedKeyComputerFactory; import org.apache.asterix.om.types.IAType; @@ -59,6 +60,8 @@ return new AWrappedAscNormalizedKeyComputerFactory(new UTF8StringNormalizedKeyComputerFactory()); case BINARY: return new AWrappedAscNormalizedKeyComputerFactory(new ByteArrayNormalizedKeyComputerFactory()); + case UUID: + return new AWrappedAscNormalizedKeyComputerFactory(new AUUIDNormalizedKeyComputerFactory()); default: return null; } @@ -81,6 +84,8 @@ return new AWrappedDescNormalizedKeyComputerFactory(new UTF8StringNormalizedKeyComputerFactory()); case BINARY: return new AWrappedDescNormalizedKeyComputerFactory(new ByteArrayNormalizedKeyComputerFactory()); + case UUID: + return new AWrappedDescNormalizedKeyComputerFactory(new AUUIDNormalizedKeyComputerFactory()); default: return null; } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java index 7bf8255..7364bea 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputer.java @@ -19,10 +19,7 @@ package org.apache.hyracks.api.dataflow.value; public interface INormalizedKeyComputer { - public int normalize(byte[] bytes, int start, int length); + void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart); - default void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) { - int key = normalize(bytes, start, length); - normalizedKeys[keyStart] = key; - } + INormalizedKeyProperties getNormalizedKeyProperties(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java index 901702e..a1339a9 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyComputerFactory.java @@ -23,20 +23,5 @@ public interface INormalizedKeyComputerFactory extends Serializable { public INormalizedKeyComputer createNormalizedKeyComputer(); - /** - * - * @return The length of the normalized key in terms of integers - */ - default int getNormalizedKeyLength() { - return 1; - } - - /** - * - * @return Whether we can solely rely on this normalized key to complete comparison, - * even when two normalized keys are equal - */ - default boolean isDecisive() { - return false; - } + public INormalizedKeyProperties getNormalizedKeyProperties(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyProperties.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyProperties.java new file mode 100644 index 0000000..63e799a --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/INormalizedKeyProperties.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.dataflow.value; + +import java.io.Serializable; + +public interface INormalizedKeyProperties extends Serializable { + /** + * + * @return The length of the normalized key in terms of integers + */ + public int getNormalizedKeyLength(); + + /** + * + * @return Whether we can solely rely on this normalized key to complete comparison, + * even when two normalized keys are equal + */ + public boolean isDecisive(); +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java index 3d081af..9557245 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactory.java @@ -21,18 +21,45 @@ import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties; import org.apache.hyracks.data.std.primitive.ByteArrayPointable; public class ByteArrayNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory { + private static final long serialVersionUID = 1L; + + public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() { + private static final long serialVersionUID = 1L; + + @Override + public int getNormalizedKeyLength() { + return 1; + } + + @Override + public boolean isDecisive() { + return false; + } + }; + public static ByteArrayNormalizedKeyComputerFactory INSTANCE = new ByteArrayNormalizedKeyComputerFactory(); @Override public INormalizedKeyComputer createNormalizedKeyComputer() { return new INormalizedKeyComputer() { @Override - public int normalize(byte[] bytes, int start, int length) { - return ByteArrayPointable.normalize(bytes, start); + public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) { + normalizedKeys[keyStart] = ByteArrayPointable.normalize(bytes, start); + } + + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; } }; } + + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java index 353793b..4d5d57c 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactory.java @@ -20,28 +20,53 @@ import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; -import org.apache.hyracks.data.std.primitive.IntegerPointable; -import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties; +import org.apache.hyracks.data.std.primitive.LongPointable; +import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils; public class DoubleNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory { private static final long serialVersionUID = 1L; + + public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() { + private static final long serialVersionUID = 1L; + + @Override + public int getNormalizedKeyLength() { + return 2; + } + + @Override + public boolean isDecisive() { + return true; + } + }; @Override public INormalizedKeyComputer createNormalizedKeyComputer() { return new INormalizedKeyComputer() { @Override - public int normalize(byte[] bytes, int start, int length) { - int prefix = IntegerPointable.getInteger(bytes, start); - if (prefix >= 0) { - return prefix ^ Integer.MIN_VALUE; + public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) { + long value = LongPointable.getLong(bytes, start); + if (value >= 0) { + value = value ^ Long.MIN_VALUE; } else { - return (int) ((long) 0xffffffff - (long) prefix); + value = ~value; } + NormalizedKeyUtils.putLongIntoNormalizedKeys(normalizedKeys, keyStart, value); + } + + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; } }; } + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java index eb571a5..3b0d9d7 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactory.java @@ -20,28 +20,51 @@ import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties; import org.apache.hyracks.data.std.primitive.IntegerPointable; -import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer; public class FloatNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory { private static final long serialVersionUID = 1L; + public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() { + private static final long serialVersionUID = 1L; + + @Override + public int getNormalizedKeyLength() { + return 1; + } + + @Override + public boolean isDecisive() { + return true; + } + }; + @Override public INormalizedKeyComputer createNormalizedKeyComputer() { return new INormalizedKeyComputer() { - @Override - public int normalize(byte[] bytes, int start, int length) { - int prefix = IntegerPointable.getInteger(bytes, start); - if (prefix >= 0) { - return prefix ^ Integer.MIN_VALUE; + public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) { + int value = IntegerPointable.getInteger(bytes, start); + if (value >= 0) { + normalizedKeys[keyStart] = value ^ Integer.MIN_VALUE; } else { - return (int) ((long) 0xffffffff - (long) prefix); + // invert the key + normalizedKeys[keyStart] = ~value; } } + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; + } }; } + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; + } + } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java index fa74665..853499f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactory.java @@ -20,55 +20,47 @@ import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties; import org.apache.hyracks.data.std.primitive.LongPointable; -import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer; +import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils; public class Integer64NormalizedKeyComputerFactory implements INormalizedKeyComputerFactory { - private static final long serialVersionUID = 8735044913496854551L; + private static final long serialVersionUID = 1L; + + public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() { + private static final long serialVersionUID = 1L; + + @Override + public int getNormalizedKeyLength() { + return 2; + } + + @Override + public boolean isDecisive() { + return true; + } + }; @Override public INormalizedKeyComputer createNormalizedKeyComputer() { return new INormalizedKeyComputer() { - private static final int POSTIVE_LONG_MASK = (3 << 30); - private static final int NON_NEGATIVE_INT_MASK = (2 << 30); - private static final int NEGATIVE_LONG_MASK = (0 << 30); + @Override + public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) { + long value = LongPointable.getLong(bytes, start); + value = value ^ Long.MIN_VALUE; + NormalizedKeyUtils.putLongIntoNormalizedKeys(normalizedKeys, keyStart, value); + } @Override - public int normalize(byte[] bytes, int start, int length) { - long value = LongPointable.getLong(bytes, start); - int highValue = (int) (value >> 32); - if (value > Integer.MAX_VALUE) { - /** - * larger than Integer.MAX - */ - int highNmk = getKey(highValue); - highNmk >>= 2; - highNmk |= POSTIVE_LONG_MASK; - return highNmk; - } else if (value >=0 && value <= Integer.MAX_VALUE) { - /** - * smaller than Integer.MAX but >=0 - */ - int lowNmk = (int) value; - lowNmk >>= 2; - lowNmk |= NON_NEGATIVE_INT_MASK; - return lowNmk; - } else { - /** - * less than 0: have not optimized for that - */ - int highNmk = getKey(highValue); - highNmk >>= 2; - highNmk |= NEGATIVE_LONG_MASK; - return highNmk; - } + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; } - - private int getKey(int value) { - return value ^ Integer.MIN_VALUE; - } - }; } + + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java index 41c0740..60b8d7d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactory.java @@ -20,24 +20,44 @@ import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties; import org.apache.hyracks.data.std.primitive.IntegerPointable; public class IntegerNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory { private static final long serialVersionUID = 1L; + public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() { + private static final long serialVersionUID = 1L; + + @Override + public int getNormalizedKeyLength() { + return 1; + } + + @Override + public boolean isDecisive() { + return true; + } + }; + @Override public INormalizedKeyComputer createNormalizedKeyComputer() { return new INormalizedKeyComputer() { @Override - public int normalize(byte[] bytes, int start, int length) { + public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) { int value = IntegerPointable.getInteger(bytes, start); - return value ^ Integer.MIN_VALUE; + normalizedKeys[keyStart] = value ^ Integer.MIN_VALUE; + } + + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; } }; } @Override - public boolean isDecisive() { - return true; + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java index 589ae68..fefcebd 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/normalizers/UTF8StringNormalizedKeyComputerFactory.java @@ -20,18 +20,44 @@ import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyProperties; import org.apache.hyracks.util.string.UTF8StringUtil; public class UTF8StringNormalizedKeyComputerFactory implements INormalizedKeyComputerFactory { private static final long serialVersionUID = 1L; + public static final INormalizedKeyProperties PROPERTIES = new INormalizedKeyProperties() { + private static final long serialVersionUID = 1L; + + @Override + public int getNormalizedKeyLength() { + // TODO optimize normalize key for UTF8 to use multiple integers + return 1; + } + + @Override + public boolean isDecisive() { + return false; + } + }; + @Override public INormalizedKeyComputer createNormalizedKeyComputer() { return new INormalizedKeyComputer() { @Override - public int normalize(byte[] bytes, int start, int length) { - return UTF8StringUtil.normalize(bytes, start); + public void normalize(byte[] bytes, int start, int length, int[] normalizedKeys, int keyStart) { + normalizedKeys[keyStart] = UTF8StringUtil.normalize(bytes, start); + } + + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; } }; } + + @Override + public INormalizedKeyProperties getNormalizedKeyProperties() { + return PROPERTIES; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/NormalizedKeyUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/NormalizedKeyUtils.java new file mode 100644 index 0000000..175b04a --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/NormalizedKeyUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.dataflow.common.utils; + +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory; + +public class NormalizedKeyUtils { + + private NormalizedKeyUtils() { + } + + public static int compareNormalizeKeys(int[] keys1, int start1, int[] keys2, int start2, int length) { + for (int i = 0; i < length; i++) { + int key1 = keys1[start1 + i]; + int key2 = keys2[start2 + i]; + if (key1 != key2) { + return (((key1) & 0xffffffffL) < ((key2) & 0xffffffffL)) ? -1 : 1; + } + } + return 0; + } + + public static int getDecisivePrefixLength(INormalizedKeyComputerFactory[] keyNormalizerFactories) { + if (keyNormalizerFactories == null) { + return 0; + } + for (int i = 0; i < keyNormalizerFactories.length; i++) { + if (!keyNormalizerFactories[i].getNormalizedKeyProperties().isDecisive()) { + return i; + } + } + return keyNormalizerFactories.length; + } + + public static void putLongIntoNormalizedKeys(int[] normalizedKeys, int keyStart, long key) { + int high = (int) (key >> 32); + normalizedKeys[keyStart] = high; + int low = (int) key; + normalizedKeys[keyStart + 1] = low; + } + + public static int[] createNormalizedKeyArray(INormalizedKeyComputer normalizer) { + if (normalizer == null) { + return null; //NOSONAR + } + return new int[normalizer.getNormalizedKeyProperties().getNormalizedKeyLength()]; + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/AbstractNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/AbstractNormalizedKeyComputerFactoryTest.java new file mode 100644 index 0000000..2107574 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/AbstractNormalizedKeyComputerFactoryTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.common.data.normalizers; + +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils; +import org.junit.Test; + +import junit.framework.Assert; + +public abstract class AbstractNormalizedKeyComputerFactoryTest { + + @Test + public void testPositive() { + IPointable large = getLargePositive(); + IPointable small = getSmallPositive(); + + normalizeAndCompare(large, small, 1); + normalizeAndCompare(small, large, -1); + normalizeAndCompare(large, large, 0); + } + + @Test + public void testNegative() { + IPointable small = getSmallNegative(); + IPointable large = getLargeNegative(); + + normalizeAndCompare(large, small, 1); + normalizeAndCompare(small, large, -1); + normalizeAndCompare(large, large, 0); + } + + @Test + public void testPositiveAndNegative() { + IPointable smallNegative = getSmallNegative(); + IPointable largeNegative = getLargeNegative(); + IPointable smallPositive = getSmallPositive(); + IPointable largePositive = getLargePositive(); + + normalizeAndCompare(smallNegative, smallPositive, -1); + normalizeAndCompare(smallNegative, largePositive, -1); + normalizeAndCompare(largeNegative, smallPositive, -1); + normalizeAndCompare(largeNegative, largePositive, -1); + } + + protected void normalizeAndCompare(IPointable p1, IPointable p2, int result) { + int[] key1 = normalize(p1); + int[] key2 = normalize(p2); + int comp = NormalizedKeyUtils.compareNormalizeKeys(key1, 0, key2, 0, key1.length); + if (result > 0) { + Assert.assertTrue(comp > 0); + } else if (result == 0) { + Assert.assertTrue(comp == 0); + } else { + Assert.assertTrue(comp < 0); + } + } + + protected abstract IPointable getLargePositive(); + + protected abstract IPointable getSmallPositive(); + + protected abstract IPointable getLargeNegative(); + + protected abstract IPointable getSmallNegative(); + + protected abstract int[] normalize(IPointable value); +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java index a362e60..675bf36 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/ByteArrayNormalizedKeyComputerFactoryTest.java @@ -19,7 +19,7 @@ package org.apache.hyracks.dataflow.common.data.normalizers; -import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.*; import java.util.Random; @@ -36,10 +36,10 @@ @Test public void testRandomNormalizedKey() { for (int i = 0; i < 10; ++i) { - ByteArrayPointable pointable1 = generateRandomByteArrayPointableWithFixLength( - Math.abs(random.nextInt((i + 1) * 10)), random); - ByteArrayPointable pointable2 = generateRandomByteArrayPointableWithFixLength( - Math.abs(random.nextInt((i + 1) * 10)), random); + ByteArrayPointable pointable1 = + generateRandomByteArrayPointableWithFixLength(Math.abs(random.nextInt((i + 1) * 10)), random); + ByteArrayPointable pointable2 = + generateRandomByteArrayPointableWithFixLength(Math.abs(random.nextInt((i + 1) * 10)), random); assertNormalizeValue(pointable1, pointable2, computer); } } @@ -52,11 +52,13 @@ public static void assertNormalizeValue(ByteArrayPointable pointable1, ByteArrayPointable pointable2, INormalizedKeyComputer computer) { - int n1 = computer.normalize(pointable1.getByteArray(), pointable1.getStartOffset(), pointable1.getLength()); - int n2 = computer.normalize(pointable2.getByteArray(), pointable2.getStartOffset(), pointable2.getLength()); - if (n1 < n2) { + int[] key1 = new int[ByteArrayNormalizedKeyComputerFactory.PROPERTIES.getNormalizedKeyLength()]; + int[] key2 = new int[ByteArrayNormalizedKeyComputerFactory.PROPERTIES.getNormalizedKeyLength()]; + computer.normalize(pointable1.getByteArray(), pointable1.getStartOffset(), pointable1.getLength(), key1, 0); + computer.normalize(pointable2.getByteArray(), pointable2.getStartOffset(), pointable2.getLength(), key2, 0); + if (key1[0] < key2[0]) { assertTrue(pointable1.compareTo(pointable2) < 0); - } else if (n1 > n2) { + } else if (key1[0] > key2[0]) { assertTrue(pointable1.compareTo(pointable2) > 0); } } @@ -70,12 +72,15 @@ } ByteArrayPointable ptr1 = ByteArrayPointable.generatePointableFromPureBytes(new byte[] { 0, 25, 34, 42 }); - ByteArrayPointable ptr2 = ByteArrayPointable.generatePointableFromPureBytes( - new byte[] { (byte) 130, 25, 34, 42 }); + ByteArrayPointable ptr2 = + ByteArrayPointable.generatePointableFromPureBytes(new byte[] { (byte) 130, 25, 34, 42 }); - int n1 = computer.normalize(ptr1.getByteArray(), ptr1.getStartOffset(), ptr1.getLength()); - int n2 = computer.normalize(ptr2.getByteArray(), ptr2.getStartOffset(), ptr2.getLength()); - assertTrue(n1 < n2); + int[] key1 = new int[ByteArrayNormalizedKeyComputerFactory.PROPERTIES.getNormalizedKeyLength()]; + int[] key2 = new int[ByteArrayNormalizedKeyComputerFactory.PROPERTIES.getNormalizedKeyLength()]; + computer.normalize(ptr1.getByteArray(), ptr1.getStartOffset(), ptr1.getLength(), key1, 0); + computer.normalize(ptr2.getByteArray(), ptr2.getStartOffset(), ptr2.getLength(), key2, 0); + + assertTrue(key1[0] < key2[0]); } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactoryTest.java new file mode 100644 index 0000000..012ef07 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/DoubleNormalizedKeyComputerFactoryTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.common.data.normalizers; + +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.DoublePointable; +import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils; + +public class DoubleNormalizedKeyComputerFactoryTest extends AbstractNormalizedKeyComputerFactoryTest { + + private final INormalizedKeyComputer normalizer = + new DoubleNormalizedKeyComputerFactory().createNormalizedKeyComputer(); + + @Override + protected IPointable getLargePositive() { + return getDoublePointable(Double.MAX_VALUE - 1); + } + + @Override + protected IPointable getSmallPositive() { + return getDoublePointable(Double.MIN_VALUE); + } + + @Override + protected IPointable getLargeNegative() { + return getDoublePointable(Double.MIN_VALUE * -1); + + } + + @Override + protected IPointable getSmallNegative() { + return getDoublePointable(Double.MAX_VALUE * -1); + } + + private IPointable getDoublePointable(double value) { + DoublePointable pointable = (DoublePointable) DoublePointable.FACTORY.createPointable(); + pointable.set(new byte[Double.BYTES], 0, Double.BYTES); + pointable.setDouble(value); + return pointable; + } + + @Override + protected int[] normalize(IPointable value) { + int[] keys = NormalizedKeyUtils.createNormalizedKeyArray(normalizer); + normalizer.normalize(value.getByteArray(), value.getStartOffset(), value.getLength(), keys, 0); + return keys; + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactoryTest.java new file mode 100644 index 0000000..cd6f4c4 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/FloatNormalizedKeyComputerFactoryTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.common.data.normalizers; + +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.FloatPointable; +import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils; + +public class FloatNormalizedKeyComputerFactoryTest extends AbstractNormalizedKeyComputerFactoryTest { + + private final INormalizedKeyComputer normalizer = + new FloatNormalizedKeyComputerFactory().createNormalizedKeyComputer(); + + @Override + protected IPointable getLargePositive() { + return getFloatPointable(Float.MAX_VALUE - 1); + } + + @Override + protected IPointable getSmallPositive() { + return getFloatPointable(Float.MIN_VALUE); + } + + @Override + protected IPointable getLargeNegative() { + return getFloatPointable(Float.MIN_VALUE * -1); + + } + + @Override + protected IPointable getSmallNegative() { + return getFloatPointable(Float.MAX_VALUE * -1); + } + + private IPointable getFloatPointable(float value) { + FloatPointable pointable = (FloatPointable) FloatPointable.FACTORY.createPointable(); + pointable.set(new byte[Float.BYTES], 0, Float.BYTES); + pointable.setFloat(value); + return pointable; + } + + @Override + protected int[] normalize(IPointable value) { + int[] keys = NormalizedKeyUtils.createNormalizedKeyArray(normalizer); + normalizer.normalize(value.getByteArray(), value.getStartOffset(), value.getLength(), keys, 0); + return keys; + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactoryTest.java new file mode 100644 index 0000000..6ca623d --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/Integer64NormalizedKeyComputerFactoryTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.common.data.normalizers; + +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.LongPointable; +import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils; + +public class Integer64NormalizedKeyComputerFactoryTest extends AbstractNormalizedKeyComputerFactoryTest { + + private final INormalizedKeyComputer normalizer = + new Integer64NormalizedKeyComputerFactory().createNormalizedKeyComputer(); + + @Override + protected IPointable getLargePositive() { + return getLongPointable(Long.MAX_VALUE - 1); + } + + @Override + protected IPointable getSmallPositive() { + return getLongPointable(1); + } + + @Override + protected IPointable getLargeNegative() { + return getLongPointable(-1); + } + + @Override + protected IPointable getSmallNegative() { + return getLongPointable(Long.MIN_VALUE + 1); + } + + private IPointable getLongPointable(long value) { + LongPointable pointable = LongPointable.FACTORY.createPointable(); + pointable.set(new byte[Long.BYTES], 0, Long.BYTES); + pointable.setLong(value); + return pointable; + } + + @Override + protected int[] normalize(IPointable value) { + int[] keys = NormalizedKeyUtils.createNormalizedKeyArray(normalizer); + normalizer.normalize(value.getByteArray(), value.getStartOffset(), value.getLength(), keys, 0); + return keys; + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactoryTest.java new file mode 100644 index 0000000..9122721 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/test/java/org/apache/hyracks/dataflow/common/data/normalizers/IntegerNormalizedKeyComputerFactoryTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.dataflow.common.data.normalizers; + +import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.IntegerPointable; +import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils; + +public class IntegerNormalizedKeyComputerFactoryTest extends AbstractNormalizedKeyComputerFactoryTest { + + private final INormalizedKeyComputer normalizer = + new IntegerNormalizedKeyComputerFactory().createNormalizedKeyComputer(); + + @Override + protected IPointable getLargePositive() { + return getIntPointable(Integer.MAX_VALUE - 1); + } + + @Override + protected IPointable getSmallPositive() { + return getIntPointable(1); + } + + @Override + protected IPointable getLargeNegative() { + return getIntPointable(-1); + + } + + @Override + protected IPointable getSmallNegative() { + return getIntPointable(Integer.MIN_VALUE + 1); + } + + private IPointable getIntPointable(int value) { + IntegerPointable pointable = (IntegerPointable) IntegerPointable.FACTORY.createPointable(); + pointable.set(new byte[Integer.BYTES], 0, Integer.BYTES); + pointable.setInteger(value); + return pointable; + } + + @Override + protected int[] normalize(IPointable value) { + int[] keys = NormalizedKeyUtils.createNormalizedKeyArray(normalizer); + normalizer.normalize(value.getByteArray(), value.getStartOffset(), value.getLength(), keys, 0); + return keys; + } + +} diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java index 5cc854c..10cc954 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/intersect/IntersectOperatorDescriptor.java @@ -42,6 +42,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils; import org.apache.hyracks.dataflow.std.base.AbstractActivityNode; import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor; import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable; @@ -61,12 +62,17 @@ /** * @param spec - * @param nInputs Number of inputs - * @param compareFields The compare field list of each input. - * All the fields order should be the same with the comparatorFactories - * @param extraFields Extra field that - * @param firstKeyNormalizerFactory Normalizer for the first comparison key. - * @param comparatorFactories A list of comparators for each field + * @param nInputs + * Number of inputs + * @param compareFields + * The compare field list of each input. + * All the fields order should be the same with the comparatorFactories + * @param extraFields + * Extra field that + * @param firstKeyNormalizerFactory + * Normalizer for the first comparison key. + * @param comparatorFactories + * A list of comparators for each field * @param recordDescriptor * @throws HyracksException */ @@ -147,7 +153,10 @@ public static class IntersectOperatorNodePushable extends AbstractUnaryOutputOperatorNodePushable { - private enum ACTION {FAILED, CLOSE} + private enum ACTION { + FAILED, + CLOSE + } private final int inputArity; private final int[][] compareFields; @@ -158,6 +167,7 @@ private final FrameTupleAppender appender; private final INormalizedKeyComputer firstKeyNormalizerComputer; + private final boolean normalizedKeyDecisive; private final IBinaryComparator[] comparators; private boolean done = false; @@ -186,8 +196,12 @@ } this.allProjectFields = projectedFields; this.firstKeyNormalizerComputer = - firstKeyNormalizerFactory == null ? null : firstKeyNormalizerFactory.createNormalizedKeyComputer(); - + firstKeyNormalizerFactory != null ? firstKeyNormalizerFactory.createNormalizedKeyComputer() : null; + this.normalizedKeyDecisive = + firstKeyNormalizerFactory != null + ? firstKeyNormalizerFactory.getNormalizedKeyProperties().isDecisive() + && compareFields[0].length == 1 + : false; comparators = new IBinaryComparator[compareFields[0].length]; for (int i = 0; i < comparators.length; i++) { comparators[i] = comparatorFactory[i].createBinaryComparator(); @@ -213,6 +227,11 @@ @Override public IFrameWriter getInputFrameWriter(final int index) { return new IFrameWriter() { + private final int[] normalizedKey1 = + NormalizedKeyUtils.createNormalizedKeyArray(firstKeyNormalizerComputer); + private final int[] normalizedKey2 = + NormalizedKeyUtils.createNormalizedKeyArray(firstKeyNormalizerComputer); + @Override public void open() throws HyracksDataException { if (index == 0) { @@ -273,9 +292,8 @@ continue; } while (tupleIndexMarker[i] < refAccessor[i].getTupleCount()) { - int cmp = - compare(i, refAccessor[i], tupleIndexMarker[i], maxInput, refAccessor[maxInput], - tupleIndexMarker[maxInput]); + int cmp = compare(i, refAccessor[i], tupleIndexMarker[i], maxInput, + refAccessor[maxInput], tupleIndexMarker[maxInput]); if (cmp == 0) { match++; break; @@ -313,13 +331,14 @@ private int compare(int input1, FrameTupleAccessor frameTupleAccessor1, int tid1, int input2, FrameTupleAccessor frameTupleAccessor2, int tid2) throws HyracksDataException { - int firstNorm1 = getFirstNorm(input1, frameTupleAccessor1, tid1); - int firstNorm2 = getFirstNorm(input2, frameTupleAccessor2, tid2); - - if (firstNorm1 < firstNorm2) { - return -1; - } else if (firstNorm1 > firstNorm2) { - return 1; + if (firstKeyNormalizerComputer != null) { + getFirstNorm(input1, frameTupleAccessor1, tid1, normalizedKey1); + getFirstNorm(input2, frameTupleAccessor2, tid2, normalizedKey2); + int cmp = NormalizedKeyUtils.compareNormalizeKeys(normalizedKey1, 0, normalizedKey2, 0, + normalizedKey1.length); + if (cmp != 0 || normalizedKeyDecisive) { + return cmp; + } } for (int i = 0; i < comparators.length; i++) { @@ -337,12 +356,12 @@ return 0; } - private int getFirstNorm(int inputId1, FrameTupleAccessor frameTupleAccessor1, int tid1) { - return firstKeyNormalizerComputer == null ? - 0 : - firstKeyNormalizerComputer.normalize(frameTupleAccessor1.getBuffer().array(), - frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, compareFields[inputId1][0]), - frameTupleAccessor1.getFieldLength(tid1, compareFields[inputId1][0])); + private void getFirstNorm(int inputId1, FrameTupleAccessor frameTupleAccessor1, int tid1, int[] keys) { + if (firstKeyNormalizerComputer != null) { + firstKeyNormalizerComputer.normalize(frameTupleAccessor1.getBuffer().array(), + frameTupleAccessor1.getAbsoluteFieldStartOffset(tid1, compareFields[inputId1][0]), + frameTupleAccessor1.getFieldLength(tid1, compareFields[inputId1][0]), keys, 0); + } } private int findMaxInput() throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java index eead09e..f51271e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java @@ -35,6 +35,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils; import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo; import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager; import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool; @@ -100,7 +101,7 @@ int runningNormalizedKeyTotalLength = 0; if (normalizedKeyComputerFactories != null) { - int decisivePrefixLength = getDecisivePrefixLength(normalizedKeyComputerFactories); + int decisivePrefixLength = NormalizedKeyUtils.getDecisivePrefixLength(normalizedKeyComputerFactories); // we only take a prefix of the decisive normalized keys, plus at most indecisive normalized keys // ideally, the caller should prepare normalizers in this way, but we just guard here to avoid @@ -112,7 +113,8 @@ for (int i = 0; i < normalizedKeys; i++) { this.nkcs[i] = normalizedKeyComputerFactories[i].createNormalizedKeyComputer(); - this.normalizedKeyLength[i] = normalizedKeyComputerFactories[i].getNormalizedKeyLength(); + this.normalizedKeyLength[i] = + normalizedKeyComputerFactories[i].getNormalizedKeyProperties().getNormalizedKeyLength(); runningNormalizedKeyTotalLength += this.normalizedKeyLength[i]; } this.normalizedKeysDecisive = decisivePrefixLength == comparatorFactories.length; @@ -246,8 +248,9 @@ protected final int compare(int[] tPointers1, int tp1, int[] tPointers2, int tp2) throws HyracksDataException { if (nkcs != null) { - int cmpNormalizedKey = compareNormalizeKeys(tPointers1, tp1 * ptrSize + ID_NORMALIZED_KEY, tPointers2, - tp2 * ptrSize + ID_NORMALIZED_KEY, normalizedKeyTotalLength); + int cmpNormalizedKey = + NormalizedKeyUtils.compareNormalizeKeys(tPointers1, tp1 * ptrSize + ID_NORMALIZED_KEY, tPointers2, + tp2 * ptrSize + ID_NORMALIZED_KEY, normalizedKeyTotalLength); if (cmpNormalizedKey != 0 || normalizedKeysDecisive) { return cmpNormalizedKey; } @@ -281,29 +284,6 @@ } } return 0; - } - - public static int compareNormalizeKeys(int[] keys1, int start1, int[] keys2, int start2, int length) { - for (int i = 0; i < length; i++) { - int key1 = keys1[start1 + i]; - int key2 = keys2[start2 + i]; - if (key1 != key2) { - return (((key1) & 0xffffffffL) < ((key2) & 0xffffffffL)) ? -1 : 1; - } - } - return 0; - } - - public static int getDecisivePrefixLength(INormalizedKeyComputerFactory[] keyNormalizerFactories) { - if (keyNormalizerFactories == null) { - return 0; - } - for (int i = 0; i < keyNormalizerFactories.length; i++) { - if (!keyNormalizerFactories[i].isDecisive()) { - return i; - } - } - return keyNormalizerFactories.length; } protected void swap(int pointers1[], int pos1, int pointers2[], int pos2) { diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java index f001bed..3cbe86b 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/RunMergingFrameReader.java @@ -30,6 +30,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; +import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils; import org.apache.hyracks.dataflow.std.sort.util.GroupFrameAccessor; import org.apache.hyracks.dataflow.std.util.ReferenceEntry; import org.apache.hyracks.dataflow.std.util.ReferencedPriorityQueue; @@ -41,6 +42,8 @@ private final int[] sortFields; private final IBinaryComparator[] comparators; private final INormalizedKeyComputer nmkComputer; + private final int normalizedKeyLength; + private final boolean normalizedKeyDecisive; private final RecordDescriptor recordDesc; private final int topK; private int tupleCount; @@ -64,6 +67,14 @@ this.sortFields = sortFields; this.comparators = comparators; this.nmkComputer = nmkComputer; + this.normalizedKeyLength = + nmkComputer != null ? nmkComputer.getNormalizedKeyProperties().getNormalizedKeyLength() : 0; + // right now we didn't take multiple key normalizers for frame merger, since during this step it won't be + // too many cache misses (merging multiple runs sequentially). + // but still, we can apply a special optimization if there is only 1 sort field + this.normalizedKeyDecisive = + nmkComputer != null ? nmkComputer.getNormalizedKeyProperties().isDecisive() && comparators.length == 1 + : false; this.recordDesc = recordDesc; this.topK = topK; } @@ -153,8 +164,7 @@ } private static void closeRun(int index, List runCursors, - IFrameTupleAccessor[] tupleAccessors) - throws HyracksDataException { + IFrameTupleAccessor[] tupleAccessors) throws HyracksDataException { if (runCursors.get(index) != null) { runCursors.get(index).close(); runCursors.set(index, null); @@ -164,32 +174,39 @@ private Comparator createEntryComparator(final IBinaryComparator[] comparators) { return new Comparator() { + @Override public int compare(ReferenceEntry tp1, ReferenceEntry tp2) { - int nmk1 = tp1.getNormalizedKey(); - int nmk2 = tp2.getNormalizedKey(); - if (nmk1 != nmk2) { - return ((((long) nmk1) & 0xffffffffL) < (((long) nmk2) & 0xffffffffL)) ? -1 : 1; + int[] tPointers1 = tp1.getTPointers(); + int[] tPointers2 = tp2.getTPointers(); + int cmp = NormalizedKeyUtils.compareNormalizeKeys(tPointers1, 0, tPointers2, 0, normalizedKeyLength); + if (cmp != 0) { + return cmp; + } else if (normalizedKeyDecisive) { + // we further compare the run id + return compareRun(tp1, tp2); } IFrameTupleAccessor fta1 = tp1.getAccessor(); IFrameTupleAccessor fta2 = tp2.getAccessor(); byte[] b1 = fta1.getBuffer().array(); byte[] b2 = fta2.getBuffer().array(); - int[] tPointers1 = tp1.getTPointers(); - int[] tPointers2 = tp2.getTPointers(); for (int f = 0; f < sortFields.length; ++f) { int c; try { - c = comparators[f].compare(b1, tPointers1[2 * f + 1], tPointers1[2 * f + 2], b2, - tPointers2[2 * f + 1], tPointers2[2 * f + 2]); + c = comparators[f].compare(b1, tPointers1[2 * f + normalizedKeyLength], + tPointers1[2 * f + normalizedKeyLength + 1], b2, + tPointers2[2 * f + normalizedKeyLength], tPointers2[2 * f + normalizedKeyLength + 1]); if (c != 0) { return c; } } catch (HyracksDataException e) { throw new IllegalArgumentException(e); } - } + return compareRun(tp1, tp2); + } + + private int compareRun(ReferenceEntry tp1, ReferenceEntry tp2) { int runid1 = tp1.getRunid(); int runid2 = tp2.getRunid(); return runid1 < runid2 ? -1 : (runid1 == runid2 ? 0 : 1); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java index b02f859..46d62a7 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java @@ -34,6 +34,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.utils.NormalizedKeyUtils; import org.apache.hyracks.dataflow.std.buffermanager.IDeletableTupleBufferManager; import org.apache.hyracks.dataflow.std.buffermanager.ITuplePointerAccessor; import org.apache.hyracks.dataflow.std.structures.IResetableComparable; @@ -65,7 +66,7 @@ @Override public int compareTo(HeapEntry o) { - int cmpNormalizedKey = AbstractFrameSorter.compareNormalizeKeys(nmk, 0, o.nmk, 0, normalizedKeyTotalLength); + int cmpNormalizedKey = NormalizedKeyUtils.compareNormalizeKeys(nmk, 0, o.nmk, 0, normalizedKeyTotalLength); if (cmpNormalizedKey != 0 || normalizedKeyDecisive) { return cmpNormalizedKey; } @@ -141,7 +142,7 @@ int runningNormalizedKeyTotalLength = 0; if (keyNormalizerFactories != null) { - int decisivePrefixLength = AbstractFrameSorter.getDecisivePrefixLength(keyNormalizerFactories); + int decisivePrefixLength = NormalizedKeyUtils.getDecisivePrefixLength(keyNormalizerFactories); // we only take a prefix of the decisive normalized keys, plus at most indecisive normalized keys // ideally, the caller should prepare normalizers in this way, but we just guard here to avoid @@ -153,7 +154,8 @@ for (int i = 0; i < normalizedKeys; i++) { this.nkcs[i] = keyNormalizerFactories[i].createNormalizedKeyComputer(); - this.normalizedKeyLength[i] = keyNormalizerFactories[i].getNormalizedKeyLength(); + this.normalizedKeyLength[i] = + keyNormalizerFactories[i].getNormalizedKeyProperties().getNormalizedKeyLength(); runningNormalizedKeyTotalLength += this.normalizedKeyLength[i]; } this.normalizedKeyDecisive = decisivePrefixLength == comparatorFactories.length; @@ -225,7 +227,7 @@ private int compareTuple(IFrameTupleAccessor frameTupleAccessor, int tid, int[] nmkey, HeapEntry maxEntry) throws HyracksDataException { int cmpNormalizedKey = - AbstractFrameSorter.compareNormalizeKeys(nmkey, 0, maxEntry.nmk, 0, normalizedKeyTotalLength); + NormalizedKeyUtils.compareNormalizeKeys(nmkey, 0, maxEntry.nmk, 0, normalizedKeyTotalLength); if (cmpNormalizedKey != 0 || normalizedKeyDecisive) { return cmpNormalizedKey; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java index 8eb5c2e..4fa4fab 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/ReferenceEntry.java @@ -26,14 +26,17 @@ private final int runid; private IFrameTupleAccessor acccessor; private int tupleIndex; - private int[] tPointers; + private final int[] tPointers; + private final int normalizedKeyLength; public ReferenceEntry(int runid, FrameTupleAccessor fta, int tupleIndex, int[] keyFields, INormalizedKeyComputer nmkComputer) { super(); this.runid = runid; this.acccessor = fta; - this.tPointers = new int[1 + 2 * keyFields.length]; + this.normalizedKeyLength = + nmkComputer != null ? nmkComputer.getNormalizedKeyProperties().getNormalizedKeyLength() : 0; + this.tPointers = new int[normalizedKeyLength + 2 * keyFields.length]; if (fta != null) { initTPointer(fta, tupleIndex, keyFields, nmkComputer); } @@ -59,10 +62,6 @@ return tupleIndex; } - public int getNormalizedKey() { - return tPointers[0]; - } - public void setTupleIndex(int tupleIndex, int[] keyFields, INormalizedKeyComputer nmkComputer) { initTPointer(acccessor, tupleIndex, keyFields, nmkComputer); } @@ -73,14 +72,11 @@ byte[] b1 = fta.getBuffer().array(); for (int f = 0; f < keyFields.length; ++f) { int fIdx = keyFields[f]; - tPointers[2 * f + 1] = fta.getAbsoluteFieldStartOffset(tupleIndex, fIdx); - tPointers[2 * f + 2] = fta.getFieldLength(tupleIndex, fIdx); - if (f == 0) { - if (nmkComputer != null) { - tPointers[0] = nmkComputer.normalize(b1, tPointers[1], tPointers[2]); - } else { - tPointers[0] = 0; - } + tPointers[2 * f + normalizedKeyLength] = fta.getAbsoluteFieldStartOffset(tupleIndex, fIdx); + tPointers[2 * f + normalizedKeyLength + 1] = fta.getFieldLength(tupleIndex, fIdx); + if (f == 0 && nmkComputer != null) { + nmkComputer.normalize(b1, tPointers[normalizedKeyLength], tPointers[normalizedKeyLength + 1], tPointers, + 0); } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2225 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Idba747285af74195ef9953ed9bf5f6f217511380 Gerrit-PatchSet: 13 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Luo Chen Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ian Maxon Gerrit-Reviewer: Jenkins Gerrit-Reviewer: Luo Chen Gerrit-Reviewer: Till Westmann Gerrit-Reviewer: abdullah alamoudi