ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-4270: Hadoop: implemented striped mapper output. This closes #1334.
Date Fri, 09 Dec 2016 09:01:50 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 2f51b4ac0 -> 065ca4a75


IGNITE-4270: Hadoop: implemented striped mapper output. This closes #1334.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/065ca4a7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/065ca4a7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/065ca4a7

Branch: refs/heads/master
Commit: 065ca4a75a0765409a27d87c781efb215c0a6c48
Parents: 2f51b4a
Author: devozerov <vozerov@gridgain.com>
Authored: Fri Dec 9 12:01:40 2016 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Fri Dec 9 12:01:40 2016 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   6 +
 .../processors/hadoop/HadoopJobProperty.java    |   7 +
 .../hadoop/HadoopMapperAwareTaskOutput.java     |  32 ++
 .../processors/hadoop/HadoopTaskInfo.java       |  43 ++
 .../shuffle/HadoopDirectShuffleMessage.java     | 243 ++++++++++++
 .../processors/hadoop/HadoopMapperUtils.java    |  56 +++
 .../hadoop/impl/v2/HadoopV2Context.java         |  11 +
 .../hadoop/impl/v2/HadoopV2MapTask.java         |  10 +
 .../hadoop/jobtracker/HadoopJobTracker.java     |   4 +
 .../hadoop/shuffle/HadoopShuffle.java           |  23 +-
 .../hadoop/shuffle/HadoopShuffleJob.java        | 389 ++++++++++++++-----
 .../shuffle/HadoopShuffleRemoteState.java       |   5 +-
 .../shuffle/direct/HadoopDirectDataInput.java   | 166 ++++++++
 .../shuffle/direct/HadoopDirectDataOutput.java  | 221 +++++++++++
 .../direct/HadoopDirectDataOutputContext.java   | 100 +++++
 .../direct/HadoopDirectDataOutputState.java     |  54 +++
 .../child/HadoopChildProcessRunner.java         |   2 +-
 .../impl/HadoopMapReduceEmbeddedSelfTest.java   |  22 +-
 18 files changed, 1287 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 4ffb220..504e683 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -122,6 +122,7 @@ import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
 import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
 import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
 import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage;
@@ -170,6 +171,11 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -42:
+                msg = new HadoopDirectShuffleMessage();
+
+                break;
+
             case -41:
                 msg = new HadoopShuffleFinishResponse();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
