ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [13/46] ignite git commit: IGNITE-3440: Ignite Services: ServiceTopologyCallable is executed before system cache is started (cherry picked from commit 06b24a9)
Date Wed, 20 Jul 2016 09:06:58 GMT
IGNITE-3440: Ignite Services: ServiceTopologyCallable is executed before system cache is started
(cherry picked from commit 06b24a9)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c7eed885
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c7eed885
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c7eed885

Branch: refs/heads/ignite-1232
Commit: c7eed8859202b99cc64b7f6d6efe46acf71ad72a
Parents: cd1d618
Author: Denis Magda <dmagda@gridgain.com>
Authored: Wed Jul 13 14:19:51 2016 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Wed Jul 13 15:28:50 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |  2 +
 .../service/GridServiceProcessor.java           | 70 +++++++++++++++++++-
 2 files changed, 71 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c7eed885/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9451254..e104b87 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -821,6 +821,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (!ctx.config().isDaemon())
             ctx.cacheObjects().onUtilityCacheStarted();
 
+        ctx.service().onUtilityCacheStarted();
+
         // Wait for caches in SYNC preload mode.
         for (DynamicCacheDescriptor desc : registeredCaches.values()) {
             CacheConfiguration cfg = desc.cacheConfiguration();

http://git-wip-us.apache.org/repos/asf/ignite/blob/c7eed885/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index fd895ea..01b7302 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -38,8 +38,10 @@ import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJobContext;
 import org.apache.ignite.configuration.DeploymentMode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -86,6 +88,8 @@ import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.JobContextResource;
+import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.services.ServiceDescriptor;
@@ -137,6 +141,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /** Deployment futures. */
     private final ConcurrentMap<String, GridFutureAdapter<?>> undepFuts = new
ConcurrentHashMap8<>();
 
+    /** Pending compute job contexts that waiting for utility cache initialization. */
+    private final List<ComputeJobContext> pendingJobCtxs = new ArrayList<>(0);
+
     /** Deployment executor service. */
     private final ExecutorService depExe;
 
@@ -1325,6 +1332,23 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Called right after utility cache is started and ready for the usage.
+     */
+    public void onUtilityCacheStarted() {
+        synchronized (pendingJobCtxs) {
+            if (pendingJobCtxs.size() == 0)
+                return;
+
+            Iterator<ComputeJobContext> iter = pendingJobCtxs.iterator();
+
+            while (iter.hasNext()) {
+                iter.next().callcc();
+                iter.remove();
+            }
+        }
+    }
+
+    /**
      * Service deployment listener.
      */
     @SuppressWarnings("unchecked")
@@ -1809,9 +1833,20 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         private final String svcName;
 
         /** */
+        private boolean waitedCacheInit;
+
+        /** */
         @IgniteInstanceResource
         private IgniteEx ignite;
 
+        /** */
+        @JobContextResource
+        private ComputeJobContext jCtx;
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
         /**
          * @param svcName Service name.
          */
@@ -1821,7 +1856,40 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
         /** {@inheritDoc} */
         @Override public Map<UUID, Integer> call() throws Exception {
-            return serviceTopology(ignite.context().cache().utilityCache(), svcName);
+            IgniteInternalCache<Object, Object> cache = ignite.context().cache().utilityCache();
+
+            if (cache == null) {
+                List<ComputeJobContext> pendingCtxs = ignite.context().service().pendingJobCtxs;
+
+                synchronized (pendingCtxs) {
+                    // Double check cache reference after lock acqusition.
+                    cache = ignite.context().cache().utilityCache();
+
+                    if (cache == null) {
+                        if (!waitedCacheInit) {
+                            log.debug("Utility cache hasn't been initialized yet. Waiting.");
+
+                            // waiting for a minute for cache initialization.
+                            jCtx.holdcc(60 * 1000);
+
+                            pendingCtxs.add(jCtx);
+
+                            waitedCacheInit = true;
+
+                            return null;
+                        }
+                        else {
+                            log.error("Failed to gather service topology. Utility " +
+                                "cache initialization is stuck.");
+
+                            throw new IgniteCheckedException("Failed to gather service topology.
Utility " +
+                                "cache initialization is stuck.");
+                        }
+                    }
+                }
+            }
+
+            return serviceTopology(cache, svcName);
         }
     }
 


Mime
View raw message