ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [05/15] ignite git commit: IGNITE-6868 Implement new JMX metrics for TcpCommunicationSpi monitoring
Date Wed, 22 Nov 2017 10:23:21 GMT
IGNITE-6868 Implement new JMX metrics for TcpCommunicationSpi monitoring

Signed-off-by: Anton Vinogradov <av@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58b50413
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58b50413
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58b50413

Branch: refs/heads/ignite-zk
Commit: 58b50413622e0059f889f5df062a0d0169d0456f
Parents: 0295518
Author: Aleksey Plekhanov <Plehanov.Alex@gmail.com>
Authored: Tue Nov 21 13:56:34 2017 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Tue Nov 21 13:56:34 2017 +0300

----------------------------------------------------------------------
 .../tcp/TcpCommunicationMetricsListener.java    | 225 +++++++++++++++++++
 .../communication/tcp/TcpCommunicationSpi.java  | 101 ++++++---
 .../tcp/TcpCommunicationSpiMBean.java           |  33 +++
 .../tcp/TcpCommunicationStatisticsTest.java     | 201 +++++++++++++++++
 .../IgniteSpiCommunicationSelfTestSuite.java    |   3 +
 5 files changed, 527 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/58b50413/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
new file mode 100644
index 0000000..8981e17
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationMetricsListener.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.util.nio.GridNioMetricsListener;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jsr166.LongAdder8;
+
+/**
+ * Statistics for {@link org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi}.
+ */
+public class TcpCommunicationMetricsListener implements GridNioMetricsListener{
+    /** Received messages count. */
+    private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
+
+    /** Sent messages count.*/
+    private final LongAdder8 sentMsgsCnt = new LongAdder8();
+
+    /** Received bytes count. */
+    private final LongAdder8 rcvdBytesCnt = new LongAdder8();
+
+    /** Sent bytes count.*/
+    private final LongAdder8 sentBytesCnt = new LongAdder8();
+
+    /** Counter factory. */
+    private static final Callable<LongAdder8> LONG_ADDER_FACTORY = new Callable<LongAdder8>()
{
+        @Override public LongAdder8 call() {
+            return new LongAdder8();
+        }
+    };
+
+    /** Received messages count grouped by message type. */
+    private final ConcurrentMap<String, LongAdder8> rcvdMsgsCntByType = new ConcurrentHashMap<>();
+
+    /** Received messages count grouped by sender. */
+    private final ConcurrentMap<String, LongAdder8> rcvdMsgsCntByNode = new ConcurrentHashMap<>();
+
+    /** Sent messages count grouped by message type. */
+    private final ConcurrentMap<String, LongAdder8> sentMsgsCntByType = new ConcurrentHashMap<>();
+
+    /** Sent messages count grouped by receiver. */
+    private final ConcurrentMap<String, LongAdder8> sentMsgsCntByNode = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override public void onBytesSent(int bytesCnt) {
+        sentBytesCnt.add(bytesCnt);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onBytesReceived(int bytesCnt) {
+        rcvdBytesCnt.add(bytesCnt);
+    }
+
+    /**
+     * Collects statistics for message sent by SPI.
+     *
+     * @param msg Sent message.
+     * @param nodeId Receiver node id.
+     */
+    public void onMessageSent(Message msg, UUID nodeId) {
+        assert msg != null;
+        assert nodeId != null;
+
+        sentMsgsCnt.increment();
+
+        if (msg instanceof GridIoMessage)
+            msg = ((GridIoMessage)msg).message();
+
+        LongAdder8 cntByType = F.addIfAbsent(sentMsgsCntByType, msg.getClass().getSimpleName(),
LONG_ADDER_FACTORY);
+        LongAdder8 cntByNode = F.addIfAbsent(sentMsgsCntByNode, nodeId.toString(), LONG_ADDER_FACTORY);
+
+        cntByType.increment();
+        cntByNode.increment();
+    }
+
+    /**
+     * Collects statistics for message received by SPI.
+     *
+     * @param msg Received message.
+     * @param nodeId Sender node id.
+     */
+    public void onMessageReceived(Message msg, UUID nodeId) {
+        assert msg != null;
+        assert nodeId != null;
+
+        rcvdMsgsCnt.increment();
+
+        if (msg instanceof GridIoMessage)
+            msg = ((GridIoMessage)msg).message();
+
+        LongAdder8 cntByType = F.addIfAbsent(rcvdMsgsCntByType, msg.getClass().getSimpleName(),
LONG_ADDER_FACTORY);
+        LongAdder8 cntByNode = F.addIfAbsent(rcvdMsgsCntByNode, nodeId.toString(), LONG_ADDER_FACTORY);
+
+        cntByType.increment();
+        cntByNode.increment();
+    }
+
+    /**
+     * Gets sent messages count.
+     *
+     * @return Sent messages count.
+     */
+    public int sentMessagesCount() {
+        return sentMsgsCnt.intValue();
+    }
+
+    /**
+     * Gets sent bytes count.
+     *
+     * @return Sent bytes count.
+     */
+    public long sentBytesCount() {
+        return sentBytesCnt.longValue();
+    }
+
+    /**
+     * Gets received messages count.
+     *
+     * @return Received messages count.
+     */
+    public int receivedMessagesCount() {
+        return rcvdMsgsCnt.intValue();
+    }
+
+    /**
+     * Gets received bytes count.
+     *
+     * @return Received bytes count.
+     */
+    public long receivedBytesCount() {
+        return rcvdBytesCnt.longValue();
+    }
+
+    /**
+     * Converts statistics from internal representation to JMX-readable format.
+     *
+     * @param srcStat Internal statistics representation.
+     * @return Result map.
+     */
+    private Map<String, Long> convertStatistics(Map<String, LongAdder8> srcStat)
{
+        Map<String, Long> destStat = U.newHashMap(srcStat.size());
+
+        for (Map.Entry<String, LongAdder8> entry : srcStat.entrySet()) {
+            destStat.put(entry.getKey(), entry.getValue().longValue());
+        }
+
+        return destStat;
+    }
+
+    /**
+     * Gets received messages counts (grouped by type).
+     *
+     * @return Map containing message types and respective counts.
+     */
+    public Map<String, Long> receivedMessagesByType() {
+        return convertStatistics(rcvdMsgsCntByType);
+    }
+
+    /**
+     * Gets received messages counts (grouped by node).
+     *
+     * @return Map containing sender nodes and respective counts.
+     */
+    public Map<String, Long> receivedMessagesByNode() {
+        return convertStatistics(rcvdMsgsCntByNode);
+    }
+
+    /**
+     * Gets sent messages counts (grouped by type).
+     *
+     * @return Map containing message types and respective counts.
+     */
+    public Map<String, Long> sentMessagesByType() {
+        return convertStatistics(sentMsgsCntByType);
+    }
+
+    /**
+     * Gets sent messages counts (grouped by node).
+     *
+     * @return Map containing receiver nodes and respective counts.
+     */
+    public Map<String, Long> sentMessagesByNode() {
+        return convertStatistics(sentMsgsCntByNode);
+    }
+
+    /**
+     * Resets metrics for this instance.
+     */
+    public void resetMetrics() {
+        // Can't use 'reset' method because it is not thread-safe
+        // according to javadoc.
+        sentMsgsCnt.add(-sentMsgsCnt.sum());
+        rcvdMsgsCnt.add(-rcvdMsgsCnt.sum());
+        sentBytesCnt.add(-sentBytesCnt.sum());
+        rcvdBytesCnt.add(-rcvdBytesCnt.sum());
+
+        sentMsgsCntByType.clear();
+        rcvdMsgsCntByType.clear();
+        sentMsgsCntByNode.clear();
+        rcvdMsgsCntByNode.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/58b50413/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 49425ce..e68797e 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -83,7 +83,6 @@ import org.apache.ignite.internal.util.nio.GridNioFilter;
 import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
 import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
 import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
-import org.apache.ignite.internal.util.nio.GridNioMetricsListener;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServer;
 import org.apache.ignite.internal.util.nio.GridNioServerListener;
@@ -138,7 +137,6 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
-import org.jsr166.LongAdder8;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -696,7 +694,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     }
                 }
                 else {
-                    rcvdMsgsCnt.increment();
+                    metricsLsnr.onMessageReceived(msg, connKey.nodeId());
 
                     if (msg instanceof RecoveryLastReceivedMessage) {
                         GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor();
@@ -1111,34 +1109,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
     /** Address resolver. */
     private AddressResolver addrRslvr;
 
-    /** Received messages count. */
-    private final LongAdder8 rcvdMsgsCnt = new LongAdder8();
-
-    /** Sent messages count.*/
-    private final LongAdder8 sentMsgsCnt = new LongAdder8();
-
-    /** Received bytes count. */
-    private final LongAdder8 rcvdBytesCnt = new LongAdder8();
-
-    /** Sent bytes count.*/
-    private final LongAdder8 sentBytesCnt = new LongAdder8();
-
     /** Context initialization latch. */
     private final CountDownLatch ctxInitLatch = new CountDownLatch(1);
 
     /** Stopping flag (set to {@code true} when SPI gets stopping signal). */
     private volatile boolean stopping;
 
-    /** metrics listener. */
-    private final GridNioMetricsListener metricsLsnr = new GridNioMetricsListener() {
-        @Override public void onBytesSent(int bytesCnt) {
-            sentBytesCnt.add(bytesCnt);
-        }
-
-        @Override public void onBytesReceived(int bytesCnt) {
-            rcvdBytesCnt.add(bytesCnt);
-        }
-    };
+    /** Statistics. */
+    private final TcpCommunicationMetricsListener metricsLsnr = new TcpCommunicationMetricsListener();
 
     /** Client connect futures. */
     private final ConcurrentMap<ConnectionKey, GridFutureAdapter<GridCommunicationClient>>
clientFuts =
@@ -1821,22 +1799,58 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
 
     /** {@inheritDoc} */
     @Override public int getSentMessagesCount() {
-        return sentMsgsCnt.intValue();
+        return metricsLsnr.sentMessagesCount();
     }
 
     /** {@inheritDoc} */
     @Override public long getSentBytesCount() {
-        return sentBytesCnt.longValue();
+        return metricsLsnr.sentBytesCount();
     }
 
     /** {@inheritDoc} */
     @Override public int getReceivedMessagesCount() {
-        return rcvdMsgsCnt.intValue();
+        return metricsLsnr.receivedMessagesCount();
     }
 
     /** {@inheritDoc} */
     @Override public long getReceivedBytesCount() {
-        return rcvdBytesCnt.longValue();
+        return metricsLsnr.receivedBytesCount();
+    }
+
+    /**
+     * Gets received messages counts (grouped by type).
+     *
+     * @return Map containing message types and respective counts.
+     */
+    public Map<String, Long> getReceivedMessagesByType() {
+        return metricsLsnr.receivedMessagesByType();
+    }
+
+    /**
+     * Gets received messages counts (grouped by node).
+     *
+     * @return Map containing sender nodes and respective counts.
+     */
+    public Map<String, Long> getReceivedMessagesByNode() {
+        return metricsLsnr.receivedMessagesByNode();
+    }
+
+    /**
+     * Gets sent messages counts (grouped by type).
+     *
+     * @return Map containing message types and respective counts.
+     */
+    public Map<String, Long> getSentMessagesByType() {
+        return metricsLsnr.sentMessagesByType();
+    }
+
+    /**
+     * Gets sent messages counts (grouped by node).
+     *
+     * @return Map containing receiver nodes and respective counts.
+     */
+    public Map<String, Long> getSentMessagesByNode() {
+        return metricsLsnr.receivedMessagesByNode();
     }
 
     /** {@inheritDoc} */
@@ -1848,12 +1862,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
 
     /** {@inheritDoc} */
     @Override public void resetMetrics() {
-        // Can't use 'reset' method because it is not thread-safe
-        // according to javadoc.
-        sentMsgsCnt.add(-sentMsgsCnt.sum());
-        rcvdMsgsCnt.add(-rcvdMsgsCnt.sum());
-        sentBytesCnt.add(-sentBytesCnt.sum());
-        rcvdBytesCnt.add(-rcvdBytesCnt.sum());
+        metricsLsnr.resetMetrics();
     }
 
     /**
@@ -2607,7 +2616,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
                     client.release();
 
                     if (!retry)
-                        sentMsgsCnt.increment();
+                        metricsLsnr.onMessageSent(msg, node.id());
                     else {
                         removeNodeClient(node.id(), client);
 
@@ -5146,6 +5155,26 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements
Communicati
         }
 
         /** {@inheritDoc} */
+        @Override public Map<String, Long> getReceivedMessagesByType() {
+            return TcpCommunicationSpi.this.metricsLsnr.receivedMessagesByType();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<String, Long> getReceivedMessagesByNode() {
+            return TcpCommunicationSpi.this.metricsLsnr.receivedMessagesByNode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<String, Long> getSentMessagesByType() {
+            return TcpCommunicationSpi.this.metricsLsnr.sentMessagesByType();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<String, Long> getSentMessagesByNode() {
+            return TcpCommunicationSpi.this.metricsLsnr.sentMessagesByNode();
+        }
+
+        /** {@inheritDoc} */
         @Override public int getOutboundMessagesQueueSize() {
             return TcpCommunicationSpi.this.getOutboundMessagesQueueSize();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/58b50413/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index 953245a..f4aba01 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.communication.tcp;
 
+import java.util.Map;
 import org.apache.ignite.mxbean.MXBeanDescription;
 import org.apache.ignite.spi.IgniteSpiManagementMBean;
 
@@ -147,6 +148,38 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean
{
     public long getReceivedBytesCount();
 
     /**
+     * Gets received messages counts (grouped by type).
+     *
+     * @return Map containing message types and respective counts.
+     */
+    @MXBeanDescription("Received messages count grouped by message type.")
+    public Map<String, Long> getReceivedMessagesByType();
+
+    /**
+     * Gets received messages counts (grouped by node).
+     *
+     * @return Map containing sender nodes and respective counts.
+     */
+    @MXBeanDescription("Received messages count grouped by sender node.")
+    public Map<String, Long> getReceivedMessagesByNode();
+
+    /**
+     * Gets sent messages counts (grouped by type).
+     *
+     * @return Map containing message types and respective counts.
+     */
+    @MXBeanDescription("Sent messages count grouped by message type.")
+    public Map<String, Long> getSentMessagesByType();
+
+    /**
+     * Gets sent messages counts (grouped by node).
+     *
+     * @return Map containing receiver nodes and respective counts.
+     */
+    @MXBeanDescription("Sent messages count grouped by receiver node.")
+    public Map<String, Long> getSentMessagesByNode();
+
+    /**
      * Gets outbound messages queue size.
      *
      * @return Outbound messages queue size.

http://git-wip-us.apache.org/repos/asf/ignite/blob/58b50413/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
new file mode 100644
index 0000000..f0a8d71
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationStatisticsTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.lang.management.ManagementFactory;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.CO;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.GridTestMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test for TcpCommunicationSpi statistics.
+ */
+public class TcpCommunicationStatisticsTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Mutex. */
+    final private Object mux = new Object();
+
+    /** */
+    final private CountDownLatch latch = new CountDownLatch(1);
+
+    static {
+        GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>()
{
+            @Override public Message apply() {
+                return new GridTestMessage();
+            }
+        });
+    }
+
+    /**
+     * CommunicationSPI synchronized by {@code mux}.
+     */
+    private class SynchronizedCommunicationSpi extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException
{
+            synchronized (mux) {
+                super.sendMessage(node, msg);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg,
+            IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            synchronized (mux) {
+                super.sendMessage(node, msg, ackC);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC)
{
+            super.notifyListener(sndId, msg, msgC);
+
+            if (msg instanceof GridTestMessage)
+                latch.countDown();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER).setForceServerMode(true));
+
+        TcpCommunicationSpi spi = new SynchronizedCommunicationSpi();
+
+        cfg.setCommunicationSpi(spi);
+
+        return cfg;
+    }
+
+    /**
+     * Gets TcpCommunicationSpiMBean for given node.
+     *
+     * @param nodeIdx Node index.
+     * @return MBean instance.
+     */
+    private TcpCommunicationSpiMBean mbean(int nodeIdx) throws MalformedObjectNameException
{
+        ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(nodeIdx), "SPIs",
+            SynchronizedCommunicationSpi.class.getSimpleName());
+
+        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+
+        if (mbeanServer.isRegistered(mbeanName))
+            return MBeanServerInvocationHandler.newProxyInstance(mbeanServer, mbeanName,
TcpCommunicationSpiMBean.class,
+                true);
+        else
+            fail("MBean is not registered: " + mbeanName.getCanonicalName());
+
+        return null;
+    }
+
+    /**
+     * Compares two maps for equality.
+     */
+    private static <K, V> boolean mapsEquals(Map<K, V> map1, Map<K, V>
map2) {
+        assert map1 != null;
+        assert map2 != null;
+
+        return map1.size() == map2.size() && map1.entrySet().containsAll(map2.entrySet());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStatistics() throws Exception {
+        startGrids(2);
+
+        try {
+            // Send custom message from node0 to node1.
+            grid(0).configuration().getCommunicationSpi().sendMessage(grid(1).cluster().localNode(),
+                new GridTestMessage());
+
+            latch.await(10, TimeUnit.SECONDS);
+
+            ClusterGroup clusterGroupNode1 = grid(0).cluster().forNodeId(grid(1).localNode().id());
+
+            // Send job from node0 to node1.
+            grid(0).compute(clusterGroupNode1).call(new IgniteCallable<Boolean>() {
+                @Override public Boolean call() throws Exception {
+                    return Boolean.TRUE;
+                }
+            });
+
+            synchronized (mux) {
+                TcpCommunicationSpiMBean mbean0 = mbean(0);
+                TcpCommunicationSpiMBean mbean1 = mbean(1);
+
+                Map<String, Long> msgsSentByNode0 = mbean0.getSentMessagesByNode();
+                Map<String, Long> msgsSentByNode1 = mbean1.getSentMessagesByNode();
+                Map<String, Long> msgsReceivedByNode0 = mbean0.getReceivedMessagesByNode();
+                Map<String, Long> msgsReceivedByNode1 = mbean1.getReceivedMessagesByNode();
+
+                String nodeId0 = grid(0).localNode().id().toString();
+                String nodeId1 = grid(1).localNode().id().toString();
+
+                assertEquals(msgsReceivedByNode0.get(nodeId1).longValue(), mbean0.getReceivedMessagesCount());
+                assertEquals(msgsReceivedByNode1.get(nodeId0).longValue(), mbean1.getReceivedMessagesCount());
+                assertEquals(msgsSentByNode0.get(nodeId1).longValue(), mbean0.getSentMessagesCount());
+                assertEquals(msgsSentByNode1.get(nodeId0).longValue(), mbean1.getSentMessagesCount());
+
+                assertEquals(mbean0.getSentMessagesCount(), mbean1.getReceivedMessagesCount());
+                assertEquals(mbean1.getSentMessagesCount(), mbean0.getReceivedMessagesCount());
+
+                Map<String, Long> msgsSentByType0 = mbean0.getSentMessagesByType();
+                Map<String, Long> msgsSentByType1 = mbean1.getSentMessagesByType();
+                Map<String, Long> msgsReceivedByType0 = mbean0.getReceivedMessagesByType();
+                Map<String, Long> msgsReceivedByType1 = mbean1.getReceivedMessagesByType();
+
+                // Node0 sent exactly the same types and count of messages as node1 received.
+                assertTrue(mapsEquals(msgsSentByType0, msgsReceivedByType1));
+
+                // Node1 sent exactly the same types and count of messages as node0 received.
+                assertTrue(mapsEquals(msgsSentByType1, msgsReceivedByType0));
+
+                assertEquals(1, msgsSentByType0.get(GridTestMessage.class.getSimpleName()).longValue());
+                assertEquals(1, msgsReceivedByType1.get(GridTestMessage.class.getSimpleName()).longValue());
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/58b50413/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 8e96a3f..7a4de1b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -39,6 +39,7 @@ import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationRecoveryAck
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiHalfOpenedConnectionTest;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationStatisticsTest;
 
 /**
  * Test suite for all communication SPIs.
@@ -81,6 +82,8 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(TcpCommunicationSpiDropNodesTest.class));
         suite.addTest(new TestSuite(TcpCommunicationSpiHalfOpenedConnectionTest.class));
 
+        suite.addTest(new TestSuite(TcpCommunicationStatisticsTest.class));
+
         return suite;
     }
 }


Mime
View raw message