index e713caa..1f0ef1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java
@@ -91,6 +91,13 @@ public enum HadoopJobProperty {
     SHUFFLE_MSG_SIZE("ignite.shuffle.message.size"),
 
     /**
+     * Whether to stripe mapper output for remote reducers.
+     * <p>
+     * Defaults to {@code false}.
+     */
+    SHUFFLE_MAPPER_STRIPED_OUTPUT("ignite.shuffle.mapper.striped.output"),
+
+    /**
      * Shuffle job throttle in milliseconds. When job is executed with separate shuffle thread, this parameter
      * controls sleep duration between iterations through intermediate reducer maps.
      * <p>

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperAwareTaskOutput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperAwareTaskOutput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperAwareTaskOutput.java
new file mode 100644
index 0000000..1d6637c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperAwareTaskOutput.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ * Special output type with callback invoked when mapper finished writing data.
+ */
+public interface HadoopMapperAwareTaskOutput extends HadoopTaskOutput {
+    /**
+     * Callback invoked when mapper finished writing data.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onMapperFinished() throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
index b76fb85..3509367 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskInfo.java
@@ -46,6 +46,12 @@ public class HadoopTaskInfo implements Externalizable {
     /** */
     private HadoopInputSplit inputSplit;
 
+    /** Whether mapper index is set. */
+    private boolean mapperIdxSet;
+
+    /** Current mapper index. */
+    private int mapperIdx;
+
     /**
      * For {@link Externalizable}.
      */
@@ -78,6 +84,13 @@ public class HadoopTaskInfo implements Externalizable {
         out.writeInt(taskNum);
         out.writeInt(attempt);
         out.writeObject(inputSplit);
+
+        if (mapperIdxSet) {
+            out.writeBoolean(true);
+            out.writeInt(mapperIdx);
+        }
+        else
+            out.writeBoolean(false);
     }
 
     /** {@inheritDoc} */
@@ -87,6 +100,13 @@ public class HadoopTaskInfo implements Externalizable {
         taskNum = in.readInt();
         attempt = in.readInt();
         inputSplit = (HadoopInputSplit)in.readObject();
+
+        if (in.readBoolean()) {
+            mapperIdxSet = true;
+            mapperIdx = in.readInt();
+        }
+        else
+            mapperIdxSet = false;
     }
 
     /**
@@ -118,6 +138,29 @@ public class HadoopTaskInfo implements Externalizable {
     }
 
     /**
+     * @param mapperIdx Current mapper index.
+     */
+    public void mapperIndex(int mapperIdx) {
+        this.mapperIdx = mapperIdx;
+
+        mapperIdxSet = true;
+    }
+
+    /**
+     * @return Current mapper index or {@code null}
+     */
+    public int mapperIndex() {
+        return mapperIdx;
+    }
+
+    /**
+     * @return {@code True} if mapped index is set.
+     */
+    public boolean hasMapperIndex() {
+        return mapperIdxSet;
+    }
+
+    /**
      * @return Input split.
      */
     @Nullable public HadoopInputSplit inputSplit() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
new file mode 100644
index 0000000..e81dc5f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopDirectShuffleMessage.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+
+/**
+ * Direct shuffle message.
+ */
+public class HadoopDirectShuffleMessage implements Message, HadoopMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    @GridToStringInclude
+    private HadoopJobId jobId;
+
+    /** */
+    @GridToStringInclude
+    private int reducer;
+
+    /** Count. */
+    private int cnt;
+
+    /** Buffer. */
+    private byte[] buf;
+
+    /** Buffer length (equal or less than buf.length). */
+    @GridDirectTransient
+    private transient int bufLen;
+
+    /**
+     * Default constructor.
+     */
+    public HadoopDirectShuffleMessage() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param jobId Job ID.
+     * @param reducer Reducer.
+     * @param cnt Count.
+     * @param buf Buffer.
+     * @param bufLen Buffer length.
+     */
+    public HadoopDirectShuffleMessage(HadoopJobId jobId, int reducer, int cnt, byte[] buf, int bufLen) {
+        assert jobId != null;
+
+        this.jobId = jobId;
+        this.reducer = reducer;
+        this.cnt = cnt;
+        this.buf = buf;
+        this.bufLen = bufLen;
+    }
+
+    /**
+     * @return Job ID.
+     */
+    public HadoopJobId jobId() {
+        return jobId;
+    }
+
+    /**
+     * @return Reducer.
+     */
+    public int reducer() {
+        return reducer;
+    }
+
+    /**
+     * @return Count.
+     */
+    public int count() {
+        return cnt;
+    }
+
+    /**
+     * @return Buffer.
+     */
+    public byte[] buffer() {
+        return buf;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMessage("jobId", jobId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeInt("reducer", reducer))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeInt("cnt", cnt))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeByteArray("buf", this.buf, 0, bufLen))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                jobId = reader.readMessage("jobId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                reducer = reader.readInt("reducer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                cnt = reader.readInt("cnt");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                this.buf = reader.readByteArray("buf");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                bufLen = this.buf != null ? this.buf.length : 0;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(HadoopDirectShuffleMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -42;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        jobId.writeExternal(out);
+
+        out.writeInt(reducer);
+        out.writeInt(cnt);
+
+        U.writeByteArray(out, buf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        jobId = new HadoopJobId();
+        jobId.readExternal(in);
+
+        reducer = in.readInt();
+        cnt = in.readInt();
+
+        buf = U.readByteArray(in);
+        bufLen = buf != null ? buf.length : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopDirectShuffleMessage.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperUtils.java
new file mode 100644
index 0000000..87adcb7
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopMapperUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop;
+
+/**
+ * Set of mapper utility methods.
+ */
+public class HadoopMapperUtils {
+    /** Thread-local mapper index. */
+    private static final ThreadLocal<Integer> MAP_IDX = new ThreadLocal<>();
+
+    /**
+     * @return Current mapper index.
+     */
+    public static int mapperIndex() {
+        Integer res = MAP_IDX.get();
+
+        return res != null ? res : -1;
+    }
+
+    /**
+     * @param idx Current mapper index.
+     */
+    public static void mapperIndex(Integer idx) {
+        MAP_IDX.set(idx);
+    }
+
+    /**
+     * Clear mapper index.
+     */
+    public static void clearMapperIndex() {
+        MAP_IDX.remove();
+    }
+
+    /**
+     * Constructor.
+     */
+    private HadoopMapperUtils() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
index 90a1bad..eec0636 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Context.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
 import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
@@ -153,6 +154,16 @@ public class HadoopV2Context extends JobContextImpl implements MapContext, Reduc
         }
     }
 
+    /**
+     * Callback invoked from mapper thread when map is finished.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void onMapperFinished() throws IgniteCheckedException {
+        if (output instanceof HadoopMapperAwareTaskOutput)
+            ((HadoopMapperAwareTaskOutput)output).onMapperFinished();
+    }
+
     /** {@inheritDoc} */
     @Override public OutputCommitter getOutputCommitter() {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
index 418df4e..eb3b935 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 
 /**
@@ -49,6 +50,11 @@ public class HadoopV2MapTask extends HadoopV2Task {
 
         JobContextImpl jobCtx = taskCtx.jobContext();
 
+        if (taskCtx.taskInfo().hasMapperIndex())
+            HadoopMapperUtils.mapperIndex(taskCtx.taskInfo().mapperIndex());
+        else
+            HadoopMapperUtils.clearMapperIndex();
+
         try {
             InputSplit nativeSplit = hadoopContext().getInputSplit();
 
@@ -72,6 +78,8 @@ public class HadoopV2MapTask extends HadoopV2Task {
 
             try {
                 mapper.run(new WrappedMapper().getMapContext(hadoopContext()));
+
+                hadoopContext().onMapperFinished();
             }
             finally {
                 closeWriter();
@@ -92,6 +100,8 @@ public class HadoopV2MapTask extends HadoopV2Task {
             throw new IgniteCheckedException(e);
         }
         finally {
+            HadoopMapperUtils.clearMapperIndex();
+
             if (err != null)
                 abort(outputFormat);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
index 36782bf..a725534 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java
@@ -1018,6 +1018,8 @@ public class HadoopJobTracker extends HadoopComponent {
             if (state == null)
                 state = initState(jobId);
 
+            int mapperIdx = 0;
+
             for (HadoopInputSplit split : mappers) {
                 if (state.addMapper(split)) {
                     if (log.isDebugEnabled())
@@ -1026,6 +1028,8 @@ public class HadoopJobTracker extends HadoopComponent {
 
                     HadoopTaskInfo taskInfo = new HadoopTaskInfo(MAP, jobId, meta.taskNumber(split), 0, split);
 
+                    taskInfo.mapperIndex(mapperIdx++);
+
                     if (tasks == null)
                         tasks = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 82bbd32..8ffea8c 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
 import org.apache.ignite.internal.processors.hadoop.HadoopContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 
+import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -102,8 +104,8 @@ public class HadoopShuffle extends HadoopComponent {
     private HadoopShuffleJob<UUID> newJob(HadoopJobId jobId) throws IgniteCheckedException {
         HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
 
-        HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log,
-            ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()), true);
+        HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(), log, ctx.jobTracker().job(jobId, null),
+            mem, plan.reducers(), plan.reducers(ctx.localNodeId()), localMappersCount(plan), true);
 
         UUID[] rdcAddrs = new UUID[plan.reducers()];
 
@@ -123,6 +125,18 @@ public class HadoopShuffle extends HadoopComponent {
     }
 
     /**
+     * Get number of local mappers.
+     *
+     * @param plan Plan.
+     * @return Number of local mappers.
+     */
+    private int localMappersCount(HadoopMapReducePlan plan) {
+        Collection<HadoopInputSplit> locMappers = plan.mappers(ctx.localNodeId());
+
+        return F.isEmpty(locMappers) ? 0 : locMappers.size();
+    }
+
+    /**
      * @param nodeId Node ID to send message to.
      * @param msg Message to send.
      * @throws IgniteCheckedException If send failed.
@@ -195,6 +209,11 @@ public class HadoopShuffle extends HadoopComponent {
 
                 job(m.jobId()).onShuffleMessage(src, m);
             }
+            else if (msg instanceof HadoopDirectShuffleMessage) {
+                HadoopDirectShuffleMessage m = (HadoopDirectShuffleMessage)msg;
+
+                job(m.jobId()).onDirectShuffleMessage(src, m);
+            }
             else if (msg instanceof HadoopShuffleAck) {
                 HadoopShuffleAck m = (HadoopShuffleAck)msg;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 0a3a0ae..214a335 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -17,20 +17,16 @@
 
 package org.apache.ignite.internal.processors.hadoop.shuffle;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReferenceArray;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperAwareTaskOutput;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapperUtils;
 import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
@@ -41,6 +37,9 @@ import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
 import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList;
+import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataInput;
+import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataOutputContext;
+import org.apache.ignite.internal.processors.hadoop.shuffle.direct.HadoopDirectDataOutputState;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -55,9 +54,19 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE;
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
 import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
@@ -121,6 +130,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     /** Message size. */
     private final int msgSize;
 
+    /** Whether to strip mappers for remote execution. */
+    private final boolean stripeMappers;
+
     /** Local shuffle states. */
     private volatile HashMap<T, HadoopShuffleLocalState> locShuffleStates = new HashMap<>();
 
@@ -143,11 +155,12 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
      * @param mem Memory.
      * @param totalReducerCnt Amount of reducers in the Job.
      * @param locReducers Reducers will work on current node.
+     * @param locMappersCnt Number of mappers running on the given node.
      * @param embedded Whether shuffle is running in embedded mode.
      * @throws IgniteCheckedException If error.
      */
     public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem,
-        int totalReducerCnt, int[] locReducers, boolean embedded) throws IgniteCheckedException {
+        int totalReducerCnt, int[] locReducers, int locMappersCnt, boolean embedded) throws IgniteCheckedException {
         this.locReduceAddr = locReduceAddr;
         this.totalReducerCnt = totalReducerCnt;
         this.job = job;
@@ -155,6 +168,27 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
         this.log = log.getLogger(HadoopShuffleJob.class);
         this.embedded = embedded;
 
+        // No stripes for combiner.
+        boolean stripeMappers0 = get(job.info(), SHUFFLE_MAPPER_STRIPED_OUTPUT, false);
+
+        if (stripeMappers0) {
+            if (job.info().hasCombiner()) {
+                log.info("Striped mapper output is disabled because it cannot be used together with combiner [jobId=" +
+                    job.id() + ']');
+
+                stripeMappers0 = false;
+            }
+
+            if (!embedded) {
+                log.info("Striped mapper output is disabled becuase it cannot be used in external mode [jobId=" +
+                    job.id() + ']');
+
+                stripeMappers0 = false;
+            }
+        }
+
+        stripeMappers = stripeMappers0;
+
         msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE);
 
         locReducersCtx = new AtomicReferenceArray<>(totalReducerCnt);
@@ -169,9 +203,20 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
 
         needPartitioner = totalReducerCnt > 1;
 
+        // Size of local map is always equal to total reducer number to allow index-based lookup.
         locMaps = new AtomicReferenceArray<>(totalReducerCnt);
-        rmtMaps = new AtomicReferenceArray<>(totalReducerCnt);
-        msgs = new HadoopShuffleMessage[totalReducerCnt];
+
+        // Size of remote map:
+        // - If there are no local mappers, then we will not send anything, so set to 0;
+        // - If output is not striped, then match it to total reducer count, the same way as for local maps.
+        // - If output is striped, then multiply previous value by number of local mappers.
+        int rmtMapsSize = locMappersCnt == 0 ? 0 : totalReducerCnt;
+
+        if (stripeMappers)
+            rmtMapsSize *= locMappersCnt;
+
+        rmtMaps = new AtomicReferenceArray<>(rmtMapsSize);
+        msgs = new HadoopShuffleMessage[rmtMapsSize];
 
         throttle = get(job.info(), SHUFFLE_JOB_THROTTLE, 0);
     }
@@ -208,24 +253,26 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
 
         this.io = io;
 
-        if (!flushed) {
-            snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) {
-                @Override protected void body() throws InterruptedException {
-                    try {
-                        while (!isCancelled()) {
-                            if (throttle > 0)
-                                Thread.sleep(throttle);
-
-                            collectUpdatesAndSend(false);
+        if (!stripeMappers) {
+            if (!flushed) {
+                snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) {
+                    @Override protected void body() throws InterruptedException {
+                        try {
+                            while (!isCancelled()) {
+                                if (throttle > 0)
+                                    Thread.sleep(throttle);
+
+                                collectUpdatesAndSend(false);
+                            }
+                        }
+                        catch (IgniteCheckedException e) {
+                            throw new IllegalStateException(e);
                         }
                     }
-                    catch (IgniteCheckedException e) {
-                        throw new IllegalStateException(e);
-                    }
-                }
-            };
+                };
 
-            new IgniteThread(snd).start();
+                new IgniteThread(snd).start();
+            }
         }
 
         ioInitLatch.countDown();
@@ -306,6 +353,46 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     }
 
     /**
+     * Process shuffle message.
+     *
+     * @param src Source.
+     * @param msg Message.
+     * @throws IgniteCheckedException Exception.
+     */
+    public void onDirectShuffleMessage(T src, HadoopDirectShuffleMessage msg) throws IgniteCheckedException {
+        assert msg.buffer() != null;
+
+        HadoopTaskContext taskCtx = locReducersCtx.get(msg.reducer()).get();
+
+        HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null);
+
+        perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis());
+
+        HadoopMultimap map = getOrCreateMap(locMaps, msg.reducer());
+
+        HadoopSerialization keySer = taskCtx.keySerialization();
+        HadoopSerialization valSer = taskCtx.valueSerialization();
+
+        // Add data from message to the map.
+        try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) {
+            HadoopDirectDataInput in = new HadoopDirectDataInput(msg.buffer());
+
+            Object key = null;
+            Object val = null;
+
+            for (int i = 0; i < msg.count(); i++) {
+                key = keySer.read(in, key);
+                val = valSer.read(in, val);
+
+                adder.write(key, val);
+            }
+        }
+
+        if (localShuffleState(src).onShuffleMessage())
+            sendFinishResponse(src, msg.jobId());
+    }
+
+    /**
      * @param ack Shuffle ack.
      */
     @SuppressWarnings("ConstantConditions")
