ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [09/49] ignite git commit: Added IO latency test + made it available from MBean
Date Fri, 02 Jun 2017 17:13:22 GMT
Added IO latency test + made it available from MBean

(cherry picked from commit 8195ae0)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c35dbf4e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c35dbf4e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c35dbf4e

Branch: refs/heads/ignite-5398
Commit: c35dbf4ec3ba6b01932c76f14ad4c45fed402391
Parents: 8f1398f
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Mon May 15 18:03:07 2017 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Mon May 15 18:19:07 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  14 ++
 .../managers/communication/GridIoManager.java   | 204 +++++++++++++++++++
 .../org/apache/ignite/mxbean/IgniteMXBean.java  |  44 ++++
 3 files changed, 262 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c35dbf4e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 391509e..1445443 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -3723,6 +3723,20 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
     }
 
     /** {@inheritDoc} */
+    @Override public void runIoTest(
+        long warmup,
+        long duration,
+        int threads,
+        long maxLatency,
+        int rangesCnt,
+        int payLoadSize,
+        boolean procFromNioThread
+    ) {
+        ctx.io().runIoTest(warmup, duration, threads, maxLatency, rangesCnt, payLoadSize,
procFromNioThread,
+            new ArrayList(ctx.cluster().get().forServers().forRemotes().nodes()));
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteKernal.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c35dbf4e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6738909..81233ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -21,16 +21,22 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -83,6 +89,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
+import org.jsr166.LongAdder8;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -420,6 +427,203 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         return map;
     }
 
