ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [56/62] [abbrv] ignite git commit: ignite-5155 Added possibility to receive diagnostic information from remote nodes for hanging cache futures
Date Mon, 12 Jun 2017 15:34:10 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 66993ea..3591703 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -54,7 +54,9 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.ConnectorConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -66,6 +68,7 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -704,14 +707,57 @@ public class GridNioServer<T> {
     }
 
     /**
-     *
+     * @return Future.
      */
-    public void dumpStats() {
-        U.warn(log, "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get() +
-            ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']');
+    public IgniteInternalFuture<String> dumpStats() {
+        String msg = "NIO server statistics [readerSesBalanceCnt=" + readerMoveCnt.get()
+
+            ", writerSesBalanceCnt=" + writerMoveCnt.get() + ']';
 
-        for (int i = 0; i < clientWorkers.size(); i++)
-            clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS));
+        return dumpStats(msg, null);
+    }
+
+    /**
+     * @param msg Message to add.
+     * @param p Session predicate.
+     * @return Future.
+     */
+    public IgniteInternalFuture<String> dumpStats(final String msg, IgnitePredicate<GridNioSession>
p) {
+        GridCompoundFuture<String, String> fut = new GridCompoundFuture<>(new
IgniteReducer<String, String>() {
+            private final StringBuilder sb = new StringBuilder(msg);
+
+            @Override public boolean collect(@Nullable String msg) {
+                if (!F.isEmpty(msg)) {
+                    synchronized (sb) {
+                        if (sb.length() > 0)
+                            sb.append(U.nl());
+
+                        sb.append(msg);
+                    }
+                }
+
+                return true;
+            }
+
+            @Override public String reduce() {
+                synchronized (sb) {
+                    return sb.toString();
+                }
+            }
+        });
+
+        for (int i = 0; i < clientWorkers.size(); i++) {
+            NioOperationFuture<String> opFut = new NioOperationFuture<>(null,
NioOperation.DUMP_STATS);
+
+            opFut.msg = p;
+
+            clientWorkers.get(i).offer(opFut);
+
+            fut.add(opFut);
+        }
+
+        fut.markInitialized();
+
+        return fut;
     }
 
     /**
@@ -1812,12 +1858,16 @@ public class GridNioServer<T> {
                             case DUMP_STATS: {
                                 NioOperationFuture req = (NioOperationFuture)req0;
 
+                                IgnitePredicate<GridNioSession> p =
+                                    req.msg instanceof IgnitePredicate ? (IgnitePredicate<GridNioSession>)req.msg
: null;
+
+                                StringBuilder sb = new StringBuilder();
+
                                 try {
-                                    dumpStats();
+                                    dumpStats(sb, p, p!= null);
                                 }
                                 finally {
-                                    // Complete the request just in case (none should wait
on this future).
-                                    req.onDone(true);
+                                    req.onDone(sb.toString());
                                 }
                             }
                         }
@@ -1925,80 +1975,126 @@ public class GridNioServer<T> {
         }
 
         /**
-         *
+         * @param sb Message builder.
+         * @param keys Keys.
          */