@@ -467,88 +554,149 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     }
 
     /**
-     * Sends map updates to remote reducers.
+     * Send updates to remote reducers.
+     *
+     * @param flush Flush flag.
+     * @throws IgniteCheckedException If failed.
      */
     private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException {
-        for (int i = 0; i < rmtMaps.length(); i++) {
-            HadoopMultimap map = rmtMaps.get(i);
+        for (int i = 0; i < rmtMaps.length(); i++)
+            collectUpdatesAndSend(i, flush);
+    }
+
+    /**
+     * Send updates to concrete remote reducer.
+     *
+     * @param rmtMapIdx Remote map index.
+     * @param flush Flush flag.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void collectUpdatesAndSend(int rmtMapIdx, boolean flush) throws IgniteCheckedException {
+        final int rmtRdcIdx = stripeMappers ? rmtMapIdx % totalReducerCnt : rmtMapIdx;
 
-            if (map == null)
-                continue; // Skip empty map and local node.
+        HadoopMultimap map = rmtMaps.get(rmtMapIdx);
 
-            if (msgs[i] == null)
-                msgs[i] = new HadoopShuffleMessage(job.id(), i, msgSize);
+        if (map == null)
+            return;
 
-            final int idx = i;
+        if (msgs[rmtMapIdx] == null)
+            msgs[rmtMapIdx] = new HadoopShuffleMessage(job.id(), rmtRdcIdx, msgSize);
 
-            map.visit(false, new HadoopMultimap.Visitor() {
-                /** */
-                private long keyPtr;
+        visit(map, rmtMapIdx, rmtRdcIdx);
 
-                /** */
-                private int keySize;
+        if (flush && msgs[rmtMapIdx].offset() != 0)
+            send(rmtMapIdx, rmtRdcIdx, 0);
+    }
 
