ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject ignite git commit: IGNITE-3024 - Fixed deadlock in ServiceProcessor
Date Thu, 28 Apr 2016 03:56:03 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3024 [created] 5053376ab


IGNITE-3024 - Fixed deadlock in ServiceProcessor


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5053376a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5053376a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5053376a

Branch: refs/heads/ignite-3024
Commit: 5053376ab0429eb867e4c47ff2ea7747ec67e287
Parents: f9bd9a3
Author: Valentin Kulichenko <valentin.lulichenko@gmail.com>
Authored: Wed Apr 27 20:55:19 2016 -0700
Committer: Valentin Kulichenko <valentin.lulichenko@gmail.com>
Committed: Wed Apr 27 20:55:19 2016 -0700

----------------------------------------------------------------------
 .../service/GridServiceProcessor.java           | 197 +++++++++++--------
 .../processors/service/GridServiceProxy.java    |  15 +-
 .../processors/service/ServiceContextImpl.java  |  23 ++-
 3 files changed, 140 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5053376a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index eded5e1..2e4ae6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -240,7 +240,11 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
         for (ServiceContextImpl ctx : ctxs) {
             ctx.setCancelled(true);
-            ctx.service().cancel(ctx);
+
+            Service svc = ctx.service();
+
+            if (svc != null)
+                svc.cancel(ctx);
 
             ctx.executor().shutdownNow();
         }
@@ -653,7 +657,14 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             if (ctxs.isEmpty())
                 return null;
 
-            return (T)ctxs.iterator().next().service();
+            for (ServiceContextImpl ctx : ctxs) {
+                Service svc = ctx.service();
+
+                if (svc != null)
+                    return (T)svc;
+            }
+
+            return null;
         }
     }
 
@@ -675,7 +686,12 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             if (ctxs.isEmpty())
                 return null;
 
-            return ctxs.iterator().next();
+            for (ServiceContextImpl ctx : ctxs) {
+                if (ctx.service() != null)
+                    return ctx;
+            }
+
+            return null;
         }
     }
 
