Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 86E8F200B20 for ; Wed, 11 May 2016 21:43:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8539C160A18; Wed, 11 May 2016 19:43:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ABD35160A17 for ; Wed, 11 May 2016 21:43:15 +0200 (CEST) Received: (qmail 71200 invoked by uid 500); 11 May 2016 19:43:14 -0000 Mailing-List: contact commits-help@geode.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@geode.incubator.apache.org Delivered-To: mailing list commits@geode.incubator.apache.org Received: (qmail 71191 invoked by uid 99); 11 May 2016 19:43:14 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 May 2016 19:43:14 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 560E6180361 for ; Wed, 11 May 2016 19:43:14 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -3.221 X-Spam-Level: X-Spam-Status: No, score=-3.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 43yBK2Bbx2RI for ; Wed, 11 May 2016 19:43:05 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 08B935F471 for ; Wed, 11 May 2016 19:43:03 +0000 (UTC) Received: (qmail 69335 invoked by uid 99); 11 May 2016 19:43:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 May 2016 19:43:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 480B5DFBA8; Wed, 11 May 2016 19:43:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sai_boorlagadda@apache.org To: commits@geode.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-geode git commit: GEODE-1153: BucketOperatorWrapper fails to invoke completion callback. Date: Wed, 11 May 2016 19:43:03 +0000 (UTC) archived-at: Wed, 11 May 2016 19:43:17 -0000 Repository: incubator-geode Updated Branches: refs/heads/develop 3222b0514 -> 02b768866 GEODE-1153: BucketOperatorWrapper fails to invoke completion callback. * Fixed BucketOperatorWrapper to invoke onFailure if bucket creation fails * Promoted BucketOperatorWrapper & BucketOperatorImpl to be higher level classes to ease unit test callback invocation * Added JUnit tests to test these higher level classes * Added a DUnit to test redundancy is met if redundant bucket creation fails Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/02b76886 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/02b76886 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/02b76886 Branch: refs/heads/develop Commit: 02b7688663f5bb2919c07183336704f300462b63 Parents: 3222b05 Author: Sai Boorlagadda Authored: Thu Mar 31 15:37:25 2016 -0700 Committer: Sai Boorlagadda Committed: Tue May 10 22:31:33 2016 -0700 ---------------------------------------------------------------------- .../PartitionedRegionRebalanceOp.java | 322 ++---------------- .../rebalance/BucketOperatorImpl.java | 78 +++++ .../rebalance/BucketOperatorWrapper.java | 235 ++++++++++++++ .../rebalance/PartitionedRegionLoadModel.java | 2 + .../control/RebalanceOperationDUnitTest.java | 148 ++++++++- .../rebalance/BucketOperatorImplTest.java | 138 ++++++++ .../rebalance/BucketOperatorWrapperTest.java | 323 +++++++++++++++++++ 7 files changed, 950 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/02b76886/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java index 8642876..641d43d 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java @@ -47,6 +47,8 @@ import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessag import com.gemstone.gemfire.internal.cache.partitioned.MoveBucketMessage.MoveBucketResponse; import com.gemstone.gemfire.internal.cache.partitioned.RemoveBucketMessage.RemoveBucketResponse; import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator; +import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorImpl; +import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorWrapper; import com.gemstone.gemfire.internal.cache.partitioned.rebalance.ParallelBucketOperator; import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel; import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.AddressComparor; @@ -413,9 +415,9 @@ public class PartitionedRegionRebalanceOp { } BucketOperator operator = simulate ? new SimulatedBucketOperator() - : new BucketOperatorImpl(); + : new BucketOperatorImpl(this); BucketOperatorWrapper wrapper = new BucketOperatorWrapper( - operator, rebalanceDetails); + operator, rebalanceDetails, stats, leaderRegion); return wrapper; } @@ -509,17 +511,12 @@ public class PartitionedRegionRebalanceOp { * the member on which to create the redundant bucket * @param bucketId * the identifier of the bucket - * @param pr - * the partitioned region which contains the bucket - * @param forRebalance - * true if part of a rebalance operation * @return true if the redundant bucket was created */ - public static boolean createRedundantBucketForRegion( - InternalDistributedMember target, int bucketId, PartitionedRegion pr, - boolean forRebalance, boolean replaceOfflineData) { - return pr.getRedundancyProvider().createBackupBucketOnMember(bucketId, - target, forRebalance, replaceOfflineData,null, true); + public boolean createRedundantBucketForRegion( + InternalDistributedMember target, int bucketId) { + return getLeaderRegion().getRedundancyProvider().createBackupBucketOnMember(bucketId, + target, isRebalance, replaceOfflineData,null, true); } /** @@ -529,20 +526,18 @@ public class PartitionedRegionRebalanceOp { * the member on which to create the redundant bucket * @param bucketId * the identifier of the bucket - * @param pr - * the partitioned region which contains the bucket * @return true if the redundant bucket was removed */ - public static boolean removeRedundantBucketForRegion( - InternalDistributedMember target, int bucketId, PartitionedRegion pr) { + public boolean removeRedundantBucketForRegion( + InternalDistributedMember target, int bucketId) { boolean removed = false; - if (pr.getDistributionManager().getId().equals(target)) { + if (getLeaderRegion().getDistributionManager().getId().equals(target)) { // invoke directly on local member... - removed = pr.getDataStore().removeBucket(bucketId, false); + removed = getLeaderRegion().getDataStore().removeBucket(bucketId, false); } else { // send message to remote member... - RemoveBucketResponse response = RemoveBucketMessage.send(target, pr, + RemoveBucketResponse response = RemoveBucketMessage.send(target, getLeaderRegion(), bucketId, false); if (response != null) { removed = response.waitForResponse(); @@ -558,28 +553,23 @@ public class PartitionedRegionRebalanceOp { * the member which should be primary * @param bucketId * the identifier of the bucket - * @param pr - * the partitioned region which contains the bucket - * @param forRebalance - * true if part of a rebalance operation * @return true if the move was successful */ - public static boolean movePrimaryBucketForRegion( - InternalDistributedMember target, int bucketId, PartitionedRegion pr, - boolean forRebalance) { + public boolean movePrimaryBucketForRegion( + InternalDistributedMember target, int bucketId) { boolean movedPrimary = false; - if (pr.getDistributionManager().getId().equals(target)) { + if (getLeaderRegion().getDistributionManager().getId().equals(target)) { // invoke directly on local member... - BucketAdvisor bucketAdvisor = pr.getRegionAdvisor().getBucketAdvisor( + BucketAdvisor bucketAdvisor = getLeaderRegion().getRegionAdvisor().getBucketAdvisor( bucketId); if (bucketAdvisor.isHosting()) { - movedPrimary = bucketAdvisor.becomePrimary(forRebalance); + movedPrimary = bucketAdvisor.becomePrimary(isRebalance); } } else { // send message to remote member... BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage.send( - target, pr, bucketId, forRebalance); + target, getLeaderRegion(), bucketId, isRebalance); if (response != null) { movedPrimary = response.waitForResponse(); } @@ -596,20 +586,18 @@ public class PartitionedRegionRebalanceOp { * member which should receive the bucket * @param bucketId * the identifier of the bucket - * @param pr - * the partitioned region which contains the bucket * @return true if the bucket was moved */ - public static boolean moveBucketForRegion(InternalDistributedMember source, - InternalDistributedMember target, int bucketId, PartitionedRegion pr) { + public boolean moveBucketForRegion(InternalDistributedMember source, + InternalDistributedMember target, int bucketId) { boolean movedBucket = false; - if (pr.getDistributionManager().getId().equals(target)) { + if (getLeaderRegion().getDistributionManager().getId().equals(target)) { // invoke directly on local member... - movedBucket = pr.getDataStore().moveBucket(bucketId, source, false); + movedBucket = getLeaderRegion().getDataStore().moveBucket(bucketId, source, false); } else { // send message to remote member... - MoveBucketResponse response = MoveBucketMessage.send(target, pr, + MoveBucketResponse response = MoveBucketMessage.send(target, getLeaderRegion(), bucketId, source); if (response != null) { movedBucket = response.waitForResponse(); @@ -626,6 +614,10 @@ public class PartitionedRegionRebalanceOp { leaderRegion.getDataPolicy().withPersistence()); } + public PartitionedRegion getLeaderRegion() { + return leaderRegion; + } + private class MembershipChangeListener implements MembershipListener { public void memberDeparted(InternalDistributedMember id, boolean crashed) { @@ -650,262 +642,4 @@ public class PartitionedRegionRebalanceOp { public void quorumLost(Set failures, List remaining) { } } - - private class BucketOperatorImpl implements BucketOperator { - - @Override - public boolean moveBucket(InternalDistributedMember source, - InternalDistributedMember target, int bucketId, - Map colocatedRegionBytes) { - - InternalResourceManager.getResourceObserver().movingBucket( - leaderRegion, bucketId, source, target); - return moveBucketForRegion(source, target, bucketId, leaderRegion); - } - - @Override - public boolean movePrimary(InternalDistributedMember source, - InternalDistributedMember target, int bucketId) { - - InternalResourceManager.getResourceObserver().movingPrimary( - leaderRegion, bucketId, source, target); - return movePrimaryBucketForRegion(target, bucketId, leaderRegion, isRebalance); - } - - @Override - public void createRedundantBucket( - InternalDistributedMember targetMember, int bucketId, - Map colocatedRegionBytes, Completion completion) { - boolean result = false; - try { - result = createRedundantBucketForRegion(targetMember, bucketId, - leaderRegion, isRebalance,replaceOfflineData); - } finally { - if(result) { - completion.onSuccess(); - } else { - completion.onFailure(); - } - } - } - - @Override - public void waitForOperations() { - //do nothing, all operations are synchronous - } - - @Override - public boolean removeBucket(InternalDistributedMember targetMember, int bucketId, - Map colocatedRegionBytes) { - return removeRedundantBucketForRegion(targetMember, bucketId, - leaderRegion); - } - } - - /** - * A wrapper class which delegates actual bucket operations to the enclosed BucketOperator, - * but keeps track of statistics about how many buckets are created, transfered, etc. - * - */ - private class BucketOperatorWrapper implements - BucketOperator { - private final BucketOperator delegate; - private final Set detailSet; - private final int regionCount; - - public BucketOperatorWrapper( - BucketOperator delegate, - Set rebalanceDetails) { - this.delegate = delegate; - this.detailSet = rebalanceDetails; - this.regionCount = detailSet.size(); - } - @Override - public boolean moveBucket(InternalDistributedMember sourceMember, - InternalDistributedMember targetMember, int id, - Map colocatedRegionBytes) { - long start = System.nanoTime(); - boolean result = false; - long elapsed = 0; - long totalBytes = 0; - - - if (stats != null) { - stats.startBucketTransfer(regionCount); - } - try { - result = delegate.moveBucket(sourceMember, targetMember, id, - colocatedRegionBytes); - elapsed = System.nanoTime() - start; - if (result) { - if (logger.isDebugEnabled()) { - logger.debug("Rebalancing {} bucket {} moved from {} to {}", leaderRegion, id, sourceMember, targetMember); - } - for (PartitionRebalanceDetailsImpl details : detailSet) { - String regionPath = details.getRegionPath(); - Long regionBytes = colocatedRegionBytes.get(regionPath); - if(regionBytes != null) { - //only increment the elapsed time for the leader region - details.incTransfers(regionBytes.longValue(), - details.getRegion().equals(leaderRegion) ? elapsed : 0); - totalBytes += regionBytes.longValue(); - } - } - } else { - if (logger.isDebugEnabled()) { - logger.debug("Rebalancing {} bucket {} moved failed from {} to {}", leaderRegion, id, sourceMember, targetMember); - } - } - } finally { - if(stats != null) { - stats.endBucketTransfer(regionCount, result, totalBytes, elapsed); - } - } - - return result; - } - - @Override - public void createRedundantBucket( - final InternalDistributedMember targetMember, final int i, - final Map colocatedRegionBytes, final Completion completion) { - - if(stats != null) { - stats.startBucketCreate(regionCount); - } - - final long start = System.nanoTime(); - delegate.createRedundantBucket(targetMember, i, - colocatedRegionBytes, new Completion() { - - @Override - public void onSuccess() { - long totalBytes = 0; - long elapsed= System.nanoTime() - start; - if(logger.isDebugEnabled()) { - logger.debug("Rebalancing {} redundant bucket {} created on {}", leaderRegion, i, targetMember); - } - for (PartitionRebalanceDetailsImpl details : detailSet) { - String regionPath = details.getRegionPath(); - Long lrb = colocatedRegionBytes.get(regionPath); - if (lrb != null) { // region could have gone away - esp during shutdow - long regionBytes = lrb.longValue(); - //Only add the elapsed time to the leader region. - details.incCreates(regionBytes, - details.getRegion().equals(leaderRegion) ? elapsed : 0); - totalBytes += regionBytes; - } - } - - if(stats != null) { - stats.endBucketCreate(regionCount, true, totalBytes, elapsed); - } - - } - - @Override - public void onFailure() { - long elapsed= System.nanoTime() - start; - - if (logger.isDebugEnabled()) { - logger.debug("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion, i, targetMember); - } - - if(stats != null) { - stats.endBucketCreate(regionCount, false, 0, elapsed); - } - } - }); - } - - @Override - public boolean removeBucket( - InternalDistributedMember targetMember, int i, - Map colocatedRegionBytes) { - boolean result = false; - long elapsed = 0; - long totalBytes = 0; - - - if(stats != null) { - stats.startBucketRemove(regionCount); - } - try { - long start = System.nanoTime(); - result = delegate.removeBucket(targetMember, i, - colocatedRegionBytes); - elapsed= System.nanoTime() - start; - if (result) { - if(logger.isDebugEnabled()) { - logger.debug("Rebalancing {} redundant bucket {} removed from {}", leaderRegion, i, targetMember); - } - for (PartitionRebalanceDetailsImpl details : detailSet) { - String regionPath = details.getRegionPath(); - Long lrb = colocatedRegionBytes.get(regionPath); - if (lrb != null) { // region could have gone away - esp during shutdow - long regionBytes = lrb.longValue(); - //Only add the elapsed time to the leader region. - details.incRemoves(regionBytes, - details.getRegion().equals(leaderRegion) ? elapsed : 0); - totalBytes += regionBytes; - } - } - } else { - if (logger.isDebugEnabled()) { - logger.debug("Rebalancing {} redundant bucket {} failed removal o{}", leaderRegion, i, targetMember); - } - } - } finally { - if(stats != null) { - stats.endBucketRemove(regionCount, result, totalBytes, elapsed); - } - } - - return result; - } - - @Override - public boolean movePrimary(InternalDistributedMember source, - InternalDistributedMember target, int bucketId) { - boolean result = false; - long elapsed = 0; - - if(stats != null) { - stats.startPrimaryTransfer(regionCount); - } - - try { - long start = System.nanoTime(); - result = delegate.movePrimary(source, target, bucketId); - elapsed = System.nanoTime() - start; - if (result) { - if (logger.isDebugEnabled()) { - logger.debug("Rebalancing {} primary bucket {} moved from {} to {}", leaderRegion, bucketId, source, target); - } - for (PartitionRebalanceDetailsImpl details : detailSet) { - details.incPrimaryTransfers(details.getRegion().equals(leaderRegion) ? elapsed : 0); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug("Rebalancing {} primary bucket {} failed to move from {} to {}", leaderRegion, bucketId, source, target); - } - } - } finally { - if(stats != null) { - stats.endPrimaryTransfer(regionCount, result, elapsed); - } - } - - return result; - } - - @Override - public void waitForOperations() { - delegate.waitForOperations(); - } - - public Set getDetailSet() { - return this.detailSet; - } - } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/02b76886/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java new file mode 100644 index 0000000..2f38752 --- /dev/null +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.partitioned.rebalance; + +import java.util.Map; + +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.internal.cache.control.InternalResourceManager; +import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionRebalanceOp; + +public class BucketOperatorImpl implements BucketOperator { + + private PartitionedRegionRebalanceOp rebalanceOp; + + public BucketOperatorImpl(PartitionedRegionRebalanceOp rebalanceOp) { + this.rebalanceOp = rebalanceOp; + } + + @Override + public boolean moveBucket(InternalDistributedMember source, + InternalDistributedMember target, int bucketId, + Map colocatedRegionBytes) { + + InternalResourceManager.getResourceObserver().movingBucket( + rebalanceOp.getLeaderRegion(), bucketId, source, target); + return rebalanceOp.moveBucketForRegion(source, target, bucketId); + } + + @Override + public boolean movePrimary(InternalDistributedMember source, + InternalDistributedMember target, int bucketId) { + + InternalResourceManager.getResourceObserver().movingPrimary( + rebalanceOp.getLeaderRegion(), bucketId, source, target); + return rebalanceOp.movePrimaryBucketForRegion(target, bucketId); + } + + @Override + public void createRedundantBucket( + InternalDistributedMember targetMember, int bucketId, + Map colocatedRegionBytes, Completion completion) { + boolean result = false; + try { + result = rebalanceOp.createRedundantBucketForRegion(targetMember, bucketId); + } finally { + if(result) { + completion.onSuccess(); + } else { + completion.onFailure(); + } + } + } + + @Override + public void waitForOperations() { + //do nothing, all operations are synchronous + } + + @Override + public boolean removeBucket(InternalDistributedMember targetMember, int bucketId, + Map colocatedRegionBytes) { + return rebalanceOp.removeRedundantBucketForRegion(targetMember, bucketId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/02b76886/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java new file mode 100644 index 0000000..d058a04 --- /dev/null +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.partitioned.rebalance; + +import java.util.Map; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.control.PartitionRebalanceDetailsImpl; +import com.gemstone.gemfire.internal.cache.control.ResourceManagerStats; +import com.gemstone.gemfire.internal.logging.LogService; + +public class BucketOperatorWrapper implements BucketOperator { + private static final Logger logger = LogService.getLogger(); + + private final BucketOperator delegate; + private final Set detailSet; + private final int regionCount; + private final ResourceManagerStats stats; + private final PartitionedRegion leaderRegion; + + public BucketOperatorWrapper(BucketOperator delegate, Set rebalanceDetails, + ResourceManagerStats stats, PartitionedRegion leaderRegion) { + this.delegate = delegate; + this.detailSet = rebalanceDetails; + this.regionCount = detailSet.size(); + this.stats = stats; + this.leaderRegion = leaderRegion; + } + + @Override + public boolean moveBucket(InternalDistributedMember sourceMember, + InternalDistributedMember targetMember, int id, + Map colocatedRegionBytes) { + long start = System.nanoTime(); + boolean result = false; + long elapsed = 0; + long totalBytes = 0; + + if (stats != null) { + stats.startBucketTransfer(regionCount); + } + try { + result = delegate.moveBucket(sourceMember, targetMember, id, colocatedRegionBytes); + elapsed = System.nanoTime() - start; + if (result) { + if (logger.isDebugEnabled()) { + logger.debug("Rebalancing {} bucket {} moved from {} to {}", leaderRegion, id, sourceMember, targetMember); + } + for (PartitionRebalanceDetailsImpl details : detailSet) { + String regionPath = details.getRegionPath(); + Long regionBytes = colocatedRegionBytes.get(regionPath); + if (regionBytes != null) { + // only increment the elapsed time for the leader region + details.incTransfers(regionBytes.longValue(), + details.getRegion().equals(leaderRegion) ? elapsed : 0); + totalBytes += regionBytes.longValue(); + } + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("Rebalancing {} bucket {} moved failed from {} to {}", leaderRegion, id, sourceMember, targetMember); + } + } + } finally { + if (stats != null) { + stats.endBucketTransfer(regionCount, result, totalBytes, elapsed); + } + } + + return result; + } + + @Override + public void createRedundantBucket( + final InternalDistributedMember targetMember, final int i, + final Map colocatedRegionBytes, final Completion completion) { + + if (stats != null) { + stats.startBucketCreate(regionCount); + } + + final long start = System.nanoTime(); + delegate.createRedundantBucket(targetMember, i, + colocatedRegionBytes, new Completion() { + + @Override + public void onSuccess() { + long totalBytes = 0; + long elapsed = System.nanoTime() - start; + if (logger.isDebugEnabled()) { + logger.debug("Rebalancing {} redundant bucket {} created on {}", leaderRegion, i, targetMember); + } + for (PartitionRebalanceDetailsImpl details : detailSet) { + String regionPath = details.getRegionPath(); + Long lrb = colocatedRegionBytes.get(regionPath); + if (lrb != null) { // region could have gone away - esp during shutdow + long regionBytes = lrb.longValue(); + // Only add the elapsed time to the leader region. + details.incCreates(regionBytes, details.getRegion().equals(leaderRegion) ? elapsed : 0); + totalBytes += regionBytes; + } + } + + if (stats != null) { + stats.endBucketCreate(regionCount, true, totalBytes, elapsed); + } + + //invoke onSuccess on the received completion callback + completion.onSuccess(); + } + + @Override + public void onFailure() { + long elapsed = System.nanoTime() - start; + + if (logger.isDebugEnabled()) { + logger.info("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion, i, targetMember); + } + + if (stats != null) { + stats.endBucketCreate(regionCount, false, 0, elapsed); + } + + //invoke onFailure on the received completion callback + completion.onFailure(); + } + }); + } + + @Override + public boolean removeBucket( + InternalDistributedMember targetMember, int i, + Map colocatedRegionBytes) { + boolean result = false; + long elapsed = 0; + long totalBytes = 0; + + if (stats != null) { + stats.startBucketRemove(regionCount); + } + try { + long start = System.nanoTime(); + result = delegate.removeBucket(targetMember, i, colocatedRegionBytes); + elapsed = System.nanoTime() - start; + if (result) { + if (logger.isDebugEnabled()) { + logger.debug("Rebalancing {} redundant bucket {} removed from {}", leaderRegion, i, targetMember); + } + for (PartitionRebalanceDetailsImpl details : detailSet) { + String regionPath = details.getRegionPath(); + Long lrb = colocatedRegionBytes.get(regionPath); + if (lrb != null) { // region could have gone away - esp during shutdow + long regionBytes = lrb.longValue(); + // Only add the elapsed time to the leader region. + details.incRemoves(regionBytes, + details.getRegion().equals(leaderRegion) ? elapsed : 0); + totalBytes += regionBytes; + } + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("Rebalancing {} redundant bucket {} failed removal o{}", leaderRegion, i, targetMember); + } + } + } finally { + if (stats != null) { + stats.endBucketRemove(regionCount, result, totalBytes, elapsed); + } + } + + return result; + } + + @Override + public boolean movePrimary(InternalDistributedMember source, + InternalDistributedMember target, int bucketId) { + boolean result = false; + long elapsed = 0; + + if (stats != null) { + stats.startPrimaryTransfer(regionCount); + } + + try { + long start = System.nanoTime(); + result = delegate.movePrimary(source, target, bucketId); + elapsed = System.nanoTime() - start; + if (result) { + if (logger.isDebugEnabled()) { + logger.debug("Rebalancing {} primary bucket {} moved from {} to {}", leaderRegion, bucketId, source, target); + } + for (PartitionRebalanceDetailsImpl details : detailSet) { + details.incPrimaryTransfers(details.getRegion().equals(leaderRegion) ? elapsed : 0); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("Rebalancing {} primary bucket {} failed to move from {} to {}", leaderRegion, bucketId, source, target); + } + } + } finally { + if (stats != null) { + stats.endPrimaryTransfer(regionCount, result, elapsed); + } + } + + return result; + } + + @Override + public void waitForOperations() { + delegate.waitForOperations(); + } + + public Set getDetailSet() { + return this.detailSet; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/02b76886/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/PartitionedRegionLoadModel.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/PartitionedRegionLoadModel.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/PartitionedRegionLoadModel.java index e024011..1cfa4a4 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/PartitionedRegionLoadModel.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/PartitionedRegionLoadModel.java @@ -402,6 +402,8 @@ public class PartitionedRegionLoadModel { //If the bucket creation failed, we need to undo the changes //we made to the model attemptedBucketCreations.add(move); + //remove the bucket from lowRedundancyBuckets before mutating the state + lowRedundancyBuckets.remove(bucket); bucket.removeMember(targetMember); if(bucket.getRedundancy() < requiredRedundancy) { lowRedundancyBuckets.add(bucket); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/02b76886/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java index 6049e14..05febda 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java @@ -16,6 +16,13 @@ */ package com.gemstone.gemfire.internal.cache.control; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -43,7 +50,6 @@ import com.gemstone.gemfire.cache.DiskStoreFactory; import com.gemstone.gemfire.cache.EntryEvent; import com.gemstone.gemfire.cache.EvictionAction; import com.gemstone.gemfire.cache.EvictionAttributes; -import com.gemstone.gemfire.cache.GemFireCache; import com.gemstone.gemfire.cache.LoaderHelper; import com.gemstone.gemfire.cache.PartitionAttributes; import com.gemstone.gemfire.cache.PartitionAttributesFactory; @@ -63,11 +69,12 @@ import com.gemstone.gemfire.cache30.CacheTestCase; import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.DistributedSystem; import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; import com.gemstone.gemfire.internal.cache.BucketRegion; import com.gemstone.gemfire.internal.cache.ColocationHelper; import com.gemstone.gemfire.internal.cache.DiskStoreImpl; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.InternalCache; +import com.gemstone.gemfire.internal.cache.PRHARedundancyProvider; import com.gemstone.gemfire.internal.cache.PartitionedRegion; import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore; import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceObserverAdapter; @@ -972,6 +979,143 @@ public class RebalanceOperationDUnitTest extends CacheTestCase { return (DistributedMember) vm.invoke(createPrRegion); } + public void testRecoverRedundancyBalancingIfCreateBucketFails() { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + + final DistributedMember member1 = createPrRegion(vm0, "region1", 100, null); + + vm0.invoke(new SerializableRunnable("createSomeBuckets") { + + public void run() { + Cache cache = getCache(); + Region region = cache.getRegion("region1"); + for(int i = 0; i < 1; i++) { + region.put(Integer.valueOf(i), "A"); + } + } + }); + + + SerializableRunnable checkRedundancy= new SerializableRunnable("checkRedundancy") { + + public void run() { + Cache cache = getCache(); + Region region = cache.getRegion("region1"); + PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region); + assertEquals(1, details.getCreatedBucketCount()); + assertEquals(0,details.getActualRedundantCopies()); + assertEquals(1,details.getLowRedundancyBucketCount()); + } + }; + + vm0.invoke(checkRedundancy); + + //Now create the region in 2 more VMs + //Let localMaxMemory(VM1) > localMaxMemory(VM2) + //so that redundant bucket will always be attempted on VM1 + final DistributedMember member2 = createPrRegion(vm1, "region1", 100, null); + final DistributedMember member3 = createPrRegion(vm2, "region1", 90, null); + + vm0.invoke(checkRedundancy); + + //Inject mock PRHARedundancyProvider to simulate createBucketFailures + vm0.invoke(new SerializableRunnable("injectCreateBucketFailureAndRebalance") { + + @Override + public void run() { + GemFireCacheImpl cache = spy(getGemfireCache()); + //set the spied cache instance + GemFireCacheImpl origCache = GemFireCacheImpl.setInstanceForTests(cache); + + PartitionedRegion origRegion = (PartitionedRegion) cache.getRegion("region1"); + PartitionedRegion spyRegion = spy(origRegion); + PRHARedundancyProvider redundancyProvider = spy(new PRHARedundancyProvider(spyRegion)); + + //return the spied region when ever getPartitionedRegions() is invoked + Set parRegions = cache.getPartitionedRegions(); + parRegions.remove(origRegion); + parRegions.add(spyRegion); + + doReturn(parRegions).when(cache).getPartitionedRegions(); + doReturn(redundancyProvider).when(spyRegion).getRedundancyProvider(); + + //simulate create bucket fails on member2 and test if it creates on member3 + doReturn(false).when(redundancyProvider).createBackupBucketOnMember(anyInt(), eq((InternalDistributedMember) member2), anyBoolean(), anyBoolean(), any(), anyBoolean()); + + //Now simulate a rebalance + //Create operationImpl and not factory as we need spied cache to be passed to operationImpl + RegionFilter filter = new FilterByPath(null, null); + RebalanceOperationImpl operation = new RebalanceOperationImpl(cache, false, filter); + operation.start(); + RebalanceResults results = null; + try { + results = operation.getResults(MAX_WAIT, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Assert.fail("Interrupted waiting on rebalance", e); + } catch (TimeoutException e) { + Assert.fail("Timeout waiting on rebalance", e); + } + assertEquals(1, results.getTotalBucketCreatesCompleted()); + assertEquals(0, results.getTotalPrimaryTransfersCompleted()); + assertEquals(0, results.getTotalBucketTransferBytes()); + assertEquals(0, results.getTotalBucketTransfersCompleted()); + Set detailSet = results.getPartitionRebalanceDetails(); + assertEquals(1, detailSet.size()); + PartitionRebalanceInfo details = detailSet.iterator().next(); + assertEquals(1, details.getBucketCreatesCompleted()); + assertEquals(0, details.getPrimaryTransfersCompleted()); + assertEquals(0, details.getBucketTransferBytes()); + assertEquals(0, details.getBucketTransfersCompleted()); + + Set afterDetails = details.getPartitionMemberDetailsAfter(); + assertEquals(3, afterDetails.size()); + for(PartitionMemberInfo memberDetails: afterDetails) { + if(memberDetails.getDistributedMember().equals(member1)) { + assertEquals(1, memberDetails.getBucketCount()); + assertEquals(1, memberDetails.getPrimaryCount()); + } else if(memberDetails.getDistributedMember().equals(member2)) { + assertEquals(0, memberDetails.getBucketCount()); + assertEquals(0, memberDetails.getPrimaryCount()); + } else if(memberDetails.getDistributedMember().equals(member3)) { + assertEquals(1, memberDetails.getBucketCount()); + assertEquals(0, memberDetails.getPrimaryCount()); + } + } + + ResourceManagerStats stats = cache.getResourceManager().getStats(); + + assertEquals(0, stats.getRebalancesInProgress()); + assertEquals(1, stats.getRebalancesCompleted()); + assertEquals(0, stats.getRebalanceBucketCreatesInProgress()); + assertEquals(results.getTotalBucketCreatesCompleted(), stats.getRebalanceBucketCreatesCompleted()); + assertEquals(1, stats.getRebalanceBucketCreatesFailed()); + + //set the original cache + GemFireCacheImpl.setInstanceForTests(origCache); + } + }); + + SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkLowRedundancy") { + + public void run() { + Cache cache = getCache(); + Region region = cache.getRegion("region1"); + PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region); + assertEquals(1, details.getCreatedBucketCount()); + assertEquals(1,details.getActualRedundantCopies()); + assertEquals(0,details.getLowRedundancyBucketCount()); + } + }; + + vm0.invoke(checkRedundancyFixed); + vm1.invoke(checkRedundancyFixed); + vm2.invoke(checkRedundancyFixed); + } + public void testRecoverRedundancyColocatedRegionsSimulation() { recoverRedundancyColocatedRegions(true); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/02b76886/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java new file mode 100644 index 0000000..a5c8982 --- /dev/null +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.partitioned.rebalance; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.control.InternalResourceManager; +import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionRebalanceOp; +import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator.Completion; +import com.gemstone.gemfire.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class BucketOperatorImplTest { + + private InternalResourceManager.ResourceObserver resourceObserver; + + private BucketOperatorImpl operator; + + private PartitionedRegion region; + private PartitionedRegionRebalanceOp rebalanceOp; + private Completion completion; + + private Map colocatedRegionBytes = new HashMap(); + private int bucketId = 1; + private InternalDistributedMember sourceMember, targetMember; + + @Before + public void setup() throws UnknownHostException { + region = mock(PartitionedRegion.class); + rebalanceOp = mock(PartitionedRegionRebalanceOp.class); + completion = mock(Completion.class); + + resourceObserver = spy(new InternalResourceManager.ResourceObserverAdapter()); + InternalResourceManager.setResourceObserver(resourceObserver); + + doReturn(region).when(rebalanceOp).getLeaderRegion(); + + operator = new BucketOperatorImpl(rebalanceOp); + + sourceMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1); + targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.2"), 1); + } + + @After + public void after() { + reset(resourceObserver); + } + + @Test + public void moveBucketShouldDelegateToParRegRebalanceOpMoveBucketForRegion() throws UnknownHostException { + doReturn(true).when(rebalanceOp).moveBucketForRegion(sourceMember, targetMember, bucketId); + + operator.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes); + + verify(resourceObserver, times(1)).movingBucket(region, bucketId, sourceMember, targetMember); + verify(rebalanceOp, times(1)).moveBucketForRegion(sourceMember, targetMember, bucketId); + } + + @Test + public void movePrimaryShouldDelegateToParRegRebalanceOpMovePrimaryBucketForRegion() throws UnknownHostException { + doReturn(true).when(rebalanceOp).movePrimaryBucketForRegion(targetMember, bucketId); + + operator.movePrimary(sourceMember, targetMember, bucketId); + + verify(resourceObserver, times(1)).movingPrimary(region, bucketId, sourceMember, targetMember); + verify(rebalanceOp, times(1)).movePrimaryBucketForRegion(targetMember, bucketId); + } + + @Test + public void createBucketShouldDelegateToParRegRebalanceOpCreateRedundantBucketForRegion() throws UnknownHostException { + doReturn(true).when(rebalanceOp).createRedundantBucketForRegion(targetMember, bucketId); + + operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion); + + verify(rebalanceOp, times(1)).createRedundantBucketForRegion(targetMember, bucketId); + } + + @Test + public void createBucketShouldInvokeOnSuccessIfCreateBucketSucceeds() { + doReturn(true).when(rebalanceOp).createRedundantBucketForRegion(targetMember, bucketId); + + operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion); + + verify(rebalanceOp, times(1)).createRedundantBucketForRegion(targetMember, bucketId); + verify(completion, times(1)).onSuccess(); + } + + @Test + public void createBucketShouldInvokeOnFailureIfCreateBucketFails() { + doReturn(false).when(rebalanceOp).createRedundantBucketForRegion(targetMember, bucketId); //return false for create fail + + operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion); + + verify(rebalanceOp, times(1)).createRedundantBucketForRegion(targetMember, bucketId); + verify(completion, times(1)).onFailure(); + } + + @Test + public void removeBucketShouldDelegateToParRegRebalanceOpRemoveRedundantBucketForRegion() { + doReturn(true).when(rebalanceOp).removeRedundantBucketForRegion(targetMember, bucketId); + + operator.removeBucket(targetMember, bucketId, colocatedRegionBytes); + + verify(rebalanceOp, times(1)).removeRedundantBucketForRegion(targetMember, bucketId); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/02b76886/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java new file mode 100644 index 0000000..558062b --- /dev/null +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.partitioned.rebalance; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.spy; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.control.PartitionRebalanceDetailsImpl; +import com.gemstone.gemfire.internal.cache.control.ResourceManagerStats; +import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator.Completion; +import com.gemstone.gemfire.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class BucketOperatorWrapperTest { + + private ResourceManagerStats stats; + private PartitionedRegion leaderRegion; + private PartitionedRegion colocatedRegion; + private Set rebalanceDetails; + private BucketOperatorWrapper wrapper; + private BucketOperatorImpl delegate; + + private Map colocatedRegionBytes; + private int bucketId = 1; + private InternalDistributedMember sourceMember, targetMember; + + private final static String PR_LEADER_REGION_NAME = "leadregion1"; + private final static String PR_COLOCATED_REGION_NAME = "coloregion1"; + + @Before + public void setUp() throws UnknownHostException { + colocatedRegionBytes = new HashMap(); + colocatedRegionBytes.put(PR_LEADER_REGION_NAME, 100L); + colocatedRegionBytes.put(PR_COLOCATED_REGION_NAME, 50L); + + sourceMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1); + targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.2"), 1); + + stats = mock(ResourceManagerStats.class); + doNothing().when(stats).startBucketCreate(anyInt()); + doNothing().when(stats).endBucketCreate(anyInt(), anyBoolean(), anyLong(), anyLong()); + + leaderRegion = mock(PartitionedRegion.class); + doReturn(PR_LEADER_REGION_NAME).when(leaderRegion).getFullPath(); + colocatedRegion = mock(PartitionedRegion.class); + doReturn(PR_COLOCATED_REGION_NAME).when(colocatedRegion).getFullPath(); + + rebalanceDetails = new HashSet(); + PartitionRebalanceDetailsImpl details = spy(new PartitionRebalanceDetailsImpl(leaderRegion)); + rebalanceDetails.add(details); + + delegate = mock(BucketOperatorImpl.class); + + wrapper = new BucketOperatorWrapper(delegate, rebalanceDetails, stats, leaderRegion); + } + + @Test + public void bucketWrapperShouldDelegateCreateBucketToEnclosedOperator() { + Completion completionSentToWrapper = mock(Completion.class); + + doNothing().when(delegate).createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper); + + wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper); + + verify(delegate, times(1)).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class)); + } + + @Test + public void bucketWrapperShouldRecordNumberOfBucketsCreatedIfCreateBucketSucceeds() { + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket + ((Completion) invocation.getArguments()[3]).onSuccess(); + return null; + } + }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class)); + + Completion completionSentToWrapper = mock(Completion.class); + wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper); + + //verify create buckets is recorded + for( PartitionRebalanceDetailsImpl details : rebalanceDetails) { + if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME)) + verify(details, times(1)).incCreates(eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME)), anyLong()); + else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME)) + verify(details, times(1)).incTransfers(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader + } + } + + @Test + public void bucketWrapperShouldNotRecordNumberOfBucketsCreatedIfCreateBucketFails() { + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket + ((Completion) invocation.getArguments()[3]).onFailure(); + return null; + } + }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class)); + + Completion completionSentToWrapper = mock(Completion.class); + wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper); + + //verify create buckets is not recorded + for( PartitionRebalanceDetailsImpl details : rebalanceDetails) { + verify(details, times(0)).incTransfers(anyLong(), anyLong()); + } + } + + @Test + public void bucketWrapperShouldInvokeOnFailureWhenCreateBucketFails() { + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket + ((Completion) invocation.getArguments()[3]).onFailure(); + return null; + } + }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class)); + + Completion completionSentToWrapper = mock(Completion.class); + wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper); + + //verify onFailure is invoked + verify(completionSentToWrapper, times(1)).onFailure(); + } + + @Test + public void bucketWrapperShouldInvokeOnSuccessWhenCreateBucketSucceeds() { + doAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket + ((Completion) invocation.getArguments()[3]).onSuccess(); + return null; + } + }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class)); + + Completion completionSentToWrapper = mock(Completion.class); + wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper); + + verify(completionSentToWrapper, times(1)).onSuccess(); + } + + @Test + public void bucketWrapperShouldDelegateMoveBucketToEnclosedOperator() { + doReturn(true).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes); + + wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes); + + //verify the delegate is invoked + verify(delegate, times(1)).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes); + + //verify we recorded necessary stats + verify(stats, times(1)).startBucketTransfer(anyInt()); + verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong()); + } + + @Test + public void bucketWrapperShouldRecordBytesTransferredPerRegionAfterMoveBucketIsSuccessful() { + doReturn(true).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes); + + wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes); + + //verify the details is updated with bytes transfered + for( PartitionRebalanceDetailsImpl details : rebalanceDetails) { + if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME)) + verify(details, times(1)).incTransfers(eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME)), anyLong()); + else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME)) + verify(details, times(1)).incTransfers(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader + } + + //verify we recorded necessary stats + verify(stats, times(1)).startBucketTransfer(anyInt()); + verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong()); + } + + @Test + public void bucketWrapperShouldDoNotRecordBytesTransferedIfMoveBucketFails() { + doReturn(false).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes); + + wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes); + + //verify the details is not updated with bytes transfered + for( PartitionRebalanceDetailsImpl details : rebalanceDetails) { + verify(details, times(0)).incTransfers(anyLong(), anyLong()); + } + + //verify we recorded necessary stats + verify(stats, times(1)).startBucketTransfer(anyInt()); + verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong()); + } + + @Test + public void bucketWrapperShouldDelegateRemoveBucketToEnclosedOperator() { + wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes); + + //verify the delegate is invoked + verify(delegate, times(1)).removeBucket(targetMember, bucketId, colocatedRegionBytes); + + //verify we recorded necessary stats + verify(stats, times(1)).startBucketRemove(anyInt()); + verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong()); + } + + @Test + public void bucketWrapperShouldRecordBucketRemovesPerRegionAfterRemoveBucketIsSuccessful() { + doReturn(true).when(delegate).removeBucket(targetMember, bucketId, colocatedRegionBytes); + + wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes); + + //verify the details is updated with bytes transfered + for( PartitionRebalanceDetailsImpl details : rebalanceDetails) { + if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME)) + verify(details, times(1)).incRemoves((eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME))), anyLong()); + else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME)) + verify(details, times(1)).incRemoves(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader + } + + //verify we recorded necessary stats + verify(stats, times(1)).startBucketRemove(anyInt()); + verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong()); + } + + @Test + public void bucketWrapperShouldDoNotRecordBucketRemovesIfMoveBucketFails() { + doReturn(false).when(delegate).removeBucket(targetMember, bucketId, colocatedRegionBytes); + + wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes); + + //verify the details is not updated with bytes transfered + for( PartitionRebalanceDetailsImpl details : rebalanceDetails) { + verify(details, times(0)).incTransfers(anyLong(), anyLong()); + } + + //verify we recorded necessary stats + verify(stats, times(1)).startBucketRemove(anyInt()); + verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong()); + } + + @Test + public void bucketWrapperShouldDelegateMovePrimaryToEnclosedOperator() { + wrapper.movePrimary(sourceMember, targetMember, bucketId); + + //verify the delegate is invoked + verify(delegate, times(1)).movePrimary(sourceMember, targetMember, bucketId); + + //verify we recorded necessary stats + verify(stats, times(1)).startPrimaryTransfer(anyInt()); + verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong()); + } + + @Test + public void bucketWrapperShouldRecordPrimaryTransfersPerRegionAfterMovePrimaryIsSuccessful() { + doReturn(true).when(delegate).movePrimary(sourceMember, targetMember, bucketId); + + wrapper.movePrimary(sourceMember, targetMember, bucketId); + + //verify the details is updated with bytes transfered + for( PartitionRebalanceDetailsImpl details : rebalanceDetails) { + if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME)) + verify(details, times(1)).incPrimaryTransfers(anyLong()); + else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME)) + verify(details, times(1)).incPrimaryTransfers(0); //elapsed is recorded only if its leader + } + + //verify we recorded necessary stats + verify(stats, times(1)).startPrimaryTransfer(anyInt()); + verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong()); + } + + @Test + public void bucketWrapperShouldNotRecordPrimaryTransfersPerRegionAfterMovePrimaryFails() { + doReturn(false).when(delegate).movePrimary(sourceMember, targetMember, bucketId); + + wrapper.movePrimary(sourceMember, targetMember, bucketId); + + //verify the details is not updated with bytes transfered + for( PartitionRebalanceDetailsImpl details : rebalanceDetails) { + verify(details, times(0)).incTransfers(anyLong(), anyLong()); + } + + //verify we recorded necessary stats + verify(stats, times(1)).startPrimaryTransfer(anyInt()); + verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong()); + } +}