-                /** */
-                private boolean keyAdded;
+    /**
+     * Flush remote direct context.
+     *
+     * @param rmtMapIdx Remote map index.
+     * @param rmtDirectCtx Remote direct context.
+     * @param reset Whether to perform reset.
+     */
+    private void sendShuffleMessage(int rmtMapIdx, @Nullable HadoopDirectDataOutputContext rmtDirectCtx, boolean reset) {
+        if (rmtDirectCtx == null)
+            return;
 
-                /** {@inheritDoc} */
-                @Override public void onKey(long keyPtr, int keySize) {
-                    this.keyPtr = keyPtr;
-                    this.keySize = keySize;
+        int cnt = rmtDirectCtx.count();
 
-                    keyAdded = false;
-                }
+        if (cnt == 0)
+            return;
 
-                private boolean tryAdd(long valPtr, int valSize) {
-                    HadoopShuffleMessage msg = msgs[idx];
+        int rmtRdcIdx = stripeMappers ? rmtMapIdx % totalReducerCnt : rmtMapIdx;
 
-                    if (!keyAdded) { // Add key and value.
-                        int size = keySize + valSize;
+        HadoopDirectDataOutputState state = rmtDirectCtx.state();
 
-                        if (!msg.available(size, false))
-                            return false;
+        if (reset)
+            rmtDirectCtx.reset();
 
-                        msg.addKey(keyPtr, keySize);
-                        msg.addValue(valPtr, valSize);
+        HadoopDirectShuffleMessage msg = new HadoopDirectShuffleMessage(job.id(), rmtRdcIdx, cnt,
+            state.buffer(), state.bufferLength());
 
-                        keyAdded = true;
+        T nodeId = reduceAddrs[rmtRdcIdx];
 
-                        return true;
-                    }
+        io.apply(nodeId, msg);
+
+        remoteShuffleState(nodeId).onShuffleMessage();
+    }
+
+    /**
+     * Visit output map.
+     *
+     * @param map Map.
+     * @param rmtMapIdx Remote map index.
+     * @param rmtRdcIdx Remote reducer index.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void visit(HadoopMultimap map, final int rmtMapIdx, final int rmtRdcIdx) throws IgniteCheckedException {
+        map.visit(false, new HadoopMultimap.Visitor() {
+            /** */
+            private long keyPtr;
 
-                    if (!msg.available(valSize, true))
+            /** */
+            private int keySize;
+
+            /** */
+            private boolean keyAdded;
+
+            /** {@inheritDoc} */
+            @Override public void onKey(long keyPtr, int keySize) {
+                this.keyPtr = keyPtr;
+                this.keySize = keySize;
+
+                keyAdded = false;
+            }
+
+            private boolean tryAdd(long valPtr, int valSize) {
+                HadoopShuffleMessage msg = msgs[rmtMapIdx];
+
+                if (!keyAdded) { // Add key and value.
+                    int size = keySize + valSize;
+
+                    if (!msg.available(size, false))
                         return false;
 
+                    msg.addKey(keyPtr, keySize);
                     msg.addValue(valPtr, valSize);
 
+                    keyAdded = true;
+
                     return true;
                 }
 
-                /** {@inheritDoc} */
-                @Override public void onValue(long valPtr, int valSize) {
-                    if (tryAdd(valPtr, valSize))
-                        return;
+                if (!msg.available(valSize, true))
+                    return false;
 
-                    send(idx, keySize + valSize);
+                msg.addValue(valPtr, valSize);
 
-                    keyAdded = false;
+                return true;
+            }
 
-                    if (!tryAdd(valPtr, valSize))
-                        throw new IllegalStateException();
-                }
-            });
+            /** {@inheritDoc} */
+            @Override public void onValue(long valPtr, int valSize) {
+                if (tryAdd(valPtr, valSize))
+                    return;
 
-            if (flush && msgs[i].offset() != 0)
-                send(i, 0);
-        }
+                send(rmtMapIdx, rmtRdcIdx, keySize + valSize);
+
+                keyAdded = false;
+
+                if (!tryAdd(valPtr, valSize))
+                    throw new IllegalStateException();
+            }
+        });
     }
 
     /**
-     * @param idx Index of message.
+     * Send message.
+     *
+     * @param rmtMapIdx Remote map index.
+     * @param rmtRdcIdx Remote reducer index.
      * @param newBufMinSize Min new buffer size.
      */
