Return-Path: X-Original-To: apmail-geode-commits-archive@minotaur.apache.org Delivered-To: apmail-geode-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 229FB18D9B for ; Wed, 29 Jul 2015 21:18:36 +0000 (UTC) Received: (qmail 12860 invoked by uid 500); 29 Jul 2015 21:18:20 -0000 Delivered-To: apmail-geode-commits-archive@geode.apache.org Received: (qmail 12832 invoked by uid 500); 29 Jul 2015 21:18:20 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 12823 invoked by uid 99); 29 Jul 2015 21:18:20 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Jul 2015 21:18:20 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id AF9FFC0721 for ; Wed, 29 Jul 2015 21:18:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.771 X-Spam-Level: * X-Spam-Status: No, score=1.771 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id w9bEzJJxG1dH for ; Wed, 29 Jul 2015 21:18:16 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 74DB74C0E5 for ; Wed, 29 Jul 2015 21:18:11 +0000 (UTC) Received: (qmail 12453 invoked by uid 99); 29 Jul 2015 21:18:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 29 Jul 2015 21:18:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CE0F1E03FA; Wed, 29 Jul 2015 21:18:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ashvin@apache.org To: commits@geode.incubator.apache.org Date: Wed, 29 Jul 2015 21:18:14 -0000 Message-Id: <872e7d885cc249b39e29f44b63ddce1b@git.apache.org> In-Reply-To: <92805afe0fc8436fa1a1dc62f37bc28d@git.apache.org> References: <92805afe0fc8436fa1a1dc62f37bc28d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/7] incubator-geode git commit: GEODE-124: Add min size check and optimize calls GEODE-124: Add min size check and optimize calls * Member stats are needed for computing member load skew and total data size. The gathering step is common and need not be executed twice * If the total transfer size during rebalance is low, then avoid rebalance. This could happen when a new cluster is created is data is being loaded Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/1683361b Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/1683361b Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/1683361b Branch: refs/heads/develop Commit: 1683361bc87368e48d8cfefebf6dbcda932ed90c Parents: d307eda Author: Ashvin Agrawal Authored: Mon Jul 27 22:11:27 2015 -0700 Committer: Ashvin Agrawal Committed: Tue Jul 28 16:23:46 2015 -0700 ---------------------------------------------------------------------- .../gemfire/cache/util/AutoBalancer.java | 80 +++++++-- .../cache/util/AutoBalancerJUnitTest.java | 166 +++++++++++++++---- 2 files changed, 200 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1683361b/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java ---------------------------------------------------------------------- diff --git a/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java b/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java index 1de7031..ef795b0 100644 --- a/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java +++ b/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java @@ -1,6 +1,8 @@ package com.gemstone.gemfire.cache.util; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.CancellationException; @@ -74,9 +76,9 @@ public class AutoBalancer implements Declarable { public static final String SCHEDULE = "schedule"; /** - * Use this configuration to manage re-balance threshold. Rebalance operation + * Use this configuration to manage re-balance invocation. Rebalance operation * will be triggered if the total number of bytes rebalance operation may move - * is more than this threshold, percentage of the total data size. + * is more than this threshold, in percentage of the total data size. *

* Default {@value AutoBalancer#DEFAULT_SIZE_THRESHOLD_PERCENT} */ @@ -89,6 +91,26 @@ public class AutoBalancer implements Declarable { public static final int DEFAULT_SIZE_THRESHOLD_PERCENT = 10; /** + * In the initial data load phases, + * {@link AutoBalancer#SIZE_THRESHOLD_PERCENT} based rebalance invocation may + * be unnecessary. Rebalance should not be triggered if the total data size + * managed by cluster is too small. Rebalance operation will be triggered if + * the total number of bytes rebalance operation may move is more than this + * number of bytes. + *

+ * Default {@value AutoBalancer#DEFAULT_SIZE_MINIMUM} + */ + public static final String SIZE_MINIMUM = "size-minimum"; + + /** + * Default value of {@link AutoBalancer#SIZE_MINIMUM}. In the initial data + * load phases, {@link AutoBalancer#SIZE_THRESHOLD_PERCENT} based rebalance + * invocation may be unnecessary. Do not rebalance if the data to be moved is + * less than 100MB + */ + public static final int DEFAULT_SIZE_MINIMUM = 100 * 1024 * 1024; + + /** * Name of the DistributedLockService that {@link AutoBalancer} will use to * guard against concurrent maintenance activity */ @@ -193,6 +215,7 @@ public class AutoBalancer implements Declarable { */ class SizeBasedOOBAuditor implements OOBAuditor { private int sizeThreshold = DEFAULT_SIZE_THRESHOLD_PERCENT; + private int sizeMinimum = DEFAULT_SIZE_MINIMUM; @Override public void init(Properties props) { @@ -207,6 +230,12 @@ public class AutoBalancer implements Declarable { throw new GemFireConfigException(SIZE_THRESHOLD_PERCENT + " should be integer, 1 to 99"); } } + if (props.getProperty(SIZE_MINIMUM) != null) { + sizeMinimum = Integer.valueOf(props.getProperty(SIZE_MINIMUM)); + if (sizeMinimum <= 0) { + throw new GemFireConfigException(SIZE_MINIMUM + " should be greater than 0"); + } + } } } @@ -250,7 +279,12 @@ public class AutoBalancer implements Declarable { boolean needsRebalancing() { // test cluster level status long transferSize = cacheFacade.getTotalTransferSize(); - long totalSize = cacheFacade.getTotalDataSize(); + if (transferSize <= sizeMinimum) { + return false; + } + + Map details = cacheFacade.getRegionMemberDetails(); + long totalSize = cacheFacade.getTotalDataSize(details); if (totalSize > 0) { int transferPercent = (int) ((100.0 * transferSize) / totalSize); @@ -264,9 +298,13 @@ public class AutoBalancer implements Declarable { return false; } - public int getSizeThreshold() { + int getSizeThreshold() { return sizeThreshold; } + + public long getSizeMinimum() { + return sizeMinimum; + } } /** @@ -275,18 +313,30 @@ public class AutoBalancer implements Declarable { */ static class GeodeCacheFacade implements CacheOperationFacade { @Override - public long getTotalDataSize() { - long totalSize = 0; + public Map getRegionMemberDetails() { GemFireCacheImpl cache = getCache(); + Map detailsMap = new HashMap<>(); for (PartitionedRegion region : cache.getPartitionedRegions()) { LoadProbe probe = cache.getResourceManager().getLoadProbe(); InternalPRInfo info = region.getRedundancyProvider().buildPartitionedRegionInfo(true, probe); - Set membersInfo = info.getPartitionMemberInfo(); - for (PartitionMemberInfo member : membersInfo) { - if (logger.isDebugEnabled()) { - logger.debug("Region:{}, Member: {}, Size: {}", region.getFullPath(), member, member.getSize()); + detailsMap.put(region, info); + } + return detailsMap; + } + + @Override + public long getTotalDataSize(Map details) { + long totalSize = 0; + if (details != null) { + for (PartitionedRegion region : details.keySet()) { + InternalPRInfo info = details.get(region); + Set membersInfo = info.getPartitionMemberInfo(); + for (PartitionMemberInfo member : membersInfo) { + if (logger.isDebugEnabled()) { + logger.debug("Region:{}, Member: {}, Size: {}", region.getFullPath(), member, member.getSize()); + } + totalSize += member.getSize(); } - totalSize += member.getSize(); } } return totalSize; @@ -417,7 +467,9 @@ public class AutoBalancer implements Declarable { void incrementAttemptCounter(); - long getTotalDataSize(); + Map getRegionMemberDetails(); + + long getTotalDataSize(Map details); long getTotalTransferSize(); } @@ -456,4 +508,8 @@ public class AutoBalancer implements Declarable { public void setCacheOperationFacade(CacheOperationFacade facade) { this.cacheFacade = facade; } + + public CacheOperationFacade getCacheOperationFacade() { + return this.cacheFacade; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1683361b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java index 888ea20..db225cb 100644 --- a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java +++ b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java @@ -2,11 +2,14 @@ package com.gemstone.gemfire.cache.util; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.ByteArrayInputStream; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -47,10 +50,6 @@ import com.gemstone.gemfire.test.junit.categories.UnitTest; @Category(UnitTest.class) public class AutoBalancerJUnitTest { - - // OOB > threshold && size < min - // OOB > threshold && size < min - // OOB critical nodes GemFireCacheImpl cache; Mockery mockContext; @@ -355,19 +354,20 @@ public class AutoBalancerJUnitTest { public void testOOBWhenBelowSizeThreshold() { final long totalSize = 1000L; + final Map details = new HashMap<>(); final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class); mockContext.checking(new Expectations() { { + allowing(mockCacheFacade).getRegionMemberDetails(); + will(returnValue(details)); // first run - oneOf(mockCacheFacade).getTotalDataSize(); + oneOf(mockCacheFacade).getTotalDataSize(details); will(returnValue(totalSize)); oneOf(mockCacheFacade).getTotalTransferSize(); // half of threshold limit will(returnValue((AutoBalancer.DEFAULT_SIZE_THRESHOLD_PERCENT * totalSize / 100) / 2)); // second run - oneOf(mockCacheFacade).getTotalDataSize(); - will(returnValue(totalSize)); oneOf(mockCacheFacade).getTotalTransferSize(); // nothing to transfer will(returnValue(0L)); @@ -376,7 +376,9 @@ public class AutoBalancerJUnitTest { AutoBalancer balancer = new AutoBalancer(); balancer.setCacheOperationFacade(mockCacheFacade); - balancer.init(getBasicConfig()); + Properties config = getBasicConfig(); + config.put(AutoBalancer.SIZE_MINIMUM, "10"); + balancer.init(config); SizeBasedOOBAuditor auditor = (SizeBasedOOBAuditor) balancer.getOOBAuditor(); // first run @@ -387,22 +389,18 @@ public class AutoBalancerJUnitTest { } @Test - public void testOOBWhenBelowAboveThreshold() { + public void testOOBWhenAboveThresholdButBelowMin() { final long totalSize = 1000L; final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class); mockContext.checking(new Expectations() { { // first run - oneOf(mockCacheFacade).getTotalDataSize(); - will(returnValue(totalSize)); oneOf(mockCacheFacade).getTotalTransferSize(); // twice threshold will(returnValue((AutoBalancer.DEFAULT_SIZE_THRESHOLD_PERCENT * totalSize / 100) * 2)); // second run - oneOf(mockCacheFacade).getTotalDataSize(); - will(returnValue(totalSize)); oneOf(mockCacheFacade).getTotalTransferSize(); // more than total size will(returnValue(2 * totalSize)); @@ -411,17 +409,60 @@ public class AutoBalancerJUnitTest { AutoBalancer balancer = new AutoBalancer(); balancer.setCacheOperationFacade(mockCacheFacade); - balancer.init(getBasicConfig()); + Properties config = getBasicConfig(); + config.put(AutoBalancer.SIZE_MINIMUM, "" + (totalSize * 5)); + balancer.init(config); SizeBasedOOBAuditor auditor = (SizeBasedOOBAuditor) balancer.getOOBAuditor(); // first run - assertTrue(auditor.needsRebalancing()); + assertFalse(auditor.needsRebalancing()); // second run - assertTrue(auditor.needsRebalancing()); + assertFalse(auditor.needsRebalancing()); } @Test + public void testOOBWhenAboveThresholdAndMin() { + final long totalSize = 1000L; + + final Map details = new HashMap<>(); + final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class); + mockContext.checking(new Expectations() { + { + allowing(mockCacheFacade).getRegionMemberDetails(); + will(returnValue(details)); + + // first run + oneOf(mockCacheFacade).getTotalDataSize(details); + will(returnValue(totalSize)); + oneOf(mockCacheFacade).getTotalTransferSize(); + // twice threshold + will(returnValue((AutoBalancer.DEFAULT_SIZE_THRESHOLD_PERCENT * totalSize / 100) * 2)); + + // second run + oneOf(mockCacheFacade).getTotalDataSize(details); + will(returnValue(totalSize)); + oneOf(mockCacheFacade).getTotalTransferSize(); + // more than total size + will(returnValue(2 * totalSize)); + } + }); + + AutoBalancer balancer = new AutoBalancer(); + balancer.setCacheOperationFacade(mockCacheFacade); + Properties config = getBasicConfig(); + config.put(AutoBalancer.SIZE_MINIMUM, "10"); + balancer.init(config); + SizeBasedOOBAuditor auditor = (SizeBasedOOBAuditor) balancer.getOOBAuditor(); + + // first run + assertTrue(auditor.needsRebalancing()); + + // second run + assertTrue(auditor.needsRebalancing()); + } + + @Test public void testInitializerCacheXML() { String configStr = "())); + } + + @Test + public void testFacadeCollectMemberDetailsNoRegion() { final GemFireCacheImpl mockCache = mockContext.mock(GemFireCacheImpl.class); mockContext.checking(new Expectations() { { @@ -638,11 +697,11 @@ public class AutoBalancerJUnitTest { } }; - assertEquals(0, facade.getTotalDataSize()); + assertEquals(0, facade.getRegionMemberDetails().size()); } @Test - public void testFacadeTotalBytes2Regions() { + public void testFacadeCollectMemberDetails2Regions() { cache = createBasicCache(); final GemFireCacheImpl mockCache = mockContext.mock(GemFireCacheImpl.class); @@ -655,17 +714,9 @@ public class AutoBalancerJUnitTest { final PRHARedundancyProvider mockRedundancyProviderR1 = mockContext.mock(PRHARedundancyProvider.class, "prhaR1"); final InternalPRInfo mockR1PRInfo = mockContext.mock(InternalPRInfo.class, "prInforR1"); - final PartitionMemberInfo mockR1M1Info = mockContext.mock(PartitionMemberInfo.class, "r1M1"); - final PartitionMemberInfo mockR1M2Info = mockContext.mock(PartitionMemberInfo.class, "r1M2"); - final HashSet r1Members = new HashSet<>(); - r1Members.add(mockR1M1Info); - r1Members.add(mockR1M2Info); final PRHARedundancyProvider mockRedundancyProviderR2 = mockContext.mock(PRHARedundancyProvider.class, "prhaR2"); final InternalPRInfo mockR2PRInfo = mockContext.mock(InternalPRInfo.class, "prInforR2"); - final PartitionMemberInfo mockR2M1Info = mockContext.mock(PartitionMemberInfo.class, "r2M1"); - final HashSet r2Members = new HashSet<>(); - r2Members.add(mockR2M1Info); mockContext.checking(new Expectations() { { @@ -682,6 +733,55 @@ public class AutoBalancerJUnitTest { oneOf(mockRedundancyProviderR1).buildPartitionedRegionInfo(with(true), with(any(LoadProbe.class))); will(returnValue(mockR1PRInfo)); + + oneOf(mockRedundancyProviderR2).buildPartitionedRegionInfo(with(true), with(any(LoadProbe.class))); + will(returnValue(mockR2PRInfo)); + } + }); + + GeodeCacheFacade facade = new GeodeCacheFacade() { + @Override + GemFireCacheImpl getCache() { + return mockCache; + } + }; + + Map map = facade.getRegionMemberDetails(); + assertNotNull(map); + assertEquals(2, map.size()); + assertEquals(map.get(mockR1), mockR1PRInfo); + assertEquals(map.get(mockR2), mockR2PRInfo); + } + + @Test + public void testFacadeTotalBytes2Regions() { + final PartitionedRegion mockR1 = mockContext.mock(PartitionedRegion.class, "r1"); + final PartitionedRegion mockR2 = mockContext.mock(PartitionedRegion.class, "r2"); + final HashSet regions = new HashSet<>(); + regions.add(mockR1); + regions.add(mockR2); + + final InternalPRInfo mockR1PRInfo = mockContext.mock(InternalPRInfo.class, "prInforR1"); + final PartitionMemberInfo mockR1M1Info = mockContext.mock(PartitionMemberInfo.class, "r1M1"); + final PartitionMemberInfo mockR1M2Info = mockContext.mock(PartitionMemberInfo.class, "r1M2"); + final HashSet r1Members = new HashSet<>(); + r1Members.add(mockR1M1Info); + r1Members.add(mockR1M2Info); + + final InternalPRInfo mockR2PRInfo = mockContext.mock(InternalPRInfo.class, "prInforR2"); + final PartitionMemberInfo mockR2M1Info = mockContext.mock(PartitionMemberInfo.class, "r2M1"); + final HashSet r2Members = new HashSet<>(); + r2Members.add(mockR2M1Info); + + final Map details = new HashMap<>(); + details.put(mockR1, mockR1PRInfo); + details.put(mockR2, mockR2PRInfo); + + mockContext.checking(new Expectations() { + { + allowing(mockR1).getFullPath(); + allowing(mockR2).getFullPath(); + oneOf(mockR1PRInfo).getPartitionMemberInfo(); will(returnValue(r1Members)); atLeast(1).of(mockR1M1Info).getSize(); @@ -689,8 +789,6 @@ public class AutoBalancerJUnitTest { atLeast(1).of(mockR1M2Info).getSize(); will(returnValue(74L)); - oneOf(mockRedundancyProviderR2).buildPartitionedRegionInfo(with(true), with(any(LoadProbe.class))); - will(returnValue(mockR2PRInfo)); oneOf(mockR2PRInfo).getPartitionMemberInfo(); will(returnValue(r2Members)); atLeast(1).of(mockR2M1Info).getSize(); @@ -700,12 +798,12 @@ public class AutoBalancerJUnitTest { GeodeCacheFacade facade = new GeodeCacheFacade() { @Override - GemFireCacheImpl getCache() { - return mockCache; + public Map getRegionMemberDetails() { + return details; } }; - assertEquals(123 + 74 + 3475, facade.getTotalDataSize()); + assertEquals(123 + 74 + 3475, facade.getTotalDataSize(details)); } private Properties getBasicConfig() {