ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [04/28] ignite git commit: Results printout for IO latency test and new metrics
Date Fri, 26 May 2017 09:46:43 GMT
Results printout for IO latency test and new metrics

(cherry picked)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/018b25b2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/018b25b2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/018b25b2

Branch: refs/heads/ignite-5075-cc
Commit: 018b25b29c3c491db7e44963e8c79677d77ceb23
Parents: 6f1dc3a
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Tue May 23 17:39:37 2017 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Tue May 23 17:44:33 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 380 +++++++++++++++----
 .../communication/IgniteIoTestMessage.java      | 362 +++++++++++++++++-
 2 files changed, 672 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/018b25b2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 147f94d..68bfd07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -18,9 +18,11 @@
 package org.apache.ignite.internal.managers.communication;
 
 import java.io.Serializable;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,12 +66,12 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
-import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.lang.IgnitePair;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -341,10 +343,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
                 IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg;
 
+                msg0.senderNodeId(nodeId);
+
                 if (msg0.request()) {
                     IgniteIoTestMessage res = new IgniteIoTestMessage(msg0.id(), false, null);
 
                     res.flags(msg0.flags());
+                    res.onRequestProcessed();
+
+                    res.copyDataFromRequest(msg0);
 
                     try {
                         sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
@@ -356,10 +363,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 else {
                     IoTestFuture fut = ioTestMap().get(msg0.id());
 
+                    msg0.onResponseProcessed();
+
                     if (fut == null)
                         U.warn(log, "Failed to find IO test future [msg=" + msg0 + ']');
                     else
-                        fut.onResponse();
+                        fut.onResponse(msg0);
                 }
             }
         });
@@ -404,7 +413,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param procFromNioThread If {@code true} message is processed from NIO thread.
      * @return Response future.
      */
