Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 36DFB200C09 for ; Wed, 25 Jan 2017 12:39:25 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 35A8D160B50; Wed, 25 Jan 2017 11:39:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1904A160B4E for ; Wed, 25 Jan 2017 12:39:23 +0100 (CET) Received: (qmail 86136 invoked by uid 500); 25 Jan 2017 11:39:23 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 86127 invoked by uid 99); 25 Jan 2017 11:39:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Jan 2017 11:39:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3847FDFA69; Wed, 25 Jan 2017 11:39:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ptupitsyn@apache.org To: commits@ignite.apache.org Date: Wed, 25 Jan 2017 11:39:23 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/6] ignite git commit: IGNITE-4598: Hadoop: implemented raw comparator for BytesWritable key type. This closes #1457. archived-at: Wed, 25 Jan 2017 11:39:25 -0000 Repository: ignite Updated Branches: refs/heads/ignite-2.0 e219bad22 -> 490e9a138 IGNITE-4598: Hadoop: implemented raw comparator for BytesWritable key type. This closes #1457. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4d5976d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4d5976d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4d5976d Branch: refs/heads/ignite-2.0 Commit: d4d5976dd354e05f6ac5fa2e2faf1ac66f3b7dec Parents: 28d66db Author: devozerov Authored: Tue Jan 24 16:45:59 2017 +0300 Committer: devozerov Committed: Tue Jan 24 16:45:59 2017 +0300 ---------------------------------------------------------------------- .../io/BytesWritablePartiallyRawComparator.java | 51 +++++++++++++++ .../hadoop/io/TextPartiallyRawComparator.java | 68 +------------------- .../processors/hadoop/impl/HadoopUtils.java | 66 +++++++++++++++++++ .../hadoop/impl/v2/HadoopV2TaskContext.java | 13 +++- 4 files changed, 129 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d5976d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/BytesWritablePartiallyRawComparator.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/BytesWritablePartiallyRawComparator.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/BytesWritablePartiallyRawComparator.java new file mode 100644 index 0000000..da9240b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/BytesWritablePartiallyRawComparator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.hadoop.io; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.apache.ignite.internal.processors.hadoop.io.OffheapRawMemory; +import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx; + +/** + * Partial raw comparator for {@link BytesWritable} data type. + *

