ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [39/42] ignite git commit: IGNITE-5306 - Persist SQL context along with cache configuration
Date Fri, 09 Jun 2017 14:21:17 GMT
IGNITE-5306 - Persist SQL context along with cache configuration


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/87551427
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/87551427
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/87551427

Branch: refs/heads/ignite-5267-merge-ea
Commit: 87551427c5796a5d48f60a412a770b810269b797
Parents: fffad68
Author: Konstantin Dudkov <kdudkov@ya.ru>
Authored: Fri Jun 9 16:21:31 2017 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Fri Jun 9 16:21:31 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    | 138 +++++++++----------
 .../processors/cache/StoredCacheData.java       |  17 +++
 .../IgnitePersistentStoreSchemaLoadTest.java    |   2 +-
 3 files changed, 82 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/87551427/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e968074..8dc6026 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -270,7 +270,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (cfg.getCacheMode() == REPLICATED)
             cfg.setBackups(Integer.MAX_VALUE);
 
-        if( cfg.getQueryParallelism() > 1 && cfg.getCacheMode() != PARTITIONED)
+        if (cfg.getQueryParallelism() > 1 && cfg.getCacheMode() != PARTITIONED)
             throw new IgniteCheckedException("Segmented indices are supported for PARTITIONED
mode only.");
 
         if (cfg.getAffinityMapper() == null)
@@ -502,12 +502,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             else if (cc.getRebalanceMode() == SYNC) {
                 if (delay < 0) {
                     U.warn(log, "Ignoring SYNC rebalance mode with manual rebalance start
(node will not wait for " +
-                        "rebalancing to be finished): " + U.maskName(cc.getName()),
+                            "rebalancing to be finished): " + U.maskName(cc.getName()),
                         "Node will not wait for rebalance in SYNC mode: " + U.maskName(cc.getName()));
                 }
                 else {
                     U.warn(log, "Using SYNC rebalance mode with rebalance delay (node will
wait until rebalancing is " +
-                        "initiated for " + delay + "ms) for cache: " + U.maskName(cc.getName()),
+                            "initiated for " + delay + "ms) for cache: " + U.maskName(cc.getName()),
                         "Node will wait until rebalancing is initiated for " + delay + "ms
for cache: " + U.maskName(cc.getName()));
                 }
             }
@@ -646,7 +646,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (!F.isEmpty(ctx.config().getCacheConfiguration())) {
             if (depMode != CONTINUOUS && depMode != SHARED)
                 U.warn(log, "Deployment mode for cache is not CONTINUOUS or SHARED " +
-                    "(it is recommended that you change deployment mode and restart): " +
depMode,
+                        "(it is recommended that you change deployment mode and restart):
" + depMode,
                     "Deployment mode for cache is not CONTINUOUS or SHARED.");
         }
 
@@ -690,12 +690,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /**
      * @param cacheData Cache data.
-     * @param sql SQL flag - {@code true} if cache was created with {@code CREATE TABLE}.
      * @param caches Caches map.
      * @param templates Templates map.
      * @throws IgniteCheckedException If failed.
      */
-    private void addCacheOnJoin(StoredCacheData cacheData, boolean sql,
+    private void addCacheOnJoin(StoredCacheData cacheData,
         Map<String, CacheJoinNodeDiscoveryData.CacheInfo> caches,
         Map<String, CacheJoinNodeDiscoveryData.CacheInfo> templates) throws IgniteCheckedException
{
         CacheConfiguration<?, ?> cfg = cacheData.config();
@@ -734,7 +733,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             else
                 stopSeq.addFirst(cfg.getName());
 
-            caches.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cacheData,
cacheType, sql, 0));
+            caches.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cacheData,
cacheType, cacheData.sql(), 0));
         }
         else
             templates.put(cfg.getName(), new CacheJoinNodeDiscoveryData.CacheInfo(cacheData,
CacheType.USER, false, 0));
@@ -757,8 +756,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             CacheConfiguration<?, ?> cfg = new CacheConfiguration(cfgs[i]);
 
             cfgs[i] = cfg; // Replace original configuration value.