-        private void dumpStats() {
-            StringBuilder sb = new StringBuilder();
-
-            Set<SelectionKey> keys = selector.keys();
-
-            sb.append(U.nl())
-                .append(">> Selector info [idx=").append(idx)
+        private void dumpSelectorInfo(StringBuilder sb, Set<SelectionKey> keys) {
+            sb.append(">> Selector info [idx=").append(idx)
                 .append(", keysCnt=").append(keys.size())
                 .append(", bytesRcvd=").append(bytesRcvd)
                 .append(", bytesRcvd0=").append(bytesRcvd0)
                 .append(", bytesSent=").append(bytesSent)
                 .append(", bytesSent0=").append(bytesSent0)
                 .append("]").append(U.nl());
+        }
+
+        /**
+         * @param sb Message builder.
+         * @param p Optional session predicate.
+         * @param shortInfo Short info flag.
+         */
+        private void dumpStats(StringBuilder sb,
+            @Nullable IgnitePredicate<GridNioSession> p,
+            boolean shortInfo) {
+            Set<SelectionKey> keys = selector.keys();
+
+            boolean selInfo = p == null;
+
+            if (selInfo)
+                dumpSelectorInfo(sb, keys);
 
             for (SelectionKey key : keys) {
                 GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
 
-                MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
-                MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
+                boolean sesInfo = p == null || p.apply(ses);
 
-                sb.append("    Connection info [")
-                    .append("in=").append(ses.accepted())
-                    .append(", rmtAddr=").append(ses.remoteAddress())
-                    .append(", locAddr=").append(ses.localAddress());
+                if (sesInfo) {
+                    if (!selInfo) {
+                        dumpSelectorInfo(sb, keys);
 
-                GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
+                        selInfo = true;
+                    }
 
-                if (outDesc != null) {
-                    sb.append(", msgsSent=").append(outDesc.sent())
-                        .append(", msgsAckedByRmt=").append(outDesc.acked())
-                        .append(", descIdHash=").append(System.identityHashCode(outDesc));
-                }
-                else
-                    sb.append(", outRecoveryDesc=null");
+                    sb.append("    Connection info [")
+                        .append("in=").append(ses.accepted())
+                        .append(", rmtAddr=").append(ses.remoteAddress())
+                        .append(", locAddr=").append(ses.localAddress());
 
-                GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
+                    GridNioRecoveryDescriptor outDesc = ses.outRecoveryDescriptor();
 
-                if (inDesc != null) {
-                    sb.append(", msgsRcvd=").append(inDesc.received())
-                        .append(", lastAcked=").append(inDesc.lastAcknowledged())
-                        .append(", descIdHash=").append(System.identityHashCode(inDesc));
-                }
-                else
-                    sb.append(", inRecoveryDesc=null");
+                    if (outDesc != null) {
+                        sb.append(", msgsSent=").append(outDesc.sent())
+                            .append(", msgsAckedByRmt=").append(outDesc.acked())
+                            .append(", descIdHash=").append(System.identityHashCode(outDesc));
 
-                sb.append(", bytesRcvd=").append(ses.bytesReceived())
-                    .append(", bytesRcvd0=").append(ses.bytesReceived0())
-                    .append(", bytesSent=").append(ses.bytesSent())
-                    .append(", bytesSent0=").append(ses.bytesSent0())
-                    .append(", opQueueSize=").append(ses.writeQueueSize())
-                    .append(", msgWriter=").append(writer != null ? writer.toString() : "null")
-                    .append(", msgReader=").append(reader != null ? reader.toString() : "null");
+                        if (!outDesc.messagesRequests().isEmpty()) {
+                            int cnt = 0;
 
-                int cnt = 0;
+                            sb.append(", unackedMsgs=[");
 
-                for (SessionWriteRequest req : ses.writeQueue()) {
-                    if (cnt == 0)
-                        sb.append(",\n opQueue=[").append(req);
+                            for (SessionWriteRequest req : outDesc.messagesRequests()) {
+                                if (cnt != 0)
+                                    sb.append(", ");
+
+                                Object msg = req.message();
+
+                                if (shortInfo && msg instanceof GridIoMessage)
+                                    msg = ((GridIoMessage)msg).message().getClass().getSimpleName();
+
+                                sb.append(msg);
+
+                                if (++cnt == 5)
+                                    break;
+                            }
+
+                            sb.append(']');
+                        }
+                    }
                     else
-                        sb.append(',').append(req);
+                        sb.append(", outRecoveryDesc=null");
 
-                    if (++cnt == 5) {
-                        sb.append(']');
+                    GridNioRecoveryDescriptor inDesc = ses.inRecoveryDescriptor();
 
-                        break;
+                    if (inDesc != null) {
+                        sb.append(", msgsRcvd=").append(inDesc.received())
+                            .append(", lastAcked=").append(inDesc.lastAcknowledged())
+                            .append(", descIdHash=").append(System.identityHashCode(inDesc));
                     }
-                }
+                    else
+                        sb.append(", inRecoveryDesc=null");
 
-                sb.append("]").append(U.nl());
-            }
+                    sb.append(", bytesRcvd=").append(ses.bytesReceived())
+                        .append(", bytesRcvd0=").append(ses.bytesReceived0())
+                        .append(", bytesSent=").append(ses.bytesSent())
+                        .append(", bytesSent0=").append(ses.bytesSent0())
+                        .append(", opQueueSize=").append(ses.writeQueueSize());
+
+                    if (!shortInfo) {
+                        MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+                        MessageReader reader = ses.meta(GridDirectParser.READER_META_KEY);
+
+                        sb.append(", msgWriter=").append(writer != null ? writer.toString()
: "null")
+                            .append(", msgReader=").append(reader != null ? reader.toString()
: "null");
+                    }
+
+                    int cnt = 0;
 
-            U.warn(log, sb.toString());
+                    for (SessionWriteRequest req : ses.writeQueue()) {
+                        if (cnt == 0)
+                            sb.append(",\n opQueue=[").append(req);
+                        else
+                            sb.append(',').append(req);
+
+                        if (++cnt == 5) {
+                            sb.append(']');
+
+                            break;
+                        }
+                    }
+
+                    sb.append("]");
+                }
+            }
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2e35c6e..5d74a80 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -46,7 +46,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLException;
 import org.apache.ignite.Ignite;
@@ -67,6 +66,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.ipc.IpcEndpoint;
 import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
@@ -93,6 +93,7 @@ import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient;
 import org.apache.ignite.internal.util.nio.ssl.BlockingSslHandler;
 import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
 import org.apache.ignite.internal.util.nio.ssl.GridSslMeta;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -934,6 +935,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
     @LoggerResource
     private IgniteLogger log;
 
+    /** Logger. */
+    @LoggerResource(categoryName = "org.apache.ignite.internal.diagnostic")
+    private IgniteLogger diagnosticLog;
+
     /** Local IP address. */
     private String locAddr;
 
@@ -1764,83 +1769,141 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
     }
 
     /**
+     * @param nodeId Target node ID.
+     * @return Future.
+     */
+    public IgniteInternalFuture<String> dumpNodeStatistics(final UUID nodeId) {
+        StringBuilder sb = new StringBuilder("Communication SPI statistics [rmtNode=").append(nodeId).append(']').append(U.nl());
+
+        dumpInfo(sb, nodeId);
+
+        GridNioServer<Message> nioSrvr = this.nioSrvr;
+
+        if (nioSrvr != null) {
+            sb.append("NIO sessions statistics:");
+
+            IgnitePredicate<GridNioSession> p = new IgnitePredicate<GridNioSession>()
{
+                @Override public boolean apply(GridNioSession ses) {
+                    ConnectionKey connId = ses.meta(CONN_IDX_META);
+
+                    return connId != null && nodeId.equals(connId.nodeId());
+                }
+            };
+
+            return nioSrvr.dumpStats(sb.toString(), p);
+        }
+        else {
+            sb.append(U.nl()).append("GridNioServer is null.");
+
+            return new GridFinishedFuture<>(sb.toString());
+        }
+    }
+
+    /**
      * Dumps SPI per-connection stats to logs.
      */
     public void dumpStats() {
-        IgniteLogger log = this.log;
+        final IgniteLogger log = this.diagnosticLog;
 
         if (log != null) {
-            StringBuilder sb = new StringBuilder("Communication SPI recovery descriptors:
").append(U.nl());
-
-            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet())
{
-                GridNioRecoveryDescriptor desc = entry.getValue();
-
-                sb.append("    [key=").append(entry.getKey())
-                    .append(", msgsSent=").append(desc.sent())
-                    .append(", msgsAckedByRmt=").append(desc.acked())
-                    .append(", msgsRcvd=").append(desc.received())
-                    .append(", lastAcked=").append(desc.lastAcknowledged())
-                    .append(", reserveCnt=").append(desc.reserveCount())
-                    .append(", descIdHash=").append(System.identityHashCode(desc))
-                    .append(']').append(U.nl());
-            }
-
-            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet())
{
-                GridNioRecoveryDescriptor desc = entry.getValue();
-
-                sb.append("    [key=").append(entry.getKey())
-                    .append(", msgsSent=").append(desc.sent())
-                    .append(", msgsAckedByRmt=").append(desc.acked())
-                    .append(", reserveCnt=").append(desc.reserveCount())
-                    .append(", connected=").append(desc.connected())
-                    .append(", reserved=").append(desc.reserved())
-                    .append(", descIdHash=").append(System.identityHashCode(desc))
-                    .append(']').append(U.nl());
-            }
+            StringBuilder sb = new StringBuilder();
 
-            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet())
{
-                GridNioRecoveryDescriptor desc = entry.getValue();
-
-                sb.append("    [key=").append(entry.getKey())
-                    .append(", msgsRcvd=").append(desc.received())
-                    .append(", lastAcked=").append(desc.lastAcknowledged())
-                    .append(", reserveCnt=").append(desc.reserveCount())
-                    .append(", connected=").append(desc.connected())
-                    .append(", reserved=").append(desc.reserved())
-                    .append(", handshakeIdx=").append(desc.handshakeIndex())
-                    .append(", descIdHash=").append(System.identityHashCode(desc))
-                    .append(']').append(U.nl());
-            }
+            dumpInfo(sb, null);
 
-            sb.append("Communication SPI clients: ").append(U.nl());
+            U.warn(log, sb.toString());
 
-            for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet())
{
-                UUID nodeId = entry.getKey();
-                GridCommunicationClient[] clients0 = entry.getValue();
+            GridNioServer<Message> nioSrvr = this.nioSrvr;
 
-                for (GridCommunicationClient client : clients0) {
-                    if (client != null) {
-                        sb.append("    [node=").append(nodeId)
-                            .append(", client=").append(client)
-                            .append(']').append(U.nl());
+            if (nioSrvr != null) {
+                nioSrvr.dumpStats().listen(new CI1<IgniteInternalFuture<String>>()
{
+                    @Override public void apply(IgniteInternalFuture<String> fut) {
+                        try {
+                            U.warn(log, fut.get());
+                        }
+                        catch (Exception e) {
+                            U.error(log, "Failed to dump NIO server statistics: " + e, e);
+                        }
                     }
-                }
+                });
             }
+        }
+    }
 