+ * Implementation is borrowed from {@code org.apache.hadoop.io.FastByteComparisons} and adopted to Ignite + * infrastructure. + */ +public class BytesWritablePartiallyRawComparator implements PartiallyRawComparator, + PartiallyOffheapRawComparatorEx { + /** Length bytes. */ + private static final int LEN_BYTES = 4; + + /** {@inheritDoc} */ + @Override public int compare(BytesWritable val1, RawMemory val2Buf) { + if (val2Buf instanceof OffheapRawMemory) { + OffheapRawMemory val2Buf0 = (OffheapRawMemory)val2Buf; + + return compare(val1, val2Buf0.pointer(), val2Buf0.length()); + } + else + throw new UnsupportedOperationException("Text can be compared only with offheap memory."); + } + + /** {@inheritDoc} */ + @Override public int compare(BytesWritable val1, long val2Ptr, int val2Len) { + return HadoopUtils.compareBytes(val1.getBytes(), val1.getLength(), val2Ptr + LEN_BYTES, val2Len - LEN_BYTES); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d5976d/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java index a2bc3d4..e82f5e4 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java @@ -17,10 +17,9 @@ package org.apache.ignite.hadoop.io; -import com.google.common.primitives.Longs; -import com.google.common.primitives.UnsignedBytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; import org.apache.ignite.internal.processors.hadoop.io.OffheapRawMemory; import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx; import org.apache.ignite.internal.util.GridUnsafe; @@ -47,69 +46,6 @@ public class TextPartiallyRawComparator implements PartiallyRawComparator, @Override public int compare(Text val1, long val2Ptr, int val2Len) { int len2 = WritableUtils.decodeVIntSize(GridUnsafe.getByte(val2Ptr)); - return compareBytes(val1.getBytes(), val1.getLength(), val2Ptr + len2, val2Len - len2); - } - - /** - * Internal comparison routine. - * - * @param buf1 Bytes 1. - * @param len1 Length 1. - * @param ptr2 Pointer 2. - * @param len2 Length 2. - * @return Result. - */ - @SuppressWarnings("SuspiciousNameCombination") - private static int compareBytes(byte[] buf1, int len1, long ptr2, int len2) { - int minLength = Math.min(len1, len2); - - int minWords = minLength / Longs.BYTES; - - for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) { - long lw = GridUnsafe.getLong(buf1, GridUnsafe.BYTE_ARR_OFF + i); - long rw = GridUnsafe.getLong(ptr2 + i); - - long diff = lw ^ rw; - - if (diff != 0) { - if (GridUnsafe.BIG_ENDIAN) - return (lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE) ? -1 : 1; - - // Use binary search - int n = 0; - int y; - int x = (int) diff; - - if (x == 0) { - x = (int) (diff >>> 32); - - n = 32; - } - - y = x << 16; - - if (y == 0) - n += 16; - else - x = y; - - y = x << 8; - - if (y == 0) - n += 8; - - return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL)); - } - } - - // The epilogue to cover the last (minLength % 8) elements. - for (int i = minWords * Longs.BYTES; i < minLength; i++) { - int res = UnsignedBytes.compare(buf1[i], GridUnsafe.getByte(ptr2 + i)); - - if (res != 0) - return res; - } - - return len1 - len2; + return HadoopUtils.compareBytes(val1.getBytes(), val1.getLength(), val2Ptr + len2, val2Len - len2); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d5976d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java index a34388d..767e10a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopUtils.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.hadoop.impl; +import com.google.common.primitives.Longs; +import com.google.common.primitives.UnsignedBytes; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; @@ -32,6 +34,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; import java.io.ByteArrayInputStream; @@ -328,4 +331,67 @@ public class HadoopUtils { HadoopCommonUtils.restoreContextClassLoader(oldLdr); } } + + /** + * Internal comparison routine. + * + * @param buf1 Bytes 1. + * @param len1 Length 1. + * @param ptr2 Pointer 2. + * @param len2 Length 2. + * @return Result. + */ + @SuppressWarnings("SuspiciousNameCombination") + public static int compareBytes(byte[] buf1, int len1, long ptr2, int len2) { + int minLength = Math.min(len1, len2); + + int minWords = minLength / Longs.BYTES; + + for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) { + long lw = GridUnsafe.getLong(buf1, GridUnsafe.BYTE_ARR_OFF + i); + long rw = GridUnsafe.getLong(ptr2 + i); + + long diff = lw ^ rw; + + if (diff != 0) { + if (GridUnsafe.BIG_ENDIAN) + return (lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE) ? -1 : 1; + + // Use binary search + int n = 0; + int y; + int x = (int) diff; + + if (x == 0) { + x = (int) (diff >>> 32); + + n = 32; + } + + y = x << 16; + + if (y == 0) + n += 16; + else + x = y; + + y = x << 8; + + if (y == 0) + n += 8; + + return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL)); + } + } + + // The epilogue to cover the last (minLength % 8) elements. + for (int i = minWords * Longs.BYTES; i < minLength; i++) { + int res = UnsignedBytes.compare(buf1[i], GridUnsafe.getByte(ptr2 + i)); + + if (res != 0) + return res; + } + + return len1 - len2; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/d4d5976d/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java index 8acc7aa..b8d4cac 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.serializer.Deserializer; @@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.hadoop.io.BytesWritablePartiallyRawComparator; import org.apache.ignite.hadoop.io.PartiallyRawComparator; import org.apache.ignite.hadoop.io.TextPartiallyRawComparator; import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils; @@ -48,7 +50,6 @@ import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty; -import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput; import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper; @@ -155,6 +156,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { COMBINE_KEY_GROUPING_SUPPORTED = ok; + PARTIAL_COMPARATORS.put(ByteWritable.class.getName(), BytesWritablePartiallyRawComparator.class.getName()); PARTIAL_COMPARATORS.put(Text.class.getName(), TextPartiallyRawComparator.class.getName()); } @@ -595,11 +597,16 @@ public class HadoopV2TaskContext extends HadoopTaskContext { if (clsName == null) { Class keyCls = conf.getMapOutputKeyClass(); - if (keyCls != null) { + while (keyCls != null) { clsName = PARTIAL_COMPARATORS.get(keyCls.getName()); - if (clsName != null) + if (clsName != null) { conf.set(HadoopJobProperty.JOB_PARTIALLY_RAW_COMPARATOR.propertyName(), clsName); + + break; + } + + keyCls = keyCls.getSuperclass(); } } }