ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [5/7] ignite git commit: minor
Date Tue, 06 Dec 2016 08:09:12 GMT
minor


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/21ea0b8b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/21ea0b8b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/21ea0b8b

Branch: refs/heads/ignite-comm-balance-master
Commit: 21ea0b8b497b6c0c35f894ed5a5ee920c2437d09
Parents: ff7f0b2
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Tue Dec 6 13:36:15 2016 +0700
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Tue Dec 6 13:36:15 2016 +0700

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  44 +++++--
 .../ignite/internal/util/StripedExecutor.java   | 129 +++++++++----------
 2 files changed, 99 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/21ea0b8b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index fdd1bda..e229141 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -680,7 +680,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
         ExecutorService marshCachePool,
         final ExecutorService execSvc,
         final ExecutorService sysExecSvc,
-        StripedExecutor stripedExecSvc,
+        final StripedExecutor stripedExecSvc,
         ExecutorService p2pExecSvc,
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
@@ -992,24 +992,52 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
 
             starveTask = ctx.timeout().schedule(new Runnable() {
                 /** Last completed task count. */
-                private long lastCompletedCnt;
+                private long lastCompletedCntPub;
+
+                /** Last completed task count. */
+                private long lastCompletedCntSys;
 
                 @Override public void run() {
-                    if (!(execSvc instanceof ThreadPoolExecutor))
-                        return;
+                    if (execSvc instanceof ThreadPoolExecutor) {
+                        ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
+
+                        lastCompletedCntPub = checkPoolStarvation(exec, lastCompletedCntPub,
"public");
+                    }
 
-                    ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
+                    if (sysExecSvc instanceof ThreadPoolExecutor) {
+                        ThreadPoolExecutor exec = (ThreadPoolExecutor)sysExecSvc;
+
+                        lastCompletedCntSys = checkPoolStarvation(exec, lastCompletedCntSys,
"system");
+                    }
+
+                    if (stripedExecSvc != null)
+                        stripedExecSvc.checkStarvation();
+                }
 
+                /**
+                 * @param exec Thread pool executor to check.
+                 * @param lastCompletedCnt Last completed tasks count.
+                 * @param pool Pool name for message.
+                 * @return Current completed tasks count.
+                 */
+                private long checkPoolStarvation(
+                    ThreadPoolExecutor exec,
+                    long lastCompletedCnt,
+                    String pool
+                ) {
                     long completedCnt = exec.getCompletedTaskCount();
 
                     // If all threads are active and no task has completed since last time
and there is
                     // at least one waiting request, then it is possible starvation.
                     if (exec.getPoolSize() == exec.getActiveCount() && completedCnt
== lastCompletedCnt &&
                         !exec.getQueue().isEmpty())
-                        LT.warn(log, null, "Possible thread pool starvation detected (no
task completed in last " +
-                            interval + "ms, is executorService pool size large enough?)");
+                        LT.warn(
+                            log,
+                            null,
+                            "Possible thread pool starvation detected (no task completed
in last " +
+                                interval + "ms, is " + pool + " thread pool size large enough?)");
 
-                    lastCompletedCnt = completedCnt;
+                    return completedCnt;
                 }
             }, interval, interval);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/21ea0b8b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 69f6b67..e9ec74b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.util;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -47,6 +48,12 @@ public class StripedExecutor implements ExecutorService {
     /** Stripes. */
     private final Stripe[] stripes;
 
+    /** For starvation checks. */
+    private final long[] completedCntrs;
+
+    /** */
+    private final IgniteLogger log;
+
     /**
      * Constructor.
      *
@@ -59,6 +66,12 @@ public class StripedExecutor implements ExecutorService {
 
         stripes = new Stripe[cnt];
 
+        completedCntrs = new long[cnt];
+
+        Arrays.fill(completedCntrs, -1);
+
+        this.log = log;
+
         try {
             for (int i = 0; i < cnt; i++) {
                 stripes[i] = new StripeConcurrentQueue(
@@ -70,60 +83,6 @@ public class StripedExecutor implements ExecutorService {
                 stripes[i].start();
             }
 
-            // TODO - move to starvation checker
-            Thread t = new Thread(new Runnable() {
-                @Override public void run() {
-                    long[] cntrs = new long[stripes.length];
-
-                    for (; !isShutdown();) {
-                        try {
-                            Thread.sleep(10_000);
-                        }
-                        catch (InterruptedException e) {
-                            return;
-                        }
-
-                        for (int i = 0; i < stripes.length; i++) {
-                            Stripe stripe = stripes[i];
-
-                            long completedCnt = stripe.completedCnt;
-
-                            if (cntrs[i] == completedCnt &&
-                                stripe.active) {
-                                boolean deadlockPresent = U.deadlockPresent();
-
-                                GridStringBuilder sb = new GridStringBuilder();
-
-                                sb.a(">>> Possible starvation in striped pool: ")
-                                    .a(stripe.thread.getName()).a(U.nl())
-                                    .a(stripe.queueToString()).a(U.nl())
-                                    .a("deadlock: ").a(deadlockPresent).a(U.nl())
-                                    .a("completed: ").a(completedCnt).a(U.nl());
-
-                                U.printStackTrace(
-                                    stripe.thread.getId(),
-                                    sb);
-
-                                String msg = sb.toString();
-
-                                U.warn(
-                                    log,
-                                    msg);
-                                U.warn(
-                                    null,
-                                    msg);
-                            }
-                            else
-                                cntrs[i] = completedCnt;
-                        }
-                    }
-                }
-            });
-
-            t.setDaemon(true);
-
-            t.start();
-
             success = true;
         }
         catch (Error | RuntimeException e) {
@@ -147,6 +106,45 @@ public class StripedExecutor implements ExecutorService {
     }
 
     /**
+     * Checks starvation in striped pool. Maybe too verbose
+     * but this is needed to faster debug possible issues.
+     */
+    public void checkStarvation() {
+        for (int i = 0; i < stripes.length; i++) {
+            Stripe stripe = stripes[i];
+
+            long completedCnt = stripe.completedCnt;
+
+            boolean active = stripe.active;
+
+            if (completedCntrs[i] != -1 &&
+                completedCntrs[i] == completedCnt &&
+                active) {
+                boolean deadlockPresent = U.deadlockPresent();
+
+                GridStringBuilder sb = new GridStringBuilder();
+
+                sb.a(">>> Possible starvation in striped pool: ")
+                    .a(stripe.thread.getName()).a(U.nl())
+                    .a(stripe.queueToString()).a(U.nl())
+                    .a("deadlock: ").a(deadlockPresent).a(U.nl())
+                    .a("completed: ").a(completedCnt).a(U.nl());
+
+                U.printStackTrace(
+                    stripe.thread.getId(),
+                    sb);
+
+                String msg = sb.toString();
+
+                U.warn(log, msg);
+            }
+
+            if (active || completedCnt > 0)
+                completedCntrs[i] = completedCnt;
+        }
+    }
+
+    /**
      * @return Stripes count.
      */
     public int stripes() {
@@ -246,28 +244,27 @@ public class StripedExecutor implements ExecutorService {
     }
 
     /**
-     * @param log Logger to dump to.
+     * @return Return total queue size of all stripes.
      */
-    public void dumpStats(IgniteLogger log) {
-        StringBuilder sb = new StringBuilder("Stats ");
+    public int queueSize() {
+        int size = 0;
 
-        for (int i = 0; i < stripes.length; i++)
-            sb.append(i).append(" [qSize=").append(stripes[i].queueSize()).append("]; ");
+        for (Stripe stripe : stripes)
+            size += stripe.queueSize();
 
-        if (log.isInfoEnabled())
-            log.info(sb.toString());
+        return size;
     }
 
     /**
-     * @return Return total queue size of all stripes.
+     * @return Completed tasks count.
      */
-    public int queueSize() {
-        int size = 0;
+    public long completedTasks() {
+        long cnt = 0;
 
         for (Stripe stripe : stripes)
-            size += stripe.queueSize();
+            cnt += stripe.completedCnt;
 
-        return size;
+        return cnt;
     }
 
     /**


Mime
View raw message