ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [23/50] [abbrv] ignite git commit: IGNITE-4301: Hadoop: Optimized shuffle so that only one ack is needed when running in embedded mode. This closes #1319.
Date Thu, 22 Dec 2016 15:16:03 GMT
IGNITE-4301: Hadoop: Optimized shuffle so that only one ack is needed when running in embedded
mode. This closes #1319.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58c33805
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58c33805
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58c33805

Branch: refs/heads/master
Commit: 58c338051aed89a787c2a575856e949cc44e8b8c
Parents: e857f29
Author: devozerov <vozerov@gridgain.com>
Authored: Tue Dec 6 13:23:05 2016 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Thu Dec 15 13:46:14 2016 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |  12 +
 .../shuffle/HadoopShuffleFinishRequest.java     | 172 ++++++++++++++
 .../shuffle/HadoopShuffleFinishResponse.java    | 142 ++++++++++++
 .../hadoop/shuffle/HadoopShuffle.java           |  45 ++--
 .../hadoop/shuffle/HadoopShuffleJob.java        | 231 +++++++++++++++++--
 .../hadoop/shuffle/HadoopShuffleLocalState.java |  67 ++++++
 .../shuffle/HadoopShuffleRemoteState.java       |  64 +++++
 .../child/HadoopChildProcessRunner.java         |   6 +-
 8 files changed, 686 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 86742e8..4ffb220 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -119,6 +119,8 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
 import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse;
 import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
 import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
 import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
@@ -168,6 +170,16 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -41:
+                msg = new HadoopShuffleFinishResponse();
+
+                break;
+
+            case -40:
+                msg = new HadoopShuffleFinishRequest();
+
+                break;
+
             case -39:
                 msg = new HadoopJobId();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishRequest.java