-    private void send(final int idx, int newBufMinSize) {
-        HadoopShuffleMessage msg = msgs[idx];
+    private void send(int rmtMapIdx, int rmtRdcIdx, int newBufMinSize) {
+        HadoopShuffleMessage msg = msgs[rmtMapIdx];
 
         final long msgId = msg.id();
 
@@ -566,10 +714,10 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
         }
 
         try {
-            io.apply(reduceAddrs[idx], msg);
+            io.apply(reduceAddrs[rmtRdcIdx], msg);
 
             if (embedded)
-                remoteShuffleState(reduceAddrs[idx]).onShuffleMessage();
+                remoteShuffleState(reduceAddrs[rmtRdcIdx]).onShuffleMessage();
         }
         catch (GridClosureException e) {
             if (fut != null)
@@ -593,7 +741,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
             });
         }
 
-        msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx,
+        msgs[rmtMapIdx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), rmtRdcIdx,
             Math.max(msgSize, newBufMinSize));
     }
 
@@ -639,31 +787,33 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
         if (totalReducerCnt == 0)
             return new GridFinishedFuture<>();
 
-        U.await(ioInitLatch);
+        if (!stripeMappers) {
+            U.await(ioInitLatch);
 
-        GridWorker snd0 = snd;
+            GridWorker snd0 = snd;
 
-        if (snd0 != null) {
-            if (log.isDebugEnabled())
-                log.debug("Cancelling sender thread.");
+            if (snd0 != null) {
+                if (log.isDebugEnabled())
+                    log.debug("Cancelling sender thread.");
 
-            snd0.cancel();
+                snd0.cancel();
 
-            try {
-                snd0.join();
+                try {
+                    snd0.join();
 
-                if (log.isDebugEnabled())
-                    log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id());
-            }
-            catch (InterruptedException e) {
-                throw new IgniteInterruptedCheckedException(e);
+                    if (log.isDebugEnabled())
+                        log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id());
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteInterruptedCheckedException(e);
+                }
             }
-        }
 