+    /**
+     * @param warmup Warmup duration in milliseconds.
+     * @param duration Test duration in milliseconds.
+     * @param threads Thread count.
+     * @param maxLatency Max latency in nanoseconds.
+     * @param rangesCnt Ranges count in resulting histogram.
+     * @param payLoadSize Payload size in bytes.
+     * @param procFromNioThread {@code True} to process requests in NIO threads.
+     * @param nodes Nodes participating in test.
+     */
+    public void runIoTest(
+        final long warmup,
+        final long duration,
+        final int threads,
+        final long maxLatency,
+        final int rangesCnt,
+        final int payLoadSize,
+        final boolean procFromNioThread,
+        final List<ClusterNode> nodes
+    ) {
+        ExecutorService svc = Executors.newFixedThreadPool(threads + 1);
+
+        final AtomicBoolean warmupFinished = new AtomicBoolean();
+        final AtomicBoolean done = new AtomicBoolean();
+        final CyclicBarrier bar = new CyclicBarrier(threads + 1);
+        final LongAdder8 cnt = new LongAdder8();
+        final long sleepDuration = 5000;
+        final byte[] payLoad = new byte[payLoadSize];
+        final Map<UUID, long[]>[] res = new Map[threads];
+
+        boolean failed = true;
+
+        try {
+            svc.execute(new Runnable() {
+                @Override public void run() {
+                    boolean failed = true;
+
+                    try {
+                        bar.await();
+
+                        long start = System.currentTimeMillis();
+
+                        if (log.isInfoEnabled())
+                            log.info("IO test started " +
+                                "[warmup=" + warmup +
+                                ", duration=" + duration +
+                                ", threads=" + threads +
+                                ", maxLatency=" + maxLatency +
+                                ", rangesCnt=" + rangesCnt +
+                                ", payLoadSize=" + payLoadSize +
+                                ", procFromNioThreads=" + procFromNioThread + ']'
+                            );
+
+                        for (;;) {
+                            if (!warmupFinished.get() && System.currentTimeMillis()
- start > warmup) {
+                                if (log.isInfoEnabled())
+                                    log.info("IO test warmup finished.");
+
+                                warmupFinished.set(true);
+
+                                start = System.currentTimeMillis();
+                            }
+
+                            if (warmupFinished.get() && System.currentTimeMillis()
- start > duration) {
+                                if (log.isInfoEnabled())
+                                    log.info("IO test finished, will wait for all threads
to finish.");
+
+                                done.set(true);
+
+                                bar.await();
+
+                                failed = false;
+
+                                break;
+                            }
+
+                            if (log.isInfoEnabled())
+                                log.info("IO test [opsCnt/sec=" + (cnt.sumThenReset() * 1000
/ sleepDuration) +
+                                    ", warmup=" + !warmupFinished.get() +
+                                    ", elapsed=" + (System.currentTimeMillis() - start) +
']');
+
+                            Thread.sleep(sleepDuration);
+                        }
+
+                        // At this point all threads have finished the test and stored data
to the result map.
+                        Map<UUID, long[]> res0 = new HashMap<>();
+
+                        for (Map<UUID, long[]> r : res) {
+                            for (Entry<UUID, long[]> e : r.entrySet()) {
+                                long[] r0 = res0.get(e.getKey());
+
+                                if (r0 == null)
+                                    res0.put(e.getKey(), e.getValue());
+                                else {
+                                    for (int i = 0; i < rangesCnt + 1; i++)
+                                        r0[i] += e.getValue()[i];
+                                }
+                            }
+                        }
+
+                        StringBuilder b = new StringBuilder("IO test results " +
+                            "[range=" + (maxLatency / (1000 * rangesCnt)) + "mcs]");
+
+                        b.append(U.nl());
+
+                        for (Entry<UUID, long[]> e : res0.entrySet()) {
+                            ClusterNode node = ctx.discovery().node(e.getKey());
+
+                            b.append("    ").append(e.getKey()).append(" (addrs=")
+                                .append(node != null ? node.addresses().toString() : "n/a").append(')')
+                                .append(Arrays.toString(e.getValue())).append(U.nl());
+                        }
+
+                        if (log.isInfoEnabled())
+                            log.info(b.toString());
+                    }
+                    catch (InterruptedException | BrokenBarrierException e) {
+                        U.error(log, "IO test failed.", e);
+                    }
+                    finally {
+                        if (failed)
+                            bar.reset();
+                    }
+                }
+            });
+
+            for (int i = 0; i < threads; i++) {
+                final int i0 = i;
+
+                res[i] = U.newHashMap(nodes.size());
+
+                svc.execute(new Runnable() {
+                    @Override public void run() {
+                        boolean failed = true;
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+                        int size = nodes.size();
+                        Map<UUID, long[]> res0 = res[i0];
+
+                        try {
+                            boolean warmupFinished0 = false;
+
+                            bar.await();
+
+                            for (;;) {
+                                if (done.get())
+                                    break;
+
+                                if (!warmupFinished0)
+                                    warmupFinished0 = warmupFinished.get();
+
+                                ClusterNode node = nodes.get(rnd.nextInt(size));
+
+                                long start = System.nanoTime();
+
+                                sendIoTest(node, payLoad, procFromNioThread).get();
+
+                                long latency = System.nanoTime() - start;
+
+                                cnt.increment();
+
+                                long[] latencies = res0.get(node.id());
+
+                                if (latencies == null)
+                                    res0.put(node.id(), latencies = new long[rangesCnt +
1]);
+
+                                if (latency >= maxLatency)
+                                    latencies[rangesCnt]++; // Timed out.
+                                else {
+                                    int idx = (int)Math.floor((1.0 * latency) / ((1.0 * maxLatency)
/ rangesCnt));
+
+                                    latencies[idx]++;
+                                }
+                            }
+
+                            bar.await();
+
+                            failed = false;
+                        }
+                        catch (Exception e) {
+                            U.error(log, "IO test worker thread failed.", e);
+                        }
+                        finally {
+                            if (failed)
+                                bar.reset();
+                        }
+                    }
+                });
+            }
+
+            failed = false;
+        }
+        finally {
+            if (failed)
+                U.shutdownNow(GridIoManager.class, svc, log);
+        }
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"})
     @Override public void onKernalStart0() throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c35dbf4e/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
index bf84b0d..ce63e4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java
@@ -385,4 +385,48 @@ public interface IgniteMXBean {
      */
     @MXBeanDescription("Dumps debug information for the current node.")
     public void dumpDebugInfo();
+
+    /**
+     * Runs IO latency test against all remote server nodes in cluster.
+     *
+     * @param warmup Warmup duration in milliseconds.
+     * @param duration Test duration in milliseconds.
+     * @param threads Thread count.
+     * @param maxLatency Max latency in nanoseconds.
+     * @param rangesCnt Ranges count in resulting histogram.
+     * @param payLoadSize Payload size in bytes.
+     * @param procFromNioThread {@code True} to process requests in NIO threads.
+     */
+    @MXBeanDescription("Runs IO latency test against all remote server nodes in cluster.")
+    @MXBeanParametersNames(
+        {
+            "warmup",
+            "duration",
+            "threads",
+            "maxLatency",
+            "rangesCnt",
+            "payLoadSize",
+            "procFromNioThread"
+        }
+    )
+    @MXBeanParametersDescriptions(
+        {
+            "Warmup duration (millis).",
+            "Test duration (millis).",
+            "Threads count.",
+            "Maximum latency expected (nanos).",
+            "Ranges count for histogram.",
+            "Payload size (bytes).",
+            "Process requests in NIO-threads flag."
+        }
+    )
+    void runIoTest(
+        long warmup,
+        long duration,
+        int threads,
+        long maxLatency,
+        int rangesCnt,
+        int payLoadSize,
+        boolean procFromNioThread
+    );
 }


Mime
View raw message