+            StoredCacheData cacheData = new StoredCacheData(cfg);
+            cacheData.sql(false);
 
-            addCacheOnJoin(new StoredCacheData(cfg), false, caches, templates);
+            addCacheOnJoin(cacheData, caches, templates);
         }
     }
 
@@ -774,21 +775,20 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         assert !ctx.config().isDaemon();
 
         if (sharedCtx.pageStore() != null && sharedCtx.database().persistenceEnabled())
{
-            Map<String, StoredCacheData> ccfgs = sharedCtx.pageStore().readCacheConfigurations();
+            Map<String, StoredCacheData> cacheDataMap = sharedCtx.pageStore().readCacheConfigurations();
 
             for (String cache : caches.keySet())
-                ccfgs.remove(cache);
+                cacheDataMap.remove(cache);
 
             for (String cache : internalCaches)
-                ccfgs.remove(cache);
+                cacheDataMap.remove(cache);
 
-            if (!F.isEmpty(ccfgs)) {
+            if (!F.isEmpty(cacheDataMap)) {
                 if (log.isInfoEnabled())
-                    log.info("Register persistent caches: " + ccfgs.keySet());
+                    log.info("Register persistent caches: " + cacheDataMap.keySet());
 
-                // TODO IGNITE-5306 - set correct SQL flag below.
-                for (StoredCacheData ccfg : ccfgs.values())
-                    addCacheOnJoin(ccfg, false, caches, templates);
+                for (StoredCacheData cacheData : cacheDataMap.values())
+                    addCacheOnJoin(cacheData, caches, templates);
             }
         }
     }
@@ -981,7 +981,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * @param cancel Cancel.
      */
-    public void stopCaches(boolean cancel){
+    public void stopCaches(boolean cancel) {
         for (String cacheName : stopSeq) {
             GridCacheAdapter<?, ?> cache = stoppedCaches.remove(cacheName);
 
@@ -1040,7 +1040,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * @param cancel Cancel.
      */
-    public void onKernalStopCaches(boolean cancel){
+    public void onKernalStopCaches(boolean cancel) {
         IgniteCheckedException affErr =
             new IgniteCheckedException("Failed to wait for topology update, node is stopping.");
 
@@ -1235,14 +1235,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         cacheCtx.onStarted();
 
-
         if (log.isInfoEnabled()) {
             log.info("Started cache [name=" + cfg.getName() +
                 (cfg.getGroupName() != null ? ", group=" + cfg.getGroupName() : "") +
                 ", memoryPolicyName=" + cfg.getMemoryPolicyName() +
                 ", mode=" + cfg.getCacheMode() +
                 ", atomicity=" + cfg.getAtomicityMode() + ']');
-}
+        }
     }
 
     /**
@@ -1773,26 +1772,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param cacheDesc Cache start request.
-     * @param nearCfg Near cache configuration.
-     * @param exchTopVer Current exchange version.
-     * @throws IgniteCheckedException If failed.
-     */
-    void prepareCacheStart(DynamicCacheDescriptor cacheDesc,
-        @Nullable NearCacheConfiguration nearCfg,
-        AffinityTopologyVersion exchTopVer)
-        throws IgniteCheckedException {
-        prepareCacheStart(
-            cacheDesc.groupDescriptor(),
-            cacheDesc.cacheConfiguration(),
-            nearCfg,
-            cacheDesc,
-            exchTopVer,
-            cacheDesc.schema()
-        );
-    }
-
-    /**
      * @param exchTopVer Current exchange version.
      * @throws IgniteCheckedException If failed.
      */
@@ -1804,12 +1783,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 DynamicCacheDescriptor desc = t.get1();
 
                 prepareCacheStart(
-                    desc.groupDescriptor(),
-                    desc.cacheConfiguration(),
-                    t.get2(),
                     desc,
-                    exchTopVer,
-                    desc.schema()
+                    t.get2(),
+                    exchTopVer
                 );
             }
         }
@@ -1833,12 +1809,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 if (CU.affinityNode(ctx.discovery().localNode(), filter)) {
                     prepareCacheStart(
-                        desc.groupDescriptor(),
-                        desc.cacheConfiguration(),
-                        null,
                         desc,
-                        exchTopVer,
-                        desc.schema()
+                        null,
+                        exchTopVer
                     );
                 }
             }
