ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [35/50] [abbrv] ignite git commit: Corrected ServiceProcessor shutdown sequence
Date Tue, 06 Jun 2017 08:59:53 GMT
Corrected ServiceProcessor shutdown sequence


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d0b0765
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d0b0765
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d0b0765

Branch: refs/heads/ignite-5075
Commit: 1d0b0765134a81e6626a9ef1c70939085f954847
Parents: 31ac7e9a
Author: Konstantin Dudkov <kdudkov@ya.ru>
Authored: Mon Jun 5 11:06:44 2017 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Mon Jun 5 11:06:44 2017 +0300

----------------------------------------------------------------------
 .../service/GridServiceProcessor.java           | 86 +++++++++-----------
 1 file changed, 39 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1d0b0765/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index c2fee02..fd8141c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -281,56 +281,52 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
         if (ctx.isDaemon())
             return;
 
+        // Will not release it.
         busyLock.block();
 
-        try {
-            U.shutdownNow(GridServiceProcessor.class, depExe, log);
+        U.shutdownNow(GridServiceProcessor.class, depExe, log);
 
-            if (!ctx.clientNode())
-                ctx.event().removeDiscoveryEventListener(topLsnr);
+        if (!ctx.clientNode())
+            ctx.event().removeDiscoveryEventListener(topLsnr);
 
-            Collection<ServiceContextImpl> ctxs = new ArrayList<>();
+        Collection<ServiceContextImpl> ctxs = new ArrayList<>();
 
-            synchronized (locSvcs) {
-                for (Collection<ServiceContextImpl> ctxs0 : locSvcs.values())
-                    ctxs.addAll(ctxs0);
-            }
+        synchronized (locSvcs) {
+            for (Collection<ServiceContextImpl> ctxs0 : locSvcs.values())
+                ctxs.addAll(ctxs0);
+        }
 
-            for (ServiceContextImpl ctx : ctxs) {
-                ctx.setCancelled(true);
+        for (ServiceContextImpl ctx : ctxs) {
+            ctx.setCancelled(true);
 
-                Service svc = ctx.service();
+            Service svc = ctx.service();
 
-                if (svc != null)
-                    svc.cancel(ctx);
+            if (svc != null)
+                svc.cancel(ctx);
 
-                ctx.executor().shutdownNow();
-            }
+            ctx.executor().shutdownNow();
+        }
 
-            for (ServiceContextImpl ctx : ctxs) {
-                try {
-                    if (log.isInfoEnabled() && !ctxs.isEmpty())
-                        log.info("Shutting down distributed service [name=" + ctx.name()
+ ", execId8=" +
-                            U.id8(ctx.executionId()) + ']');
+        for (ServiceContextImpl ctx : ctxs) {
+            try {
+                if (log.isInfoEnabled() && !ctxs.isEmpty())
+                    log.info("Shutting down distributed service [name=" + ctx.name() + ",
execId8=" +
+                        U.id8(ctx.executionId()) + ']');
 
-                    ctx.executor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-                }
-                catch (InterruptedException ignore) {
-                    Thread.currentThread().interrupt();
+                ctx.executor().awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException ignore) {
+                Thread.currentThread().interrupt();
 
-                    U.error(log, "Got interrupted while waiting for service to shutdown (will
continue stopping node): " +
-                        ctx.name());
-                }
+                U.error(log, "Got interrupted while waiting for service to shutdown (will
continue stopping node): " +
+                    ctx.name());
             }
+        }
 
-            Exception err = new IgniteCheckedException("Operation has been cancelled (node
is stopping).");
+        Exception err = new IgniteCheckedException("Operation has been cancelled (node is
stopping).");
 
-            cancelFutures(depFuts, err);
-            cancelFutures(undepFuts, err);
-        }
-        finally {
-            busyLock.unblock();
-        }
+        cancelFutures(depFuts, err);
+        cancelFutures(undepFuts, err);
 
         if (log.isDebugEnabled())
             log.debug("Stopped service processor.");
@@ -462,10 +458,11 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
      * @param name Service name.
      * @param svc Service.
      * @param cacheName Cache name.
-     * @param  affKey Affinity key.
+     * @param affKey Affinity key.
      * @return Future.
      */
-    public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service
svc, String cacheName, Object affKey) {
+    public IgniteInternalFuture<?> deployKeyAffinitySingleton(String name, Service
svc, String cacheName,
+        Object affKey) {
         A.notNull(affKey, "affKey");
 
         ServiceConfiguration cfg = new ServiceConfiguration();
@@ -878,7 +875,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
         Collection<ServiceContextImpl> ctxs;
 
         synchronized (locSvcs) {
-             ctxs = locSvcs.get(name);
+            ctxs = locSvcs.get(name);
         }
 
         if (ctxs == null)
@@ -921,9 +918,9 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
         while (true) {
             GridServiceAssignments assigns = new GridServiceAssignments(cfg, dep.nodeId(),
topVer.topologyVersion());
 
-             Collection<ClusterNode> nodes;
+            Collection<ClusterNode> nodes;
 
-             // Call node filter outside of transaction.
+            // Call node filter outside of transaction.
             if (affKey == null) {
                 nodes = ctx.discovery().nodes(topVer);
 
@@ -1212,7 +1209,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
      * @param cancelCnt Number of contexts to cancel.
      */
     private void cancel(Iterable<ServiceContextImpl> ctxs, int cancelCnt) {
-        for (Iterator<ServiceContextImpl> it = ctxs.iterator(); it.hasNext();) {
+        for (Iterator<ServiceContextImpl> it = ctxs.iterator(); it.hasNext(); ) {
             ServiceContextImpl svcCtx = it.next();
 
             // Flip cancelled flag.
@@ -1285,7 +1282,7 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
             GridCloseableIterator<Map.Entry<Object, Object>> iter = qry.executeScanQuery();
 
             return cache.context().itHolder().iterator(iter,
-                new CacheIteratorConverter<Cache.Entry<Object, Object>, Map.Entry<Object,Object>>()
{
+                new CacheIteratorConverter<Cache.Entry<Object, Object>, Map.Entry<Object,
Object>>() {
                     @Override protected Cache.Entry<Object, Object> convert(Map.Entry<Object,
Object> e) {
                         return new CacheEntryImpl<>(e.getKey(), e.getValue());
                     }
@@ -1510,10 +1507,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
             if (!busyLock.enterBusy())
                 return;
 
-            //Must check that threadpool was not shutdown.
-            if (depExe.isShutdown())
-                return;
-
             try {
                 final AffinityTopologyVersion topVer;
 
@@ -1726,7 +1719,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements
Ignite
             undeploy(e.getKey().name());
     }
 
-
     /**
      * @param name Name.
      */


Mime
View raw message