ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5075-pds
Date Wed, 31 May 2017 11:55:09 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5075-pds fdd76de24 -> 39c43f2d3


ignite-5075-pds


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39c43f2d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39c43f2d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39c43f2d

Branch: refs/heads/ignite-5075-pds
Commit: 39c43f2d375c4725c39eb799ac6adf27ffaedeb4
Parents: fdd76de
Author: sboikov <sboikov@gridgain.com>
Authored: Wed May 31 14:40:04 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed May 31 14:55:00 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |   2 +-
 .../cache/CacheGroupInfrastructure.java         |   7 +
 .../dht/preloader/GridDhtPartitionDemander.java |  68 ++++---
 .../dht/preloader/GridDhtPartitionSupplier.java |  12 +-
 .../GridDhtPartitionSupplyMessage.java          |  76 ++++++--
 .../cache/CacheGroupsMetricsRebalanceTest.java  | 140 +++++++++++++
 .../distributed/CacheGroupsPreloadTest.java     | 194 +++++++++++++++++++
 .../IgniteCacheMetricsSelfTestSuite.java        |   3 +
 .../testsuites/IgniteCacheTestSuite3.java       |   1 +
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 .../IgnitePersistentStoreCacheGroupsTest.java   | 114 +++++++++--
 11 files changed, 556 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 1efc4aa..ea49dbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1008,7 +1008,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 }
 
                 default:
-                    assert plc >= 0 : "Negative policy: " + plc;
+                    assert plc >= 0 : "Negative policy [plc=" + plc + ", msg=" + msg +
']';
 
                     if (isReservedGridIoPolicy(plc))
                         throw new IgniteCheckedException("Failed to process message with
policy of reserved range. " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 1c8d85c..cdc2e9e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -700,6 +700,13 @@ public class CacheGroupInfrastructure {
     }
 
     /**
+     * @return Caches in this group.
+     */
+    public List<GridCacheContext> caches() {
+        return this.caches;
+    }
+
+    /**
      * @return {@code True} if group contains caches.
      */
     boolean hasCaches() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 2514130..0eaae75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
 import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
+import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -298,18 +299,19 @@ public class GridDhtPartitionDemander {
 
             fut.sendRebalanceStartedEvent();
 
-// TODO 5075.
-//            final boolean statsEnabled = cctx.config().isStatisticsEnabled();
-//
-//            if (statsEnabled) {
-//                cctx.cache().metrics0().clearRebalanceCounters();
-//
-//                rebalanceFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>()
{
-//                    @Override public void apply(IgniteInternalFuture<Boolean> fut)
{
-//                        cctx.cache().metrics0().clearRebalanceCounters();
-//                    }
-//                });
-//            }
+            for (GridCacheContext cctx : grp.caches()) {
+                if (cctx.config().isStatisticsEnabled()) {
+                    final CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+                    metrics.clearRebalanceCounters();
+
+                    rebalanceFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>()
{
+                        @Override public void apply(IgniteInternalFuture<Boolean> fut)
{
+                            metrics.clearRebalanceCounters();
+                        }
+                    });
+                }
+            }
 
             if (assigns.cancelled()) { // Pending exchange.
                 if (log.isDebugEnabled())
@@ -609,19 +611,35 @@ public class GridDhtPartitionDemander {
 
         final GridDhtPartitionTopology top = grp.topology();
 
-// TODO 5075.
-//        final boolean statsEnabled = cctx.config().isStatisticsEnabled();
-//
-//        if (statsEnabled) {
-//            if (supply.estimatedKeysCount() != -1)
-//                cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount());
-//
-//            cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize());
-//        }
+        if (grp.sharedGroup()) {
+            for (GridCacheContext cctx : grp.caches()) {
+                if (cctx.config().isStatisticsEnabled()) {
+                    long keysCnt = supply.keysForCache(cctx.cacheId());
+
+                    if (keysCnt != -1)
+                        cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt);
+
+                    // Can not be calculated per cache.
+                    cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize());
+                }
+            }
+        }
+        else {
+            GridCacheContext cctx = grp.singleCacheContext();
+
+            if (cctx.config().isStatisticsEnabled()) {
+                if (supply.estimatedKeysCount() != -1)
+                    cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount());
+
+                cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize());
+            }
+        }
 
         try {
             AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
 
+            GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext();
+
             // Preload.
             for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet())
{
                 int p = e.getKey();
@@ -661,9 +679,11 @@ public class GridDhtPartitionDemander {
                                     break;
                                 }
 
-// TODO 5075.
-//                                if (statsEnabled)
-//                                    cctx.cache().metrics0().onRebalanceKeyReceived();
+                                if (grp.sharedGroup() && (cctx == null || cctx.cacheId()
!= entry.cacheId()))
+                                    cctx = ctx.cacheContext(entry.cacheId());
+
+                                if(cctx != null && cctx.config().isStatisticsEnabled())
+                                    cctx.cache().metrics0().onRebalanceKeyReceived();
                             }
 
                             // If message was last for this partition,

