ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sevdoki...@apache.org
Subject [2/2] incubator-ignite git commit: # IGNITE-883 Avoid creation a lot of threads.
Date Mon, 01 Jun 2015 15:10:37 GMT
# IGNITE-883 Avoid creation a lot of threads.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f9f89cbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f9f89cbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f9f89cbe

Branch: refs/heads/ignite-883
Commit: f9f89cbe3401eb1656209a8deba3a42c13cc00cf
Parents: 04081ab
Author: sevdokimov <sevdokimov@gridgain.com>
Authored: Mon Jun 1 18:10:28 2015 +0300
Committer: sevdokimov <sevdokimov@gridgain.com>
Committed: Mon Jun 1 18:10:28 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  28 +----
 .../org/apache/ignite/internal/IgnitionEx.java  |  63 +++++++----
 .../discovery/GridDiscoveryManager.java         |  28 ++---
 .../timeout/GridTimeoutProcessor.java           | 107 ++++++++++++++++++-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  10 ++
 5 files changed, 169 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f89cbe/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8a7dc70..e3fc50f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -167,14 +167,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
     @GridToStringExclude
     private Timer updateNtfTimer;
 
-    /** */
-    @GridToStringExclude
-    private Timer starveTimer;
-
-    /** */
-    @GridToStringExclude
-    private Timer metricsLogTimer;
-
     /** Indicate error on grid stop. */
     @GridToStringExclude
     private boolean errOnStop;
