ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [04/50] [abbrv] ignite git commit: IGNITE-4277: Hadoop: implemented "partially raw" comparator. This closes #1345.
Date Mon, 26 Dec 2016 11:16:20 GMT
IGNITE-4277: Hadoop: implemented "partially raw" comparator. This closes #1345.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c1ddf21f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c1ddf21f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c1ddf21f

Branch: refs/heads/ignite-2.0
Commit: c1ddf21fd627c76a8b7e0d81ad43480b1f1e204d
Parents: 30b869d
Author: devozerov <vozerov@gridgain.com>
Authored: Thu Dec 15 11:58:28 2016 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Thu Dec 15 13:46:41 2016 +0300

----------------------------------------------------------------------
 .../processors/hadoop/HadoopClassLoader.java    |   1 +
 .../processors/hadoop/HadoopJobProperty.java    |   6 +-
 .../processors/hadoop/HadoopTaskContext.java    |   8 ++
 .../io/PartiallyOffheapRawComparatorEx.java     |  33 +++++
 .../hadoop/io/PartiallyRawComparator.java       |  33 +++++
 .../org/apache/ignite/hadoop/io/RawMemory.java  |  86 ++++++++++++
 .../hadoop/io/TextPartiallyRawComparator.java   | 115 ++++++++++++++++
 .../apache/ignite/hadoop/io/package-info.java   |  22 ++++
 ...DelegatingPartiallyOffheapRawComparator.java |  54 ++++++++
 .../hadoop/impl/v2/HadoopV2TaskContext.java     |  21 +++
 .../processors/hadoop/io/OffheapRawMemory.java  | 131 +++++++++++++++++++
 .../shuffle/collections/HadoopSkipList.java     |  14 +-
 .../hadoop/impl/HadoopTeraSortTest.java         |   7 +
 .../collections/HadoopAbstractMapTest.java      |   6 +
 14 files changed, 535 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
