Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 00D0F200C86 for ; Wed, 31 May 2017 13:55:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F38B7160BCB; Wed, 31 May 2017 11:55:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 50835160BBA for ; Wed, 31 May 2017 13:55:10 +0200 (CEST) Received: (qmail 70094 invoked by uid 500); 31 May 2017 11:55:09 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 70085 invoked by uid 99); 31 May 2017 11:55:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 May 2017 11:55:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6800CDFE5C; Wed, 31 May 2017 11:55:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Message-Id: <33ddaac4492747ef8a8a6a98345d2986@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: ignite-5075-pds Date: Wed, 31 May 2017 11:55:09 +0000 (UTC) archived-at: Wed, 31 May 2017 11:55:12 -0000 Repository: ignite Updated Branches: refs/heads/ignite-5075-pds fdd76de24 -> 39c43f2d3 ignite-5075-pds Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39c43f2d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39c43f2d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39c43f2d Branch: refs/heads/ignite-5075-pds Commit: 39c43f2d375c4725c39eb799ac6adf27ffaedeb4 Parents: fdd76de Author: sboikov Authored: Wed May 31 14:40:04 2017 +0300 Committer: sboikov Committed: Wed May 31 14:55:00 2017 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 2 +- .../cache/CacheGroupInfrastructure.java | 7 + .../dht/preloader/GridDhtPartitionDemander.java | 68 ++++--- .../dht/preloader/GridDhtPartitionSupplier.java | 12 +- .../GridDhtPartitionSupplyMessage.java | 76 ++++++-- .../cache/CacheGroupsMetricsRebalanceTest.java | 140 +++++++++++++ .../distributed/CacheGroupsPreloadTest.java | 194 +++++++++++++++++++ .../IgniteCacheMetricsSelfTestSuite.java | 3 + .../testsuites/IgniteCacheTestSuite3.java | 1 + .../testsuites/IgniteCacheTestSuite4.java | 2 + .../IgnitePersistentStoreCacheGroupsTest.java | 114 +++++++++-- 11 files changed, 556 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 1efc4aa..ea49dbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1008,7 +1008,7 @@ public class GridIoManager extends GridManagerAdapter= 0 : "Negative policy: " + plc; + assert plc >= 0 : "Negative policy [plc=" + plc + ", msg=" + msg + ']'; if (isReservedGridIoPolicy(plc)) throw new IgniteCheckedException("Failed to process message with policy of reserved range. " + http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index 1c8d85c..cdc2e9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -700,6 +700,13 @@ public class CacheGroupInfrastructure { } /** + * @return Caches in this group. + */ + public List caches() { + return this.caches; + } + + /** * @return {@code True} if group contains caches. */ boolean hasCaches() { http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 2514130..0eaae75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; +import org.apache.ignite.internal.processors.cache.CacheMetricsImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; @@ -298,18 +299,19 @@ public class GridDhtPartitionDemander { fut.sendRebalanceStartedEvent(); -// TODO 5075. -// final boolean statsEnabled = cctx.config().isStatisticsEnabled(); -// -// if (statsEnabled) { -// cctx.cache().metrics0().clearRebalanceCounters(); -// -// rebalanceFut.listen(new IgniteInClosure>() { -// @Override public void apply(IgniteInternalFuture fut) { -// cctx.cache().metrics0().clearRebalanceCounters(); -// } -// }); -// } + for (GridCacheContext cctx : grp.caches()) { + if (cctx.config().isStatisticsEnabled()) { + final CacheMetricsImpl metrics = cctx.cache().metrics0(); + + metrics.clearRebalanceCounters(); + + rebalanceFut.listen(new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture fut) { + metrics.clearRebalanceCounters(); + } + }); + } + } if (assigns.cancelled()) { // Pending exchange. if (log.isDebugEnabled()) @@ -609,19 +611,35 @@ public class GridDhtPartitionDemander { final GridDhtPartitionTopology top = grp.topology(); -// TODO 5075. -// final boolean statsEnabled = cctx.config().isStatisticsEnabled(); -// -// if (statsEnabled) { -// if (supply.estimatedKeysCount() != -1) -// cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount()); -// -// cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize()); -// } + if (grp.sharedGroup()) { + for (GridCacheContext cctx : grp.caches()) { + if (cctx.config().isStatisticsEnabled()) { + long keysCnt = supply.keysForCache(cctx.cacheId()); + + if (keysCnt != -1) + cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(keysCnt); + + // Can not be calculated per cache. + cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize()); + } + } + } + else { + GridCacheContext cctx = grp.singleCacheContext(); + + if (cctx.config().isStatisticsEnabled()) { + if (supply.estimatedKeysCount() != -1) + cctx.cache().metrics0().onRebalancingKeysCountEstimateReceived(supply.estimatedKeysCount()); + + cctx.cache().metrics0().onRebalanceBatchReceived(supply.messageSize()); + } + } try { AffinityAssignment aff = grp.affinity().cachedAffinity(topVer); + GridCacheContext cctx = grp.sharedGroup() ? null : grp.singleCacheContext(); + // Preload. for (Map.Entry e : supply.infos().entrySet()) { int p = e.getKey(); @@ -661,9 +679,11 @@ public class GridDhtPartitionDemander { break; } -// TODO 5075. -// if (statsEnabled) -// cctx.cache().metrics0().onRebalanceKeyReceived(); + if (grp.sharedGroup() && (cctx == null || cctx.cacheId() != entry.cacheId())) + cctx = ctx.cacheContext(entry.cacheId()); + + if(cctx != null && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onRebalanceKeyReceived(); } // If message was last for this partition, http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index a7ae3c5..6196746 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -28,7 +28,6 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; -import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; @@ -248,18 +247,19 @@ class GridDhtPartitionSupplier { Iterator partIt = sctx != null ? sctx.partIt : d.partitions().iterator(); if (sctx == null) { - long keysCnt = 0; - for (Integer part : d.partitions()) { GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); if (loc == null || loc.state() != OWNING) continue; - keysCnt += grp.offheap().totalPartitionEntriesCount(part); + if (grp.sharedGroup()) { + for (int cacheId : grp.cacheIds()) + s.addKeysForCache(cacheId, grp.offheap().cacheEntriesCount(cacheId, part)); + } + else + s.addEstimatedKeysCount(grp.offheap().totalPartitionEntriesCount(part)); } - - s.estimatedKeysCount(keysCnt); } while ((sctx != null && newReq) || partIt.hasNext()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 92c462b..ee5b190 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -28,7 +28,6 @@ import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; -import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure; @@ -80,6 +79,10 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple /** Estimated keys count. */ private long estimatedKeysCnt = -1; + /** Estimated keys count per cache in case the message is for shared group. */ + @GridDirectMap(keyType = int.class, valueType = long.class) + private Map keysPerCache; + /** * @param updateSeq Update sequence for this node. * @param grpId Cache group ID. @@ -295,30 +298,36 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple writer.incrementState(); case 6: - if (!writer.writeCollection("last", last, MessageCollectionItemType.INT)) + if (!writer.writeMap("keysPerCache", keysPerCache, MessageCollectionItemType.INT, MessageCollectionItemType.LONG)) return false; writer.incrementState(); case 7: - if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT)) + if (!writer.writeCollection("last", last, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 8: - if (!writer.writeInt("msgSize", msgSize)) + if (!writer.writeCollection("missed", missed, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 9: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("msgSize", msgSize)) return false; writer.incrementState(); case 10: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 11: if (!writer.writeLong("updateSeq", updateSeq)) return false; @@ -365,7 +374,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple reader.incrementState(); case 6: - last = reader.readCollection("last", MessageCollectionItemType.INT); + keysPerCache = reader.readMap("keysPerCache", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false); if (!reader.isLastRead()) return false; @@ -373,7 +382,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple reader.incrementState(); case 7: - missed = reader.readCollection("missed", MessageCollectionItemType.INT); + last = reader.readCollection("last", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -381,7 +390,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple reader.incrementState(); case 8: - msgSize = reader.readInt("msgSize"); + missed = reader.readCollection("missed", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -389,7 +398,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple reader.incrementState(); case 9: - topVer = reader.readMessage("topVer"); + msgSize = reader.readInt("msgSize"); if (!reader.isLastRead()) return false; @@ -397,6 +406,14 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple reader.incrementState(); case 10: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: updateSeq = reader.readLong("updateSeq"); if (!reader.isLastRead()) @@ -416,7 +433,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 12; } /** @@ -427,10 +444,43 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple } /** - * @param estimatedKeysCnt New estimated keys count. + * @param cnt Keys count to add. */ - public void estimatedKeysCount(long estimatedKeysCnt) { - this.estimatedKeysCnt = estimatedKeysCnt; + public void addEstimatedKeysCount(long cnt) { + this.estimatedKeysCnt += cnt; + } + + /** + * @return Estimated keys count for a given cache ID. + */ + public long keysForCache(int cacheId) { + if (this.keysPerCache == null) + return -1; + + Long cnt = this.keysPerCache.get(cacheId); + + return cnt != null ? cnt : 0; + } + + /** + * @param cacheId Cache ID. + * @param cnt Keys count. + */ + public void addKeysForCache(int cacheId, long cnt) { + assert cacheId != 0 && cnt >= 0; + + if (keysPerCache == null) + keysPerCache = new HashMap<>(); + + Long cnt0 = keysPerCache.get(cacheId); + + if (cnt0 == null) { + keysPerCache.put(cacheId, cnt); + + msgSize += 12; + } + else + keysPerCache.put(cacheId, cnt0 + cnt); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java new file mode 100644 index 0000000..c15fa5f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String CACHE1 = "cache1"; + + /** */ + private static final String CACHE2 = "cache2"; + + /** */ + private static final String GROUP = "group1"; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + CacheConfiguration cfg1 = new CacheConfiguration() + .setName(CACHE1) + .setGroupName(GROUP) + .setCacheMode(CacheMode.PARTITIONED) + .setAtomicityMode(CacheAtomicityMode.ATOMIC) + .setRebalanceMode(CacheRebalanceMode.ASYNC) + .setStatisticsEnabled(true); + + CacheConfiguration cfg2 = new CacheConfiguration(cfg1) + .setName(CACHE2); + + cfg.setCacheConfiguration(cfg1, cfg2); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testRebalance() throws Exception { + Ignite ignite = startGrids(4); + + IgniteCache cache1 = ignite.cache(CACHE1); + IgniteCache cache2 = ignite.cache(CACHE2); + + for (int i = 0; i < 10000; i++) { + cache1.put(i, CACHE1 + "-" + i); + + if (i % 2 == 0) + cache2.put(i, CACHE2 + "-" + i); + } + + final CountDownLatch l1 = new CountDownLatch(1); + final CountDownLatch l2 = new CountDownLatch(1); + + startGrid(4).events().localListen(new IgnitePredicate() { + @Override public boolean apply(Event evt) { + l1.countDown(); + + try { + assertTrue(l2.await(5, TimeUnit.SECONDS)); + } + catch (InterruptedException e) { + throw new AssertionError(); + } + + return false; + } + }, EventType.EVT_CACHE_REBALANCE_STOPPED); + + assertTrue(l1.await(5, TimeUnit.SECONDS)); + + ignite = ignite(4); + + CacheMetrics metrics1 = ignite.cache(CACHE1).localMetrics(); + CacheMetrics metrics2 = ignite.cache(CACHE2).localMetrics(); + + l2.countDown(); + + long rate1 = metrics1.getRebalancingKeysRate(); + long rate2 = metrics2.getRebalancingKeysRate(); + + assertTrue(rate1 > 0); + assertTrue(rate2 > 0); + + // rate1 has to be roughly twice more than rate2. + double ratio = ((double)rate2 / rate1) * 100; + + log.info("Ratio: " + ratio); + + assertTrue(ratio > 40 && ratio < 60); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGroupsPreloadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGroupsPreloadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGroupsPreloadTest.java new file mode 100644 index 0000000..8859638 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGroupsPreloadTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class CacheGroupsPreloadTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String CACHE1 = "cache1"; + + /** */ + private static final String CACHE2 = "cache2"; + + /** */ + private static final String GROUP1 = "group1"; + + /** */ + private static final String GROUP2 = "group2"; + + /** */ + private CacheAtomicityMode atomicityMode = CacheAtomicityMode.ATOMIC; + + /** */ + private CacheMode cacheMode = CacheMode.PARTITIONED; + + /** */ + private boolean sameGrp = true; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + CacheConfiguration cfg1 = defaultCacheConfiguration() + .setName(CACHE1) + .setGroupName(GROUP1) + .setCacheMode(cacheMode) + .setAtomicityMode(atomicityMode) + .setBackups(1); + + CacheConfiguration cfg2 = new CacheConfiguration(cfg1) + .setName(CACHE2); + + if (!sameGrp) + cfg2.setGroupName(GROUP2); + + cfg.setCacheConfiguration(cfg1, cfg2); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testCachePreload1() throws Exception { + cachePreloadTest(); + } + + /** + * @throws Exception If failed. + */ + public void testCachePreload2() throws Exception { + atomicityMode = CacheAtomicityMode.TRANSACTIONAL; + + cachePreloadTest(); + } + + /** + * @throws Exception If failed. + */ + public void testCachePreload3() throws Exception { + cacheMode = CacheMode.REPLICATED; + + cachePreloadTest(); + } + + /** + * @throws Exception If failed. + */ + public void testCachePreload4() throws Exception { + cacheMode = CacheMode.REPLICATED; + atomicityMode = CacheAtomicityMode.TRANSACTIONAL; + + cachePreloadTest(); + } + + /** + * @throws Exception If failed. + */ + public void testCachePreload5() throws Exception { + sameGrp = false; + + cachePreloadTest(); + } + + /** + * @throws Exception If failed. + */ + public void testCachePreload6() throws Exception { + sameGrp = false; + atomicityMode = CacheAtomicityMode.TRANSACTIONAL; + + cachePreloadTest(); + } + + /** + * @throws Exception If failed. + */ + public void testCachePreload7() throws Exception { + sameGrp = false; + cacheMode = CacheMode.REPLICATED; + + cachePreloadTest(); + } + + /** + * @throws Exception If failed. + */ + public void testCachePreload8() throws Exception { + sameGrp = false; + cacheMode = CacheMode.REPLICATED; + atomicityMode = CacheAtomicityMode.TRANSACTIONAL; + + cachePreloadTest(); + } + + /** + * @throws Exception If failed. + */ + private void cachePreloadTest() throws Exception { + IgniteCache cache = startGrid(0).cache(CACHE1); + + for (int i = 0; i < 1000; i++) + cache.put(i, CACHE1 + "-" + i); + + cache = startGrid(1).cache(CACHE1); + + for (int i = 0; i < 1000; i++) + assertEquals(CACHE1 + "-" + i, cache.get(i)); + + cache = ignite(1).cache(CACHE2); + + for (int i = 0; i < 1000; i++) + cache.put(i, CACHE2 + "-" + i); + + cache = startGrid(2).cache(CACHE1); + + for (int i = 0; i < 1000; i++) + assertEquals(CACHE1 + "-" + i, cache.get(i)); + + cache = ignite(2).cache(CACHE2); + + for (int i = 0; i < 1000; i++) + assertEquals(CACHE2 + "-" + i, cache.get(i)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java index ebcf1df..d3471ca 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMetricsSelfTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.CacheGroupsMetricsRebalanceTest; import org.apache.ignite.internal.processors.cache.CacheMetricsForClusterGroupSelfTest; import org.apache.ignite.internal.processors.cache.OffheapCacheMetricsForClusterGroupSelfTest; import org.apache.ignite.internal.processors.cache.distributed.near.GridCacheAtomicPartitionedMetricsSelfTest; @@ -57,6 +58,8 @@ public class IgniteCacheMetricsSelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheAtomicPartitionedTckMetricsSelfTestImpl.class); suite.addTestSuite(GridCacheAtomicLocalTckMetricsSelfTestImpl.class); + suite.addTestSuite(CacheGroupsMetricsRebalanceTest.class); + // Cluster wide metrics. suite.addTestSuite(CacheMetricsForClusterGroupSelfTest.class); suite.addTestSuite(OffheapCacheMetricsForClusterGroupSelfTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java index f0c0c5a..feb2cdf 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheGroupsTest; import org.apache.ignite.internal.processors.cache.IgniteCacheInterceptorSelfTestSuite; import org.apache.ignite.internal.processors.cache.IgniteCacheScanPredicateDeploymentSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheAsyncOperationsTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheGroupsPreloadTest; import org.apache.ignite.internal.processors.cache.distributed.GridCacheMixedModeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteTxGetAfterStopTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDaemonNodePartitionedSelfTest; http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 6370a10..906d3ff 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -87,6 +87,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarl import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest; import org.apache.ignite.internal.processors.cache.distributed.CacheDiscoveryDataConcurrentJoinTest; import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheGroupsPreloadTest; import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest; import org.apache.ignite.internal.processors.cache.distributed.CacheStartOnJoinTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest; @@ -303,6 +304,7 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(GridCacheDhtTxPreloadSelfTest.class); suite.addTestSuite(GridCacheNearTxPreloadSelfTest.class); suite.addTestSuite(GridReplicatedTxPreloadTest.class); + suite.addTestSuite(CacheGroupsPreloadTest.class); suite.addTestSuite(IgniteDynamicCacheFilterTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/39c43f2d/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java index d2a5177..1b9ebc4 100644 --- a/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java +++ b/modules/pds/src/test/java/org/apache/ignite/cache/database/IgnitePersistentStoreCacheGroupsTest.java @@ -16,7 +16,6 @@ */ package org.apache.ignite.cache.database; - import java.io.Serializable; import java.util.Arrays; import java.util.List; @@ -24,6 +23,7 @@ import java.util.Objects; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import javax.cache.Cache; +import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheAtomicityMode; @@ -36,6 +36,8 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.internal.processors.cache.database.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -64,6 +66,9 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest /** */ private CacheConfiguration[] ccfgs; + /** */ + private boolean activeOnStart = true; + /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); @@ -84,6 +89,8 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest cfg.setConsistentId(gridName); + cfg.setActiveOnStart(activeOnStart); + MemoryConfiguration memCfg = new MemoryConfiguration(); memCfg.setPageSize(1024); memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024); @@ -145,6 +152,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest /** * @throws Exception If failed. */ + @SuppressWarnings("unchecked") public void testClusterRestartCachesWithH2Indexes() throws Exception { CacheConfiguration[] ccfgs1 = new CacheConfiguration[5]; @@ -168,17 +176,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest node.createCaches(Arrays.asList(ccfgs1)); - for (String cacheName : caches) { - IgniteCache cache = node.cache(cacheName); - - for (int i = 0; i < 10; i++) { - cache.put(i, new Person("" + i, cacheName)); - - assertEquals(new Person("" + i, cacheName), cache.get(i)); - } + putPersons(caches, node); - assertEquals(10, cache.size()); - } + checkPersons(caches, node); + checkPersonsQuery(caches, node); stopAllGrids(); @@ -194,21 +195,81 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest int idx = rnd.nextInt(caches.length); String cacheName = caches[idx]; + CacheConfiguration cacheCfg = ccfgs1[idx]; node.destroyCache(cacheName); - IgniteCache cache = node.createCache(ccfgs1[idx]); + node.createCache(cacheCfg); + + putPersons(new String[]{cacheName}, node); - for (int i = 0; i < 10; i++) { - cache.put(i, new Person("" + i, cacheName)); + checkPersons(caches, node); + checkPersonsQuery(caches, node); + } - assertEquals(new Person("" + i, cacheName), cache.get(i)); + /** + * @throws Exception If failed. + */ + public void testExpiryPolicy() throws Exception { + long ttl = 10000; + + activeOnStart = false; + + CacheConfiguration[] ccfgs1 = new CacheConfiguration[5]; + + // Several caches with the same indexed type (and index names) + ccfgs1[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1); + ccfgs1[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, TRANSACTIONAL, 1); + ccfgs1[2] = cacheConfiguration(GROUP2, "c3", PARTITIONED, ATOMIC, 1); + ccfgs1[3] = cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 1); + ccfgs1[4] = cacheConfiguration(null, "c5", PARTITIONED, ATOMIC, 1); + + String[] caches = {"c1", "c2", "c3", "c4", "c5"}; + + startGrids(3); + + Ignite node = ignite(0); + + node.active(true); + + node.createCaches(Arrays.asList(ccfgs1)); + + ExpiryPolicy plc = new PlatformExpiryPolicy(ttl, -2, -2); + + for (String cacheName : caches) { + IgniteCache cache = node.cache(cacheName).withExpiryPolicy(plc); + + for (int i = 0; i < 10; i++) + cache.put(i, cacheName + i); } - assertEquals(10, cache.size()); + long deadline = System.currentTimeMillis() + (long)(ttl * 1.2); - checkPersons(caches, node); - checkPersonsQuery(caches, node); + stopAllGrids(); + + startGrids(3); + + node = ignite(0); + + node.active(true); + + for (String cacheName : caches) { + IgniteCache cache = node.cache(cacheName); + + for (int i = 0; i < 10; i++) + assertEquals(cacheName + i, cache.get(i)); + + assertEquals(10, cache.size()); + } + + // wait for expiration + Thread.sleep(Math.max(deadline - System.currentTimeMillis(), 0)); + + for (String cacheName : caches) { + IgniteCache cache = node.cache(cacheName); + + assertEquals(0, cache.size()); + } } /** @@ -226,6 +287,19 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest } /** + * @param caches Cache names to put data into. + * @param node Ignite node. + */ + private void putPersons(String[] caches, Ignite node) { + for (String cacheName : caches) { + IgniteCache cache = node.cache(cacheName); + + for (int i = 0; i < 10; i++) + cache.put(i, new Person("" + i, cacheName)); + } + } + + /** * @param caches Cache names to invoke a query against to. * @param node Ignite node. */ @@ -274,7 +348,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest ccfgs[3] = cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 1); ccfgs[4] = cacheConfiguration(null, "c5", PARTITIONED, ATOMIC, 1); - String[] caches = {"c1", "c2", "c3", "c5", "c5"}; + String[] caches = {"c1", "c2", "c3", "c4", "c5"}; for (int i = 0; i < nodes; i++) { if (staticCaches) @@ -346,10 +420,12 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest */ static class Person implements Serializable { /** */ + @GridToStringInclude @QuerySqlField(index = true, groups = "full_name") String fName; /** */ + @GridToStringInclude @QuerySqlField(index = true, groups = "full_name") String lName;