ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [57/62] [abbrv] ignite git commit: ignite-5155 Added possibility to receive diagnostic information from remote nodes for hanging cache futures
Date Mon, 12 Jun 2017 15:34:11 GMT
ignite-5155 Added possibility to receive diagnostic information from remote nodes for hanging cache futures


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d98ccec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d98ccec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d98ccec

Branch: refs/heads/ignite-5239
Commit: 5d98ccecbd43ad155f1394356c5569604556f158
Parents: 3c5ffd1
Author: sboikov <sboikov@gridgain.com>
Authored: Sat Jun 10 11:51:10 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Sat Jun 10 11:51:10 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   6 +
 .../org/apache/ignite/internal/GridTopic.java   |   6 +-
 .../ignite/internal/IgniteDiagnosticAware.java  |  28 ++
 .../ignite/internal/IgniteDiagnosticInfo.java   |  45 ++
 .../internal/IgniteDiagnosticMessage.java       | 467 +++++++++++++++++++
 .../IgniteDiagnosticPrepareContext.java         | 279 +++++++++++
 .../apache/ignite/internal/IgniteKernal.java    |   7 +-
 .../managers/communication/GridIoManager.java   |   2 +-
 .../communication/GridIoMessageFactory.java     |   8 +-
 .../processors/GridProcessorAdapter.java        |  10 +
 .../processors/cache/GridCacheIoManager.java    |  34 ++
 .../GridCachePartitionExchangeManager.java      | 136 ++++--
 .../cache/GridCacheSharedManagerAdapter.java    |   9 +
 .../distributed/dht/GridDhtTxPrepareFuture.java |  31 +-
 .../dht/GridPartitionedSingleGetFuture.java     |  25 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  45 +-
 .../GridDhtPartitionsExchangeFuture.java        |  48 +-
 .../processors/cluster/ClusterProcessor.java    | 305 +++++++++++-
 .../ignite/internal/util/nio/GridNioServer.java | 212 ++++++---
 .../communication/tcp/TcpCommunicationSpi.java  | 185 +++++---
 .../managers/IgniteDiagnosticMessagesTest.java  | 255 ++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java |   6 +-
 22 files changed, 1970 insertions(+), 179 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 84f3732..539f288 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -448,6 +448,12 @@ public final class IgniteSystemProperties {
     /** If this property is set to {@code true} then Ignite will log thread dump in case of partition exchange timeout. */
     public static final String IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT = "IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT";
 
+    /** */
+    public static final String IGNITE_IO_DUMP_ON_TIMEOUT = "IGNITE_IO_DUMP_ON_TIMEOUT";
+
+    /** */
+    public static final String IGNITE_DIAGNOSTIC_ENABLED = "IGNITE_DIAGNOSTIC_ENABLED";
+
     /** Cache operations that take more time than value of this property will be output to log. Set to {@code 0} to disable. */
     public static final String IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT = "IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index c382999..abdbf95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -96,6 +96,7 @@ public enum GridTopic {
     /** */
     TOPIC_TX,
 
+    /** */
     TOPIC_SNAPSHOT,
 
     /** */
@@ -111,7 +112,10 @@ public enum GridTopic {
     TOPIC_METADATA_REQ,
 
     /** */
-    TOPIC_SCHEMA;
+    TOPIC_SCHEMA,
+
+    /** */
+    TOPIC_INTERNAL_DIAGNOSTIC;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
new file mode 100644
index 0000000..45a5f3a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticAware.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+/**
+ *
+ */
+public interface IgniteDiagnosticAware {
+    /**
+     * @param ctx Context.
+     */
+    public void addDiagnosticRequest(IgniteDiagnosticPrepareContext ctx);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticInfo.java
new file mode 100644
index 0000000..f82f600
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public class IgniteDiagnosticInfo implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private String msg;
+
+    /**
+     * @param msg Message.
+     */
+    public IgniteDiagnosticInfo(String msg) {
+        this.msg = msg;
+    }
+
+    /**
+     * @return Message.
+     */
+    public String message() {
+        return msg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
new file mode 100644
index 0000000..4f37f53
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.nio.ByteBuffer;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
+import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class IgniteDiagnosticMessage implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final int REQUEST_FLAG_MASK = 0x01;
+
+    /** */
+    private static final ThreadLocal<DateFormat> dateFormat = new ThreadLocal<DateFormat>() {
+        @Override protected DateFormat initialValue() {
+            return new SimpleDateFormat("HH:mm:ss.SSS");
+        }
+    };
+
+    /** */
+    private byte flags;
+
+    /** */
+    private long futId;
+
+    /** */
+    private byte[] bytes;
+
+    /**
+     * Required by {@link GridIoMessageFactory}.
+     */
+    public IgniteDiagnosticMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param marsh Marshaller.
+     * @param c Closure to run.
+     * @param futId Future ID.
+     * @return Request message.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static IgniteDiagnosticMessage createRequest(Marshaller marsh,
+        IgniteClosure<GridKernalContext, IgniteDiagnosticInfo> c,
+        long futId)
+        throws IgniteCheckedException
+    {
+        byte[] cBytes = U.marshal(marsh, c);
+
+        IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage();
+
+        msg.futId = futId;
+        msg.bytes = cBytes;
+        msg.flags |= REQUEST_FLAG_MASK;
+
+        return msg;
+    }
+
+    /**
+     * @param resBytes Marshalled result.
+     * @param futId Future ID.
+     * @return Response message.
+     */
+    public static IgniteDiagnosticMessage createResponse(byte[] resBytes, long futId) {
+        IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage();
+
+        msg.futId = futId;
+        msg.bytes = resBytes;
+
+        return msg;
+    }
+
+    /**
+     * @param marsh Marshaller.
+     * @return Unmarshalled payload.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public <T> T unmarshal(Marshaller marsh)
+        throws IgniteCheckedException {
+        if (bytes == null)
+            return null;
+
+        return U.unmarshal(marsh, bytes, null);
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public long futureId() {
+        return futId;
+    }
+
+    /**
+     * @return {@code True} if this is request message.
+     */
+    public boolean request() {
+        return (flags & REQUEST_FLAG_MASK) != 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("bytes", bytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                bytes = reader.readByteArray("bytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                futId = reader.readLong("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(IgniteDiagnosticMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return -55;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /**
+     *
+     */
+    public static abstract class DiagnosticBaseClosure implements IgniteBiInClosure<StringBuilder, GridKernalContext> {
+        /**
+         * @return Key to group similar messages.
+         */
+        public Object mergeKey() {
+            return getClass();
+        }
+
+        /**
+         * @param other Another closure of the same type.
+         */
+        public void merge(DiagnosticBaseClosure other) {
+            // No-op.
+        }
+    }
+
+    /**
+     *
+     */
+    public final static class TxEntriesInfoClosure extends DiagnosticBaseClosure {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final int cacheId;
+
+        /** */
+        private final Set<KeyCacheObject> keys;
+
+        /**
+         * @param cacheId Cache ID.
+         * @param keys Keys.
+         */
+        TxEntriesInfoClosure(int cacheId, Collection<KeyCacheObject> keys) {
+            this.cacheId = cacheId;
+            this.keys = new HashSet<>(keys);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(StringBuilder sb, GridKernalContext ctx) {
+            sb.append(U.nl());
+
+            GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
+
+            if (cctx == null) {
+                sb.append("Failed to find cache with id: ").append(cacheId);
+
+                return;
+            }
+
+            try {
+                for (KeyCacheObject key : keys)
+                    key.finishUnmarshal(cctx.cacheObjectContext(), null);
+            }
+            catch (IgniteCheckedException e) {
+                ctx.cluster().diagnosticLog().error("Failed to unmarshal key: " + e, e);
+
+                sb.append("Failed to unmarshal key: ").append(e).append(U.nl());
+            }
+
+            sb.append("Cache entries [cacheId=").append(cacheId)
+                .append(", cacheName=").append(cctx.name()).append("]: ");
+
+            for (KeyCacheObject key : keys) {
+                GridCacheMapEntry e = (GridCacheMapEntry)cctx.cache().peekEx(key);
+
+                sb.append(U.nl()).append("    Key [key=").append(key).append(", entry=").append(e).append("]");
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object mergeKey() {
+            return new T2<>(getClass(), cacheId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void merge(DiagnosticBaseClosure other) {
+            TxEntriesInfoClosure other0 = (TxEntriesInfoClosure)other;
+
+            assert other0 != null && cacheId == other0.cacheId : other;
+
+            this.keys.addAll(other0.keys);
+        }
+    }
+
+    /**
+     *
+     */
+    public final static class ExchangeInfoClosure extends DiagnosticBaseClosure {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final AffinityTopologyVersion topVer;
+
+        /**
+         * @param topVer Exchange version.
+         */
+        ExchangeInfoClosure(AffinityTopologyVersion topVer) {
+            this.topVer = topVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(StringBuilder sb, GridKernalContext ctx) {
+            sb.append(U.nl());
+
+            List<GridDhtPartitionsExchangeFuture> futs = ctx.cache().context().exchange().exchangeFutures();
+
+            for (GridDhtPartitionsExchangeFuture fut : futs) {
+                if (topVer.equals(fut.topologyVersion())) {
+                    sb.append("Exchange future: ").append(fut);
+
+                    return;
+                }
+            }
+
+            sb.append("Failed to find exchange future: ").append(topVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object mergeKey() {
+            return new T2<>(getClass(), topVer);
+        }
+    }
+
+    /**
+     *
+     */
+    public final static class TxInfoClosure extends DiagnosticBaseClosure {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private final GridCacheVersion dhtVer;
+
+        /** */
+        private final GridCacheVersion nearVer;
+
+        /**
+         * @param dhtVer Tx dht version.
+         * @param nearVer Tx near version.
+         */
+        TxInfoClosure(GridCacheVersion dhtVer, GridCacheVersion nearVer) {
+            this.dhtVer = dhtVer;
+            this.nearVer = nearVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(StringBuilder sb, GridKernalContext ctx) {
+            sb.append(U.nl())
+                .append("Related transactions [dhtVer=").append(dhtVer)
+                .append(", nearVer=").append(nearVer).append("]: ");
+
+            boolean found = false;
+
+            for (IgniteInternalTx tx : ctx.cache().context().tm().activeTransactions()) {
+                if (dhtVer.equals(tx.xidVersion()) || nearVer.equals(tx.nearXidVersion())) {
+                    sb.append(U.nl())
+                        .append("    [ver=").append(tx.xidVersion())
+                        .append(", nearVer=").append(tx.nearXidVersion())
+                        .append(", topVer=").append(tx.topologyVersion())
+                        .append(", state=").append(tx.state())
+                        .append(", fullTx=").append(tx).append(']');
+
+                    found = true;
+                }
+            }
+
+            if (!found)
+                sb.append(U.nl()).append("Failed to find related transactions.");
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object mergeKey() {
+            return new T3<>(getClass(), nearVer, dhtVer);
+        }
+    }
+
+    /**
+     *
+     * @param sb String builder.
+     * @param ctx Context.
+     */
+    static void dumpNodeBasicInfo(StringBuilder sb, GridKernalContext ctx) {
+        sb.append("General node info [id=").append(ctx.localNodeId())
+            .append(", client=").append(ctx.clientNode())
+            .append(", discoTopVer=").append(ctx.discovery().topologyVersionEx())
+            .append(", time=").append(formatTime(U.currentTimeMillis())).append(']');
+    }
+
+    /**
+     * @param sb String builder.
+     * @param ctx Context.
+     */
+    static void dumpExchangeInfo(StringBuilder sb, GridKernalContext ctx) {
+        GridCachePartitionExchangeManager exchMgr = ctx.cache().context().exchange();
+        GridDhtTopologyFuture fut = exchMgr.lastTopologyFuture();
+
+        sb.append("Partitions exchange info [readyVer=").append(exchMgr.readyAffinityVersion()).append(']').append(U.nl())
+            .append("Last initialized exchange future: ").append(fut);
+    }
+
+    /**
+     * @param ctx Context.
+     * @param nodeId Target node ID.
+     * @return Communication information future.
+     */
+    public static IgniteInternalFuture<String> dumpCommunicationInfo(GridKernalContext ctx, UUID nodeId) {
+        if (ctx.config().getCommunicationSpi() instanceof TcpCommunicationSpi)
+            return ((TcpCommunicationSpi) ctx.config().getCommunicationSpi()).dumpNodeStatistics(nodeId);
+        else
+            return new GridFinishedFuture<>("Unexpected communication SPI: " + ctx.config().getCommunicationSpi());
+    }
+
+    /**
+     * @param time Time.
+     * @return Time string.
+     */
+    private static String formatTime(long time) {
+        return dateFormat.get().format(new Date(time));
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteDiagnosticMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
new file mode 100644
index 0000000..b55199a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.IgniteDiagnosticMessage.DiagnosticBaseClosure;
+import static org.apache.ignite.internal.IgniteDiagnosticMessage.ExchangeInfoClosure;
+import static org.apache.ignite.internal.IgniteDiagnosticMessage.TxEntriesInfoClosure;
+import static org.apache.ignite.internal.IgniteDiagnosticMessage.TxInfoClosure;
+import static org.apache.ignite.internal.IgniteDiagnosticMessage.dumpCommunicationInfo;
+import static org.apache.ignite.internal.IgniteDiagnosticMessage.dumpExchangeInfo;
+import static org.apache.ignite.internal.IgniteDiagnosticMessage.dumpNodeBasicInfo;
+
+/**
+ * Groups diagnostic closures by node/closure type.
+ */
+public class IgniteDiagnosticPrepareContext {
+    /** */
+    private final UUID locNodeId;
+
+    /** */
+    private final Map<UUID, CompoundInfoClosure> cls = new HashMap<>();
+
+    /**
+     * @param nodeId Local node ID.
+     */
+    public IgniteDiagnosticPrepareContext(UUID nodeId) {
+        locNodeId = nodeId;
+    }
+
+    /**
+     * @param nodeId Remote node ID.
+     * @param topVer Topology version.
+     * @param msg Initial message.
+     */
+    public void exchangeInfo(UUID nodeId, AffinityTopologyVersion topVer, String msg) {
+        closure(nodeId).add(msg, new ExchangeInfoClosure(topVer));
+    }
+
+    /**
+     * @param nodeId Remote node ID.
+     * @param cacheId Cache ID.
+     * @param keys Entry keys.
+     * @param msg Initial message.
+     */
+    public void  txKeyInfo(UUID nodeId, int cacheId, Collection<KeyCacheObject> keys, String msg) {
+        closure(nodeId).add(msg, new TxEntriesInfoClosure(cacheId, keys));
+    }
+
+    /**
+     * @param nodeId Remote node ID.
+     * @param dhtVer Tx dht version.
+     * @param nearVer Tx near version.
+     * @param msg Initial message.
+     */
+    public void remoteTxInfo(UUID nodeId, GridCacheVersion dhtVer, GridCacheVersion nearVer, String msg) {
+        closure(nodeId).add(msg, new TxInfoClosure(dhtVer, nearVer));
+    }
+
+    /**
+     * @param nodeId Remote node ID.
+     * @param msg Initial message.
+     */
+    public void basicInfo(UUID nodeId, String msg) {
+        closure(nodeId).add(msg, null);
+    }
+
+    /**
+     * @param nodeId Remote node ID.
+     * @return Compound closure
+     */
+    private CompoundInfoClosure closure(UUID nodeId) {
+        CompoundInfoClosure cl = cls.get(nodeId);
+
+        if (cl == null)
+            cls.put(nodeId, cl = new CompoundInfoClosure(locNodeId));
+
+        return cl;
+    }
+
+    /**
+     * @return {@code True} if there are no added closures.
+     */
+    public boolean empty() {
+        return cls.isEmpty();
+    }
+
+    /**
+     * @param ctx Grid context.
+     * @param lsnr Optional listener (used in test).
+     */
+    public void send(GridKernalContext ctx, @Nullable IgniteInClosure<IgniteInternalFuture<String>> lsnr) {
+        for (Map.Entry<UUID, CompoundInfoClosure> entry : cls.entrySet()) {
+            UUID rmtNodeId = entry.getKey();
+
+            CompoundInfoClosure c = entry.getValue();
+
+            IgniteInternalFuture<String> fut =
+                ctx.cluster().requestDiagnosticInfo(rmtNodeId, c, c.message());
+
+            if (lsnr != null)
+                fut.listen(lsnr);
+
+            listenAndLog(ctx.cluster().diagnosticLog(), fut);
+        }
+    }
+
+    /**
+     * @param log Logger.
+     * @param fut Future.
+     */
+    private void listenAndLog(final IgniteLogger log, IgniteInternalFuture<String> fut) {
+        fut.listen(new CI1<IgniteInternalFuture<String>>() {
+            @Override public void apply(IgniteInternalFuture<String> fut) {
+                synchronized (IgniteDiagnosticPrepareContext.class) {
+                    try {
+                        log.info(fut.get());
+                    }
+                    catch (Exception e) {
+                        U.error(log, "Failed to dump diagnostic info: " + e, e);
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     *
+     */
+    private final static class CompoundInfoClosure implements IgniteClosure<GridKernalContext, IgniteDiagnosticInfo> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** ID of node sent closure. */
+        protected final UUID nodeId;
+
+        /** Closures to send on remote node. */
+        private Map<Object, IgniteDiagnosticMessage.DiagnosticBaseClosure> cls = new LinkedHashMap<>();
+
+        /** Local message related to remote closures. */
+        private transient Map<Object, List<String>> msgs = new LinkedHashMap<>();
+
+        /**
+         * @param nodeId Node sent closure.
+         */
+        CompoundInfoClosure(UUID nodeId) {
+            this.nodeId = nodeId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public final IgniteDiagnosticInfo apply(GridKernalContext ctx) {
+            try {
+                IgniteInternalFuture<String> commInfo = dumpCommunicationInfo(ctx, nodeId);
+
+                StringBuilder sb = new StringBuilder();
+
+                dumpNodeBasicInfo(sb, ctx);
+
+                sb.append(U.nl());
+
+                dumpExchangeInfo(sb, ctx);
+
+                sb.append(U.nl());
+
+                sb.append(commInfo.get(10_000));
+
+                moreInfo(sb, ctx);
+
+                return new IgniteDiagnosticInfo(sb.toString());
+            }
+            catch (Exception e) {
+                ctx.cluster().diagnosticLog().error("Failed to execute diagnostic message closure: " + e, e);
+
+                return new IgniteDiagnosticInfo("Failed to execute diagnostic message closure: " + e);
+            }
+        }
+
+        /**
+         * @param sb String builder.
+         * @param ctx Grid context.
+         */
+        private void moreInfo(StringBuilder sb, GridKernalContext ctx) {
+            for (DiagnosticBaseClosure c : cls.values()) {
+                try {
+                    c.apply(sb, ctx);
+                }
+                catch (Exception e) {
+                    ctx.cluster().diagnosticLog().error(
+                        "Failed to populate diagnostic with additional information: " + e, e);
+
+                    sb.append(U.nl()).append("Failed to populate diagnostic with additional information: ").append(e);
+                }
+            }
+        }
+
+        /**
+         * @return Node ID.
+         */
+        public UUID nodeId() {
+            return nodeId;
+        }
+
+        /**
+         * @return Initial message.
+         */
+        public String message() {
+            StringBuilder sb = new StringBuilder();
+
+            for (List<String> msgs0 : msgs.values()) {
+                for (String msg : msgs0) {
+                    if (sb.length() > 0)
+                        sb.append('\n');
+
+                    sb.append(msg);
+                }
+            }
+
+            return sb.toString();
+        }
+
+        /**
+         * @param msg Message.
+         * @param c Closure or {@code null} if only basic info is needed.
+         */
+        public void add(String msg, @Nullable DiagnosticBaseClosure c) {
+            Object key = c != null ? c.mergeKey() : getClass();
+
+            List<String> msgs0 = msgs.get(key);
+
+            if (msgs0 == null) {
+                msgs0 = new ArrayList<>();
+
+                msgs.put(key, msgs0);
+            }
+
+            msgs0.add(msg);
+
+            if (c != null) {
+                DiagnosticBaseClosure c0 = cls.get(c.mergeKey());
+
+                if (c0 == null)
+                    cls.put(c.mergeKey(), c);
+                else
+                    c0.merge(c);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 6d05147..fec1892 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -950,7 +950,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
                 // Start platform plugins.
                 if (ctx.config().getPlatformConfiguration() != null)
-                    startProcessor(new PlatformPluginProcessor(ctx));fillNodeAttributes(clusterProc.updateNotifierEnabled());}
+                    startProcessor(new PlatformPluginProcessor(ctx));
+
+                ctx.cluster().initDiagnosticListeners();
+
+                fillNodeAttributes(clusterProc.updateNotifierEnabled());
+            }
             catch (Throwable e) {
                 U.error(
                     log, "Exception during start processors, node will be stopped and close connections", e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 1efc4aa..81692da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -2290,7 +2290,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * Dumps SPI stats to logs in case TcpCommunicationSpi is used, no-op otherwise.
+     * Dumps SPI stats to diagnostic logs in case TcpCommunicationSpi is used, no-op otherwise.
      */
     public void dumpStats() {
         CommunicationSpi spi = getSpi();

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 753d8af..12f160b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridJobSiblingsRequest;
 import org.apache.ignite.internal.GridJobSiblingsResponse;
 import org.apache.ignite.internal.GridTaskCancelRequest;
 import org.apache.ignite.internal.GridTaskSessionRequest;
+import org.apache.ignite.internal.IgniteDiagnosticMessage;
 import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
 import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest;
@@ -178,6 +179,11 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -55:
+                msg = new IgniteDiagnosticMessage();
+
+                break;
+
             // -54 is reserved for SQL.
 
             case -53:
@@ -875,7 +881,7 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..119] [124..127] [-23..-27] [-36..-47]- this
+            // [-3..119] [124..127] [-23..-27] [-36..-55]- this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
             default:

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
index cd97aea..b9d7260 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -35,6 +36,9 @@ import org.jetbrains.annotations.Nullable;
  * Advanced parent adapter for all processor.
  */
 public abstract class GridProcessorAdapter implements GridProcessor {
+    /** */
+    private static final String DIAGNOSTIC_LOG_CATEGORY = "org.apache.ignite.internal.diagnostic";
+
     /** Kernal context. */
     @GridToStringExclude
     protected final GridKernalContext ctx;
@@ -43,6 +47,10 @@ public abstract class GridProcessorAdapter implements GridProcessor {
     @GridToStringExclude
     protected final IgniteLogger log;
 
+    /** Diagnostic logger. */
+    @GridToStringExclude
+    protected final IgniteLogger diagnosticLog;
+
     /**
      * @param ctx Kernal context.
      */
@@ -52,6 +60,8 @@ public abstract class GridProcessorAdapter implements GridProcessor {
         this.ctx = ctx;
 
         log = ctx.log(getClass());
+
+        diagnosticLog = ctx.log(DIAGNOSTIC_LOG_CATEGORY);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index a251047..946d256 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
@@ -104,6 +106,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     /** Message ID generator. */
     private static final AtomicLong idGen = new AtomicLong();
 
+    /** */
+    private static final int MAX_STORED_PENDING_MESSAGES = 100;
+
     /** Delay in milliseconds between retries. */
     private long retryDelay;
 
@@ -126,6 +131,26 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
     /** Deployment enabled. */
     private boolean depEnabled;
 
+    /** */
+    private final List<GridCacheMessage> pendingMsgs = new ArrayList<>(MAX_STORED_PENDING_MESSAGES);
+
+    /**
+     *
+     */
+    public void dumpPendingMessages() {
+        synchronized (pendingMsgs) {
+            if (pendingMsgs.isEmpty())
+                return;
+
+            diagnosticLog.info("Pending cache messages waiting for exchange [" +
+                "readyVer=" + cctx.exchange().readyAffinityVersion() +
+                ", discoVer=" + cctx.discovery().topologyVersion() + ']');
+
+            for (GridCacheMessage msg : pendingMsgs)
+                diagnosticLog.info("Message [waitVer=" + msg.topologyVersion() + ", msg=" + msg + ']');
+        }
+    }
+
     /** Message listener. */
     private GridMessageListener lsnr = new GridMessageListener() {
         @Override public void onMessage(final UUID nodeId, final Object msg) {
@@ -214,6 +239,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             }
 
             if (fut != null && !fut.isDone()) {
+                synchronized (pendingMsgs) {
+                    if (pendingMsgs.size() < MAX_STORED_PENDING_MESSAGES)
+                        pendingMsgs.add(cacheMsg);
+                }
+
                 Thread curThread = Thread.currentThread();
 
                 final int stripe = curThread instanceof IgniteThread ? ((IgniteThread)curThread).stripe() : -1;
@@ -222,6 +252,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
                     @Override public void apply(IgniteInternalFuture<?> t) {
                         Runnable c = new Runnable() {
                             @Override public void run() {
+                                synchronized (pendingMsgs) {
+                                    pendingMsgs.remove(cacheMsg);
+                                }
+
                                 IgniteLogger log = cacheMsg.messageLogger(cctx);
 
                                 if (log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 2be4d0f..fdf8a2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -49,6 +49,8 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteDiagnosticAware;
+import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -101,6 +103,7 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_IO_DUMP_ON_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.getLong;
@@ -110,7 +113,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
 import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.nextDumpTimeout;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.DFLT_PRELOAD_RESEND_TIMEOUT;
 
 /**
@@ -1364,47 +1367,63 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param exchTopVer Optional current exchange topology version.
+     * @param exchFut Optional current exchange future.
      * @throws Exception If failed.
      */
-    public void dumpDebugInfo(@Nullable AffinityTopologyVersion exchTopVer) throws Exception {
-        U.warn(log, "Ready affinity version: " + readyTopVer.get());
+    public void dumpDebugInfo(@Nullable GridDhtPartitionsExchangeFuture exchFut) throws Exception {
+        AffinityTopologyVersion exchTopVer = exchFut != null ? exchFut.topologyVersion() : null;
 
-        U.warn(log, "Last exchange future: " + lastInitializedFut);
+        U.warn(diagnosticLog, "Ready affinity version: " + readyTopVer.get());
+
+        U.warn(diagnosticLog, "Last exchange future: " + lastInitializedFut);
 
         exchWorker.dumpExchangeDebugInfo();
 
         if (!readyFuts.isEmpty()) {
-            U.warn(log, "Pending affinity ready futures:");
+            U.warn(diagnosticLog, "Pending affinity ready futures:");
 
             for (AffinityReadyFuture fut : readyFuts.values())
-                U.warn(log, ">>> " + fut);
+                U.warn(diagnosticLog, ">>> " + fut);
         }
 
+        IgniteDiagnosticPrepareContext diagCtx = cctx.kernalContext().cluster().diagnosticEnabled() ?
+            new IgniteDiagnosticPrepareContext(cctx.localNodeId()) : null;
+
+        if (diagCtx != null && exchFut != null)
+            exchFut.addDiagnosticRequest(diagCtx);
+
         ExchangeFutureSet exchFuts = this.exchFuts;
 
         if (exchFuts != null) {
-            U.warn(log, "Last 10 exchange futures (total: " + exchFuts.size() + "):");
+            U.warn(diagnosticLog, "Last 10 exchange futures (total: " + exchFuts.size() + "):");
 
             int cnt = 0;
 
             for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) {
-                U.warn(log, ">>> " + fut);
+                U.warn(diagnosticLog, ">>> " + fut.shortInfo());
 
                 if (++cnt == 10)
                     break;
             }
         }
 
-        dumpPendingObjects(exchTopVer);
+        dumpPendingObjects(exchTopVer, diagCtx);
 
         for (CacheGroupContext grp : cctx.cache().cacheGroups())
             grp.preloader().dumpDebugInfo();
 
         cctx.affinity().dumpDebugInfo();
 
-        // Dump IO manager statistics.
-        cctx.gridIO().dumpStats();
+        cctx.io().dumpPendingMessages();
+
+        if (IgniteSystemProperties.getBoolean(IGNITE_IO_DUMP_ON_TIMEOUT, false))
+            cctx.gridIO().dumpStats();
+
+        if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
+            U.dumpThreads(diagnosticLog);
+
+        if (diagCtx != null)
+            diagCtx.send(cctx.kernalContext(), null);
     }
 
     /**
@@ -1420,12 +1439,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         GridCacheMvccManager mvcc = cctx.mvcc();
 
+        final IgniteDiagnosticPrepareContext diagCtx = cctx.kernalContext().cluster().diagnosticEnabled() ?
+            new IgniteDiagnosticPrepareContext(cctx.localNodeId()) : null;
+
         if (tm != null) {
             for (IgniteInternalTx tx : tm.activeTransactions()) {
                 if (curTime - tx.startTime() > timeout) {
                     found = true;
 
-                    U.warn(log, "Found long running transaction [startTime=" + formatTime(tx.startTime()) +
+                    U.warn(diagnosticLog, "Found long running transaction [startTime=" + formatTime(tx.startTime()) +
                         ", curTime=" + formatTime(curTime) + ", tx=" + tx + ']');
                 }
             }
@@ -1436,8 +1458,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 if (curTime - fut.startTime() > timeout) {
                     found = true;
 
-                    U.warn(log, "Found long running cache future [startTime=" + formatTime(fut.startTime()) +
+                    U.warn(diagnosticLog, "Found long running cache future [startTime=" + formatTime(fut.startTime()) +
                         ", curTime=" + formatTime(curTime) + ", fut=" + fut + ']');
+
+                    if (diagCtx != null && fut instanceof IgniteDiagnosticAware)
+                        ((IgniteDiagnosticAware)fut).addDiagnosticRequest(diagCtx);
                 }
             }
 
@@ -1445,12 +1470,28 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 if (curTime - fut.startTime() > timeout) {
                     found = true;
 
-                    U.warn(log, "Found long running cache future [startTime=" + formatTime(fut.startTime()) +
+                    U.warn(diagnosticLog, "Found long running cache future [startTime=" + formatTime(fut.startTime()) +
                         ", curTime=" + formatTime(curTime) + ", fut=" + fut + ']');
+
+                    if (diagCtx != null && fut instanceof IgniteDiagnosticAware)
+                        ((IgniteDiagnosticAware)fut).addDiagnosticRequest(diagCtx);
                 }
             }
         }
 
+        if (diagCtx != null && !diagCtx.empty()) {
+            try {
+                cctx.kernalContext().closure().runLocal(new Runnable() {
+                    @Override public void run() {
+                        diagCtx.send(cctx.kernalContext(), null);
+                    }
+                }, SYSTEM_POOL);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(diagnosticLog, "Failed to submit diagnostic closure: " + e, e);
+            }
+        }
+
         return found;
     }
 
@@ -1472,15 +1513,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 nextLongRunningOpsDumpTime = U.currentTimeMillis() + nextDumpTimeout(longRunningOpsDumpStep++, timeout);
 
                 if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false)) {
-                    U.warn(log, "Found long running cache operations, dump threads.");
+                    U.warn(diagnosticLog, "Found long running cache operations, dump threads.");
 
-                    U.dumpThreads(log);
+                    U.dumpThreads(diagnosticLog);
                 }
 
-                U.warn(log, "Found long running cache operations, dump IO statistics.");
+                if (IgniteSystemProperties.getBoolean(IGNITE_IO_DUMP_ON_TIMEOUT, false)) {
+                    U.warn(diagnosticLog, "Found long running cache operations, dump IO statistics.");
 
-                // Dump IO manager statistics.
-                cctx.gridIO().dumpStats();
+                    // Dump IO manager statistics.
+                    cctx.gridIO().dumpStats();
+                }
             }
             else {
                 nextLongRunningOpsDumpTime = 0;
@@ -1488,7 +1531,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             }
         }
         catch (Exception e) {
-            U.error(log, "Failed to dump debug information: " + e, e);
+            U.error(diagnosticLog, "Failed to dump debug information: " + e, e);
         }
     }
 
@@ -1512,52 +1555,54 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
     /**
      * @param exchTopVer Exchange topology version.
+     * @param diagCtx Diagnostic request.
      */
-    private void dumpPendingObjects(@Nullable AffinityTopologyVersion exchTopVer) {
+    private void dumpPendingObjects(@Nullable AffinityTopologyVersion exchTopVer,
+        @Nullable IgniteDiagnosticPrepareContext diagCtx) {
         IgniteTxManager tm = cctx.tm();
 
         if (tm != null) {
-            U.warn(log, "Pending transactions:");
+            U.warn(diagnosticLog, "Pending transactions:");
 
             for (IgniteInternalTx tx : tm.activeTransactions()) {
                 if (exchTopVer != null) {
-                    U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() +
+                    U.warn(diagnosticLog, ">>> [txVer=" + tx.topologyVersionSnapshot() +
                         ", exchWait=" + tm.needWaitTransaction(tx, exchTopVer) +
                         ", tx=" + tx + ']');
                 }
                 else
-                    U.warn(log, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", tx=" + tx + ']');
+                    U.warn(diagnosticLog, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", tx=" + tx + ']');
             }
         }
 
         GridCacheMvccManager mvcc = cctx.mvcc();
 
         if (mvcc != null) {
-            U.warn(log, "Pending explicit locks:");
+            U.warn(diagnosticLog, "Pending explicit locks:");
 
             for (GridCacheExplicitLockSpan lockSpan : mvcc.activeExplicitLocks())
-                U.warn(log, ">>> " + lockSpan);
+                U.warn(diagnosticLog, ">>> " + lockSpan);
 
-            U.warn(log, "Pending cache futures:");
+            U.warn(diagnosticLog, "Pending cache futures:");
 
             for (GridCacheFuture<?> fut : mvcc.activeFutures())
-                U.warn(log, ">>> " + fut);
+                dumpDiagnosticInfo(fut, diagCtx);
 
-            U.warn(log, "Pending atomic cache futures:");
+            U.warn(diagnosticLog, "Pending atomic cache futures:");
 
             for (GridCacheFuture<?> fut : mvcc.atomicFutures())
-                U.warn(log, ">>> " + fut);
+                dumpDiagnosticInfo(fut, diagCtx);
 
-            U.warn(log, "Pending data streamer futures:");
+            U.warn(diagnosticLog, "Pending data streamer futures:");
 
             for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures())
-                U.warn(log, ">>> " + fut);
+                dumpDiagnosticInfo(fut, diagCtx);
 
             if (tm != null) {
-                U.warn(log, "Pending transaction deadlock detection futures:");
+                U.warn(diagnosticLog, "Pending transaction deadlock detection futures:");
 
                 for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures())
-                    U.warn(log, ">>> " + fut);
+                    dumpDiagnosticInfo(fut, diagCtx);
             }
         }
 
@@ -1578,6 +1623,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * Logs the future and add diagnostic info closure.
+     *
+     * @param fut Future.
+     * @param ctx Diagnostic prepare context.
+     */
+    private void dumpDiagnosticInfo(IgniteInternalFuture<?> fut,
+        @Nullable IgniteDiagnosticPrepareContext ctx) {
+        U.warn(diagnosticLog, ">>> " + fut);
+
+        if (ctx != null && fut instanceof IgniteDiagnosticAware)
+            ((IgniteDiagnosticAware)fut).addDiagnosticRequest(ctx);
+    }
+
+    /**
      * @param deque Deque to poll from.
      * @param time Time to wait.
      * @param w Worker.
@@ -1698,7 +1757,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
             for (CachePartitionExchangeWorkerTask task: futQ) {
                 if (isExchangeTask(task))
-                    U.warn(log, ">>> " + task);
+                    U.warn(log, ">>> " + ((GridDhtPartitionsExchangeFuture)task).shortInfo());
             }
         }
 
@@ -1797,15 +1856,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
                                     if (nextDumpTime <= U.currentTimeMillis()) {
                                         try {
-                                            dumpDebugInfo(exchFut.topologyVersion());
+                                            dumpDebugInfo(exchFut);
                                         }
                                         catch (Exception e) {
                                             U.error(log, "Failed to dump debug information: " + e, e);
                                         }
 
-                                        if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
-                                            U.dumpThreads(log);
-
                                         nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, futTimeout);
                                     }
                                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
index 3194ac6..c98a8f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
@@ -27,12 +27,19 @@ import org.apache.ignite.lang.IgniteFuture;
  * Convenience adapter for cache managers.
  */
 public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManager<K, V> {
+
+    /** */
+    private static final String DIAGNOSTIC_LOG_CATEGORY = "org.apache.ignite.internal.diagnostic";
+
     /** Context. */
     protected GridCacheSharedContext<K, V> cctx;
 
     /** Logger. */
     protected IgniteLogger log;
 
+    /** Diagnostic logger. */
+    protected IgniteLogger diagnosticLog;
+
     /** Starting flag. */
     private final AtomicBoolean starting = new AtomicBoolean(false);
 
@@ -50,6 +57,8 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
 
         log = cctx.logger(getClass());
 
+        diagnosticLog = cctx.logger(DIAGNOSTIC_LOG_CATEGORY);
+
         start0();
 
         if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 75f8366..609e4cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -36,6 +36,8 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteDiagnosticAware;
+import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -100,7 +102,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED;
  */
 @SuppressWarnings("unchecked")
 public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse>
-    implements GridCacheMvccFuture<GridNearTxPrepareResponse> {
+    implements GridCacheMvccFuture<GridNearTxPrepareResponse>, IgniteDiagnosticAware {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -1567,6 +1569,33 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
     }
 
     /** {@inheritDoc} */
+    @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext req) {
+        if (!isDone()) {
+            for (IgniteInternalFuture fut : futures()) {
+                if (!fut.isDone() && fut instanceof MiniFuture) {
+                    MiniFuture f = (MiniFuture)fut;
+
+                    if (!f.node().isLocal()) {
+                        GridCacheVersion dhtVer = tx.xidVersion();
+                        GridCacheVersion nearVer = tx.nearXidVersion();
+
+                        req.remoteTxInfo(f.nodeId, dhtVer, nearVer, "GridDhtTxPrepareFuture " +
+                            "waiting for response [node=" + f.nodeId +
+                            ", topVer=" + tx.topologyVersion() +
+                            ", dhtVer=" + dhtVer +
+                            ", nearVer=" + nearVer +
+                            ", futId=" + futId +
+                            ", miniId=" + f.futId +
+                            ", tx=" + tx + ']');
+
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
             @Override public String apply(IgniteInternalFuture<?> f) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 3f612f7..d3bfb3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -26,6 +26,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteDiagnosticAware;
+import org.apache.ignite.internal.IgniteDiagnosticMessage;
+import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
@@ -63,7 +66,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
  *
  */
 public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Object> implements GridCacheFuture<Object>,
-    CacheGetFuture {
+    CacheGetFuture, IgniteDiagnosticAware {
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
 
@@ -767,6 +770,26 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec
     }
 
     /** {@inheritDoc} */
+    @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext ctx) {
+        if (!isDone()) {
+            UUID nodeId;
+            AffinityTopologyVersion topVer;
+
+            synchronized (this) {
+                nodeId = node != null ? node.id() : null;
+                topVer = this.topVer;
+            }
+
+            if (nodeId != null)
+                ctx.basicInfo(nodeId, "GridPartitionedSingleGetFuture waiting for " +
+                    "response [node=" + nodeId +
+                    ", key=" + key +
+                    ", futId=" + futId +
+                    ", topVer=" + topVer + ']');
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridPartitionedSingleGetFuture.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 4442b3a..b88eb47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteDiagnosticAware;
+import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
@@ -83,7 +85,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
  * Colocated cache lock future.
  */
 public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityFuture<Boolean>
-    implements GridCacheMvccFuture<Boolean> {
+    implements GridCacheMvccFuture<Boolean>, IgniteDiagnosticAware {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -596,13 +598,51 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
     }
 
     /** {@inheritDoc} */
+    @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext ctx) {
+        if (!isDone()) {
+            for (IgniteInternalFuture fut : futures()) {
+                if (!fut.isDone() && isMini(fut)) {
+                    MiniFuture m = (MiniFuture)fut;
+
+                    AffinityTopologyVersion topVer = null;
+                    UUID rmtNodeId = null;
+
+                    synchronized (m) {
+                        if (!m.rcvRes && !m.node.isLocal()) {
+                            rmtNodeId = m.node.id();
+
+                            topVer = this.topVer;
+                        }
+                    }
+
+                    if (rmtNodeId != null) {
+                        ctx.txKeyInfo(rmtNodeId, cctx.cacheId(), m.keys,
+                            "GridDhtColocatedLockFuture waiting for response [node=" + rmtNodeId +
+                            ", cache=" + cctx.name() +
+                            ", miniId=" + m.futId +
+                            ", topVer=" + topVer +
+                            ", keys=" + m.keys + ']');
+
+                        return;
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
             @Override public String apply(IgniteInternalFuture<?> f) {
                 if (isMini(f)) {
                     MiniFuture m = (MiniFuture)f;
 
-                    return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+                    synchronized (m) {
+                        return "[node=" + m.node().id() +
+                            ", rcvRes=" + m.rcvRes +
+                            ", loc=" + m.node().isLocal() +
+                            ", done=" + f.isDone() + "]";
+                    }
                 }
                 else
                     return "[loc=true, done=" + f.isDone() + "]";
@@ -610,6 +650,7 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF
         });
 
         return S.toString(GridDhtColocatedLockFuture.class, this,
+            "topVer", topVer,
             "innerFuts", futs,
             "inTx", inTx(),
             "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 26406c6..2271a85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -42,6 +42,8 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteDiagnosticAware;
+import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -108,7 +110,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
  */
 @SuppressWarnings({"TypeMayBeWeakened", "unchecked"})
 public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion>
-    implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask {
+    implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask, IgniteDiagnosticAware {
     /** Dummy flag. */
     private final boolean dummy;
 
@@ -1023,14 +1025,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             ", node=" + cctx.localNodeId() + "]. Dumping pending objects that might be the cause: ");
 
         try {
-            cctx.exchange().dumpDebugInfo(topologyVersion());
+            cctx.exchange().dumpDebugInfo(this);
         }
         catch (Exception e) {
             U.error(log, "Failed to dump debug information: " + e, e);
         }
-
-        if (getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
-            U.dumpThreads(log);
     }
 
     /**
@@ -2157,19 +2156,52 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /** {@inheritDoc} */
+    @Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext diagCtx) {
+        if (!isDone()) {
+            ClusterNode crd;
+            Set<UUID> remaining;
+
+            synchronized (this) {
+                crd = this.crd;
+                remaining = new HashSet<>(this.remaining);
+            }
+
+            if (crd != null) {
+                if (!crd.isLocal()) {
+                    diagCtx.exchangeInfo(crd.id(), topologyVersion(), "Exchange future waiting for coordinator " +
+                        "response [crd=" + crd.id() + ", topVer=" + topologyVersion() + ']');
+                }
+                else if (!remaining.isEmpty()){
+                    UUID nodeId = remaining.iterator().next();
+
+                    diagCtx.exchangeInfo(crd.id(), topologyVersion(), "Exchange future on coordinator waiting for " +
+                        "server response [node=" + nodeId + ", topVer=" + topologyVersion() + ']');
+                }
+            }
+        }
+    }
+
+    /**
+     * @return Short information string.
+     */
+    public String shortInfo() {
+        return "GridDhtPartitionsExchangeFuture [topVer=" + topologyVersion() +
+            ", evt=" + (discoEvt != null ? discoEvt.type() : -1) +
+            ", evtNode=" + (discoEvt != null ? discoEvt.eventNode() : null) +
+            ", done=" + isDone() + ']';
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         Set<UUID> remaining;
-        List<ClusterNode> srvNodes;
 
         synchronized (this) {
             remaining = new HashSet<>(this.remaining);
-            srvNodes = this.srvNodes != null ? new ArrayList<>(this.srvNodes) : null;
         }
 
         return S.toString(GridDhtPartitionsExchangeFuture.class, this,
             "evtLatch", evtLatch == null ? "null" : evtLatch.getCount(),
             "remaining", remaining,
-            "srvNodes", srvNodes,
             "super", super.toString());
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
index 542ee43..ed83650 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java
@@ -24,25 +24,47 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Timer;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteDiagnosticInfo;
+import org.apache.ignite.internal.IgniteDiagnosticMessage;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.IgniteProperties;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.IgniteClusterImpl;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.util.GridTimerTask;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_DIAGNOSTIC_ENABLED;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CLUSTER_PROC;
+import static org.apache.ignite.internal.GridTopic.TOPIC_INTERNAL_DIAGNOSTIC;
 import static org.apache.ignite.internal.IgniteVersionUtils.VER_STR;
 
 /**
@@ -72,6 +94,13 @@ public class ClusterProcessor extends GridProcessorAdapter {
     @GridToStringExclude
     private GridUpdateNotifier verChecker;
 
+    /** */
+    private final AtomicReference<ConcurrentHashMap<Long, InternalDiagnosticFuture>> diagnosticFutMap =
+        new AtomicReference<>();
+
+    /** */
+    private final AtomicLong diagFutId = new AtomicLong();
+
     /**
      * @param ctx Kernal context.
      */
@@ -82,6 +111,135 @@ public class ClusterProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @return Diagnostic flag.
+     */
+    public boolean diagnosticEnabled() {
+        return getBoolean(IGNITE_DIAGNOSTIC_ENABLED, true);
+    }
+
+    /** */
+    private final JdkMarshaller marsh = new JdkMarshaller();
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void initDiagnosticListeners() throws IgniteCheckedException {
+        ctx.event().addLocalEventListener(new GridLocalEventListener() {
+                @Override public void onEvent(Event evt) {
+                    assert evt instanceof DiscoveryEvent;
+                    assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+
+                    DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+                    UUID nodeId = discoEvt.eventNode().id();
+
+                    ConcurrentHashMap<Long, InternalDiagnosticFuture> futs = diagnosticFutMap.get();
+
+                    if (futs != null) {
+                        for (InternalDiagnosticFuture fut : futs.values()) {
+                            if (fut.nodeId.equals(nodeId))
+                                fut.onDone(new IgniteDiagnosticInfo("Target node failed: " + nodeId));
+                        }
+                    }
+                }
+            },
+            EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+        ctx.io().addMessageListener(TOPIC_INTERNAL_DIAGNOSTIC, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                if (msg instanceof IgniteDiagnosticMessage) {
+                    IgniteDiagnosticMessage msg0 = (IgniteDiagnosticMessage)msg;
+
+                    if (msg0.request()) {
+                        ClusterNode node = ctx.discovery().node(nodeId);
+
+                        if (node == null) {
+                            if (diagnosticLog.isDebugEnabled()) {
+                                diagnosticLog.debug("Skip diagnostic request, sender node left " +
+                                    "[node=" + nodeId + ", msg=" + msg + ']');
+                            }
+
+                            return;
+                        }
+
+                        byte[] diagRes;
+
+                        IgniteClosure<GridKernalContext, IgniteDiagnosticInfo> c;
+
+                        try {
+                            c = msg0.unmarshal(marsh);
+
+                            diagRes = marsh.marshal(c.apply(ctx));
+                        }
+                        catch (Exception e) {
+                            U.error(diagnosticLog, "Failed to run diagnostic closure: " + e, e);
+
+                            try {
+                                IgniteDiagnosticInfo errInfo =
+                                    new IgniteDiagnosticInfo("Failed to run diagnostic closure: " + e);
+
+                                diagRes = marsh.marshal(errInfo);
+                            }
+                            catch (Exception e0) {
+                                U.error(diagnosticLog, "Failed to marshal diagnostic closure result: " + e, e);
+
+                                diagRes = null;
+                            }
+                        }
+
+                        IgniteDiagnosticMessage res = IgniteDiagnosticMessage.createResponse(diagRes, msg0.futureId());
+
+                        try {
+                            ctx.io().sendToGridTopic(node, TOPIC_INTERNAL_DIAGNOSTIC, res, GridIoPolicy.SYSTEM_POOL);
+                        }
+                        catch (ClusterTopologyCheckedException e) {
+                            if (diagnosticLog.isDebugEnabled()) {
+                                diagnosticLog.debug("Failed to send diagnostic response, node left " +
+                                    "[node=" + nodeId + ", msg=" + msg + ']');
+                            }
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(diagnosticLog, "Failed to send diagnostic response [msg=" + msg0 + "]", e);
+                        }
+                    }
+                    else {
+                        InternalDiagnosticFuture fut = diagnosticFuturesMap().get(msg0.futureId());
+
+                        if (fut != null) {
+                            IgniteDiagnosticInfo res;
+
+                            try {
+                                res = msg0.unmarshal(marsh);
+
+                                if (res == null)
+                                    res = new IgniteDiagnosticInfo("Remote node failed to marshal response.");
+                            }
+                            catch (Exception e) {
+                                U.error(diagnosticLog, "Failed to unmarshal diagnostic response: " + e, e);
+
+                                res = new IgniteDiagnosticInfo("Failed to unmarshal diagnostic response: " + e);
+                            }
+
+                            fut.onResponse(res);
+                        }
+                        else
+                            U.warn(diagnosticLog, "Failed to find diagnostic message future [msg=" + msg0 + ']');
+                    }
+                }
+                else
+                    U.warn(diagnosticLog, "Received unexpected message: " + msg);
+            }
+        });
+    }
+
+    /**
+     * @return Logger for diagnostic category.
+     */
+    public IgniteLogger diagnosticLog() {
+        return diagnosticLog;
+    }
+
+    /**
      * @return Cluster.
      */
     public IgniteClusterImpl get() {
@@ -188,6 +346,7 @@ public class ClusterProcessor extends GridProcessorAdapter {
         if (verChecker != null)
             verChecker.stop();
 
+        ctx.io().removeMessageListener(TOPIC_INTERNAL_DIAGNOSTIC);
     }
 
     /**
@@ -212,6 +371,103 @@ public class ClusterProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Sends diagnostic message closure to remote node. When response received dumps
+     * remote message and local communication info about connection(s) with remote node.
+     *
+     * @param nodeId Target node ID.
+     * @param c Closure to send.
+     * @param baseMsg Local message to log.
+     * @return Message future.
+     */
+    public IgniteInternalFuture<String> requestDiagnosticInfo(final UUID nodeId,
+        IgniteClosure<GridKernalContext, IgniteDiagnosticInfo> c,
+        final String baseMsg) {
+        final GridFutureAdapter<String> infoFut = new GridFutureAdapter<>();
+
+        final IgniteInternalFuture<IgniteDiagnosticInfo> rmtFut = sendDiagnosticMessage(nodeId, c);
+
+        rmtFut.listen(new CI1<IgniteInternalFuture<IgniteDiagnosticInfo>>() {
+            @Override public void apply(IgniteInternalFuture<IgniteDiagnosticInfo> fut) {
+                String rmtMsg;
+
+                try {
+                    rmtMsg = fut.get().message();
+                }
+                catch (Exception e) {
+                    rmtMsg = "Diagnostic processing error: " + e;
+                }
+
+                final String rmtMsg0 = rmtMsg;
+
+                IgniteInternalFuture<String> locFut = IgniteDiagnosticMessage.dumpCommunicationInfo(ctx, nodeId);
+
+                locFut.listen(new CI1<IgniteInternalFuture<String>>() {
+                    @Override public void apply(IgniteInternalFuture<String> locFut) {
+                        String locMsg;
+
+                        try {
+                            locMsg = locFut.get();
+                        }
+                        catch (Exception e) {
+                            locMsg = "Failed to get info for local node: " + e;
+                        }
+
+                        String msg = baseMsg + U.nl() +
+                            "Remote node information:" + U.nl() + rmtMsg0 +
+                            U.nl() + "Local communication statistics:" + U.nl() +
+                            locMsg;
+
+                        infoFut.onDone(msg);
+                    }
+                });
+            }
+        });
+
+        return infoFut;
+    }
+
+    /**
+     * @param nodeId Target node ID.
+     * @param c Message closure.
+     * @return Message future.
+     */
+    private IgniteInternalFuture<IgniteDiagnosticInfo> sendDiagnosticMessage(UUID nodeId,
+        IgniteClosure<GridKernalContext, IgniteDiagnosticInfo> c) {
+        try {
+            IgniteDiagnosticMessage msg = IgniteDiagnosticMessage.createRequest(marsh,
+                c,
+                diagFutId.getAndIncrement());
+
+            InternalDiagnosticFuture fut = new InternalDiagnosticFuture(nodeId, msg.futureId());
+
+            diagnosticFuturesMap().put(msg.futureId(), fut);
+
+            ctx.io().sendToGridTopic(nodeId, TOPIC_INTERNAL_DIAGNOSTIC, msg, GridIoPolicy.SYSTEM_POOL);
+
+            return fut;
+        }
+        catch (Exception e) {
+            U.error(diagnosticLog, "Failed to send diagnostic message: " + e);
+
+            return new GridFinishedFuture<>(new IgniteDiagnosticInfo("Failed to send diagnostic message: " + e));
+        }
+    }
+
+    /**
+     * @return Diagnostic messages futures map.
+     */
+    private ConcurrentHashMap<Long, InternalDiagnosticFuture> diagnosticFuturesMap() {
+        ConcurrentHashMap<Long, InternalDiagnosticFuture> map = diagnosticFutMap.get();
+
+        if (map == null) {
+            if (!diagnosticFutMap.compareAndSet(null, map = new ConcurrentHashMap<>()))
+                map = diagnosticFutMap.get();
+        }
+
+        return map;
+    }
+
+    /**
      * Update notifier timer task.
      */
     private static class UpdateNotifierTimerTask extends GridTimerTask {
@@ -279,4 +535,47 @@ public class ClusterProcessor extends GridProcessorAdapter {
             }
         }
     }
+
+    /**
+     *
+     */
+    class InternalDiagnosticFuture extends GridFutureAdapter<IgniteDiagnosticInfo> {
+        /** */
+        private final long id;
+
+        /** */
+        private final UUID nodeId;
+
+        /**
+         * @param nodeId Target node ID.
+         * @param id Future ID.
+         */
+        InternalDiagnosticFuture(UUID nodeId, long id) {
+            this.nodeId = nodeId;
+            this.id = id;
+        }
+
+        /**
+         * @param res Response.
+         */
+        public void onResponse(IgniteDiagnosticInfo res) {
+            onDone(res);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable IgniteDiagnosticInfo res, @Nullable Throwable err) {
+            if (super.onDone(res, err)) {
+                diagnosticFuturesMap().remove(id);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(InternalDiagnosticFuture.class, this);
+        }
+    }
 }
\ No newline at end of file


Mime
View raw message