index f6c2fa9..81c1405 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java
@@ -372,6 +372,7 @@ public class HadoopClassLoader extends URLClassLoader implements ClassCache
{
             // We use "contains" instead of "equals" to handle subclasses properly.
             if (clsName.contains("org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem")
||
                 clsName.contains("org.apache.ignite.hadoop.fs.v2.IgniteHadoopFileSystem")
||
+                clsName.contains("org.apache.ignite.hadoop.io.TextPartialRawComparator")
||
                 clsName.contains("org.apache.ignite.hadoop.mapreduce.IgniteHadoopClientProtocolProvider"))
                 return true;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
index 9e1dede..4122eef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
@@ -64,6 +64,11 @@ public enum HadoopJobProperty {
     JOB_SHARED_CLASSLOADER("ignite.job.shared.classloader"),
 
     /**
+     * Fully qualified name of partially-raw comparator which should be used on sorting phase.
+     */
+    JOB_PARTIAL_RAW_COMPARATOR("ignite.job.partial.raw.comparator"),
+
+    /**
      * Size in bytes of single memory page which will be allocated for data structures in
shuffle.
      * <p>
      * By default is {@code 32 * 1024}.
@@ -112,7 +117,6 @@ public enum HadoopJobProperty {
      */
     SHUFFLE_JOB_THROTTLE("ignite.shuffle.job.throttle");
 
-
     /** Property name. */
     private final String propName;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
index ecb9f26..dddd017 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskContext.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
 
 /**
  * Task context.
@@ -157,6 +158,13 @@ public abstract class HadoopTaskContext {
     public abstract Comparator<Object> sortComparator();
 
     /**
+     * Get semi-raw sorting comparator.
+     *
+     * @return Semi-raw sorting comparator.
+     */
+    public abstract PartiallyOffheapRawComparatorEx<Object> partialRawSortComparator();
+
+    /**
      * Gets comparator for grouping on combine or reduce operation.
      *
      * @return Comparator.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/io/PartiallyOffheapRawComparatorEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/io/PartiallyOffheapRawComparatorEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/io/PartiallyOffheapRawComparatorEx.java
new file mode 100644
index 0000000..157609e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/io/PartiallyOffheapRawComparatorEx.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.io;
+
+/**
+ * Special version of raw comparator allowing direct access to the underlying memory.
+ */
+public interface PartiallyOffheapRawComparatorEx<T> {
+    /**
+     * Perform compare.
+     *
+     * @param val1 First value.
+     * @param val2Ptr Pointer to the second value data.
+     * @param val2Len Length of the second value data.
+     * @return Result.
+     */
+    int compare(T val1, long val2Ptr, int val2Len);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/PartiallyRawComparator.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/PartiallyRawComparator.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/PartiallyRawComparator.java
new file mode 100644
index 0000000..b9a4505
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/PartiallyRawComparator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.io;
+
+/**
+ * Partially raw comparator. Compares one deserialized value with serialized value.
+ */
+public interface PartiallyRawComparator<T> {
+    /**
+     * Do compare.
+     *
+     * @param val1 First value (deserialized).
+     * @param val2Buf Second value (serialized).
+     * @return A negative integer, zero, or a positive integer as this object is less than,
equal to, or greater
+     *     than the specified object.
+     */
+    int compare(T val1, RawMemory val2Buf);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/RawMemory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/RawMemory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/RawMemory.java
new file mode 100644
index 0000000..8dcaf83
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/RawMemory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.io;
+
+/**
+ * Memory abstraction for raw comparison.
+ */
+public interface RawMemory {
+    /**
+     * Get byte value at the given index.
+     *
+     * @param idx Index.
+     * @return Value.
+     */
+    byte get(int idx);
+
+    /**
+     * Get short value at the given index.
+     *
+     * @param idx Index.
+     * @return Value.
+     */
+    short getShort(int idx);
+
+    /**
+     * Get char value at the given index.
+     *
+     * @param idx Index.
+     * @return Value.
+     */
+    char getChar(int idx);
+
+    /**
+     * Get int value at the given index.
+     *
+     * @param idx Index.
+     * @return Value.
+     */
+    int getInt(int idx);
+
+    /**
+     * Get long value at the given index.
+     *
+     * @param idx Index.
+     * @return Value.
+     */
+    long getLong(int idx);
+
+    /**
+     * Get float value at the given index.
+     *
+     * @param idx Index.
+     * @return Value.
+     */
+    float getFloat(int idx);
+
+    /**
+     * Get double value at the given index.
+     *
+     * @param idx Index.
+     * @return Value.
+     */
+    double getDouble(int idx);
+
+    /**
+     * Get length.
+     *
+     * @return Length.
+     */
+    int length();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
new file mode 100644
index 0000000..a2bc3d4
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/TextPartiallyRawComparator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.io;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.ignite.internal.processors.hadoop.io.OffheapRawMemory;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/**
+ * Partial raw comparator for {@link Text} data type.
+ * <p>
+ * Implementation is borrowed from {@code org.apache.hadoop.io.FastByteComparisons} and adopted
to Ignite
+ * infrastructure.
+ */
+public class TextPartiallyRawComparator implements PartiallyRawComparator<Text>, PartiallyOffheapRawComparatorEx<Text>
{
+    /** {@inheritDoc} */
+    @Override public int compare(Text val1, RawMemory val2Buf) {
+        if (val2Buf instanceof OffheapRawMemory) {
+            OffheapRawMemory val2Buf0 = (OffheapRawMemory)val2Buf;
+
+            return compare(val1, val2Buf0.pointer(), val2Buf0.length());
+        }
+        else
+            throw new UnsupportedOperationException("Text can be compared only with offheap
memory.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compare(Text val1, long val2Ptr, int val2Len) {
+        int len2 = WritableUtils.decodeVIntSize(GridUnsafe.getByte(val2Ptr));
+
+        return compareBytes(val1.getBytes(), val1.getLength(), val2Ptr + len2, val2Len -
len2);
+    }
+
+    /**
+     * Internal comparison routine.
+     *
+     * @param buf1 Bytes 1.
+     * @param len1 Length 1.
+     * @param ptr2 Pointer 2.
+     * @param len2 Length 2.
+     * @return Result.
+     */
+    @SuppressWarnings("SuspiciousNameCombination")
+    private static int compareBytes(byte[] buf1, int len1, long ptr2, int len2) {
+        int minLength = Math.min(len1, len2);
+
+        int minWords = minLength / Longs.BYTES;
+
+        for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+            long lw = GridUnsafe.getLong(buf1, GridUnsafe.BYTE_ARR_OFF + i);
+            long rw = GridUnsafe.getLong(ptr2 + i);
+
+            long diff = lw ^ rw;
+
+            if (diff != 0) {
+                if (GridUnsafe.BIG_ENDIAN)
+                    return (lw + Long.MIN_VALUE) < (rw + Long.MIN_VALUE) ? -1 : 1;
+
+                // Use binary search
+                int n = 0;
+                int y;
+                int x = (int) diff;
+
+                if (x == 0) {
+                    x = (int) (diff >>> 32);
+
+                    n = 32;
+                }
+
+                y = x << 16;
+
+                if (y == 0)
+                    n += 16;
+                else
+                    x = y;
+
+                y = x << 8;
+
+                if (y == 0)
+                    n += 8;
+
+                return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) &
0xFFL));
+            }
+        }
+
+        // The epilogue to cover the last (minLength % 8) elements.
+        for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+            int res = UnsignedBytes.compare(buf1[i], GridUnsafe.getByte(ptr2 + i));
+
+            if (res != 0)
+                return res;
+        }
+
+        return len1 - len2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/package-info.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/package-info.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/package-info.java
new file mode 100644
index 0000000..0d1f7b9
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/io/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains <b>Hadoop Accelerator</b> API for input-output operations.
+ */
+package org.apache.ignite.hadoop.io;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2DelegatingPartiallyOffheapRawComparator.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2DelegatingPartiallyOffheapRawComparator.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2DelegatingPartiallyOffheapRawComparator.java
new file mode 100644
index 0000000..e6d369e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2DelegatingPartiallyOffheapRawComparator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.v2;
+
+import org.apache.ignite.hadoop.io.PartiallyRawComparator;
+import org.apache.ignite.internal.processors.hadoop.io.OffheapRawMemory;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
+
+/**
+ * Delegating partial raw comparator.
+ */
+public class HadoopV2DelegatingPartiallyOffheapRawComparator<T> implements PartiallyOffheapRawComparatorEx<T>
{
+    /** Target comparator. */
+    private final PartiallyRawComparator<T> target;
+
+    /** Memory. */
+    private OffheapRawMemory mem;
+
+    /**
+     * Constructor.
+     *
+     * @param target Target.
+     */
+    public HadoopV2DelegatingPartiallyOffheapRawComparator(PartiallyRawComparator<T>
target) {
+        assert target != null;
+
+        this.target = target;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compare(T val1, long val2Ptr, int val2Len) {
+        if (mem == null)
+            mem = new OffheapRawMemory(val2Ptr, val2Len);
+        else
+            mem.update(val2Ptr, val2Len);
+
+        return target.compare(val1, mem);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
index d444f2b..42bbec5 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java
@@ -38,13 +38,16 @@ import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.hadoop.io.PartiallyRawComparator;
 import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
 import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopExternalSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
 import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
 import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
 import org.apache.ignite.internal.processors.hadoop.HadoopSplitWrapper;
@@ -62,6 +65,7 @@ import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1MapTask;
 import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1Partitioner;
 import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1ReduceTask;
 import org.apache.ignite.internal.processors.hadoop.impl.v1.HadoopV1SetupTask;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
 import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
@@ -421,11 +425,28 @@ public class HadoopV2TaskContext extends HadoopTaskContext {
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public Comparator<Object> sortComparator() {
         return (Comparator<Object>)jobCtx.getSortComparator();
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public PartiallyOffheapRawComparatorEx<Object> partialRawSortComparator()
{
+        Class cls = jobCtx.getJobConf().getClass(HadoopJobProperty.JOB_PARTIAL_RAW_COMPARATOR.propertyName(),
null);
+
+        if (cls == null)
+            return null;
+
+        Object res = ReflectionUtils.newInstance(cls, jobConf());
+
+        if (res instanceof PartiallyOffheapRawComparatorEx)
+            return (PartiallyOffheapRawComparatorEx)res;
+        else
+            return new HadoopV2DelegatingPartiallyOffheapRawComparator<>((PartiallyRawComparator)res);
+    }
+
+    /** {@inheritDoc} */
     @Override public Comparator<Object> groupComparator() {
         Comparator<?> res;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/io/OffheapRawMemory.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/io/OffheapRawMemory.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/io/OffheapRawMemory.java
new file mode 100644
index 0000000..564f92c
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/io/OffheapRawMemory.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.io;
+
+import org.apache.ignite.hadoop.io.RawMemory;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Offheap-based memory.
+ */
+public class OffheapRawMemory implements RawMemory {
+    /** Pointer. */
+    private long ptr;
+
+    /** Length. */
+    private int len;
+
+    /**
+     * Constructor.
+     *
+     * @param ptr Pointer.
+     * @param len Length.
+     */
+    public OffheapRawMemory(long ptr, int len) {
+        update(ptr, len);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte get(int idx) {
+        ensure(idx, 1);
+
+        return GridUnsafe.getByte(ptr + idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short getShort(int idx) {
+        ensure(idx, 2);
+
+        return GridUnsafe.getShort(ptr + idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public char getChar(int idx) {
+        ensure(idx, 2);
+
+        return GridUnsafe.getChar(ptr + idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getInt(int idx) {
+        ensure(idx, 4);
+
+        return GridUnsafe.getInt(ptr + idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getLong(int idx) {
+        ensure(idx, 8);
+
+        return GridUnsafe.getLong(ptr + idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public float getFloat(int idx) {
+        ensure(idx, 4);
+
+        return GridUnsafe.getFloat(ptr + idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public double getDouble(int idx) {
+        ensure(idx, 8);
+
+        return GridUnsafe.getDouble(ptr + idx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int length() {
+        return len;
+    }
+
+    /**
+     * @return Raw pointer.
+     */
+    public long pointer() {
+        return ptr;
+    }
+
+    /**
+     * Update pointer and length.
+     *
+     * @param ptr Pointer.
+     * @param len Length.
+     */
+    public void update(long ptr, int len) {
+        this.ptr = ptr;
+        this.len = len;
+    }
+
+    /**
+     * Ensure that the given number of bytes are available for read. Throw an exception otherwise.
+     *
+     * @param idx Index.
+     * @param cnt Count.
+     */
+    private void ensure(int idx, int cnt) {
+        if (idx < 0 || idx + cnt - 1 >= len)
+            throw new IndexOutOfBoundsException("Illegal index [len=" + len + ", idx=" +
idx + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(OffheapRawMemory.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
index 7db88bc..f300a18 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipList.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.GridRandom;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
@@ -280,6 +281,9 @@ public class HadoopSkipList extends HadoopMultimapBase {
         private final Comparator<Object> cmp;
 
         /** */
+        private final PartiallyOffheapRawComparatorEx<Object> partialRawCmp;
+
+        /** */
         private final Random rnd = new GridRandom();
 
         /** */
@@ -298,6 +302,7 @@ public class HadoopSkipList extends HadoopMultimapBase {
             keyReader = new Reader(keySer);
 
             cmp = ctx.sortComparator();
+            partialRawCmp = ctx.partialRawSortComparator();
         }
 
         /** {@inheritDoc} */
@@ -475,7 +480,14 @@ public class HadoopSkipList extends HadoopMultimapBase {
         private int cmp(Object key, long meta) {
             assert meta != 0;
 
-            return cmp.compare(key, keyReader.readKey(meta));
+            if (partialRawCmp != null) {
+                long keyPtr = key(meta);
+                int keySize = keySize(keyPtr);
+
+                return partialRawCmp.compare(key, keyPtr + 4, keySize);
+            }
+            else
+                return cmp.compare(key, keyReader.readKey(meta));
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
index 0cc9564..a016506 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTeraSortTest.java
@@ -41,8 +41,10 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.HadoopConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.hadoop.io.TextPartiallyRawComparator;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
 
 import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
 
@@ -161,6 +163,11 @@ public class HadoopTeraSortTest extends HadoopAbstractSelfTest {
         jobConf.set("mapred.min.split.size", String.valueOf(splitSize));
         jobConf.set("mapred.max.split.size", String.valueOf(splitSize));
 
+        jobConf.setBoolean(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(),
true);
+
+        jobConf.set(HadoopJobProperty.JOB_PARTIAL_RAW_COMPARATOR.propertyName(),
+            TextPartiallyRawComparator.class.getName());
+
         Job job = setupConfig(jobConf);
 
         HadoopJobId jobId = new HadoopJobId(UUID.randomUUID(), 1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/c1ddf21f/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
index 9d1fd4f..1f8978d 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/shuffle/collections/HadoopAbstractMapTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
 import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
 import org.apache.ignite.internal.processors.hadoop.impl.v2.HadoopWritableSerialization;
+import org.apache.ignite.internal.processors.hadoop.io.PartiallyOffheapRawComparatorEx;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 
@@ -84,6 +85,11 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest
{
         }
 
         /** {@inheritDoc} */
+        @Override public PartiallyOffheapRawComparatorEx<Object> partialRawSortComparator()
{
+            return null;
+        }
+
+        /** {@inheritDoc} */
         @SuppressWarnings("unchecked")
         @Override public Comparator<Object> groupComparator() {
             return ComparableComparator.getInstance();


Mime
View raw message