-    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread)
{
+    public IgniteInternalFuture<List<IgniteIoTestMessage>> sendIoTest(
+        ClusterNode node,
+        byte[] payload,
+        boolean procFromNioThread
+    ) {
         long id = ioTestId.getAndIncrement();
 
         IoTestFuture fut = new IoTestFuture(id, 1);
@@ -445,7 +458,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param warmup Warmup duration in milliseconds.
      * @param duration Test duration in milliseconds.
      * @param threads Thread count.
-     * @param maxLatency Max latency in nanoseconds.
+     * @param latencyLimit Max latency in nanoseconds.
      * @param rangesCnt Ranges count in resulting histogram.
      * @param payLoadSize Payload size in bytes.
      * @param procFromNioThread {@code True} to process requests in NIO threads.
@@ -455,7 +468,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         final long warmup,
         final long duration,
         final int threads,
-        final long maxLatency,
+        final long latencyLimit,
         final int rangesCnt,
         final int payLoadSize,
         final boolean procFromNioThread,
@@ -469,8 +482,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         final LongAdder8 cnt = new LongAdder8();
         final long sleepDuration = 5000;
         final byte[] payLoad = new byte[payLoadSize];
-        final Map<UUID, long[]>[] res = new Map[threads];
-        final ConcurrentMap<UUID, GridAtomicLong> maxLatencies = new ConcurrentHashMap8<>();
+        final Map<UUID, IoTestThreadLocalNodeResults>[] res = new Map[threads];
 
         boolean failed = true;
 
@@ -489,7 +501,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                                 "[warmup=" + warmup +
                                 ", duration=" + duration +
                                 ", threads=" + threads +
-                                ", maxLatency=" + maxLatency +
+                                ", latencyLimit=" + latencyLimit +
                                 ", rangesCnt=" + rangesCnt +
                                 ", payLoadSize=" + payLoadSize +
                                 ", procFromNioThreads=" + procFromNioThread + ']'
@@ -529,22 +541,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                         // At this point all threads have finished the test and
                         // stored data to the resulting array of maps.
                         // Need to iterate it over and sum values for all threads.
-                        Map<UUID, long[]> res0 = new HashMap<>();
-
-                        for (Map<UUID, long[]> r : res) {
-                            for (Entry<UUID, long[]> e : r.entrySet()) {
-                                long[] r0 = res0.get(e.getKey());
-
-                                if (r0 == null)
-                                    res0.put(e.getKey(), e.getValue());
-                                else {
-                                    for (int i = 0; i < rangesCnt + 1; i++)
-                                        r0[i] += e.getValue()[i];
-                                }
-                            }
-                        }
-
-                        printIoTestResults(maxLatency / (1000 * rangesCnt), res0, maxLatencies);
+                        printIoTestResults(res);
                     }
                     catch (InterruptedException | BrokenBarrierException e) {
                         U.error(log, "IO test failed.", e);
@@ -566,7 +563,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                         boolean failed = true;
                         ThreadLocalRandom rnd = ThreadLocalRandom.current();
                         int size = nodes.size();
-                        Map<UUID, long[]> res0 = res[i0];
+                        Map<UUID, IoTestThreadLocalNodeResults> res0 = res[i0];
 
                         try {
                             boolean warmupFinished0 = false;
@@ -582,38 +579,22 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
                                 ClusterNode node = nodes.get(rnd.nextInt(size));
 
-                                long start = System.nanoTime();
-
-                                sendIoTest(node, payLoad, procFromNioThread).get();
-
-                                long latency = System.nanoTime() - start;
+                                List<IgniteIoTestMessage> msgs = sendIoTest(node, payLoad,
procFromNioThread).get();
 
                                 cnt.increment();
 
-                                long[] latencies = res0.get(node.id());
-
-                                if (latencies == null)
-                                    res0.put(node.id(), latencies = new long[rangesCnt +
1]);
-
-                                if (latency >= maxLatency) {
-                                    latencies[rangesCnt]++; // Timed out.
+                                for (IgniteIoTestMessage msg : msgs) {
+                                    UUID nodeId = msg.senderNodeId();
 
-                                    GridAtomicLong maxLatency = maxLatencies.get(node.id());
+                                    assert nodeId != null;
 
-                                    if (maxLatency == null) {
-                                        GridAtomicLong old = maxLatencies.putIfAbsent(node.id(),
-                                            maxLatency = new GridAtomicLong());
+                                    IoTestThreadLocalNodeResults nodeRes = res0.get(nodeId);
 
-                                        if (old != null)
-                                            maxLatency = old;
-                                    }
+                                    if (nodeRes == null)
+                                        res0.put(nodeId,
+                                            nodeRes = new IoTestThreadLocalNodeResults(rangesCnt,
latencyLimit));
 
-                                    maxLatency.setIfGreater(latency);
-                                }
-                                else {
-                                    int idx = (int)Math.floor((1.0 * latency) / ((1.0 * maxLatency)
/ rangesCnt));
-
-                                    latencies[idx]++;
+                                    nodeRes.onResult(msg);
                                 }
                             }
 
@@ -641,30 +622,44 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     }
 
     /**
-     * @param binLatencyMcs Bin latency in microseconds.
-     * @param res Resulting map.
-     * @param maxLatencies Max latency for each node.
+     * @param rawRes Resulting map.
      */
     private void printIoTestResults(
-        long binLatencyMcs,
-        Map<UUID, long[]> res,
-        ConcurrentMap<UUID, GridAtomicLong> maxLatencies
+        Map<UUID, IoTestThreadLocalNodeResults>[] rawRes
     ) {
+        Map<UUID, IoTestNodeResults> res = new HashMap<>();
+
+        for (Map<UUID, IoTestThreadLocalNodeResults> r : rawRes) {
+            for (Entry<UUID, IoTestThreadLocalNodeResults> e : r.entrySet()) {
+                IoTestNodeResults r0 = res.get(e.getKey());
+
+                if (r0 == null)
+                    res.put(e.getKey(), r0 = new IoTestNodeResults());
+
+                r0.add(e.getValue());
+            }
+        }
+
+        SimpleDateFormat dateFmt = new SimpleDateFormat("HH:mm:ss,SSS");
+
         StringBuilder b = new StringBuilder(U.nl())
-            .append("IO test results (round-trip count per each latency bin) " +
-                "[binLatency=" + binLatencyMcs + "mcs]")
+            .append("IO test results (round-trip count per each latency bin).")
             .append(U.nl());
 
-        for (Entry<UUID, long[]> e : res.entrySet()) {
+        for (Entry<UUID, IoTestNodeResults> e : res.entrySet()) {
             ClusterNode node = ctx.discovery().node(e.getKey());
 
+            long binLatencyMcs = e.getValue().binLatencyMcs();
+
             b.append("Node ID: ").append(e.getKey()).append(" (addrs=")
-                .append(node != null ? node.addresses().toString() : "n/a").append(')').append(U.nl());
+                .append(node != null ? node.addresses().toString() : "n/a")
+                .append(", binLatency=").append(binLatencyMcs).append("mcs")
+                .append(')').append(U.nl());
 
             b.append("Latency bin, mcs | Count exclusive | Percentage exclusive | " +
                 "Count inclusive | Percentage inclusive ").append(U.nl());
 
-            long[] nodeRes = e.getValue();
+            long[] nodeRes = e.getValue().resLatency;
 
             long sum = 0;
 
@@ -688,15 +683,49 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                         curSum, (100.0 * curSum) / sum));
             }
 
-            GridAtomicLong maxLatency = maxLatencies.get(e.getKey());
+            b.append(U.nl()).append("Total latency (ns): ").append(U.nl())
+                .append(String.format("%15d", e.getValue().totalLatency)).append(U.nl());
+
+            b.append(U.nl()).append("Max latencies (ns):").append(U.nl());
+            format(b, e.getValue().maxLatency, dateFmt);
+
+            b.append(U.nl()).append("Max request send queue times (ns):").append(U.nl());
+            format(b, e.getValue().maxReqSendQueueTime, dateFmt);
+
+            b.append(U.nl()).append("Max request receive queue times (ns):").append(U.nl());
+            format(b, e.getValue().maxReqRcvQueueTime, dateFmt);
+
+            b.append(U.nl()).append("Max response send queue times (ns):").append(U.nl());
+            format(b, e.getValue().maxResSendQueueTime, dateFmt);
+
+            b.append(U.nl()).append("Max response receive queue times (ns):").append(U.nl());
+            format(b, e.getValue().maxResRcvQueueTime, dateFmt);
 
-            b.append("Max latency (ns): ").append(maxLatency != null ? maxLatency.get() :
-1).append(U.nl());
+            b.append(U.nl()).append("Max request wire times (millis):").append(U.nl());
+            format(b, e.getValue().maxReqWireTimeMillis, dateFmt);
+
+            b.append(U.nl()).append("Max response wire times (millis):").append(U.nl());
+            format(b, e.getValue().maxResWireTimeMillis, dateFmt);
+
+            b.append(U.nl());
         }
 
         if (log.isInfoEnabled())
             log.info(b.toString());
     }
 