-            U.warn(log, sb.toString());
+    /**
+     * @param sb Message builder.
+     * @param dstNodeId Target node ID.
+     */
+    private void dumpInfo(StringBuilder sb, UUID dstNodeId) {
+        sb.append("Communication SPI recovery descriptors: ").append(U.nl());
+
+        for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : recoveryDescs.entrySet())
{
+            GridNioRecoveryDescriptor desc = entry.getValue();
+
+            if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
+                continue;
+
+            sb.append("    [key=").append(entry.getKey())
+                .append(", msgsSent=").append(desc.sent())
+                .append(", msgsAckedByRmt=").append(desc.acked())
+                .append(", msgsRcvd=").append(desc.received())
+                .append(", lastAcked=").append(desc.lastAcknowledged())
+                .append(", reserveCnt=").append(desc.reserveCount())
+                .append(", descIdHash=").append(System.identityHashCode(desc))
+                .append(']').append(U.nl());
         }
 
-        GridNioServer<Message> nioSrvr = this.nioSrvr;
+        for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : outRecDescs.entrySet())
{
+            GridNioRecoveryDescriptor desc = entry.getValue();
 
-        if (nioSrvr != null)
-            nioSrvr.dumpStats();
-    }
+            if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
+                continue;
 
-    /** */
-    private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>();
+            sb.append("    [key=").append(entry.getKey())
+                .append(", msgsSent=").append(desc.sent())
+                .append(", msgsAckedByRmt=").append(desc.acked())
+                .append(", reserveCnt=").append(desc.reserveCount())
+                .append(", connected=").append(desc.connected())
+                .append(", reserved=").append(desc.reserved())
+                .append(", descIdHash=").append(System.identityHashCode(desc))
+                .append(']').append(U.nl());
+        }
 