-        collectUpdatesAndSend(true); // With flush.
+            collectUpdatesAndSend(true); // With flush.
 
-        if (log.isDebugEnabled())
-            log.debug("Finished sending collected updates to remote reducers: " + job.id());
+            if (log.isDebugEnabled())
+                log.debug("Finished sending collected updates to remote reducers: " + job.id());
+        }
 
         GridCompoundFuture fut = new GridCompoundFuture<>();
 
@@ -700,8 +850,8 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
 
             if (log.isDebugEnabled())
                 log.debug("Collected futures to compound futures for flush: " + sentMsgs.size());
-
         }
+
         return fut;
     }
 
@@ -775,13 +925,17 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     /**
      * Partitioned output.
      */
-    private class PartitionedOutput implements HadoopTaskOutput {
+    public class PartitionedOutput implements HadoopMapperAwareTaskOutput {
         /** */
         private final HadoopTaskOutput[] locAdders = new HadoopTaskOutput[locMaps.length()];
 
         /** */
         private final HadoopTaskOutput[] rmtAdders = new HadoopTaskOutput[rmtMaps.length()];
 
+        /** Remote direct contexts. */
+        private final HadoopDirectDataOutputContext[] rmtDirectCtxs =
+            new HadoopDirectDataOutputContext[rmtMaps.length()];
+
         /** */
         private HadoopPartitioner partitioner;
 
@@ -819,16 +973,53 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
                     locAdders[part] = out = getOrCreateMap(locMaps, part).startAdding(taskCtx);
             }
             else {
-                out = rmtAdders[part];
+                if (stripeMappers) {
+                    int mapperIdx = HadoopMapperUtils.mapperIndex();
 
-                if (out == null)
-                    rmtAdders[part] = out = getOrCreateMap(rmtMaps, part).startAdding(taskCtx);
+                    assert mapperIdx >= 0;
+
+                    int idx = totalReducerCnt * mapperIdx + part;
+
+                    HadoopDirectDataOutputContext rmtDirectCtx = rmtDirectCtxs[idx];
+
+                    if (rmtDirectCtx == null) {
+                        rmtDirectCtx = new HadoopDirectDataOutputContext(msgSize, taskCtx);
+
+                        rmtDirectCtxs[idx] = rmtDirectCtx;
+                    }
+
+                    if (rmtDirectCtx.write(key, val))
+                        sendShuffleMessage(idx, rmtDirectCtx, true);
+
+                    return;
+                }
+                else {
+                    out = rmtAdders[part];
+
+                    if (out == null)
+                        rmtAdders[part] = out = getOrCreateMap(rmtMaps, part).startAdding(taskCtx);
+                }
             }
 
             out.write(key, val);
         }
 
         /** {@inheritDoc} */