new file mode 100644
index 0000000..f568c2e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishRequest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+
+/**
+ * Shuffle finish request.
+ */
+public class HadoopShuffleFinishRequest implements Message, HadoopMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Job ID. */
+    @GridToStringInclude
+    private HadoopJobId jobId;
+
+    /** Total message count. */
+    private long msgCnt;
+
+    /**
+     * Default constructor.
+     */
+    public HadoopShuffleFinishRequest() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param jobId Job.
+     * @param msgCnt Message count.
+     */
+    public HadoopShuffleFinishRequest(HadoopJobId jobId, long msgCnt) {
+        this.jobId = jobId;
+        this.msgCnt = msgCnt;
+    }
+
+    /**
+     * @return Job ID.
+     */
+    public HadoopJobId jobId() {
+        return jobId;
+    }
+
+    /**
+     * @return Message count.
+     */
+    public long messageCount() {
+        return msgCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMessage("jobId", jobId))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("msgCnt", msgCnt))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                jobId = reader.readMessage("jobId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                msgCnt = reader.readLong("msgCnt");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(HadoopShuffleFinishRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -40;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        jobId.writeExternal(out);
+
+        out.writeLong(msgCnt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+        jobId = new HadoopJobId();
+
+        jobId.readExternal(in);
+
+        msgCnt = in.readLong();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopShuffleFinishRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishResponse.java
new file mode 100644
index 0000000..4b7c93b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleFinishResponse.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+
+/**
+ * Shuffle finish response.
+ */
+public class HadoopShuffleFinishResponse implements Message, HadoopMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Job ID. */
+    @GridToStringInclude
+    private HadoopJobId jobId;
+
+    /**
+     * Default constructor.
+     */
+    public HadoopShuffleFinishResponse() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param jobId Job.
+     */
+    public HadoopShuffleFinishResponse(HadoopJobId jobId) {
+        this.jobId = jobId;
+    }
+
+    /**
+     * @return Job ID.
+     */
+    public HadoopJobId jobId() {
+        return jobId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeMessage("jobId", jobId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                jobId = reader.readMessage("jobId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(HadoopShuffleFinishResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -41;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        jobId.writeExternal(out);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+        jobId = new HadoopJobId();
+
+        jobId.readExternal(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopShuffleFinishResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index 4450bf2..82bbd32 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@ -103,7 +103,7 @@ public class HadoopShuffle extends HadoopComponent {
         HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId);
 
         HadoopShuffleJob<UUID> job = new HadoopShuffleJob<>(ctx.localNodeId(),
log,
-            ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()));
+            ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId()),
true);
 
         UUID[] rdcAddrs = new UUID[plan.reducers()];
 
@@ -189,37 +189,34 @@ public class HadoopShuffle extends HadoopComponent {
      * @return {@code True}.
      */
     public boolean onMessageReceived(UUID src, HadoopMessage msg) {
-        if (msg instanceof HadoopShuffleMessage) {
-            HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
+        try {
+            if (msg instanceof HadoopShuffleMessage) {
+                HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
 
-            try {
-                job(m.jobId()).onShuffleMessage(m);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Message handling failed.", e);
+                job(m.jobId()).onShuffleMessage(src, m);
             }
+            else if (msg instanceof HadoopShuffleAck) {
+                HadoopShuffleAck m = (HadoopShuffleAck)msg;
 
-            try {
-                // Reply with ack.
-                send0(src, new HadoopShuffleAck(m.id(), m.jobId()));
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to reply back to shuffle message sender [snd=" + src
+ ", msg=" + msg + ']', e);
+                job(m.jobId()).onShuffleAck(m);
             }
-        }
-        else if (msg instanceof HadoopShuffleAck) {
-            HadoopShuffleAck m = (HadoopShuffleAck)msg;
+            else if (msg instanceof HadoopShuffleFinishRequest) {
+                HadoopShuffleFinishRequest m = (HadoopShuffleFinishRequest)msg;
 
-            try {
-                job(m.jobId()).onShuffleAck(m);
+                job(m.jobId()).onShuffleFinishRequest(src, m);
             }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Message handling failed.", e);
+            else if (msg instanceof HadoopShuffleFinishResponse) {
+                HadoopShuffleFinishResponse m = (HadoopShuffleFinishResponse)msg;
+
+                job(m.jobId()).onShuffleFinishResponse(src);
             }
+            else
+                throw new IllegalStateException("Unknown message type received to Hadoop
shuffle [src=" + src +
+                    ", msg=" + msg + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Message handling failed.", e);
         }
-        else
-            throw new IllegalStateException("Unknown message type received to Hadoop shuffle
[src=" + src +
-                ", msg=" + msg + ']');
 
         return true;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index 3afb55a..0a3a0ae 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.processors.hadoop.shuffle;
 
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -27,6 +29,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
 import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
@@ -118,9 +121,21 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     /** Message size. */
     private final int msgSize;
 
+    /** Local shuffle states. */
+    private volatile HashMap<T, HadoopShuffleLocalState> locShuffleStates = new HashMap<>();
+
+    /** Remote shuffle states. */
+    private volatile HashMap<T, HadoopShuffleRemoteState> rmtShuffleStates = new HashMap<>();
+
+    /** Mutex for internal synchronization. */
+    private final Object mux = new Object();
+
     /** */
     private final long throttle;
 
+    /** Embedded mode flag. */
+    private final boolean embedded;
+
     /**
      * @param locReduceAddr Local reducer address.
      * @param log Logger.
@@ -128,15 +143,17 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
      * @param mem Memory.
      * @param totalReducerCnt Amount of reducers in the Job.
      * @param locReducers Reducers will work on current node.
+     * @param embedded Whether shuffle is running in embedded mode.
      * @throws IgniteCheckedException If error.
      */
     public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory
mem,
-        int totalReducerCnt, int[] locReducers) throws IgniteCheckedException {
+        int totalReducerCnt, int[] locReducers, boolean embedded) throws IgniteCheckedException
{
         this.locReduceAddr = locReduceAddr;
         this.totalReducerCnt = totalReducerCnt;
         this.job = job;
         this.mem = mem;
         this.log = log.getLogger(HadoopShuffleJob.class);
+        this.embedded = embedded;
 
         msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE);
 
@@ -238,10 +255,11 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     }
 
     /**
+     * @param src Source.
      * @param msg Message.
      * @throws IgniteCheckedException Exception.
      */
-    public void onShuffleMessage(HadoopShuffleMessage msg) throws IgniteCheckedException
{
+    public void onShuffleMessage(T src, HadoopShuffleMessage msg) throws IgniteCheckedException
{
         assert msg.buffer() != null;
         assert msg.offset() > 0;
 
@@ -276,6 +294,15 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
                 }
             });
         }
+
+        if (embedded) {
+            // No immediate response.
+            if (localShuffleState(src).onShuffleMessage())
+                sendFinishResponse(src, msg.jobId());
+        }
+        else
+            // Response for every message.
+            io.apply(src, new HadoopShuffleAck(msg.id(), msg.jobId()));
     }
 
     /**
@@ -292,6 +319,121 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
     }
 
     /**
+     * Process shuffle finish request.
+     *
+     * @param src Source.
+     * @param msg Shuffle finish message.
+     */
+    public void onShuffleFinishRequest(T src, HadoopShuffleFinishRequest msg) {
+        if (log.isDebugEnabled())
+            log.debug("Received shuffle finish request [jobId=" + job.id() + ", src=" + src
+ ", req=" + msg + ']');
+
+        HadoopShuffleLocalState state = localShuffleState(src);
+
+        if (state.onShuffleFinishMessage(msg.messageCount()))
+            sendFinishResponse(src, msg.jobId());
+    }
+
+    /**
+     * Process shuffle finish response.
+     *
+     * @param src Source.
+     */
+    public void onShuffleFinishResponse(T src) {
+        if (log.isDebugEnabled())
+            log.debug("Received shuffle finish response [jobId=" + job.id() + ", src=" +
src + ']');
+
+        remoteShuffleState(src).onShuffleFinishResponse();
+    }
+
+    /**
+     * Send finish response.
+     *
+     * @param dest Destination.
+     * @param jobId Job ID.
+     */
+    @SuppressWarnings("unchecked")
+    private void sendFinishResponse(T dest, HadoopJobId jobId) {
+        if (log.isDebugEnabled())
+            log.debug("Sent shuffle finish response [jobId=" + jobId + ", dest=" + dest +
']');
+
+        HadoopShuffleFinishResponse msg = new HadoopShuffleFinishResponse(jobId);
+
+        io.apply(dest, msg);
+    }
+
+    /**
+     * Get local shuffle state for node.
+     *
+     * @param src Source
+     * @return Local shuffle state.
+     */
+    private HadoopShuffleLocalState localShuffleState(T src) {
+        HashMap<T, HadoopShuffleLocalState> states = locShuffleStates;
+
+        HadoopShuffleLocalState res = states.get(src);
+
+        if (res == null) {
+            synchronized (mux) {
+                res = locShuffleStates.get(src);
+
+                if (res == null) {
+                    res = new HadoopShuffleLocalState();
+
+                    states = new HashMap<>(locShuffleStates);
+
+                    states.put(src, res);
+
+                    locShuffleStates = states;
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Get remote shuffle state for node.
+     *
+     * @param src Source.
+     * @return Remote shuffle state.
+     */
+    private HadoopShuffleRemoteState remoteShuffleState(T src) {
+        HashMap<T, HadoopShuffleRemoteState> states = rmtShuffleStates;
+
+        HadoopShuffleRemoteState res = states.get(src);
+
+        if (res == null) {
+            synchronized (mux) {
+                res = rmtShuffleStates.get(src);
+
+                if (res == null) {
+                    res = new HadoopShuffleRemoteState();
+
+                    states = new HashMap<>(rmtShuffleStates);
+
+                    states.put(src, res);
+
+                    rmtShuffleStates = states;
+                }
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Get all remote shuffle states.
+     *
+     * @return Remote shuffle states.
+     */
+    private HashMap<T, HadoopShuffleRemoteState> remoteShuffleStates() {
+        synchronized (mux) {
+            return new HashMap<>(rmtShuffleStates);
+        }
+    }
+
+    /**
      * Unsafe value.
      */
     private static class UnsafeValue implements HadoopMultimap.Value {
@@ -406,38 +548,50 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
      * @param newBufMinSize Min new buffer size.
      */
     private void send(final int idx, int newBufMinSize) {
-        final GridFutureAdapter<?> fut = new GridFutureAdapter<>();
-
         HadoopShuffleMessage msg = msgs[idx];
 
         final long msgId = msg.id();
 
-        IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> old = sentMsgs.putIfAbsent(msgId,
-            new IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>(msg,
fut));
+        final GridFutureAdapter<?> fut;
+
+        if (embedded)
+            fut = null;
+        else {
+            fut = new GridFutureAdapter<>();
+
+            IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> old = sentMsgs.putIfAbsent(msgId,
+                new IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>(msg,
fut));
 
-        assert old == null;
+            assert old == null;
+        }
 
         try {
             io.apply(reduceAddrs[idx], msg);
+
+            if (embedded)
+                remoteShuffleState(reduceAddrs[idx]).onShuffleMessage();
         }
         catch (GridClosureException e) {
-            fut.onDone(U.unwrap(e));
+            if (fut != null)
+                fut.onDone(U.unwrap(e));
         }
 
-        fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
-            @Override public void apply(IgniteInternalFuture<?> f) {
-                try {
-                    f.get();
+        if (fut != null) {
+            fut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> f) {
+                    try {
+                        f.get();
 
-                    // Clean up the future from map only if there was no exception.
-                    // Otherwise flush() should fail.
-                    sentMsgs.remove(msgId);
-                }
-                catch (IgniteCheckedException e) {
-                    log.error("Failed to send message.", e);
+                        // Clean up the future from map only if there was no exception.
+                        // Otherwise flush() should fail.
+                        sentMsgs.remove(msgId);
+                    }
+                    catch (IgniteCheckedException e) {
+                        log.error("Failed to send message.", e);
+                    }
                 }
-            }
-        });
+            });
+        }
 
         msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx,
             Math.max(msgSize, newBufMinSize));
@@ -513,14 +667,41 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
 
         GridCompoundFuture fut = new GridCompoundFuture<>();
 
-        for (IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup :
sentMsgs.values())
-            fut.add(tup.get2());
+        if (embedded) {
+            boolean sent = false;
 
-        fut.markInitialized();
+            for (Map.Entry<T, HadoopShuffleRemoteState> rmtStateEntry : remoteShuffleStates().entrySet())
{
+                T dest = rmtStateEntry.getKey();
+                HadoopShuffleRemoteState rmtState = rmtStateEntry.getValue();
 
-        if (log.isDebugEnabled())
-            log.debug("Collected futures to compound futures for flush: " + sentMsgs.size());
+                HadoopShuffleFinishRequest req = new HadoopShuffleFinishRequest(job.id(),
rmtState.messageCount());
+
+                io.apply(dest, req);
 
+                if (log.isDebugEnabled())
+                    log.debug("Sent shuffle finish request [jobId=" + job.id() + ", dest="
+ dest +
+                        ", req=" + req + ']');
+
+                fut.add(rmtState.future());
+
+                sent = true;
+            }
+
+            if (sent)
+                fut.markInitialized();
+            else
+                return new GridFinishedFuture<>();
+        }
+        else {
+            for (IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> tup
: sentMsgs.values())
+                fut.add(tup.get2());
+
+            fut.markInitialized();
+
+            if (log.isDebugEnabled())
+                log.debug("Collected futures to compound futures for flush: " + sentMsgs.size());
+
+        }
         return fut;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleLocalState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleLocalState.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleLocalState.java
new file mode 100644
index 0000000..68c0653
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleLocalState.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Local shuffle state.
+ */
+class HadoopShuffleLocalState {
+    /** Message counter. */
+    private final AtomicLong msgCnt = new AtomicLong();
+
+    /** Reply guard. */
+    private final AtomicBoolean replyGuard = new AtomicBoolean();
+
+    /** Total message count.*/
+    private volatile long totalMsgCnt;
+
+    /**
+     * Callback invoked when shuffle message arrived.
+     *
+     * @return Whether to perform reply.
+     */
+    public boolean onShuffleMessage() {
+        long msgCnt0 = msgCnt.incrementAndGet();
+
+        return msgCnt0 == totalMsgCnt && reserve();
+    }
+
+    /**
+     * Callback invoked when shuffle is finished.
+     *
+     * @param totalMsgCnt Message count.
+     * @return Whether to perform reply.
+     */
+    public boolean onShuffleFinishMessage(long totalMsgCnt) {
+        this.totalMsgCnt = totalMsgCnt;
+
+        return msgCnt.get() == totalMsgCnt && reserve();
+    }
+
+    /**
+     * Reserve reply.
+     *
+     * @return {@code True} if reserved.
+     */
+    private boolean reserve() {
+        return replyGuard.compareAndSet(false, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
new file mode 100644
index 0000000..5ffaa55
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleRemoteState.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Remote shuffle state.
+ */
+class HadoopShuffleRemoteState<T> {
+    /** Message count. */
+    private final AtomicLong msgCnt = new AtomicLong();
+
+    /** Completion future. */
+    private final GridFutureAdapter fut = new GridFutureAdapter();
+
+    /**
+     * Callback invoked when shuffle message is sent.
+     */
+    public void onShuffleMessage() {
+        msgCnt.incrementAndGet();
+    }
+
+    /**
+     * Callback invoked on shuffle finish response.
+     */
+    public void onShuffleFinishResponse() {
+        fut.onDone();
+    }
+
+    /**
+     * @return Message count.
+     */
+    public long messageCount() {
+        return msgCnt.get();
+    }
+
+    /**
+     * @return Completion future.
+     */
+    public GridFutureAdapter future() {
+        return fut;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/58c33805/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
index 7001b8c..cb08c00 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java
@@ -151,7 +151,7 @@ public class HadoopChildProcessRunner {
                 job.initialize(true, nodeDesc.processId());
 
                 shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(),
log, job, mem,
-                    req.totalReducerCount(), req.localReducers());
+                    req.totalReducerCount(), req.localReducers(), false);
 
                 initializeExecutors(req);
 
@@ -432,9 +432,7 @@ public class HadoopChildProcessRunner {
                         try {
                             HadoopShuffleMessage m = (HadoopShuffleMessage)msg;
 
-                            shuffleJob.onShuffleMessage(m);
-
-                            comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId()));
+                            shuffleJob.onShuffleMessage(desc, m);
                         }
                         catch (IgniteCheckedException e) {
                             U.error(log, "Failed to process hadoop shuffle message [desc="
+ desc + ", msg=" + msg + ']', e);


Mime
View raw message