http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index a7ae3c5..6196746 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -28,7 +28,6 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
@@ -248,18 +247,19 @@ class GridDhtPartitionSupplier {
             Iterator<Integer> partIt = sctx != null ? sctx.partIt : d.partitions().iterator();
 
             if (sctx == null) {
-                long keysCnt = 0;
-
                 for (Integer part : d.partitions()) {
                     GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(),
false);
 
                     if (loc == null || loc.state() != OWNING)
                         continue;
 
-                    keysCnt += grp.offheap().totalPartitionEntriesCount(part);
+                    if (grp.sharedGroup()) {
+                        for (int cacheId : grp.cacheIds())
+                            s.addKeysForCache(cacheId, grp.offheap().cacheEntriesCount(cacheId,
part));
+                    }
+                    else
+                        s.addEstimatedKeysCount(grp.offheap().totalPartitionEntriesCount(part));
                 }
-
-                s.estimatedKeysCount(keysCnt);
             }
 
             while ((sctx != null && newReq) || partIt.hasNext()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 92c462b..ee5b190 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
 import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
@@ -80,6 +79,10 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
imple
     /** Estimated keys count. */
     private long estimatedKeysCnt = -1;
 
+    /** Estimated keys count per cache in case the message is for shared group. */
+    @GridDirectMap(keyType = int.class, valueType = long.class)
+    private Map<Integer, Long> keysPerCache;
+
     /**
      * @param updateSeq Update sequence for this node.
      * @param grpId Cache group ID.
@@ -295,30 +298,36 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
imple
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
+                if (!writer.writeMap("keysPerCache", keysPerCache, MessageCollectionItemType.INT,
MessageCollectionItemType.LONG))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
+                if (!writer.writeCollection("last", last, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeInt("msgSize", msgSize))
+                if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeInt("msgSize", msgSize))
                     return false;
 
                 writer.incrementState();
 
             case 10:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
                 if (!writer.writeLong("updateSeq", updateSeq))
                     return false;
 
@@ -365,7 +374,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
imple
                 reader.incrementState();
 
             case 6:
-                last = reader.readCollection("last", MessageCollectionItemType.INT);
+                keysPerCache = reader.readMap("keysPerCache", MessageCollectionItemType.INT,
MessageCollectionItemType.LONG, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -373,7 +382,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
imple
                 reader.incrementState();
 
             case 7:
-                missed = reader.readCollection("missed", MessageCollectionItemType.INT);
+                last = reader.readCollection("last", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -381,7 +390,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
imple
                 reader.incrementState();
 
             case 8:
-                msgSize = reader.readInt("msgSize");
+                missed = reader.readCollection("missed", MessageCollectionItemType.INT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -389,7 +398,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
imple
                 reader.incrementState();
 
             case 9:
-                topVer = reader.readMessage("topVer");
+                msgSize = reader.readInt("msgSize");
 
                 if (!reader.isLastRead())
                     return false;
@@ -397,6 +406,14 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
imple
                 reader.incrementState();
 
             case 10:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
                 updateSeq = reader.readLong("updateSeq");
 
                 if (!reader.isLastRead())
@@ -416,7 +433,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
imple
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 12;
     }
 
     /**
@@ -427,10 +444,43 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage
imple
     }
 
     /**
-     * @param estimatedKeysCnt New estimated keys count.
+     * @param cnt Keys count to add.
      */
-    public void estimatedKeysCount(long estimatedKeysCnt) {
-        this.estimatedKeysCnt = estimatedKeysCnt;
+    public void addEstimatedKeysCount(long cnt) {
+        this.estimatedKeysCnt += cnt;
+    }
+
+    /**
+     * @return Estimated keys count for a given cache ID.
+     */
+    public long keysForCache(int cacheId) {
+        if (this.keysPerCache == null)
+            return -1;
+
+        Long cnt = this.keysPerCache.get(cacheId);
+
+        return cnt != null ? cnt : 0;
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param cnt Keys count.
+     */
+    public void addKeysForCache(int cacheId, long cnt) {
+        assert cacheId != 0 && cnt >= 0;
+
+        if (keysPerCache == null)
+            keysPerCache = new HashMap<>();
+
+        Long cnt0 = keysPerCache.get(cacheId);
+
+        if (cnt0 == null) {
+            keysPerCache.put(cacheId, cnt);
+
+            msgSize += 12;
+        }
+        else
+            keysPerCache.put(cacheId, cnt0 + cnt);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
new file mode 100644
index 0000000..c15fa5f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String CACHE1 = "cache1";
+
+    /** */
+    private static final String CACHE2 = "cache2";
+
+    /** */
+    private static final String GROUP = "group1";
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        CacheConfiguration cfg1 = new CacheConfiguration()
+            .setName(CACHE1)
+            .setGroupName(GROUP)
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setRebalanceMode(CacheRebalanceMode.ASYNC)
+            .setStatisticsEnabled(true);
+
+        CacheConfiguration cfg2 = new CacheConfiguration(cfg1)
+            .setName(CACHE2);
+
+        cfg.setCacheConfiguration(cfg1, cfg2);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRebalance() throws Exception {
+        Ignite ignite = startGrids(4);
+
+        IgniteCache<Object, Object> cache1 = ignite.cache(CACHE1);
+        IgniteCache<Object, Object> cache2 = ignite.cache(CACHE2);
+
+        for (int i = 0; i < 10000; i++) {
+            cache1.put(i, CACHE1 + "-" + i);
+
+            if (i % 2 == 0)
+                cache2.put(i, CACHE2 + "-" + i);
+        }
+
+        final CountDownLatch l1 = new CountDownLatch(1);
+        final CountDownLatch l2 = new CountDownLatch(1);
+
+        startGrid(4).events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                l1.countDown();
+
+                try {
+                    assertTrue(l2.await(5, TimeUnit.SECONDS));
+                }
+                catch (InterruptedException e) {
+                    throw new AssertionError();
+                }
+
+                return false;
+            }
+        }, EventType.EVT_CACHE_REBALANCE_STOPPED);
+
+        assertTrue(l1.await(5, TimeUnit.SECONDS));
+
+        ignite = ignite(4);
+
+        CacheMetrics metrics1 = ignite.cache(CACHE1).localMetrics();
+        CacheMetrics metrics2 = ignite.cache(CACHE2).localMetrics();
+
+        l2.countDown();
+
+        long rate1 = metrics1.getRebalancingKeysRate();
+        long rate2 = metrics2.getRebalancingKeysRate();
+
+        assertTrue(rate1 > 0);
+        assertTrue(rate2 > 0);
+
+        // rate1 has to be roughly twice more than rate2.
+        double ratio = ((double)rate2 / rate1) * 100;
+
+        log.info("Ratio: " + ratio);
+
+        assertTrue(ratio > 40 && ratio < 60);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGroupsPreloadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGroupsPreloadTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGroupsPreloadTest.java
new file mode 100644
index 0000000..8859638
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGroupsPreloadTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class CacheGroupsPreloadTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final String CACHE1 = "cache1";
+
+    /** */
+    private static final String CACHE2 = "cache2";
+
+    /** */
+    private static final String GROUP1 = "group1";
+
+    /** */
+    private static final String GROUP2 = "group2";
+
+    /** */
+    private CacheAtomicityMode atomicityMode = CacheAtomicityMode.ATOMIC;
+
+    /** */
+    private CacheMode cacheMode = CacheMode.PARTITIONED;
+
+    /** */
+    private boolean sameGrp = true;
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        CacheConfiguration cfg1 = defaultCacheConfiguration()
+            .setName(CACHE1)
+            .setGroupName(GROUP1)
+            .setCacheMode(cacheMode)
+            .setAtomicityMode(atomicityMode)
+            .setBackups(1);
+
+        CacheConfiguration cfg2 = new CacheConfiguration(cfg1)
+            .setName(CACHE2);
+
+        if (!sameGrp)
+            cfg2.setGroupName(GROUP2);
+
+        cfg.setCacheConfiguration(cfg1, cfg2);
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCachePreload1() throws Exception {
+        cachePreloadTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCachePreload2() throws Exception {
+        atomicityMode = CacheAtomicityMode.TRANSACTIONAL;
+
+        cachePreloadTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCachePreload3() throws Exception {
+        cacheMode = CacheMode.REPLICATED;
+
+        cachePreloadTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCachePreload4() throws Exception {
+        cacheMode = CacheMode.REPLICATED;
+        atomicityMode = CacheAtomicityMode.TRANSACTIONAL;
+
+        cachePreloadTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCachePreload5() throws Exception {
+        sameGrp = false;
+
+        cachePreloadTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCachePreload6() throws Exception {
+        sameGrp = false;
+        atomicityMode = CacheAtomicityMode.TRANSACTIONAL;
+
+        cachePreloadTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCachePreload7() throws Exception {
+        sameGrp = false;
+        cacheMode = CacheMode.REPLICATED;
+
+        cachePreloadTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCachePreload8() throws Exception {
+        sameGrp = false;
+        cacheMode = CacheMode.REPLICATED;
+        atomicityMode = CacheAtomicityMode.TRANSACTIONAL;
+
+        cachePreloadTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void cachePreloadTest() throws Exception {
+        IgniteCache<Object, Object> cache = startGrid(0).cache(CACHE1);
+
+        for (int i = 0; i < 1000; i++)
+            cache.put(i, CACHE1 + "-" + i);
+
+        cache = startGrid(1).cache(CACHE1);
+
+        for (int i = 0; i < 1000; i++)
+            assertEquals(CACHE1 + "-" + i, cache.get(i));
+
+        cache = ignite(1).cache(CACHE2);
+
+        for (int i = 0; i < 1000; i++)
+            cache.put(i, CACHE2 + "-" + i);
+
+        cache = startGrid(2).cache(CACHE1);
+
+        for (int i = 0; i < 1000; i++)
+            assertEquals(CACHE1 + "-" + i, cache.get(i));
+
+        cache = ignite(2).cache(CACHE2);
+
+        for (int i = 0; i < 1000; i++)
+            assertEquals(CACHE2 + "-" + i, cache.get(i));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
index ebcf1df..d3471ca 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.CacheGroupsMetricsRebalanceTest;
 import org.apache.ignite.internal.processors.cache.CacheMetricsForClusterGroupSelfTest;
 import org.apache.ignite.internal.processors.cache.OffheapCacheMetricsForClusterGroupSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPartitionedMetricsSelfTest;
@@ -57,6 +58,8 @@ public class IgniteCacheMetricsSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheAtomicPartitionedTckMetricsSelfTestImpl.class);
         suite.addTestSuite(GridCacheAtomicLocalTckMetricsSelfTestImpl.class);
 
+        suite.addTestSuite(CacheGroupsMetricsRebalanceTest.class);
+
         // Cluster wide metrics.
         suite.addTestSuite(CacheMetricsForClusterGroupSelfTest.class);
         suite.addTestSuite(OffheapCacheMetricsForClusterGroupSelfTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index f0c0c5a..feb2cdf 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheGroupsTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheInterceptorSelfTestSuite;
 import org.apache.ignite.internal.processors.cache.IgniteCacheScanPredicateDeploymentSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheAsyncOperationsTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheGroupsPreloadTest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheMixedModeSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteTxGetAfterStopTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDaemonNodePartitionedSelfTest;

http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 6370a10..906d3ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarl
 import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheDiscoveryDataConcurrentJoinTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheGroupsPreloadTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheStartOnJoinTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest;
@@ -303,6 +304,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(GridCacheDhtTxPreloadSelfTest.class);
         suite.addTestSuite(GridCacheNearTxPreloadSelfTest.class);
         suite.addTestSuite(GridReplicatedTxPreloadTest.class);
+        suite.addTestSuite(CacheGroupsPreloadTest.class);
 
         suite.addTestSuite(IgniteDynamicCacheFilterTest.class);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java
b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java
index d2a5177..1b9ebc4 100644
--- a/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java
+++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java
@@ -16,7 +16,6 @@
  */
 
 package org.apache.ignite.cache.database;
-
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
@@ -24,6 +23,7 @@ import java.util.Objects;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.cache.Cache;
+import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -36,6 +36,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.configuration.PersistentStoreConfiguration;
 import org.apache.ignite.internal.processors.cache.database.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -64,6 +66,9 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
     /** */
     private CacheConfiguration[] ccfgs;
 
+    /** */
+    private boolean activeOnStart = true;
+
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         super.beforeTestsStarted();
@@ -84,6 +89,8 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
 
         cfg.setConsistentId(gridName);
 
+        cfg.setActiveOnStart(activeOnStart);
+
         MemoryConfiguration memCfg = new MemoryConfiguration();
         memCfg.setPageSize(1024);
         memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024);
@@ -145,6 +152,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
     /**
      * @throws Exception If failed.
      */
+    @SuppressWarnings("unchecked")
     public void testClusterRestartCachesWithH2Indexes() throws Exception {
         CacheConfiguration[] ccfgs1 = new CacheConfiguration[5];
 
@@ -168,17 +176,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
 
         node.createCaches(Arrays.asList(ccfgs1));
 
-        for (String cacheName : caches) {
-            IgniteCache<Object, Object> cache = node.cache(cacheName);
-
-            for (int i = 0; i < 10; i++)  {
-                cache.put(i, new Person("" + i, cacheName));
-
-                assertEquals(new Person("" + i, cacheName), cache.get(i));
-            }
+        putPersons(caches, node);
 
-            assertEquals(10, cache.size());
-        }
+        checkPersons(caches, node);
+        checkPersonsQuery(caches, node);
 
         stopAllGrids();
 
@@ -194,21 +195,81 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         int idx = rnd.nextInt(caches.length);
 
         String cacheName = caches[idx];
+        CacheConfiguration cacheCfg = ccfgs1[idx];
 
         node.destroyCache(cacheName);
 
-        IgniteCache<Object, Object> cache = node.createCache(ccfgs1[idx]);
+        node.createCache(cacheCfg);
+
+        putPersons(new String[]{cacheName}, node);
 
-        for (int i = 0; i < 10; i++)  {
-            cache.put(i, new Person("" + i, cacheName));
+        checkPersons(caches, node);
+        checkPersonsQuery(caches, node);
+    }
 
-            assertEquals(new Person("" + i, cacheName), cache.get(i));
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExpiryPolicy() throws Exception {
+        long ttl = 10000;
+
+        activeOnStart = false;
+
+        CacheConfiguration[] ccfgs1 = new CacheConfiguration[5];
+
+        // Several caches with the same indexed type (and index names)
+        ccfgs1[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1);
+        ccfgs1[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, TRANSACTIONAL, 1);
+        ccfgs1[2] = cacheConfiguration(GROUP2, "c3", PARTITIONED, ATOMIC, 1);
+        ccfgs1[3] = cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 1);
+        ccfgs1[4] = cacheConfiguration(null, "c5", PARTITIONED, ATOMIC, 1);
+
+        String[] caches = {"c1", "c2", "c3", "c4", "c5"};
+
+        startGrids(3);
+
+        Ignite node = ignite(0);
+
+        node.active(true);
+
+        node.createCaches(Arrays.asList(ccfgs1));
+
+        ExpiryPolicy plc = new PlatformExpiryPolicy(ttl, -2, -2);
+
+        for (String cacheName : caches) {
+            IgniteCache<Object, Object> cache = node.cache(cacheName).withExpiryPolicy(plc);
+
+            for (int i = 0; i < 10; i++)
+                cache.put(i, cacheName + i);
         }
 
-        assertEquals(10, cache.size());
+        long deadline = System.currentTimeMillis() + (long)(ttl * 1.2);
 
-        checkPersons(caches, node);
-        checkPersonsQuery(caches, node);
+        stopAllGrids();
+
+        startGrids(3);
+
+        node = ignite(0);
+
+        node.active(true);
+
+        for (String cacheName : caches) {
+            IgniteCache<Object, Object> cache = node.cache(cacheName);
+
+            for (int i = 0; i < 10; i++)
+                assertEquals(cacheName + i, cache.get(i));
+
+            assertEquals(10, cache.size());
+        }
+
+        // wait for expiration
+        Thread.sleep(Math.max(deadline - System.currentTimeMillis(), 0));
+
+        for (String cacheName : caches) {
+            IgniteCache<Object, Object> cache = node.cache(cacheName);
+
+            assertEquals(0, cache.size());
+        }
     }
 
     /**
@@ -226,6 +287,19 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
     }
 
     /**
+     * @param caches Cache names to put data into.
+     * @param node Ignite node.
+     */
+    private void putPersons(String[] caches, Ignite node) {
+        for (String cacheName : caches) {
+            IgniteCache<Object, Object> cache = node.cache(cacheName);
+
+            for (int i = 0; i < 10; i++)
+                cache.put(i, new Person("" + i, cacheName));
+        }
+    }
+
+    /**
      * @param caches Cache names to invoke a query against to.
      * @param node Ignite node.
      */
@@ -274,7 +348,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
         ccfgs[3] = cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 1);
         ccfgs[4] = cacheConfiguration(null, "c5", PARTITIONED, ATOMIC, 1);
 
-        String[] caches = {"c1", "c2", "c3", "c5", "c5"};
+        String[] caches = {"c1", "c2", "c3", "c4", "c5"};
 
         for (int i = 0; i < nodes; i++) {
             if (staticCaches)
@@ -346,10 +420,12 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest
      */
     static class Person implements Serializable {
         /** */
+        @GridToStringInclude
         @QuerySqlField(index = true, groups = "full_name")
         String fName;
 
         /** */
+        @GridToStringInclude
         @QuerySqlField(index = true, groups = "full_name")
         String lName;
 


Mime
View raw message