@@ -695,11 +711,15 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             ServiceContextImpl ctx = serviceContext(name);
 
             if (ctx != null) {
-                if (!svcItf.isAssignableFrom(ctx.service().getClass()))
-                    throw new IgniteException("Service does not implement specified interface
[svcItf=" +
-                        svcItf.getName() + ", svcCls=" + ctx.service().getClass().getName()
+ ']');
+                Service svc = ctx.service();
+
+                if (svc != null) {
+                    if (!svcItf.isAssignableFrom(svc.getClass()))
+                        throw new IgniteException("Service does not implement specified interface
[svcItf=" +
+                            svcItf.getName() + ", svcCls=" + svc.getClass().getName() + ']');
 
-                return (T)ctx.service();
+                    return (T)svc;
+                }
             }
         }
 
@@ -738,8 +758,12 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         synchronized (ctxs) {
             Collection<T> res = new ArrayList<>(ctxs.size());
 
-            for (ServiceContextImpl ctx : ctxs)
-                res.add((T)ctx.service());
+            for (ServiceContextImpl ctx : ctxs) {
+                Service svc = ctx.service();
+
+                if (svc != null)
+                    res.add((T)svc);
+            }
 
             return res;
         }
@@ -911,8 +935,6 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         if (assignCnt == null)
             assignCnt = 0;
 
-        Service svc = assigns.service();
-
         Collection<ServiceContextImpl> ctxs;
 
         synchronized (locSvcs) {
@@ -922,6 +944,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                 locSvcs.put(svcName, ctxs = new ArrayList<>());
         }
 
+        Collection<ServiceContextImpl> toInit = new ArrayList<>();
+
         synchronized (ctxs) {
             if (ctxs.size() > assignCnt) {
                 int cancelCnt = ctxs.size() - assignCnt;
@@ -932,75 +956,84 @@ public class GridServiceProcessor extends GridProcessorAdapter {
                 int createCnt = assignCnt - ctxs.size();
 
                 for (int i = 0; i < createCnt; i++) {
-                    final Service cp = copyAndInject(svc);
+                    ServiceContextImpl svcCtx = new ServiceContextImpl(assigns.name(),
+                        UUID.randomUUID(), assigns.cacheName(), assigns.affinityKey(),
+                        Executors.newSingleThreadExecutor(threadFactory));
 
-                    final ExecutorService exe = Executors.newSingleThreadExecutor(threadFactory);
+                    ctxs.add(svcCtx);
 
-                    final ServiceContextImpl svcCtx = new ServiceContextImpl(assigns.name(),
-                        UUID.randomUUID(), assigns.cacheName(), assigns.affinityKey(), cp,
exe);
+                    toInit.add(svcCtx);
+                }
+            }
+        }
 
-                    ctxs.add(svcCtx);
+        for (final ServiceContextImpl svcCtx : toInit) {
+            final Service svc = copyAndInject(assigns.service());
 
-                    try {
-                        // Initialize service.
-                        cp.init(svcCtx);
-                    }
-                    catch (Throwable e) {
-                        log.error("Failed to initialize service (service will not be deployed):
" + assigns.name(), e);
+            try {
+                // Initialize service.
+                svc.init(svcCtx);
 
-                        ctxs.remove(svcCtx);
+                svcCtx.service(svc);
+            }
+            catch (Throwable e) {
+                log.error("Failed to initialize service (service will not be deployed): "
+ assigns.name(), e);
 
-                        if (e instanceof Error)
-                            throw (Error)e;
+                synchronized (ctxs) {
+                    ctxs.removeAll(toInit);
+                }
 
-                        if (e instanceof RuntimeException)
-                            throw (RuntimeException)e;
+                if (e instanceof Error)
+                    throw (Error)e;
 
-                        return;
-                    }
+                if (e instanceof RuntimeException)
+                    throw (RuntimeException)e;
 
-                    if (log.isInfoEnabled())
-                        log.info("Starting service instance [name=" + svcCtx.name() + ",
execId=" +
-                            svcCtx.executionId() + ']');
+                return;
+            }
 
-                    // Start service in its own thread.
-                    exe.submit(new Runnable() {
-                        @Override public void run() {
-                            try {
-                                cp.execute(svcCtx);
-                            }
-                            catch (InterruptedException | IgniteInterruptedCheckedException
ignore) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Service thread was interrupted [name=" + svcCtx.name()
+ ", execId=" +
-                                        svcCtx.executionId() + ']');
-                            }
-                            catch (IgniteException e) {
-                                if (e.hasCause(InterruptedException.class) ||
-                                    e.hasCause(IgniteInterruptedCheckedException.class))
{
-                                    if (log.isDebugEnabled())
-                                        log.debug("Service thread was interrupted [name="
+ svcCtx.name() +
-                                            ", execId=" + svcCtx.executionId() + ']');
-                                }
-                                else {
-                                    U.error(log, "Service execution stopped with error [name="
+ svcCtx.name() +
-                                        ", execId=" + svcCtx.executionId() + ']', e);
-                                }
-                            }
-                            catch (Throwable e) {
-                                log.error("Service execution stopped with error [name=" +
svcCtx.name() +
-                                    ", execId=" + svcCtx.executionId() + ']', e);
+            if (log.isInfoEnabled())
+                log.info("Starting service instance [name=" + svcCtx.name() + ", execId="
+
+                    svcCtx.executionId() + ']');
 
-                                if (e instanceof Error)
-                                    throw (Error)e;
-                            }
-                            finally {
-                                // Suicide.
-                                exe.shutdownNow();
-                            }
+            // Start service in its own thread.
+            final ExecutorService exe = svcCtx.executor();
+
+            exe.submit(new Runnable() {
+                @Override public void run() {
+                    try {
+                        svc.execute(svcCtx);
+                    }
+                    catch (InterruptedException | IgniteInterruptedCheckedException ignore)
{
+                        if (log.isDebugEnabled())
+                            log.debug("Service thread was interrupted [name=" + svcCtx.name()
+ ", execId=" +
+                                svcCtx.executionId() + ']');
+                    }
+                    catch (IgniteException e) {
+                        if (e.hasCause(InterruptedException.class) ||
+                            e.hasCause(IgniteInterruptedCheckedException.class)) {
+                            if (log.isDebugEnabled())
+                                log.debug("Service thread was interrupted [name=" + svcCtx.name()
+
+                                    ", execId=" + svcCtx.executionId() + ']');
                         }
-                    });
+                        else {
+                            U.error(log, "Service execution stopped with error [name=" +
svcCtx.name() +
+                                ", execId=" + svcCtx.executionId() + ']', e);
+                        }
+                    }
+                    catch (Throwable e) {
+                        log.error("Service execution stopped with error [name=" + svcCtx.name()
+
+                            ", execId=" + svcCtx.executionId() + ']', e);
+
+                        if (e instanceof Error)
+                            throw (Error)e;
+                    }
+                    finally {
+                        // Suicide.
+                        exe.shutdownNow();
+                    }
                 }
-            }
+            });
         }
     }
 
@@ -1040,22 +1073,26 @@ public class GridServiceProcessor extends GridProcessorAdapter {
             svcCtx.setCancelled(true);
 
             // Notify service about cancellation.
-            try {
-                svcCtx.service().cancel(svcCtx);
-            }
-            catch (Throwable e) {
-                log.error("Failed to cancel service (ignoring) [name=" + svcCtx.name() +
-                    ", execId=" + svcCtx.executionId() + ']', e);
+            Service svc = svcCtx.service();
 
-                if (e instanceof Error)
-                    throw e;
-            }
-            finally {
+            if (svc != null) {
                 try {
-                    ctx.resource().cleanup(svcCtx.service());
+                    svc.cancel(svcCtx);
                 }
-                catch (IgniteCheckedException e) {
-                    log.error("Failed to clean up service (will ignore): " + svcCtx.name(),
e);
+                catch (Throwable e) {
+                    log.error("Failed to cancel service (ignoring) [name=" + svcCtx.name()
+
+                        ", execId=" + svcCtx.executionId() + ']', e);
+
+                    if (e instanceof Error)
+                        throw e;
+                }
+                finally {
+                    try {
+                        ctx.resource().cleanup(svc);
+                    }
+                    catch (IgniteCheckedException e) {
+                        log.error("Failed to clean up service (will ignore): " + svcCtx.name(),
e);
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5053376a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index 578c549..564a13a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -40,7 +40,7 @@ import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridClosureCallMode;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.services.Service;
 import org.jsr166.ThreadLocalRandom8;
 
 /**
@@ -157,8 +158,12 @@ public class GridServiceProxy<T> implements Serializable {
                     if (node.isLocal()) {
                         ServiceContextImpl svcCtx = ctx.service().serviceContext(name);
 
-                        if (svcCtx != null)
-                            return mtd.invoke(svcCtx.service(), args);
+                        if (svcCtx != null) {
+                            Service svc = svcCtx.service();
+
+                            if (svc != null)
+                                return mtd.invoke(svc, args);
+                        }
                     }
                     else {
                         // Execute service remotely.
@@ -372,9 +377,9 @@ public class GridServiceProxy<T> implements Serializable {
 
         /** {@inheritDoc} */
         @Override public Object call() throws Exception {
-            ServiceContextImpl svcCtx = ((IgniteKernal) ignite).context().service().serviceContext(svcName);
+            ServiceContextImpl svcCtx = ((IgniteEx)ignite).context().service().serviceContext(svcName);
 
-            if (svcCtx == null)
+            if (svcCtx == null || svcCtx.service() == null)
                 throw new GridServiceNotFoundException(svcName);
 
             GridServiceMethodReflectKey key = new GridServiceMethodReflectKey(mtdName, argTypes);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5053376a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceContextImpl.java
index 6746e29..799c2d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceContextImpl.java
@@ -50,10 +50,6 @@ public class ServiceContextImpl implements ServiceContext {
     /** Affinity key. */
     private final Object affKey;
 
-    /** Service. */
-    @GridToStringExclude
-    private final Service svc;
-
     /** Executor service. */
     @GridToStringExclude
     private final ExecutorService exe;
@@ -61,25 +57,25 @@ public class ServiceContextImpl implements ServiceContext {
     /** Methods reflection cache. */
     private final ConcurrentMap<GridServiceMethodReflectKey, Method> mtds = new ConcurrentHashMap<>();
 
+    /** Service. */
+    @GridToStringExclude
+    private volatile Service svc;
+
     /** Cancelled flag. */
     private volatile boolean isCancelled;
 
-
     /**
      * @param name Service name.
      * @param execId Execution ID.
      * @param cacheName Cache name.
      * @param affKey Affinity key.
-     * @param svc Service.
      * @param exe Executor service.
      */
-    ServiceContextImpl(String name, UUID execId, String cacheName, Object affKey, Service
svc,
-        ExecutorService exe) {
+    ServiceContextImpl(String name, UUID execId, String cacheName, Object affKey, ExecutorService
exe) {
         this.name = name;
         this.execId = execId;
         this.cacheName = cacheName;
         this.affKey = affKey;
-        this.svc = svc;
         this.exe = exe;
     }
 
@@ -110,6 +106,13 @@ public class ServiceContextImpl implements ServiceContext {
     }
 
     /**
+     * @param svc Service instance.
+     */
+    void service(Service svc) {
+        this.svc = svc;
+    }
+
+    /**
      * @return Service instance.
      */
     Service service() {
@@ -136,7 +139,7 @@ public class ServiceContextImpl implements ServiceContext {
 
                 mtd.setAccessible(true);
             }
-            catch (NoSuchMethodException e) {
+            catch (NoSuchMethodException ignored) {
                 mtd = NULL_METHOD;
             }
 


Mime
View raw message