@@ -867,13 +859,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
         if (starveCheck) {
             final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ
: Long.parseLong(intervalStr);
 
-            starveTimer = new Timer("ignite-starvation-checker");
-
-            starveTimer.scheduleAtFixedRate(new GridTimerTask() {
+            ctx.timeout().schedule(new Runnable() {
                 /** Last completed task count. */
                 private long lastCompletedCnt;
 
-                @Override protected void safeRun() {
+                @Override public void run() {
                     if (!(execSvc instanceof ThreadPoolExecutor))
                         return;
 
@@ -896,13 +886,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
         long metricsLogFreq = cfg.getMetricsLogFrequency();
 
         if (metricsLogFreq > 0) {
-            metricsLogTimer = new Timer("ignite-metrics-logger");
-
-            metricsLogTimer.scheduleAtFixedRate(new GridTimerTask() {
-                /** */
+            ctx.timeout().schedule(new Runnable() {
                 private final DecimalFormat dblFmt = new DecimalFormat("#.##");
 
-                @Override protected void safeRun() {
+                @Override public void run() {
                     if (log.isInfoEnabled()) {
                         ClusterMetrics m = cluster().localNode().metrics();
 
@@ -1713,13 +1700,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
             if (updateNtfTimer != null)
                 updateNtfTimer.cancel();
 
-            if (starveTimer != null)
-                starveTimer.cancel();
-
-            // Cancel metrics log timer.
-            if (metricsLogTimer != null)
-                metricsLogTimer.cancel();
-
             boolean interrupted = false;
 
             while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f89cbe/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 6e4efb5..7df95c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1447,27 +1447,48 @@ public class IgnitionEx {
                 ensureMultiInstanceSupport(myCfg.getSwapSpaceSpi());
             }
 
-            execSvc = new IgniteThreadPoolExecutor(
-                "pub-" + cfg.getGridName(),
-                cfg.getPublicThreadPoolSize(),
-                cfg.getPublicThreadPoolSize(),
-                DFLT_PUBLIC_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
-
-            // Pre-start all threads as they are guaranteed to be needed.
-            ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
-
-            // Note that since we use 'LinkedBlockingQueue', number of
-            // maximum threads has no effect.
-            sysExecSvc = new IgniteThreadPoolExecutor(
-                "sys-" + cfg.getGridName(),
-                cfg.getSystemThreadPoolSize(),
-                cfg.getSystemThreadPoolSize(),
-                DFLT_SYSTEM_KEEP_ALIVE_TIME,
-                new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
-
-            // Pre-start all threads as they are guaranteed to be needed.
-            ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
+            boolean isClientMode = Boolean.TRUE.equals(myCfg.isClientMode());
+
+            if (isClientMode) {
+                execSvc = new IgniteThreadPoolExecutor(
+                    "pub-" + cfg.getGridName(),
+                    0,
+                    cfg.getPublicThreadPoolSize(),
+                    2000,
+                    new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
+
+                // Note that since we use 'LinkedBlockingQueue', number of
+                // maximum threads has no effect.
+                sysExecSvc = new IgniteThreadPoolExecutor(
+                    "sys-" + cfg.getGridName(),
+                    1,
+                    cfg.getSystemThreadPoolSize(),
+                    5000,
+                    new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+            }
+            else {
+                execSvc = new IgniteThreadPoolExecutor(
+                    "pub-" + cfg.getGridName(),
+                    cfg.getPublicThreadPoolSize(),
+                    cfg.getPublicThreadPoolSize(),
+                    DFLT_PUBLIC_KEEP_ALIVE_TIME,
+                    new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
+
+                // Pre-start all threads as they are guaranteed to be needed.
+                ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
+
+                // Note that since we use 'LinkedBlockingQueue', number of
+                // maximum threads has no effect.
+                sysExecSvc = new IgniteThreadPoolExecutor(
+                    "sys-" + cfg.getGridName(),
+                    cfg.getSystemThreadPoolSize(),
+                    cfg.getSystemThreadPoolSize(),
+                    DFLT_SYSTEM_KEEP_ALIVE_TIME,
+                    new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+
+                // Pre-start all threads as they are guaranteed to be needed.
+                ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
+            }
 
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f89cbe/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 4ef602e..9b8280e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.jobmetrics.*;
 import org.apache.ignite.internal.processors.security.*;
+import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
@@ -165,7 +166,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
     private final GridLocalMetrics metrics = createMetrics();
 
     /** Metrics update worker. */
-    private final MetricsUpdater metricsUpdater = new MetricsUpdater();
+    private GridTimeoutProcessor.CancelableTask metricsUpdateTask;
 
     /** Custom event listener. */
     private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>>
customEvtLsnrs =
@@ -325,7 +326,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
             checkSegmentOnStart();
         }
 
-        new IgniteThread(metricsUpdater).start();
+        metricsUpdateTask = ctx.timeout().schedule(new MetricsUpdater(), METRICS_UPDATE_FREQ,
METRICS_UPDATE_FREQ);
 
         spi.setMetricsProvider(createMetricsProvider());
 
@@ -987,11 +988,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
         getSpi().setListener(null);
 
         // Stop discovery worker and metrics updater.
+        U.closeQuiet(metricsUpdateTask);
+
         U.cancel(discoWrk);
-        U.cancel(metricsUpdater);
 
         U.join(discoWrk, log);
-        U.join(metricsUpdater, log);
 
         // Stop SPI itself.
         stopSpi();
@@ -1879,28 +1880,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
     /**
      *
      */
-    private class MetricsUpdater extends GridWorker {
+    private class MetricsUpdater implements Runnable {
         /** */
         private long prevGcTime = -1;
 
         /** */
         private long prevCpuTime = -1;
 
-        /**
-         *
-         */
-        private MetricsUpdater() {
-            super(ctx.gridName(), "metrics-updater", GridDiscoveryManager.this.log);
-        }
-
         /** {@inheritDoc} */
-        @Override protected void body() throws IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                U.sleep(METRICS_UPDATE_FREQ);
-
-                gcCpuLoad = getGcCpuLoad();
-                cpuLoad = getCpuLoad();
-            }
+        @Override public void run() {
+            gcCpuLoad = getGcCpuLoad();
+            cpuLoad = getCpuLoad();
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f89cbe/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index 81ff72b..e9b7717 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -24,8 +24,10 @@ import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.thread.*;
 
+import java.io.*;
 import java.util.*;
 
 /**
@@ -40,10 +42,12 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
         new GridConcurrentSkipListSet<>(new Comparator<GridTimeoutObject>() {
             /** {@inheritDoc} */
             @Override public int compare(GridTimeoutObject o1, GridTimeoutObject o2) {
-                long time1 = o1.endTime();
-                long time2 = o2.endTime();
+                int res = Long.compare(o1.endTime(), o2.endTime());
 
-                return time1 < time2 ? -1 : time1 > time2 ? 1 : o1.timeoutId().compareTo(o2.timeoutId());
+                if (res != 0)
+                    return res;
+
+                return o1.timeoutId().compareTo(o2.timeoutId());
             }
         });
 
@@ -98,6 +102,26 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Schedule the specified timer task for execution at the specified
+     * time with the specified period, in milliseconds.
+     *
+     * @param task Task to execute.
+     * @param delay Delay to first execution in milliseconds.
+     * @param period Period for execution in milliseconds or -1.
+     * @return Cancelable to cancel task.
+     */
+    public CancelableTask schedule(Runnable task, long delay, long period) {
+        assert delay >= 0;
+        assert period > 0 || period == -1;
+
+        CancelableTask obj = new CancelableTask(task, U.currentTimeMillis() + delay, period);
+
+        addTimeoutObject(obj);
+
+        return obj;
+    }
+
+    /**
      * @param timeoutObj Timeout object.
      */
     public void removeTimeoutObject(GridTimeoutObject timeoutObj) {
@@ -173,4 +197,81 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
         X.println(">>> Timeout processor memory stats [grid=" + ctx.gridName() +
']');
         X.println(">>>   timeoutObjsSize: " + timeoutObjs.size());
     }
+
+    /**
+     *
+     */
+    public class CancelableTask implements GridTimeoutObject, Closeable {
+        /** */
+        private final IgniteUuid id = new IgniteUuid();
+
+        /** */
+        private long endTime;
+
+        /** */
+        private final long period;
+
+        /** */
+        private volatile boolean cancel;
+
+        /** */
+        private final Runnable task;
+
+        /**
+         * @param firstTime First time.
+         * @param period Period.
+         * @param task Task to execute.
+         */
+        CancelableTask(Runnable task, long firstTime, long period) {
+            this.task = task;
+            endTime = firstTime;
+            this.period = period;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void onTimeout() {
+            if (cancel)
+                return;
+
+            long startTime = U.currentTimeMillis();
+
+            try {
+                task.run();
+            }
+            finally {
+                long executionTime = U.currentTimeMillis() - startTime;
+
+                if (executionTime > 10) {
+                    U.warn(log, "Timer task take a lot of time, tasks submitted to GridTimeoutProcessor
must work " +
+                        "quickly [executionTime=" + executionTime + ']');
+                }
+
+                if (!cancel && period > 0) {
+                    endTime = U.currentTimeMillis() + period;
+
+                    addTimeoutObject(this);
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            cancel = true;
+
+            synchronized (this) {
+                // Just waiting for task execution end to make sure that task will not be
executed anymore.
+                removeTimeoutObject(this);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f89cbe/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index f7be340..eed076b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -845,6 +845,16 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest
{
     /**
      * @throws Exception If any error occurs.
      */
+    public void testThreads() throws Exception {
+        startServerNodes(1);
+        startClientNodes(1);
+
+        System.out.println("End");
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
     public void testGridStartTime() throws Exception {
         startServerNodes(2);
 


Mime
View raw message