-    /** */
-    private final AtomicInteger connIdx = new AtomicInteger();
+        for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : inRecDescs.entrySet())
{
+            GridNioRecoveryDescriptor desc = entry.getValue();
+
+            if (dstNodeId != null && !dstNodeId.equals(entry.getKey().nodeId()))
+                continue;
+
+            sb.append("    [key=").append(entry.getKey())
+                .append(", msgsRcvd=").append(desc.received())
+                .append(", lastAcked=").append(desc.lastAcknowledged())
+                .append(", reserveCnt=").append(desc.reserveCount())
+                .append(", connected=").append(desc.connected())
+                .append(", reserved=").append(desc.reserved())
+                .append(", handshakeIdx=").append(desc.handshakeIndex())
+                .append(", descIdHash=").append(System.identityHashCode(desc))
+                .append(']').append(U.nl());
+        }
+
+        sb.append("Communication SPI clients: ").append(U.nl());
+
+        for (Map.Entry<UUID, GridCommunicationClient[]> entry : clients.entrySet())
{
+            UUID clientNodeId = entry.getKey();
+
+            if (dstNodeId != null && !dstNodeId.equals(clientNodeId))
+                continue;
+
+            GridCommunicationClient[] clients0 = entry.getValue();
+
+            for (GridCommunicationClient client : clients0) {
+                if (client != null) {
+                    sb.append("    [node=").append(clientNodeId)
+                        .append(", client=").append(client)
+                        .append(']').append(U.nl());
+                }
+            }
+        }
+    }
 
     /** {@inheritDoc} */
     @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException
{

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
new file mode 100644
index 0000000..08dbc66
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/IgniteDiagnosticMessagesTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridStringLogger;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteDiagnosticMessagesTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private Integer connectionsPerNode;
+
+    /** */
+    private boolean testSpi;
+
+    /** */
+    private GridStringLogger strLog;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        if (testSpi)
+            cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        if (connectionsPerNode != null)
+            ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setConnectionsPerNode(connectionsPerNode);
+
+        cfg.setClientMode(client);
+
+        if (strLog != null) {
+            cfg.setGridLogger(strLog);
+
+            strLog = null;
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDiagnosticMessages1() throws Exception {
+        checkBasicDiagnosticInfo();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDiagnosticMessages2() throws Exception {
+        connectionsPerNode = 5;
+
+        checkBasicDiagnosticInfo();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLongRunning() throws Exception {
+        System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, "3500");
+
+        try {
+            testSpi = true;
+
+            startGrid(0);
+
+            GridStringLogger strLog = this.strLog = new GridStringLogger();
+
+            startGrid(1);
+
+            awaitPartitionMapExchange();
+
+            CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+            ccfg.setWriteSynchronizationMode(FULL_SYNC);
+            ccfg.setCacheMode(PARTITIONED);
+            ccfg.setAtomicityMode(TRANSACTIONAL);
+
+            final Ignite node0 = ignite(0);
+
+            node0.createCache(ccfg);
+
+            final Ignite node1 = ignite(1);
+
+            UUID id0 = node0.cluster().localNode().id();
+            UUID id1 = node1.cluster().localNode().id();
+
+            TestRecordingCommunicationSpi.spi(node0).blockMessages(GridNearSingleGetResponse.class,
node1.name());
+
+            IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>()
{
+                @Override public Void call() throws Exception {
+                    Integer key = primaryKey(node0.cache(DEFAULT_CACHE_NAME));
+
+                    node1.cache(DEFAULT_CACHE_NAME).get(key);
+
+                    return null;
+                }
+            }, "get");
+
+            U.sleep(10_000);
+
+            assertFalse(fut.isDone());
+
+            TestRecordingCommunicationSpi.spi(node0).stopBlock();
+
+            fut.get();
+
+            String log = strLog.toString();
+
+            assertTrue(log.contains("GridPartitionedSingleGetFuture waiting for response
[node=" + id0));
+            assertTrue(log.contains("General node info [id=" + id0));
+        }
+        finally {
+            System.clearProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkBasicDiagnosticInfo() throws Exception {
+        startGrids(3);
+
+        client = true;
+
+        startGrid(3);
+
+        startGrid(4);
+
+        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setCacheMode(REPLICATED);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+
+        ignite(0).createCache(ccfg);
+
+        awaitPartitionMapExchange();
+
+        sendDiagnostic();
+
+        for (int i = 0; i < 5; i++) {
+            final IgniteCache<Object, Object> cache = ignite(i).cache(DEFAULT_CACHE_NAME);
+
+            // Put from multiple threads to create multiple connections.
+            GridTestUtils.runMultiThreaded(new Runnable() {
+                @Override public void run() {
+                    for (int j = 0; j < 10; j++)
+                        cache.put(ThreadLocalRandom.current().nextInt(), j);
+                }
+            }, 10, "cache-thread");
+        }
+
+        sendDiagnostic();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void sendDiagnostic() throws Exception {
+        for (int i = 0; i < 5; i++) {
+            IgniteKernal node = (IgniteKernal)ignite(i);
+
+            for (int j = 0; j < 5; j++) {
+                if (i != j) {
+                    ClusterNode dstNode = ignite(j).cluster().localNode();
+
+                    final GridFutureAdapter<String> fut = new GridFutureAdapter<>();
+
+                    IgniteDiagnosticPrepareContext ctx = new IgniteDiagnosticPrepareContext(node.getLocalNodeId());
+
+                    ctx.basicInfo(dstNode.id(), "Test diagnostic");
+
+                    ctx.send(node.context(), new IgniteInClosure<IgniteInternalFuture<String>>()
{
+                        @Override public void apply(IgniteInternalFuture<String> diagFut)
{
+                            try {
+                                fut.onDone(diagFut.get());
+                            }
+                            catch (Exception e) {
+                                fut.onDone(e);
+                            }
+                        }
+                    });
+
+                    String msg = fut.get();
+
+                    assertTrue("Unexpected message: " + msg,
+                        msg.contains("Test diagnostic") &&
+                            msg.contains("General node info [id=" + dstNode.id() + ", client="
+ dstNode.isClient() + ", discoTopVer=AffinityTopologyVersion [topVer=5, minorTopVer=") &&
+                            msg.contains("Partitions exchange info [readyVer=AffinityTopologyVersion
[topVer=5, minorTopVer="));
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5d98ccec/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 3305058..140b1a5 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.GridStopWithCancelSelfTest;
 import org.apache.ignite.internal.IgniteLocalNodeMapBeforeStartTest;
 import org.apache.ignite.internal.IgniteSlowClientDetectionSelfTest;
 import org.apache.ignite.internal.MarshallerContextLockingSelfTest;
+import org.apache.ignite.internal.managers.IgniteDiagnosticMessagesTest;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorRendezvousSelfTest;
 import org.apache.ignite.internal.processors.cache.GridLocalIgniteSerializationTest;
 import org.apache.ignite.internal.processors.cache.GridProjectionForCachesOnDaemonNodeSelfTest;
@@ -52,8 +53,8 @@ import org.apache.ignite.internal.processors.database.BPlusTreeSelfTest;
 import org.apache.ignite.internal.processors.database.FreeListImplSelfTest;
 import org.apache.ignite.internal.processors.database.MemoryMetricsSelfTest;
 import org.apache.ignite.internal.processors.database.MetadataStorageSelfTest;
-import org.apache.ignite.internal.processors.odbc.OdbcEscapeSequenceSelfTest;
 import org.apache.ignite.internal.processors.odbc.OdbcConfigurationValidationSelfTest;
+import org.apache.ignite.internal.processors.odbc.OdbcEscapeSequenceSelfTest;
 import org.apache.ignite.internal.processors.service.ClosureServiceClientsNodesTest;
 import org.apache.ignite.internal.product.GridProductVersionSelfTest;
 import org.apache.ignite.internal.util.nio.IgniteExceptionInNioWorkerSelfTest;
@@ -173,6 +174,9 @@ public class IgniteBasicTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteMarshallerCacheClassNameConflictTest.class);
         suite.addTestSuite(IgniteMarshallerCacheClientRequestsMappingOnMissTest.class);
+
+        suite.addTestSuite(IgniteDiagnosticMessagesTest.class);
+
         return suite;
     }
 }


Mime
View raw message