ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [13/20] ignite git commit: IGNITE-3440: Ignite Services: ServiceTopologyCallable is executed before system cache is started
Date Mon, 18 Jul 2016 09:37:54 GMT
IGNITE-3440: Ignite Services: ServiceTopologyCallable is executed before system cache is started


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/06b24a9b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/06b24a9b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/06b24a9b

Branch: refs/heads/ignite-3483
Commit: 06b24a9b3952598604e02d80d1ed76a52d85e743
Parents: 89d64e7
Author: Denis Magda <dmagda@gridgain.com>
Authored: Wed Jul 13 14:19:51 2016 +0300
Committer: Denis Magda <dmagda@gridgain.com>
Committed: Wed Jul 13 14:19:51 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |  2 +
 .../service/GridServiceProcessor.java           | 70 +++++++++++++++++++-
 2 files changed, 71 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/06b24a9b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 414a915..6484d4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -797,6 +797,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (!ctx.config().isDaemon())
             ctx.cacheObjects().onUtilityCacheStarted();
 
+        ctx.service().onUtilityCacheStarted();
+
         // Wait for caches in SYNC preload mode.
         for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) {
             GridCacheAdapter cache = caches.get(maskNull(cfg.getName()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/06b24a9b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index 53eaeb5..b418ba2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -38,8 +38,10 @@ import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJobContext;
 import org.apache.ignite.configuration.DeploymentMode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -81,6 +83,8 @@ import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.JobContextResource;
+import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.services.ServiceDescriptor;
@@ -125,6 +129,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     /** Deployment futures. */
     private final ConcurrentMap<String, GridFutureAdapter<?>> undepFuts = new
ConcurrentHashMap8<>();
 
+    /** Pending compute job contexts that waiting for utility cache initialization. */
+    private final List<ComputeJobContext> pendingJobCtxs = new ArrayList<>(0);
+
     /** Deployment executor service. */
     private final ExecutorService depExe;
 
@@ -1304,6 +1311,23 @@ public class GridServiceProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Called right after utility cache is started and ready for the usage.
+     */
+    public void onUtilityCacheStarted() {
+        synchronized (pendingJobCtxs) {
+            if (pendingJobCtxs.size() == 0)
+                return;
+
+            Iterator<ComputeJobContext> iter = pendingJobCtxs.iterator();
+
+            while (iter.hasNext()) {
+                iter.next().callcc();
+                iter.remove();
+            }
+        }
+    }
+
+    /**
      * Service deployment listener.
      */
     @SuppressWarnings("unchecked")
@@ -1783,9 +1807,20 @@ public class GridServiceProcessor extends GridProcessorAdapter {
         private final String svcName;
 
         /** */
+        private boolean waitedCacheInit;
+
+        /** */
         @IgniteInstanceResource
         private IgniteEx ignite;
 
+        /** */
+        @JobContextResource
+        private ComputeJobContext jCtx;
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
         /**
          * @param svcName Service name.
          */
@@ -1795,7 +1830,40 @@ public class GridServiceProcessor extends GridProcessorAdapter {
 
         /** {@inheritDoc} */
         @Override public Map<UUID, Integer> call() throws Exception {
-            return serviceTopology(ignite.context().cache().utilityCache(), svcName);
+            IgniteInternalCache<Object, Object> cache = ignite.context().cache().utilityCache();
+
+            if (cache == null) {
+                List<ComputeJobContext> pendingCtxs = ignite.context().service().pendingJobCtxs;
+
+                synchronized (pendingCtxs) {
+                    // Double check cache reference after lock acqusition.
+                    cache = ignite.context().cache().utilityCache();
+
+                    if (cache == null) {
+                        if (!waitedCacheInit) {
+                            log.debug("Utility cache hasn't been initialized yet. Waiting.");
+
+                            // waiting for a minute for cache initialization.
+                            jCtx.holdcc(60 * 1000);
+
+                            pendingCtxs.add(jCtx);
+
+                            waitedCacheInit = true;
+
+                            return null;
+                        }
+                        else {
+                            log.error("Failed to gather service topology. Utility " +
+                                "cache initialization is stuck.");
+
+                            throw new IgniteCheckedException("Failed to gather service topology.
Utility " +
+                                "cache initialization is stuck.");
+                        }
+                    }
+                }
+            }
+
+            return serviceTopology(cache, svcName);
         }
     }
 


Mime
View raw message