+    /**
+     * @param b Builder.
+     * @param pairs Pairs to format.
+     * @param dateFmt Formatter.
+     */
+    private void format(StringBuilder b, Collection<IgnitePair<Long>> pairs,
SimpleDateFormat dateFmt) {
+        for (IgnitePair<Long> p : pairs) {
+            b.append(String.format("%15d", p.get1())).append(" ")
+                .append(dateFmt.format(new Date(p.get2()))).append(U.nl());
+        }
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"})
     @Override public void onKernalStart0() throws IgniteCheckedException {
@@ -2857,12 +2886,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /**
      *
      */
-    private class IoTestFuture extends GridFutureAdapter<Object> {
+    private class IoTestFuture extends GridFutureAdapter<List<IgniteIoTestMessage>>
{
         /** */
         private final long id;
 
         /** */
-        private int cntr;
+        private final int cntr;
+
+        /** */
+        private final List<IgniteIoTestMessage> ress;
 
         /**
          * @param id ID.
@@ -2873,24 +2905,28 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
             this.id = id;
             this.cntr = cntr;
+
+            ress = new ArrayList<>(cntr);
         }
 
         /**
          *
          */
-        void onResponse() {
+        void onResponse(IgniteIoTestMessage res) {
             boolean complete;
 
             synchronized (this) {
-                complete = --cntr == 0;
+                ress.add(res);
+
+                complete = cntr == ress.size();
             }
 
             if (complete)
-                onDone();
+                onDone(ress);
         }
 
         /** {@inheritDoc} */
-        @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+        @Override public boolean onDone(List<IgniteIoTestMessage> res, @Nullable Throwable
err) {
             if (super.onDone(res, err)) {
                 ioTestMap().remove(id);
 
@@ -2905,4 +2941,210 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             return S.toString(IoTestFuture.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private static class IoTestThreadLocalNodeResults {
+        /** */
+        private final long[] resLatency;
+
+        /** */
+        private final int rangesCnt;
+
+        /** */
+        private long totalLatency;
+
+        /** */
+        private long maxLatency;
+
+        /** */
+        private long maxLatencyTs;
+
+        /** */
+        private long maxReqSendQueueTime;
+
+        /** */
+        private long maxReqSendQueueTimeTs;
+
+        /** */
+        private long maxReqRcvQueueTime;
+
+        /** */
+        private long maxReqRcvQueueTimeTs;
+
+        /** */
+        private long maxResSendQueueTime;
+
+        /** */
+        private long maxResSendQueueTimeTs;
+
+        /** */
+        private long maxResRcvQueueTime;
+
+        /** */
+        private long maxResRcvQueueTimeTs;
+
+        /** */
+        private long maxReqWireTimeMillis;
+
+        /** */
+        private long maxReqWireTimeTs;
+
+        /** */
+        private long maxResWireTimeMillis;
+
+        /** */
+        private long maxResWireTimeTs;
+
+        /** */
+        private final long latencyLimit;
+
+        /**
+         * @param rangesCnt Ranges count.
+         * @param latencyLimit
+         */
+        public IoTestThreadLocalNodeResults(int rangesCnt, long latencyLimit) {
+            this.rangesCnt = rangesCnt;
+            this.latencyLimit = latencyLimit;
+
+            resLatency = new long[rangesCnt + 1];
+        }
+
+        /**
+         * @param msg
+         */
+        public void onResult(IgniteIoTestMessage msg) {
+            long now = System.currentTimeMillis();
+
+            long latency = msg.responseProcessedTs() - msg.requestCreateTs();
+
+            int idx = latency >= latencyLimit ?
+                rangesCnt /* Timed out. */ :
+                (int)Math.floor((1.0 * latency) / ((1.0 * latencyLimit) / rangesCnt));
+
+            resLatency[idx]++;
+
+            totalLatency += latency;
+
+            if (maxLatency < latency) {
+                maxLatency = latency;
+                maxLatencyTs = now;
+            }
+
+            long reqSndQueueTime = msg.requestSendTs() - msg.requestCreateTs();
+
+            if (maxReqSendQueueTime < reqSndQueueTime) {
+                maxReqSendQueueTime = reqSndQueueTime;
+                maxReqSendQueueTimeTs = now;
+            }
+
+            long reqRcvQueueTime = msg.requestProcessTs() - msg.requestReceiveTs();
+
+            if (maxReqRcvQueueTime < reqRcvQueueTime) {
+                maxReqRcvQueueTime = reqRcvQueueTime;
+                maxReqRcvQueueTimeTs = now;
+            }
+
+            long resSndQueueTime = msg.responseSendTs() - msg.requestProcessTs();
+
+            if (maxResSendQueueTime < resSndQueueTime) {
+                maxResSendQueueTime = resSndQueueTime;
+                maxResSendQueueTimeTs = now;
+            }
+
+            long resRcvQueueTime = msg.responseProcessedTs() - msg.responseReceiveTs();
+
+            if (maxResRcvQueueTime < resRcvQueueTime) {
+                maxResRcvQueueTime = resRcvQueueTime;
+                maxResRcvQueueTimeTs = now;
+            }
+
+            long reqWireTimeMillis = msg.requestReceivedTsMillis() - msg.requestSendTsMillis();
+
+            if (maxReqWireTimeMillis < reqWireTimeMillis) {
+                maxReqWireTimeMillis = reqWireTimeMillis;
+                maxReqWireTimeTs = now;
+            }
+
+            long resWireTimeMillis = msg.responseRecievedTsMillis() - msg.requestSendTsMillis();
+
+            if (maxResWireTimeMillis < resWireTimeMillis) {
+                maxResWireTimeMillis = resWireTimeMillis;
+                maxResWireTimeTs = now;
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class IoTestNodeResults {
+        /** */
+        private long latencyLimit;
+
+        /** */
+        private long[] resLatency;
+
+        /** */
+        private long totalLatency;
+
+        /** */
+        private Collection<IgnitePair<Long>> maxLatency = new ArrayList<>();
+
+        /** */
+        private Collection<IgnitePair<Long>> maxReqSendQueueTime = new ArrayList<>();
+
+        /** */
+        private Collection<IgnitePair<Long>> maxReqRcvQueueTime = new ArrayList<>();
+
+        /** */
+        private Collection<IgnitePair<Long>> maxResSendQueueTime = new ArrayList<>();
+
+        /** */
+        private Collection<IgnitePair<Long>> maxResRcvQueueTime = new ArrayList<>();
+
+        /** */
+        private Collection<IgnitePair<Long>> maxReqWireTimeMillis = new ArrayList<>();
+
+        /** */
+        private Collection<IgnitePair<Long>> maxResWireTimeMillis = new ArrayList<>();
+
+        /**
+         * @param res Node results to add.
+         */
+        public void add(IoTestThreadLocalNodeResults res) {
+            if (resLatency == null) {
+                resLatency = res.resLatency.clone();
+                latencyLimit = res.latencyLimit;
+            }
+            else {
+                assert latencyLimit == res.latencyLimit;
+                assert resLatency.length == res.resLatency.length;
+
+                for (int i = 0; i < resLatency.length; i++)
+                    resLatency[i] += res.resLatency[i];
+            }
+
+            totalLatency += res.totalLatency;
+
+            maxLatency.add(F.pair(res.maxLatency, res.maxLatencyTs));
+            maxReqSendQueueTime.add(F.pair(res.maxReqSendQueueTime, res.maxReqSendQueueTimeTs));
+            maxReqRcvQueueTime.add(F.pair(res.maxReqRcvQueueTime, res.maxReqRcvQueueTimeTs));
+            maxResSendQueueTime.add(F.pair(res.maxResSendQueueTime, res.maxResSendQueueTimeTs));
+            maxResRcvQueueTime.add(F.pair(res.maxResRcvQueueTime, res.maxResRcvQueueTimeTs));
+            maxReqWireTimeMillis.add(F.pair(res.maxReqWireTimeMillis, res.maxReqWireTimeTs));
+            maxResWireTimeMillis.add(F.pair(res.maxResWireTimeMillis, res.maxResWireTimeTs));
+        }
+
+        /**
+         * @return Bin latency in microseconds.
+         */
+        public long binLatencyMcs() {
+            if (resLatency == null)
+                throw new IllegalStateException();
+
+            return latencyLimit / (1000 * (resLatency.length - 1));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/018b25b2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
index 0a41622..3e0fa76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.managers.communication;
 
 import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -45,6 +47,43 @@ public class IgniteIoTestMessage implements Message {
     /** */
     private byte payload[];
 
+    /** */
+    private long reqCreateTs;
+
+    /** */
+    private long reqSndTs;
+
+    /** */
+    private long reqSndTsMillis;
+
+    /** */
+    private long reqRcvTs;
+
+    /** */
+    private long reqRcvTsMillis;
+
+    /** */
+    private long reqProcTs;
+
+    /** */
+    private long resSndTs;
+
+    /** */
+    private long resSndTsMillis;
+
+    /** */
+    private long resRcvTs;
+
+    /** */
+    private long resRcvTsMillis;
+
+    /** */
+    private long resProcTs;
+
+    /** */
+    @GridDirectTransient
+    private UUID sndNodeId;
+
     /**
      *
      */
@@ -61,6 +100,8 @@ public class IgniteIoTestMessage implements Message {
         this.id = id;
         this.req = req;
         this.payload = payload;
+
+        reqCreateTs = System.nanoTime();
     }
 
     /**
@@ -126,10 +167,173 @@ public class IgniteIoTestMessage implements Message {
         return id;
     }
 
+    /**
+     * @return Request create timestamp.
+     */
+    public long requestCreateTs() {
+        return reqCreateTs;
+    }
+
+    /**
+     * @return Request send timestamp.
+     */
+    public long requestSendTs() {
+        return reqSndTs;
+    }
+
+    /**
+     * @return Request receive timestamp.
+     */
+    public long requestReceiveTs() {
+        return reqRcvTs;
+    }
+
+    /**
+     * @return Request process started timestamp.
+     */
+    public long requestProcessTs() {
+        return reqProcTs;
+    }
+
+    /**
+     * @return Response send timestamp.
+     */
+    public long responseSendTs() {
+        return resSndTs;
+    }
+
+    /**
+     * @return Response receive timestamp.
+     */
+    public long responseReceiveTs() {
+        return resRcvTs;
+    }
+
+    /**
+     * @return Response process timestamp.
+     */
+    public long responseProcessTs() {
+        return resProcTs;
+    }
+
+    /**
+     * @return Request send timestamp (millis).
+     */
+    public long requestSendTsMillis() {
+        return reqSndTsMillis;
+    }
+
+    /**
+     * @return Request received timestamp (millis).
+     */
+    public long requestReceivedTsMillis() {
+        return reqRcvTsMillis;
+    }
+
+    /**
+     * @return Response send timestamp (millis).
+     */
+    public long responseSendTsMillis() {
+        return resSndTsMillis;
+    }
+
+    /**
+     * @return Response received timestamp (millis).
+     */
+    public long responseRecievedTsMillis() {
+        return resRcvTsMillis;
+    }
+
+    /**
+     * This method is called to initialize tracing variables.
+     * TODO: introduce direct message lifecycle API?
+     */
+    public void onAfterRead() {
+        if (req && reqRcvTs == 0) {
+            reqRcvTs = System.nanoTime();
+
+            reqRcvTsMillis = System.currentTimeMillis();
+        }
+
+        if (!req && resRcvTs == 0) {
+            resRcvTs = System.nanoTime();
+
+            resRcvTsMillis = System.currentTimeMillis();
+        }
+    }
+
+    /**
+     * This method is called to initialize tracing variables.
+     * TODO: introduce direct message lifecycle API?
+     */
+    public void onBeforeWrite() {
+        if (req && reqSndTs == 0) {
+            reqSndTs = System.nanoTime();
+
+            reqSndTsMillis = System.currentTimeMillis();
+        }
+
+        if (!req && resSndTs == 0) {
+            resSndTs = System.nanoTime();
+
+            resSndTsMillis = System.currentTimeMillis();
+        }
+    }
+
+    /**
+     *
+     */
+    public void copyDataFromRequest(IgniteIoTestMessage req) {
+        reqCreateTs = req.reqCreateTs;
+
+        reqSndTs = req.reqSndTs;
+        reqSndTsMillis = req.reqSndTsMillis;
+
+        reqRcvTs = req.reqRcvTs;
+        reqRcvTsMillis = req.reqRcvTsMillis;
+    }
+
+    /**
+     *
+     */
+    public void onRequestProcessed() {
+        reqProcTs = System.nanoTime();
+    }
+
+    /**
+     *
+     */
+    public void onResponseProcessed() {
+        resProcTs = System.nanoTime();
+    }
+
+    /**
+     * @return Response processed timestamp.
+     */
+    public long responseProcessedTs() {
+        return resProcTs;
+    }
+
+    /**
+     * @return Sender node ID.
+     */
+    public UUID senderNodeId() {
+        return sndNodeId;
+    }
+
+    /**
+     * @param sndNodeId Sender node ID.
+     */
+    public void senderNodeId(UUID sndNodeId) {
+        this.sndNodeId = sndNodeId;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 
+        onBeforeWrite();
+
         if (!writer.isHeaderWritten()) {
             if (!writer.writeHeader(directType(), fieldsCount()))
                 return false;
@@ -162,6 +366,72 @@ public class IgniteIoTestMessage implements Message {
 
                 writer.incrementState();
 
+            case 4:
+                if (!writer.writeLong("reqCreateTs", reqCreateTs))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeLong("reqProcTs", reqProcTs))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeLong("reqRcvTs", reqRcvTs))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeLong("reqRcvTsMillis", reqRcvTsMillis))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeLong("reqSndTs", reqSndTs))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeLong("reqSndTsMillis", reqSndTsMillis))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeLong("resProcTs", resProcTs))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeLong("resRcvTs", resRcvTs))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeLong("resRcvTsMillis", resRcvTsMillis))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
+                if (!writer.writeLong("resSndTs", resSndTs))
+                    return false;
+
+                writer.incrementState();
+
+            case 14:
+                if (!writer.writeLong("resSndTsMillis", resSndTsMillis))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -207,8 +477,98 @@ public class IgniteIoTestMessage implements Message {
 
                 reader.incrementState();
 
+            case 4:
+                reqCreateTs = reader.readLong("reqCreateTs");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                reqProcTs = reader.readLong("reqProcTs");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                reqRcvTs = reader.readLong("reqRcvTs");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                reqRcvTsMillis = reader.readLong("reqRcvTsMillis");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                reqSndTs = reader.readLong("reqSndTs");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                reqSndTsMillis = reader.readLong("reqSndTsMillis");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                resProcTs = reader.readLong("resProcTs");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                resRcvTs = reader.readLong("resRcvTs");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                resRcvTsMillis = reader.readLong("resRcvTsMillis");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
+                resSndTs = reader.readLong("resSndTs");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 14:
+                resSndTsMillis = reader.readLong("resSndTsMillis");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
+        onAfterRead();
+
         return reader.afterMessageRead(IgniteIoTestMessage.class);
     }
 
@@ -219,7 +579,7 @@ public class IgniteIoTestMessage implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 4;
+        return 15;
     }
 
     /** {@inheritDoc} */


Mime
View raw message