@@ -1848,22 +1821,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * @param grpDesc Cache group descriptor.
-     * @param startCfg Start configuration.
-     * @param reqNearCfg Near configuration if specified for client cache start request.
      * @param desc Cache descriptor.
+     * @param reqNearCfg Near configuration if specified for client cache start request.
      * @param exchTopVer Current exchange version.
-     * @param schema Query schema.
      * @throws IgniteCheckedException If failed.
      */
-    private void prepareCacheStart(
-        CacheGroupDescriptor grpDesc,
-        CacheConfiguration startCfg,
-        @Nullable NearCacheConfiguration reqNearCfg,
+    public void prepareCacheStart(
         DynamicCacheDescriptor desc,
-        AffinityTopologyVersion exchTopVer,
-        @Nullable QuerySchema schema
+        @Nullable NearCacheConfiguration reqNearCfg,
+        AffinityTopologyVersion exchTopVer
     ) throws IgniteCheckedException {
+        CacheConfiguration startCfg = desc.cacheConfiguration();
         assert !caches.containsKey(startCfg.getName()) : startCfg.getName();
 
         CacheConfiguration ccfg = new CacheConfiguration(startCfg);
@@ -1881,7 +1849,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
             ccfg.setNearConfiguration(null);
         }
-        else if (CU.affinityNode(ctx.discovery().localNode(), grpDesc.config().getNodeFilter()))
+        else if (CU.affinityNode(ctx.discovery().localNode(), desc.groupDescriptor().config().getNodeFilter()))
             affNode = true;
         else {
             affNode = false;
@@ -1889,8 +1857,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             ccfg.setNearConfiguration(reqNearCfg);
         }
 
-        if (sharedCtx.pageStore() != null  && affNode)
-            sharedCtx.pageStore().initializeForCache(grpDesc, new StoredCacheData(startCfg));
+        StoredCacheData cacheData = toStoredData(desc);
+
+        if (sharedCtx.pageStore() != null && affNode)
+            sharedCtx.pageStore().initializeForCache(desc.groupDescriptor(), cacheData);
 
         String grpName = startCfg.getGroupName();
 
@@ -1906,7 +1876,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             }
 
             if (grp == null) {
-                grp = startCacheGroup(grpDesc,
+                grp = startCacheGroup(desc.groupDescriptor(),
                     desc.cacheType(),
                     affNode,
                     cacheObjCtx,
@@ -1914,7 +1884,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             }
         }
         else {
-            grp = startCacheGroup(grpDesc,
+            grp = startCacheGroup(desc.groupDescriptor(),
                 desc.cacheType(),
                 affNode,
                 cacheObjCtx,
@@ -1938,7 +1908,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         caches.put(cacheCtx.name(), cache);
 
-        startCache(cache, schema != null ? schema : new QuerySchema());
+        startCache(cache, desc.schema() != null ? desc.schema() : new QuerySchema());
 
         grp.onCacheStarted(cacheCtx);
 
@@ -2079,7 +2049,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         final ExchangeActions.ActionData act,
         Throwable err
     ) {
-        ExchangeActions actions = new ExchangeActions(){
+        ExchangeActions actions = new ExchangeActions() {
             @Override List<ActionData> closeRequests(UUID nodeId) {
                 return Collections.singletonList(act);
             }
@@ -2168,7 +2138,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                             GridCacheContext<?, ?> stopCtx = prepareCacheStop(req.request(),
forceClose);
 
                             if (stopCtx != null && !stopCtx.group().hasCaches())
{
-                                assert  !stopCtx.group().affinityNode() : stopCtx.name();
+                                assert !stopCtx.group().affinityNode() : stopCtx.name();
 
                                 stopCacheGroup(stopCtx.groupId());
                             }
@@ -2631,12 +2601,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /**
      * @param cacheName Cache name to destroy.
-     * @param sql If the cache needs to be destroyed only if it was created as the result
-     *     of SQL {@code CREATE TABLE} command.
+     * @param sql If the cache needs to be destroyed only if it was created as the result
of SQL {@code CREATE TABLE}
+     * command.
      * @param checkThreadTx If {@code true} checks that current thread does not have active
transactions.
      * @return Future that will be completed when cache is destroyed.
      */
-    public IgniteInternalFuture<Boolean> dynamicDestroyCache(String cacheName, boolean
sql, boolean checkThreadTx, boolean restart) {
+    public IgniteInternalFuture<Boolean> dynamicDestroyCache(String cacheName, boolean
sql, boolean checkThreadTx,
+        boolean restart) {
         assert cacheName != null;
 
         if (checkThreadTx)
@@ -2770,7 +2741,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      *
      */
-    public Collection<DynamicCacheChangeRequest> stopAllCachesRequests(){
+    public Collection<DynamicCacheChangeRequest> stopAllCachesRequests() {
         List<DynamicCacheChangeRequest> reqs = new ArrayList<>();
 
         for (String cacheName : cacheNames()) {
@@ -2794,7 +2765,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         cloneCheckSerializable(cfg);
 
-        if (needInit){
+        if (needInit) {
             CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(cfg);
 
             initialize(cfg, cacheObjCtx);
@@ -2821,6 +2792,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Form a {@link StoredCacheData} with all data to correctly restore cache params when
its configuration
+     * is read from page store. Essentially, this method takes from {@link DynamicCacheDescriptor}
all that's
+     * needed to start cache correctly, leaving out everything else.
+     *
+     * @param desc Cache descriptor to process.
+     * @return {@link StoredCacheData} based on {@code desc}.
+     */
+    private static StoredCacheData toStoredData(DynamicCacheDescriptor desc) {
+        A.notNull(desc, "desc");
+
+        StoredCacheData res = new StoredCacheData(desc.cacheConfiguration());
+
+        res.queryEntities(desc.schema() == null ? Collections.<QueryEntity>emptyList()
: desc.schema().entities());
+        res.sql(desc.sql());
+
+        return res;
+    }
+
+    /**
      * @param reqs Requests.
      * @param failIfExists Fail if exists flag.
      * @return Collection of futures.
@@ -3213,7 +3203,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * @param cacheName Cache name.
      * @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if
cache is not started,
-     *        otherwise returns {@code null} in this case.
+     * otherwise returns {@code null} in this case.
      * @param checkThreadTx If {@code true} checks that current thread does not have active
transactions.
      * @return Cache instance for given name.
      * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/87551427/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
index 2b3dcdc..39c3cd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/StoredCacheData.java
@@ -41,6 +41,9 @@ public class StoredCacheData implements Serializable {
     /** Query entities. */
     private Collection<QueryEntity> qryEntities;
 
+    /** SQL flag - {@code true} if cache was created with {@code CREATE TABLE}. */
+    private boolean sql;
+
     /**
      * Constructor.
      *
@@ -73,4 +76,18 @@ public class StoredCacheData implements Serializable {
     public void queryEntities(Collection<QueryEntity> qryEntities) {
         this.qryEntities = qryEntities;
     }
+
+    /**
+     * @return SQL flag - {@code true} if cache was created with {@code CREATE TABLE}.
+     */
+    public boolean sql() {
+        return sql;
+    }
+
+    /**
+     * @param sql SQL flag - {@code true} if cache was created with {@code CREATE TABLE}.
+     */
+    public void sql(boolean sql) {
+        this.sql = sql;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/87551427/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreSchemaLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreSchemaLoadTest.java
b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreSchemaLoadTest.java
index bdf64bb..7719f1a 100644
--- a/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreSchemaLoadTest.java
+++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreSchemaLoadTest.java
@@ -64,7 +64,7 @@ public class IgnitePersistentStoreSchemaLoadTest extends GridCommonAbstractTest
     private static final String TBL_NAME = Person.class.getSimpleName();
 
     /** Schema name. */
-    private static final String SCHEMA_NAME = TBL_NAME;
+    private static final String SCHEMA_NAME = "PUBLIC";
 
     /** Cache name. */
     private static final String CACHE_NAME = TBL_NAME;


Mime
View raw message