+        @Override public void onMapperFinished() throws IgniteCheckedException {
+            if (stripeMappers) {
+                int mapperIdx = HadoopMapperUtils.mapperIndex();
+
+                assert mapperIdx >= 0;
+
+                for (int i = 0; i < totalReducerCnt; i++) {
+                    int idx = totalReducerCnt * mapperIdx + i;
+
+                    sendShuffleMessage(idx, rmtDirectCtxs[idx], false);
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
         @Override public void close() throws IgniteCheckedException {
             for (HadoopTaskOutput adder : locAdders) {
                 if (adder != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
index 5ffaa55..4331124 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
@@ -17,17 +17,14 @@
 
 package org.apache.ignite.internal.processors.hadoop.shuffle;
 
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.lang.IgniteInClosure;
 
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Remote shuffle state.
  */
-class HadoopShuffleRemoteState<T> {
+class HadoopShuffleRemoteState {
     /** Message count. */
     private final AtomicLong msgCnt = new AtomicLong();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
new file mode 100644
index 0000000..e3a713a
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataInput.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.direct;
+
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+
+/**
+ * Hadoop data input used for direct communication.
+ */
+public class HadoopDirectDataInput extends InputStream implements DataInput {
+    /** Data buffer. */
+    private final byte[] buf;
+
+    /** Position. */
+    private int pos;
+
+    /**
+     * Constructor.
+     *
+     * @param buf Buffer.
+     */
+    public HadoopDirectDataInput(byte[] buf) {
+        this.buf = buf;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read() throws IOException {
+        return readByte();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFully(@NotNull byte[] b) throws IOException {
+        readFully(b, 0, b.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFully(@NotNull byte[] b, int off, int len) throws IOException {
+        System.arraycopy(buf, pos, b, off, len);
+
+        pos += len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int skipBytes(int n) throws IOException {
+        pos += n;
+
+        return n;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readBoolean() throws IOException {
+        return readByte() == 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte readByte() throws IOException {
+        byte res = GridUnsafe.getByte(buf, BYTE_ARR_OFF + pos);
+
+        pos += 1;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readUnsignedByte() throws IOException {
+        return readByte() & 0xff;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short readShort() throws IOException {
+        short res = GridUnsafe.getShort(buf, BYTE_ARR_OFF + pos);
+
+        pos += 2;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readUnsignedShort() throws IOException {
+        return readShort() & 0xffff;
+    }
+
+    /** {@inheritDoc} */
+    @Override public char readChar() throws IOException {
+        char res = GridUnsafe.getChar(buf, BYTE_ARR_OFF + pos);
+
+        pos += 2;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readInt() throws IOException {
+        int res = GridUnsafe.getInt(buf, BYTE_ARR_OFF + pos);
+
+        pos += 4;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long readLong() throws IOException {
+        long res = GridUnsafe.getLong(buf, BYTE_ARR_OFF + pos);
+
+        pos += 8;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public float readFloat() throws IOException {
+        float res = GridUnsafe.getFloat(buf, BYTE_ARR_OFF + pos);
+
+        pos += 4;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public double readDouble() throws IOException {
+        double res = GridUnsafe.getDouble(buf, BYTE_ARR_OFF + pos);
+
+        pos += 8;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String readLine() throws IOException {
+        // TODO: Create ticket!
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @NotNull @Override public String readUTF() throws IOException {
+        byte[] bytes = new byte[readShort()];
+
+        if (bytes.length != 0)
+            readFully(bytes);
+
+        return new String(bytes, StandardCharsets.UTF_8);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
new file mode 100644
index 0000000..151e552
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutput.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.direct;
+
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
+
+/**
+ * Hadoop data output for direct communication.
+ */
+public class HadoopDirectDataOutput extends OutputStream implements DataOutput {
+    /** Flush size. */
+    private final int flushSize;
+
+    /** Data buffer. */
+    private byte[] buf;
+
+    /** Buffer size. */
+    private int bufSize;
+
+    /** Position. */
+    private int pos;
+
+    /**
+     * Constructor.
+     *
+     * @param flushSize Flush size.
+     */
+    public HadoopDirectDataOutput(int flushSize) {
+        this(flushSize, flushSize);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param flushSize Flush size.
+     * @param allocSize Allocation size.
+     */
+    public HadoopDirectDataOutput(int flushSize, int allocSize) {
+        this.flushSize = flushSize;
+
+        buf = new byte[allocSize];
+        bufSize = allocSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(@NotNull byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(@NotNull byte[] b, int off, int len) throws IOException {
+        int writePos = ensure(len);
+
+        System.arraycopy(b, off, buf, writePos, len);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(int val) throws IOException {
+        writeByte(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBoolean(boolean val) throws IOException {
+        writeByte(val ? (byte)1 : (byte)0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByte(int val) throws IOException {
+        int writePos = ensure(1);
+
+        buf[writePos] = (byte)val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShort(int val) throws IOException {
+        int writePos = ensure(2);
+
+        GridUnsafe.putShort(buf, BYTE_ARR_OFF + writePos, (short)val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeChar(int val) throws IOException {
+        int writePos = ensure(2);
+
+        GridUnsafe.putChar(buf, BYTE_ARR_OFF + writePos, (char)val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeInt(int val) throws IOException {
+        int writePos = ensure(4);
+
+        GridUnsafe.putInt(buf, BYTE_ARR_OFF + writePos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLong(long val) throws IOException {
+        int writePos = ensure(8);
+
+        GridUnsafe.putLong(buf, BYTE_ARR_OFF + writePos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeFloat(float val) throws IOException {
+        int writePos = ensure(4);
+
+        GridUnsafe.putFloat(buf, BYTE_ARR_OFF + writePos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeDouble(double val) throws IOException {
+        int writePos = ensure(8);
+
+        GridUnsafe.putDouble(buf, BYTE_ARR_OFF + writePos, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBytes(@NotNull String str) throws IOException {
+        for(int i = 0; i < str.length(); ++i)
+            write((byte)str.charAt(i));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeChars(@NotNull String str) throws IOException {
+        for (int i = 0; i < str.length(); ++i)
+            writeChar(str.charAt(i));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeUTF(@NotNull String str) throws IOException {
+        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
+
+        int len = bytes.length;
+
+        if (len > 65535)
+            throw new UTFDataFormatException("UTF8 form of string is longer than 65535 bytes: " + str);
+
+        writeShort((short)len);
+        write(bytes);
+    }
+
+    /**
+     * @return Buffer.
+     */
+    public byte[] buffer() {
+        return buf;
+    }
+
+    /**
+     * @return Position.
+     */
+    public int position() {
+        return pos;
+    }
+
+    /**
+     * @return Whether buffer is ready for flush.
+     */
+    public boolean readyForFlush() {
+        return pos >= flushSize;
+    }
+
+    /**
+     * Ensure that the given amount of bytes is available within the stream, then shift the position.
+     *
+     * @param cnt Count.
+     * @return Position
+     */
+    private int ensure(int cnt) {
+        int pos0 = pos;
+
+        if (pos0 + cnt > bufSize)
+            grow(pos0 + cnt);
+
+        pos += cnt;
+
+        return pos0;
+    }
+
+    /**
+     * Grow array up to the given count.
+     *
+     * @param cnt Count.
+     */
+    private void grow(int cnt) {
+        int bufSize0 = (int)(bufSize * 1.1);
+
+        if (bufSize0 < cnt)
+            bufSize0 = cnt;
+
+        byte[] buf0 = new byte[bufSize0];
+
+        System.arraycopy(buf, 0, buf0, 0, pos);
+
+        buf = buf0;
+        bufSize = bufSize0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
new file mode 100644
index 0000000..bc70ef3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputContext.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.direct;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+
+/**
+ * Hadoop data output context for direct communication.
+ */
+public class HadoopDirectDataOutputContext {
+    /** Flush size. */
+    private final int flushSize;
+
+    /** Key serialization. */
+    private final HadoopSerialization keySer;
+
+    /** Value serialization. */
+    private final HadoopSerialization valSer;
+
+    /** Data output. */
+    private HadoopDirectDataOutput out;
+
+    /** Number of keys written. */
+    private int cnt;
+
+    /**
+     * Constructor.
+     *
+     * @param flushSize Flush size.
+     * @param taskCtx Task context.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopDirectDataOutputContext(int flushSize, HadoopTaskContext taskCtx)
+        throws IgniteCheckedException {
+        this.flushSize = flushSize;
+
+        keySer = taskCtx.keySerialization();
+        valSer = taskCtx.valueSerialization();
+
+        out = new HadoopDirectDataOutput(flushSize);
+    }
+
+    /**
+     * Write key-value pair.
+     *
+     * @param key Key.
+     * @param val Value.
+     * @return Whether flush is needed.
+     * @throws IgniteCheckedException If failed.
+     */
+    public boolean write(Object key, Object val) throws IgniteCheckedException {
+        keySer.write(out, key);
+        valSer.write(out, val);
+
+        cnt++;
+
+        return out.readyForFlush();
+    }
+
+    /**
+     * @return Key-value pairs count.
+     */
+    public int count() {
+        return cnt;
+    }
+
+    /**
+     * @return State.
+     */
+    public HadoopDirectDataOutputState state() {
+        return new HadoopDirectDataOutputState(out.buffer(), out.position());
+    }
+
+    /**
+     * Reset buffer.
+     */
+    public void reset() {
+        int allocSize = Math.max(flushSize, out.position());
+
+        out = new HadoopDirectDataOutput(flushSize, allocSize);
+        cnt = 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
new file mode 100644
index 0000000..a9c12e3
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/direct/HadoopDirectDataOutputState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.direct;
+
+/**
+ * Hadoop data output state for direct communication.
+ */
+public class HadoopDirectDataOutputState {
+    /** Buffer. */
+    private final byte[] buf;
+
+    /** Buffer length. */
+    private final int bufLen;
+
+    /**
+     * Constructor.
+     *
+     * @param buf Buffer.
+     * @param bufLen Buffer length.
+     */
+    public HadoopDirectDataOutputState(byte[] buf, int bufLen) {
+        this.buf = buf;
+        this.bufLen = bufLen;
+    }
+
+    /**
+     * @return Buffer.
+     */
+    public byte[] buffer() {
+        return buf;
+    }
+
+    /**
+     * @return Length.
+     */
+    public int bufferLength() {
+        return bufLen;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
index cb08c00..3336120 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@ -151,7 +151,7 @@ public class HadoopChildProcessRunner {
                 job.initialize(true, nodeDesc.processId());
 
                 shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem,
-                    req.totalReducerCount(), req.localReducers(), false);
+                    req.totalReducerCount(), req.localReducers(), 0, false);
 
                 initializeExecutors(req);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/065ca4a7/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
index b04deeb..8897a38 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopMapReduceEmbeddedSelfTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.configuration.HadoopConfiguration;
 import org.apache.ignite.igfs.IgfsPath;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
 import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount1;
 import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2;
 
@@ -54,12 +55,28 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
         return cfg;
     }
 
+    /*
+     * @throws Exception If fails.
+     */
+    public void testMultiReducerWholeMapReduceExecution() throws Exception {
+        checkMultiReducerWholeMapReduceExecution(false);
+    }
+
+    /*
+     * @throws Exception If fails.
+     */
+    public void testMultiReducerWholeMapReduceExecutionStriped() throws Exception {
+        checkMultiReducerWholeMapReduceExecution(true);
+    }
+
     /**
      * Tests whole job execution with all phases in old and new versions of API with definition of custom
      * Serialization, Partitioner and IO formats.
+     *
+     * @param striped Whether output should be striped or not.
      * @throws Exception If fails.
      */
-    public void testMultiReducerWholeMapReduceExecution() throws Exception {
+    public void checkMultiReducerWholeMapReduceExecution(boolean striped) throws Exception {
         IgfsPath inDir = new IgfsPath(PATH_INPUT);
 
         igfs.mkdirs(inDir);
@@ -81,6 +98,9 @@ public class HadoopMapReduceEmbeddedSelfTest extends HadoopMapReduceTest {
 
             JobConf jobConf = new JobConf();
 
+            if (striped)
+                jobConf.set(HadoopJobProperty.SHUFFLE_MAPPER_STRIPED_OUTPUT.propertyName(), "true");
+
             jobConf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, CustomSerialization.class.getName());
 
             //To split into about 6-7 items for v2


Mime
View raw message